#!/usr/bin/env python3 import argparse import asyncio import configparser import ipaddress import os import logging from logging.handlers import RotatingFileHandler from asyncio import Semaphore import dns.asyncresolver import httpx BASE_DIR = os.path.dirname(os.path.abspath(__file__)) # ================= LOGGING ================= def setup_logging(): log_dir = os.path.join(BASE_DIR, "logs") os.makedirs(log_dir, exist_ok=True) log_file = os.path.join(log_dir, "redirector.log") handler = RotatingFileHandler( log_file, maxBytes=2 * 1024 * 1024, backupCount=5 ) logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s", handlers=[handler, logging.StreamHandler()] ) # ================= CONFIG ================= def read_config(cfg_file, section=None): config = configparser.ConfigParser() config.read(cfg_file) configs = [] if section: if section not in config: logging.error(f"Секция {section} не найдена") exit(1) sections = [section] logging.info(f"Используется конфиг: {section}") else: sections = config.sections() logging.info(f"Используются ВСЕ конфиги: {', '.join(sections)}") for sec in sections: cfg = config[sec] configs.append({ "name": sec, "threads": int(cfg.get("threads", "50")), "filename": os.path.join(BASE_DIR, cfg.get("filename", f"{sec}_forced_list.txt")), "script": os.path.join(BASE_DIR, cfg.get("script", f"{sec}_ON.sh")), "rollback_script": os.path.join(BASE_DIR, cfg.get("rollback_script", f"{sec}_OFF.sh")), "exclude_cloudflare": cfg.get("exclude_cloudflare", "no").lower() in ["yes", "y"], "gateway": cfg.get("gateway", ""), "interface": cfg.get("interface", ""), "table": cfg.get("table", "1010"), "priority": cfg.get("priority", "101"), "run": cfg.get("run", "") }) return configs def load_forced_list(cfg): domains = [] ips = set() try: with open(cfg["filename"], "r", encoding="utf-8") as f: for line in f: line = line.strip() if not line or line.startswith("#"): continue try: ip = ipaddress.ip_address(line) ips.add(str(ip)) continue except ValueError: pass try: net = ipaddress.ip_network(line, strict=False) ips.add(str(net)) continue except ValueError: pass domains.append(line) except Exception as e: logging.error(f"Ошибка чтения входящих данных: {e}") exit(1) if len(domains) == 0 and len(ips) == 0: logging.info(f"В файле {cfg['filename']} не найдено доменов и IP") return domains, ips def get_resolvers(): system_dns = dns.asyncresolver.Resolver().nameservers return [ ("system", system_dns), ("google", ["8.8.8.8"]), ("yandex", ["77.88.8.8"]), ("cloudflare", ["1.1.1.1"]), ] def is_public_ip(ip_str): try: if "/" in ip_str: net = ipaddress.ip_network(ip_str, strict=False) return not ( net.is_private or net.is_loopback or net.is_link_local or net.is_multicast or net.is_reserved ) else: ip = ipaddress.ip_address(ip_str) return not ( ip.is_private or ip.is_loopback or ip.is_link_local or ip.is_multicast or ip.is_reserved or ip.is_unspecified ) except ValueError: return False async def resolve_domain(domain, resolver, semaphore): async with semaphore: try: answer = await resolver.resolve(domain) return [r.address for r in answer] except Exception as e: logging.warning(f"DNS ошибка {domain}: {e}") return [] async def get_cloudflare_ips(): async with httpx.AsyncClient() as client: r = await client.get("https://www.cloudflare.com/ips-v4/") r.raise_for_status() result = set() for line in r.text.splitlines(): net = ipaddress.ip_network(line.strip()) for ip in net: result.add(str(ip)) return result async def main(): setup_logging() parser = argparse.ArgumentParser() parser.add_argument("--env", help="Config section") args = parser.parse_args() config_path = os.path.join(BASE_DIR, "config.ini") configs = read_config(config_path, args.env) for cfg in configs: logging.info(f"\n=== Обработка: {cfg['name']} ===") semaphore = Semaphore(cfg["threads"]) domains, ready_ips = load_forced_list(cfg) resolvers = get_resolvers() logging.info(f"Домены: {len(domains)} | IP/CIDR: {len(ready_ips)}") tasks = [] for _, servers in resolvers: resolver = dns.asyncresolver.Resolver() resolver.nameservers = servers for domain in domains: tasks.append(resolve_domain(domain, resolver, semaphore)) results = await asyncio.gather(*tasks) all_ips = set(ready_ips) for res in results: all_ips.update(res) logging.info(f"Всего IP до фильтра: {len(all_ips)}") if cfg["exclude_cloudflare"]: cf_ips = await get_cloudflare_ips() before = len(all_ips) all_ips = {ip for ip in all_ips if ip not in cf_ips} logging.info(f"Удалено Cloudflare IP: {before - len(all_ips)}") before_private = len(all_ips) all_ips = {ip for ip in all_ips if is_public_ip(ip)} logging.info(f"Удалено приватных IP: {before_private - len(all_ips)}") logging.info(f"Финальных IP: {len(all_ips)}") result_file = os.path.join(BASE_DIR, f"{cfg['name']}_result_ips.txt") with open(result_file, "w") as f: for ip in sorted(all_ips): f.write(ip + "\n") if not cfg["gateway"] and not cfg["interface"]: logging.error("Не указан Gateway и Interface, выход.") exit(1) with open(cfg["script"], "w") as f: f.write( f'#!/bin/bash\n\n' f'if ! ip rule list | grep -q "lookup {cfg["table"]}"; then\n' f' ip rule add table {cfg["table"]} priority {cfg["priority"]}\n' f'fi\n\n' f'ip route flush table {cfg["table"]}\n\n' ) for ip in sorted(all_ips): if "/" in ip: if cfg["gateway"]: f.write(f'ip route replace {ip} via {cfg["gateway"]} table {cfg["table"]}\n') else: f.write(f'ip route replace {ip} dev {cfg["interface"]} table {cfg["table"]}\n') else: if cfg["gateway"]: f.write(f'ip route replace {ip}/32 via {cfg["gateway"]} table {cfg["table"]}\n') else: f.write(f'ip route replace {ip}/32 dev {cfg["interface"]} table {cfg["table"]}\n') os.chmod(cfg["script"], 0o755) with open(cfg["rollback_script"], "w") as f: f.write( f'#!/bin/bash\n\n' f'ip route flush table {cfg["table"]}\n' f'ip rule del table {cfg["table"]} 2>/dev/null\n' ) os.chmod(cfg["rollback_script"], 0o755) logging.info(f"ON: {cfg['script']}") logging.info(f"OFF: {cfg['rollback_script']}") if cfg["run"]: logging.info("Запуск post-команды") os.system(cfg["run"]) if __name__ == "__main__": asyncio.run(main())