import json import logging import re from datetime import datetime import requests from pathlib import Path from urllib3.exceptions import InsecureRequestWarning import urllib3 urllib3.disable_warnings(InsecureRequestWarning) logging.basicConfig( level=logging.INFO, filename="logs/parser.log", format="%(asctime)s - %(module)s - %(levelname)s - %(funcName)s: %(lineno)d - %(message)s", datefmt='%H:%M:%S', ) logger = logging.getLogger(__name__) HEADERS = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/129.0.0.0 Safari/537.36", "Accept": "application/json, text/plain, */*", "Accept-Language": "ru-RU,ru;q=0.9,en-US;q=0.8,en;q=0.7", "Accept-Encoding": "gzip, deflate, br", "Referer": "https://www.mrsk-1.ru/customers/customer-service/power-outage/", "X-Requested-With": "XMLHttpRequest", "Connection": "keep-alive", "Sec-Fetch-Dest": "empty", "Sec-Fetch-Mode": "cors", "Sec-Fetch-Site": "same-origin", "Pragma": "no-cache", "Cache-Control": "no-cache" } MAIN_URL = 'https://www.mrsk-1.ru/customers/customer-service/power-outage/ajax.php?request=' DISTRICT_CODE = '0ECC8970-2702-4C14-9E4D-956606EFFEE3' LOCALITY_CODE = '02E9C019-AB4D-4FA0-928E-D6C0A41DC256' URL_PLAN = f'{MAIN_URL}2®ionCode=31&districtCode={DISTRICT_CODE}&locality=Белгород&localityCode={LOCALITY_CODE}' URL_VNEREGLAMENT = f'{MAIN_URL}3®ionCode=31&districtCode={DISTRICT_CODE}&locality=Белгород&localityCode={LOCALITY_CODE}' URL_AVAR = f'{MAIN_URL}4®ionCode=31&districtCode={DISTRICT_CODE}&locality=Белгород&localityCode={LOCALITY_CODE}&Street=-' PLAN_FILE = Path("plan.txt") VN_FILE = Path("vnereglament.txt") AVAR_FILE = Path("avar.txt") def clean_street(street: str) -> str: if not street: return "" street = re.sub(r'^Белгород г;?\s*', '', street, flags=re.IGNORECASE) parts = re.split(r'[;\n]+', street) cleaned_lines = [] for part in parts: part = part.strip() if not part: continue part = re.sub(r'\b0\s+(гараж|КНС|ГСК)\b', '', part, flags=re.IGNORECASE) part = re.sub(r'\bГСК[-\s]*\d*\b', '', part, flags=re.IGNORECASE) part = re.sub(r'\bКНС\b', '', part, flags=re.IGNORECASE) part = re.sub(r'\bКотельная\b', '', part, flags=re.IGNORECASE) part = re.sub(r'[\s,]+', ' ', part).strip(' ,.') if not part: continue cleaned_lines.append(part) seen = set() unique_lines = [] for line in cleaned_lines: key = re.sub(r'[^а-яёa-z0-9]', '', line.lower()) if key and key not in seen: seen.add(key) unique_lines.append(line) formatted = [] for line in unique_lines: formatted.append(line) return '\n'.join(formatted) if formatted else "[Нет данных]" def format_time(dt_str: str) -> str: try: dt = datetime.fromisoformat(dt_str.replace('Z', '+00:00')) return dt.strftime('%d.%m.%Y %H:%M') except: return dt_str.replace('T', ' ') def parse_and_save(data, file_path: Path, mode: str): if not data or data == 0: file_path.write_text("[Нет отключений]", encoding='utf-8') return records = data if isinstance(data, list) else [data] lines = [] for item in records: if mode in ('plan', 'vnereglament'): street = clean_street(item.get("DisconnectionObject", "")) time_down = format_time(item.get("DisconnectionDateTime", "")) time_up = format_time(item.get("EnergyOnPlanningDateTime", "")) lines.append(f"{street}\nОтключение: {time_down} — {time_up}\n{'─' * 30}") elif mode == 'avar': street = clean_street(item.get("StreetHome", "")) time_up = format_time(item.get("ScheduledTimeRemoval", "")) lines.append(f"{street}\nВосстановление: ~{time_up}\n{'─' * 30}") file_path.write_text('\n'.join(lines), encoding='utf-8') def get_plan(): try: r = requests.get(URL_PLAN, headers=HEADERS, verify=False, timeout=15) r.raise_for_status() data = r.json() parse_and_save(data, PLAN_FILE, 'plan') logger.info("[Плановые отключения] обновлены") except Exception as e: logger.error(f"Ошибка при получении плановых отключений: {e}") if 'r' in locals(): logger.error(f"Статус: {r.status_code} | URL: {r.url}") logger.error(f"Ответ: {r.text[:500]}") PLAN_FILE.write_text("[Ошибка загрузки данных]", encoding='utf-8') def get_vnereglament(): try: r = requests.get(URL_VNEREGLAMENT, headers=HEADERS, verify=False, timeout=15) r.raise_for_status() data = r.json() parse_and_save(data, VN_FILE, 'vnereglament') logger.info("[Внерегламентные отключения] обновлены") except Exception as e: logger.error(f"Ошибка при получении внерегламентных отключений: {e}") if 'r' in locals(): logger.error(f"Статус: {r.status_code} | URL: {r.url}") logger.error(f"Ответ: {r.text[:500]}") VN_FILE.write_text("[Ошибка загрузки данных]", encoding='utf-8') def get_avar(): try: r = requests.get(URL_AVAR, headers=HEADERS, verify=False, timeout=15) r.raise_for_status() data = r.json() parse_and_save(data, AVAR_FILE, 'avar') logger.info("[Аварийные отключения] обновлены") except Exception as e: logger.error(f"Ошибка при получении аварийных отключений: {e}") if 'r' in locals(): logger.error(f"Статус: {r.status_code} | URL: {r.url}") logger.error(f"Ответ: {r.text[:500]}") AVAR_FILE.write_text("[Ошибка загрузки данных]", encoding='utf-8') def start_parser(): get_plan() get_vnereglament() get_avar() logger.info("Парсер отработал")