Redirector/redirector.py
Sazonov Andrey c38e6552fe fixik
2026-03-20 16:44:33 +03:00

247 lines
7.0 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
import argparse
import asyncio
import configparser
import ipaddress
import os
import logging
from logging.handlers import RotatingFileHandler
from asyncio import Semaphore
import dns.asyncresolver
import httpx
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
# ================= LOGGING =================
def setup_logging():
log_dir = os.path.join(BASE_DIR, "logs")
os.makedirs(log_dir, exist_ok=True)
log_file = os.path.join(log_dir, "redirector.log")
handler = RotatingFileHandler(
log_file,
maxBytes=2 * 1024 * 1024, # 2MB
backupCount=5
)
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
handlers=[handler, logging.StreamHandler()]
)
# ================= CONFIG =================
def read_config(cfg_file, section=None):
config = configparser.ConfigParser()
config.read(cfg_file)
if section:
section = section.capitalize()
if section and section in config:
cfg = config[section]
logging.info(f"Используется конфиг: {section}")
else:
default_section = config.sections()[0]
cfg = config[default_section]
logging.info(f"Используется дефолтный конфиг: {default_section}")
return {
"threads": int(cfg.get("threads", "50")),
"filename": os.path.join(BASE_DIR, cfg.get("filename", "forced_list.txt")),
"script": os.path.join(BASE_DIR, cfg.get("script", "forced_vpn_ON.sh")),
"rollback_script": os.path.join(BASE_DIR, cfg.get("rollback_script", "forced_vpn_OFF.sh")),
"exclude_cloudflare": cfg.get("exclude_cloudflare", "no").lower() in ["yes", "y"],
"gateway": cfg.get("gateway", ""),
"table": cfg.get("table", "1010"),
"run": cfg.get("run", "")
}
def load_forced_list(cfg):
domains = []
ips = set()
try:
with open(cfg["filename"], "r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if not line or line.startswith("#"):
continue
try:
ip = ipaddress.ip_address(line)
ips.add(str(ip))
continue
except ValueError:
pass
try:
net = ipaddress.ip_network(line, strict=False)
ips.add(str(net))
continue
except ValueError:
pass
domains.append(line)
except Exception as e:
logging.error(f"Ошибка чтения входящих данных: {e}")
return domains, ips
def get_resolvers():
system_dns = dns.asyncresolver.Resolver().nameservers
return [
("system", system_dns),
("google", ["8.8.8.8"]),
("yandex", ["77.88.8.8"]),
("cloudflare", ["1.1.1.1"]),
]
def is_public_ip(ip_str):
try:
if "/" in ip_str:
net = ipaddress.ip_network(ip_str, strict=False)
return not (
net.is_private
or net.is_loopback
or net.is_link_local
or net.is_multicast
or net.is_reserved
)
else:
ip = ipaddress.ip_address(ip_str)
return not (
ip.is_private
or ip.is_loopback
or ip.is_link_local
or ip.is_multicast
or ip.is_reserved
or ip.is_unspecified
)
except ValueError:
return False
async def resolve_domain(domain, resolver, semaphore):
async with semaphore:
try:
answer = await resolver.resolve(domain)
return [r.address for r in answer]
except Exception as e:
logging.warning(f"DNS ошибка {domain}: {e}")
return []
async def get_cloudflare_ips():
async with httpx.AsyncClient() as client:
r = await client.get("https://www.cloudflare.com/ips-v4/")
r.raise_for_status()
result = set()
for line in r.text.splitlines():
net = ipaddress.ip_network(line.strip())
for ip in net:
result.add(str(ip))
return result
async def main():
setup_logging()
parser = argparse.ArgumentParser()
parser.add_argument("-c", "--config", default=os.path.join(BASE_DIR, "config.ini"))
parser.add_argument("--env", help="Config section: test/prod")
args = parser.parse_args()
cfg = read_config(args.config, args.env)
semaphore = Semaphore(cfg["threads"])
domains, ready_ips = load_forced_list(cfg)
resolvers = get_resolvers()
logging.info(f"Домены: {len(domains)} | IP/CIDR: {len(ready_ips)}")
tasks = []
for name, servers in resolvers:
resolver = dns.asyncresolver.Resolver()
resolver.nameservers = servers
for domain in domains:
tasks.append(resolve_domain(domain, resolver, semaphore))
results = await asyncio.gather(*tasks)
all_ips = set(ready_ips)
for res in results:
all_ips.update(res)
logging.info(f"Всего IP до фильтра: {len(all_ips)}")
if cfg["exclude_cloudflare"]:
cf_ips = await get_cloudflare_ips()
before = len(all_ips)
all_ips = {ip for ip in all_ips if ip not in cf_ips}
logging.info(f"Удалено Cloudflare IP: {before - len(all_ips)}")
before_private = len(all_ips)
all_ips = {ip for ip in all_ips if is_public_ip(ip)}
logging.info(f"Удалено приватных IP: {before_private - len(all_ips)}")
logging.info(f"Финальных IP: {len(all_ips)}")
result_file = os.path.join(BASE_DIR, "result_ips.txt")
with open(result_file, "w") as f:
for ip in sorted(all_ips):
f.write(ip + "\n")
with open(cfg["script"], "w") as f:
f.write(
f'#!/bin/bash\n\n'
f'if ! ip rule list | grep -q "lookup {cfg["table"]}"; then\n'
f' ip rule add table {cfg["table"]} priority 101\n'
f'fi\n\n'
f'ip route flush table {cfg["table"]}\n\n'
)
for ip in sorted(all_ips):
if "/" in ip:
f.write(f'ip route replace {ip} via {cfg["gateway"]} table {cfg["table"]}\n')
else:
f.write(f'ip route replace {ip}/32 via {cfg["gateway"]} table {cfg["table"]}\n')
os.chmod(cfg["script"], 0o755)
with open(cfg["rollback_script"], "w") as f:
f.write(
f'#!/bin/bash\n\n'
f'ip route flush table {cfg["table"]}\n'
f'ip rule del table {cfg["table"]} priority 101 2>/dev/null\n'
)
os.chmod(cfg["rollback_script"], 0o755)
logging.info(f"ON: {cfg['script']}")
logging.info(f"OFF: {cfg['rollback_script']}")
if cfg["run"]:
logging.info("Запуск post-команды")
os.system(cfg["run"])
if __name__ == "__main__":
asyncio.run(main())