#!/usr/bin/env python3 import argparse import asyncio import configparser import ipaddress import os from asyncio import Semaphore import dns.asyncresolver import httpx BASE_DIR = os.path.dirname(os.path.abspath(__file__)) def read_config(cfg_file): config = configparser.ConfigParser() config.read(cfg_file) cfg = config['Redirector'] return { "threads": int(cfg.get("threads", "50")), "filename": os.path.join(BASE_DIR, cfg.get("filename", "forced_list.txt")), "script": os.path.join(BASE_DIR, cfg.get("script", "forced_vpn_ON.sh")), "rollback_script": os.path.join(BASE_DIR, cfg.get("rollback_script", "forced_vpn_OFF.sh")), "exclude_cloudflare": cfg.get("exclude_cloudflare", "no").lower() in ["yes", "y"], "gateway": cfg.get("gateway", ""), "table": cfg.get("table", "1010"), "run": cfg.get("run", "") } def load_forced_list(cfg): domains = [] ips = set() with open(cfg["filename"], "r", encoding="utf-8") as f: for line in f: line = line.strip() if not line or line.startswith("#"): continue try: ip = ipaddress.ip_address(line) ips.add(str(ip)) continue except ValueError: pass try: net = ipaddress.ip_network(line, strict=False) ips.add(str(net)) continue except ValueError: pass domains.append(line) return domains, ips def get_resolvers(): system_dns = dns.asyncresolver.Resolver().nameservers return [ ("system", system_dns), ("google", ["8.8.8.8"]), ("yandex", ["77.88.8.8"]), ("cloudflare", ["1.1.1.1"]), ] async def resolve_domain(domain, resolver, semaphore): async with semaphore: try: answer = await resolver.resolve(domain) return [r.address for r in answer] except Exception: return [] async def get_cloudflare_ips(): async with httpx.AsyncClient() as client: r = await client.get("https://www.cloudflare.com/ips-v4/") r.raise_for_status() result = set() for line in r.text.splitlines(): line = line.strip() if "/" in line: net = ipaddress.ip_network(line) for ip in net: result.add(str(ip)) return result async def main(): parser = argparse.ArgumentParser() parser.add_argument("-c", "--config", default=os.path.join(BASE_DIR, "config.ini")) args = parser.parse_args() cfg = read_config(args.config) semaphore = Semaphore(cfg["threads"]) domains, ready_ips = load_forced_list(cfg) resolvers = get_resolvers() print(f"Домены: {len(domains)} | IP/CIDR: {len(ready_ips)}") tasks = [] for name, servers in resolvers: resolver = dns.asyncresolver.Resolver() resolver.nameservers = servers for domain in domains: tasks.append(resolve_domain(domain, resolver, semaphore)) results = await asyncio.gather(*tasks) all_ips = set(ready_ips) for res in results: for ip in res: all_ips.add(ip) print(f"Всего IP до фильтра: {len(all_ips)}") if cfg["exclude_cloudflare"]: cf_ips = await get_cloudflare_ips() before = len(all_ips) all_ips = {ip for ip in all_ips if ip not in cf_ips} print(f"Удалено Cloudflare IP: {before - len(all_ips)}") print(f"Финальных IP: {len(all_ips)}") with open(cfg["script"], "w", encoding="utf-8") as f: f.write( f'#!/bin/bash\n\n' f'# Добавляем правило если нет\n' f'if ! ip rule list | grep -q "lookup {cfg["table"]}"; then\n' f' ip rule add table {cfg["table"]} priority 101\n' f'fi\n\n' f'# Чистим таблицу\n' f'ip route flush table {cfg["table"]}\n\n' ) for ip in sorted(all_ips): if "/" in ip: f.write(f'ip route replace {ip} via {cfg["gateway"]} table {cfg["table"]}\n') else: f.write(f'ip route replace {ip}/32 via {cfg["gateway"]} table {cfg["table"]}\n') os.chmod(cfg["script"], 0o755) with open(cfg["rollback_script"], "w", encoding="utf-8") as f: f.write( f'#!/bin/bash\n\n' f'# Чистим таблицу\n' f'ip route flush table {cfg["table"]}\n\n' f'# Удаляем правило если есть\n' f'ip rule del table {cfg["table"]} priority 101 2>/dev/null\n' ) os.chmod(cfg["rollback_script"], 0o755) print(f"ON script: {cfg['script']}") print(f"OFF script: {cfg['rollback_script']}") if cfg["run"]: print("Запуск скрипта...") os.system(cfg["run"]) if __name__ == "__main__": asyncio.run(main())