Redirector/redirector.py
Sazonov Andrey d9d03daae7 fixik
2026-04-02 17:57:29 +03:00

304 lines
9.0 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
import argparse
import asyncio
import configparser
import ipaddress
import os
import logging
from logging.handlers import RotatingFileHandler
from asyncio import Semaphore
import dns.asyncresolver
import httpx
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
# ================= LOGGING =================
def setup_logging():
log_dir = os.path.join(BASE_DIR, "logs")
os.makedirs(log_dir, exist_ok=True)
log_file = os.path.join(log_dir, "redirector.log")
handler = RotatingFileHandler(
log_file,
maxBytes=2 * 1024 * 1024,
backupCount=5
)
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
handlers=[handler, logging.StreamHandler()]
)
# ================= CONFIG =================
def read_config(cfg_file, section=None):
config = configparser.ConfigParser()
config.read(cfg_file)
configs = []
if section:
if section not in config:
logging.error(f"Секция {section} не найдена")
exit(1)
sections = [section]
logging.info(f"\n\n=== Используется конфиг: {section} ===")
else:
sections = config.sections()
logging.info(f"\n\n=== Используются ВСЕ конфиги: {', '.join(sections)} ===")
for sec in sections:
cfg = config[sec]
is_blackhole = sec.lower() == "blackhole"
configs.append({
"name": sec,
"is_blackhole": is_blackhole,
"threads": int(cfg.get("threads", "50")),
"filename": os.path.join(BASE_DIR, cfg.get("filename", f"{sec}_forced_list.txt")),
"script": os.path.join(BASE_DIR, cfg.get("script", f"{sec}_ON.sh")),
"rollback_script": os.path.join(BASE_DIR, cfg.get("rollback_script", f"{sec}_OFF.sh")),
"exclude_cloudflare": cfg.get("exclude_cloudflare", "no").lower() in ["yes", "y"],
"gateway": cfg.get("gateway", ""),
"interface": cfg.get("interface", ""),
"table": cfg.get("table", "1010"),
"priority": cfg.get("priority", "10" if is_blackhole else "101"),
"run": cfg.get("run", "")
})
return configs
def load_forced_list(cfg):
domains = []
ips = set()
try:
with open(cfg["filename"], "r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if not line or line.startswith("#"):
continue
try:
ip = ipaddress.ip_address(line)
ips.add(str(ip))
continue
except ValueError:
pass
try:
net = ipaddress.ip_network(line, strict=False)
ips.add(str(net))
continue
except ValueError:
pass
domains.append(line)
except Exception as e:
logging.error(f"Ошибка чтения входящих данных: {e}")
exit(1)
return domains, ips
def get_resolvers():
system_dns = dns.asyncresolver.Resolver().nameservers
return [
("System", system_dns),
("Google", ["8.8.8.8"]),
("Yandex", ["77.88.8.8"]),
("CloudFlare", ["1.1.1.1"]),
]
def is_public_ip(ip_str):
try:
if "/" in ip_str:
net = ipaddress.ip_network(ip_str, strict=False)
return not (
net.is_private
or net.is_loopback
or net.is_link_local
or net.is_multicast
or net.is_reserved
)
else:
ip = ipaddress.ip_address(ip_str)
return not (
ip.is_private
or ip.is_loopback
or ip.is_link_local
or ip.is_multicast
or ip.is_reserved
or ip.is_unspecified
)
except ValueError:
return False
async def resolve_domain(domain, dns_name, server, semaphore):
async with semaphore:
resolver = dns.asyncresolver.Resolver()
resolver.nameservers = [server]
resolver.lifetime = 5
try:
answer = await resolver.resolve(domain)
return [r.address for r in answer]
except Exception as e:
logging.warning(f"DNS ошибка [{dns_name} - {server} | {domain}]: {e}")
return []
def normalize_ip(ip):
try:
if "/" in ip:
return ip
addr = ipaddress.ip_address(ip)
if isinstance(addr, ipaddress.IPv4Address):
if addr.packed[-1] == 0:
return f"{addr}/24"
else:
return f"{addr}/32"
return ip
except ValueError:
return ip
async def get_cloudflare_ips():
async with httpx.AsyncClient() as client:
r = await client.get("https://www.cloudflare.com/ips-v4/")
r.raise_for_status()
result = set()
for line in r.text.splitlines():
net = ipaddress.ip_network(line.strip())
for ip in net:
result.add(str(ip))
return result
# ================= SCRIPT GENERATORS =================
def generate_policy_script(cfg, ips):
with open(cfg["script"], "w") as f:
f.write(
f'#!/bin/bash\n\n'
f'if ! ip rule list | grep -q "lookup {cfg["table"]}"; then\n'
f' ip rule add table {cfg["table"]} priority {cfg["priority"]}\n'
f'fi\n\n'
f'ip route flush table {cfg["table"]}\n\n'
)
for ip in ips:
if cfg["gateway"] and not cfg["interface"]:
f.write(f'ip route replace {ip} via {cfg["gateway"]} table {cfg["table"]}\n')
elif cfg["interface"] and not cfg["gateway"]:
f.write(f'ip route replace {ip} dev {cfg["interface"]} table {cfg["table"]}\n')
else:
f.write(f'ip route replace {ip} via {cfg["gateway"]} dev {cfg["interface"]} table {cfg["table"]}\n')
def generate_blackhole_script(cfg, ips):
with open(cfg["script"], "w") as f:
f.write(
f'#!/bin/bash\n\n'
f'if ! ip rule list | grep -q "lookup {cfg["table"]}"; then\n'
f' ip rule add table {cfg["table"]} priority {cfg["priority"]}\n'
f'fi\n\n'
f'ip route flush table {cfg["table"]}\n\n'
)
for ip in ips:
f.write(f'ip route replace blackhole {ip} table {cfg["table"]}\n')
def generate_rollback(cfg):
with open(cfg["rollback_script"], "w") as f:
f.write(
f'#!/bin/bash\n\n'
f'ip route flush table {cfg["table"]}\n'
f'ip rule del table {cfg["table"]} 2>/dev/null\n'
)
# ================= MAIN =================
async def main():
setup_logging()
parser = argparse.ArgumentParser()
parser.add_argument("--env", help="Config section")
args = parser.parse_args()
config_path = os.path.join(BASE_DIR, "config.ini")
configs = read_config(config_path, args.env)
for cfg in configs:
logging.info(f"\n=== Обработка: {cfg['name']} ===")
semaphore = Semaphore(cfg["threads"])
domains, ready_ips = load_forced_list(cfg)
resolvers = get_resolvers()
logging.info(f"Домены: {len(domains)} | IP/CIDR: {len(ready_ips)}")
tasks = []
for dns_name, servers in resolvers:
for server in servers:
for domain in domains:
tasks.append(resolve_domain(domain, dns_name, server, semaphore))
results = await asyncio.gather(*tasks)
all_ips = set(ready_ips)
for res in results:
all_ips.update(res)
logging.info(f"Всего IP до фильтра: {len(all_ips)}")
if cfg["exclude_cloudflare"]:
cf_ips = await get_cloudflare_ips()
before = len(all_ips)
all_ips = {ip for ip in all_ips if ip not in cf_ips}
logging.info(f"Удалено Cloudflare IP: {before - len(all_ips)}")
before_private = len(all_ips)
all_ips = {ip for ip in all_ips if is_public_ip(ip)}
logging.info(f"Удалено приватных IP: {before_private - len(all_ips)}")
normalized_ips = sorted({normalize_ip(ip) for ip in all_ips})
logging.info(f"Финальных IP: {len(all_ips)}")
if cfg["is_blackhole"]:
generate_blackhole_script(cfg, normalized_ips)
else:
if not cfg["gateway"] and not cfg["interface"]:
logging.error("Не указан Gateway или Interface")
exit(1)
generate_policy_script(cfg, normalized_ips)
generate_rollback(cfg)
os.chmod(cfg["script"], 0o755)
os.chmod(cfg["rollback_script"], 0o755)
logging.info(f"ON: {cfg['script']}")
logging.info(f"OFF: {cfg['rollback_script']}")
if cfg["run"]:
os.system(cfg["run"])
if __name__ == "__main__":
asyncio.run(main())