#!/usr/bin/env python3 import argparse import asyncio import configparser import ipaddress import os import logging from logging.handlers import RotatingFileHandler from asyncio import Semaphore import dns.asyncresolver import httpx BASE_DIR = os.path.dirname(os.path.abspath(__file__)) # ================= LOGGING ================= def setup_logging(): log_dir = os.path.join(BASE_DIR, "logs") os.makedirs(log_dir, exist_ok=True) log_file = os.path.join(log_dir, "redirector.log") handler = RotatingFileHandler( log_file, maxBytes=2 * 1024 * 1024, backupCount=5 ) logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s", handlers=[handler, logging.StreamHandler()] ) # ================= CONFIG ================= def read_config(cfg_file, section=None): config = configparser.ConfigParser() config.read(cfg_file) configs = [] if section: if section not in config: logging.error(f"Секция {section} не найдена") exit(1) sections = [section] logging.info(f"Используется конфиг: {section}") else: sections = config.sections() logging.info(f"Используются ВСЕ конфиги: {', '.join(sections)}") for sec in sections: cfg = config[sec] is_blackhole = sec.lower() == "blackhole" configs.append({ "name": sec, "is_blackhole": is_blackhole, "threads": int(cfg.get("threads", "50")), "filename": os.path.join(BASE_DIR, cfg.get("filename", f"{sec}_forced_list.txt")), "script": os.path.join(BASE_DIR, cfg.get("script", f"{sec}_ON.sh")), "rollback_script": os.path.join(BASE_DIR, cfg.get("rollback_script", f"{sec}_OFF.sh")), "exclude_cloudflare": cfg.get("exclude_cloudflare", "no").lower() in ["yes", "y"], "gateway": cfg.get("gateway", ""), "interface": cfg.get("interface", ""), "table": cfg.get("table", "1010"), "priority": cfg.get("priority", "10" if is_blackhole else "101"), "run": cfg.get("run", "") }) return configs def load_forced_list(cfg): domains = [] ips = set() try: with open(cfg["filename"], "r", encoding="utf-8") as f: for line in f: line = line.strip() if not line or line.startswith("#"): continue try: ip = ipaddress.ip_address(line) ips.add(str(ip)) continue except ValueError: pass try: net = ipaddress.ip_network(line, strict=False) ips.add(str(net)) continue except ValueError: pass domains.append(line) except Exception as e: logging.error(f"Ошибка чтения входящих данных: {e}") exit(1) return domains, ips def get_resolvers(): system_dns = dns.asyncresolver.Resolver().nameservers return [ ("System", system_dns), ("Google", ["8.8.8.8"]), ("Yandex", ["77.88.8.8"]), ("CloudFlare", ["1.1.1.1"]), ] def is_public_ip(ip_str): try: if "/" in ip_str: net = ipaddress.ip_network(ip_str, strict=False) return not ( net.is_private or net.is_loopback or net.is_link_local or net.is_multicast or net.is_reserved ) else: ip = ipaddress.ip_address(ip_str) return not ( ip.is_private or ip.is_loopback or ip.is_link_local or ip.is_multicast or ip.is_reserved or ip.is_unspecified ) except ValueError: return False async def resolve_domain(domain, dns_name, server, semaphore): async with semaphore: resolver = dns.asyncresolver.Resolver() resolver.nameservers = [server] resolver.lifetime = 5 try: answer = await resolver.resolve(domain) return [r.address for r in answer] except Exception as e: logging.warning(f"DNS ошибка [{dns_name} - {server} | {domain}]: {e}") return [] def normalize_ip(ip): try: if "/" in ip: return ip addr = ipaddress.ip_address(ip) if isinstance(addr, ipaddress.IPv4Address): if addr.packed[-1] == 0: return f"{addr}/24" else: return f"{addr}/32" return ip except ValueError: return ip async def get_cloudflare_ips(): async with httpx.AsyncClient() as client: r = await client.get("https://www.cloudflare.com/ips-v4/") r.raise_for_status() result = set() for line in r.text.splitlines(): net = ipaddress.ip_network(line.strip()) for ip in net: result.add(str(ip)) return result # ================= SCRIPT GENERATORS ================= def generate_policy_script(cfg, ips): with open(cfg["script"], "w") as f: f.write( f'#!/bin/bash\n\n' f'if ! ip rule list | grep -q "lookup {cfg["table"]}"; then\n' f' ip rule add table {cfg["table"]} priority {cfg["priority"]}\n' f'fi\n\n' f'ip route flush table {cfg["table"]}\n\n' ) for ip in ips: if cfg["gateway"]: f.write(f'ip route replace {ip} via {cfg["gateway"]} table {cfg["table"]}\n') else: f.write(f'ip route replace {ip} dev {cfg["interface"]} table {cfg["table"]}\n') def generate_blackhole_script(cfg, ips): with open(cfg["script"], "w") as f: f.write( f'#!/bin/bash\n\n' f'if ! ip rule list | grep -q "lookup {cfg["table"]}"; then\n' f' ip rule add table {cfg["table"]} priority {cfg["priority"]}\n' f'fi\n\n' f'ip route flush table {cfg["table"]}\n\n' ) for ip in ips: f.write(f'ip route replace blackhole {ip} table {cfg["table"]}\n') def generate_rollback(cfg): with open(cfg["rollback_script"], "w") as f: f.write( f'#!/bin/bash\n\n' f'ip route flush table {cfg["table"]}\n' f'ip rule del table {cfg["table"]} 2>/dev/null\n' ) # ================= MAIN ================= async def main(): setup_logging() parser = argparse.ArgumentParser() parser.add_argument("--env", help="Config section") args = parser.parse_args() config_path = os.path.join(BASE_DIR, "config.ini") configs = read_config(config_path, args.env) for cfg in configs: logging.info(f"\n\n=== Обработка: {cfg['name']} ===") semaphore = Semaphore(cfg["threads"]) domains, ready_ips = load_forced_list(cfg) resolvers = get_resolvers() tasks = [] for dns_name, servers in resolvers: for server in servers: for domain in domains: tasks.append(resolve_domain(domain, dns_name, server, semaphore)) results = await asyncio.gather(*tasks) all_ips = set(ready_ips) for res in results: all_ips.update(res) if cfg["exclude_cloudflare"]: cf_ips = await get_cloudflare_ips() all_ips = {ip for ip in all_ips if ip not in cf_ips} all_ips = {ip for ip in all_ips if is_public_ip(ip)} normalized_ips = sorted({normalize_ip(ip) for ip in all_ips}) if cfg["is_blackhole"]: generate_blackhole_script(cfg, normalized_ips) else: if not cfg["gateway"] and not cfg["interface"]: logging.error("Не указан Gateway или Interface") exit(1) generate_policy_script(cfg, normalized_ips) generate_rollback(cfg) os.chmod(cfg["script"], 0o755) os.chmod(cfg["rollback_script"], 0o755) logging.info(f"ON: {cfg['script']}") logging.info(f"OFF: {cfg['rollback_script']}") if cfg["run"]: os.system(cfg["run"]) if __name__ == "__main__": asyncio.run(main())