Redirector/redirector.py
Sazonov Andrey 3ad59ee75c fixik
2026-03-18 17:43:55 +03:00

182 lines
5.0 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
import argparse
import asyncio
import configparser
import ipaddress
import os
from asyncio import Semaphore
import dns.asyncresolver
import httpx
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
def read_config(cfg_file):
config = configparser.ConfigParser()
config.read(cfg_file)
cfg = config['Redirector']
return {
"threads": int(cfg.get("threads", "50")),
"filename": os.path.join(BASE_DIR, cfg.get("filename", "forced_list.txt")),
"script": os.path.join(BASE_DIR, cfg.get("script", "forced_vpn_ON.sh")),
"rollback_script": os.path.join(BASE_DIR, cfg.get("rollback_script", "forced_vpn_OFF.sh")),
"exclude_cloudflare": cfg.get("exclude_cloudflare", "no").lower() in ["yes", "y"],
"gateway": cfg.get("gateway", ""),
"table": cfg.get("table", "1010"),
"run": cfg.get("run", "")
}
def load_forced_list(cfg):
domains = []
ips = set()
with open(cfg["filename"], "r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if not line or line.startswith("#"):
continue
try:
ip = ipaddress.ip_address(line)
ips.add(str(ip))
continue
except ValueError:
pass
try:
net = ipaddress.ip_network(line, strict=False)
ips.add(str(net))
continue
except ValueError:
pass
domains.append(line)
return domains, ips
def get_resolvers():
system_dns = dns.asyncresolver.Resolver().nameservers
return [
("system", system_dns),
("google", ["8.8.8.8"]),
("yandex", ["77.88.8.8"]),
("cloudflare", ["1.1.1.1"]),
]
async def resolve_domain(domain, resolver, semaphore):
async with semaphore:
try:
answer = await resolver.resolve(domain)
return [r.address for r in answer]
except Exception:
return []
async def get_cloudflare_ips():
async with httpx.AsyncClient() as client:
r = await client.get("https://www.cloudflare.com/ips-v4/")
r.raise_for_status()
result = set()
for line in r.text.splitlines():
line = line.strip()
if "/" in line:
net = ipaddress.ip_network(line)
for ip in net:
result.add(str(ip))
return result
async def main():
parser = argparse.ArgumentParser()
parser.add_argument("-c", "--config", default=os.path.join(BASE_DIR, "config.ini"))
args = parser.parse_args()
cfg = read_config(args.config)
semaphore = Semaphore(cfg["threads"])
domains, ready_ips = load_forced_list(cfg)
resolvers = get_resolvers()
print(f"Домены: {len(domains)} | IP/CIDR: {len(ready_ips)}")
tasks = []
for name, servers in resolvers:
resolver = dns.asyncresolver.Resolver()
resolver.nameservers = servers
for domain in domains:
tasks.append(resolve_domain(domain, resolver, semaphore))
results = await asyncio.gather(*tasks)
all_ips = set(ready_ips)
for res in results:
for ip in res:
all_ips.add(ip)
print(f"Всего IP до фильтра: {len(all_ips)}")
if cfg["exclude_cloudflare"]:
cf_ips = await get_cloudflare_ips()
before = len(all_ips)
all_ips = {ip for ip in all_ips if ip not in cf_ips}
print(f"Удалено Cloudflare IP: {before - len(all_ips)}")
print(f"Финальных IP: {len(all_ips)}")
with open(cfg["script"], "w", encoding="utf-8") as f:
f.write(
f'#!/bin/bash\n\n'
f'# Добавляем правило если нет\n'
f'if ! ip rule list | grep -q "lookup {cfg["table"]}"; then\n'
f' ip rule add table {cfg["table"]} priority 101\n'
f'fi\n\n'
f'# Чистим таблицу\n'
f'ip route flush table {cfg["table"]}\n\n'
)
for ip in sorted(all_ips):
if "/" in ip:
f.write(f'ip route replace {ip} via {cfg["gateway"]} table {cfg["table"]}\n')
else:
f.write(f'ip route replace {ip}/32 via {cfg["gateway"]} table {cfg["table"]}\n')
os.chmod(cfg["script"], 0o755)
with open(cfg["rollback_script"], "w", encoding="utf-8") as f:
f.write(
f'#!/bin/bash\n\n'
f'# Чистим таблицу\n'
f'ip route flush table {cfg["table"]}\n\n'
f'# Удаляем правило если есть\n'
f'ip rule del table {cfg["table"]} priority 101 2>/dev/null\n'
)
os.chmod(cfg["rollback_script"], 0o755)
print(f"ON script: {cfg['script']}")
print(f"OFF script: {cfg['rollback_script']}")
if cfg["run"]:
print("Запуск скрипта...")
os.system(cfg["run"])
if __name__ == "__main__":
asyncio.run(main())