mrsk_bot/main.py
2025-10-05 21:48:25 +03:00

167 lines
6.2 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import json
import logging
import re
from datetime import datetime
import requests
from pathlib import Path
from urllib3.exceptions import InsecureRequestWarning
import urllib3
urllib3.disable_warnings(InsecureRequestWarning)
logging.basicConfig(
level=logging.INFO,
filename="logs/parser.log",
format="%(asctime)s - %(module)s - %(levelname)s - %(funcName)s: %(lineno)d - %(message)s",
datefmt='%H:%M:%S',
)
logger = logging.getLogger(__name__)
HEADERS = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/129.0.0.0 Safari/537.36",
"Accept": "application/json, text/plain, */*",
"Accept-Language": "ru-RU,ru;q=0.9,en-US;q=0.8,en;q=0.7",
"Accept-Encoding": "gzip, deflate, br",
"Referer": "https://www.mrsk-1.ru/customers/customer-service/power-outage/",
"X-Requested-With": "XMLHttpRequest",
"Connection": "keep-alive",
"Sec-Fetch-Dest": "empty",
"Sec-Fetch-Mode": "cors",
"Sec-Fetch-Site": "same-origin",
"Pragma": "no-cache",
"Cache-Control": "no-cache"
}
MAIN_URL = 'https://www.mrsk-1.ru/customers/customer-service/power-outage/ajax.php?request='
DISTRICT_CODE = '0ECC8970-2702-4C14-9E4D-956606EFFEE3'
LOCALITY_CODE = '02E9C019-AB4D-4FA0-928E-D6C0A41DC256'
URL_PLAN = f'{MAIN_URL}2&regionCode=31&districtCode={DISTRICT_CODE}&locality=Белгород&localityCode={LOCALITY_CODE}'
URL_VNEREGLAMENT = f'{MAIN_URL}3&regionCode=31&districtCode={DISTRICT_CODE}&locality=Белгород&localityCode={LOCALITY_CODE}'
URL_AVAR = f'{MAIN_URL}4&regionCode=31&districtCode={DISTRICT_CODE}&locality=Белгород&localityCode={LOCALITY_CODE}&Street=-'
PLAN_FILE = Path("plan.txt")
VN_FILE = Path("vnereglament.txt")
AVAR_FILE = Path("avar.txt")
def clean_street(street: str) -> str:
if not street:
return ""
street = re.sub(r'^Белгород г;?\s*', '', street, flags=re.IGNORECASE)
parts = re.split(r'[;\n]+', street)
cleaned_lines = []
for part in parts:
part = part.strip()
if not part:
continue
part = re.sub(r'\b0\s+(гараж|КНС|ГСК)\b', '', part, flags=re.IGNORECASE)
part = re.sub(r'\СК[-\s]*\d*\b', '', part, flags=re.IGNORECASE)
part = re.sub(r'\bКНС\b', '', part, flags=re.IGNORECASE)
part = re.sub(r'\bКотельная\b', '', part, flags=re.IGNORECASE)
part = re.sub(r'[\s,]+', ' ', part).strip(' ,.')
if not part:
continue
cleaned_lines.append(part)
seen = set()
unique_lines = []
for line in cleaned_lines:
key = re.sub(r'[^а-яёa-z0-9]', '', line.lower())
if key and key not in seen:
seen.add(key)
unique_lines.append(line)
formatted = []
for line in unique_lines:
formatted.append(line)
return '\n'.join(formatted) if formatted else "[Нет данных]"
def format_time(dt_str: str) -> str:
try:
dt = datetime.fromisoformat(dt_str.replace('Z', '+00:00'))
return dt.strftime('%d.%m.%Y %H:%M')
except Exception:
return dt_str.replace('T', ' ')
def parse_and_save(data, file_path: Path, mode: str):
if not data or data == 0:
file_path.write_text("[Нет отключений]", encoding='utf-8')
return
records = data if isinstance(data, list) else [data]
lines = []
for item in records:
if mode in ('plan', 'vnereglament'):
street = clean_street(item.get("DisconnectionObject", ""))
time_down = format_time(item.get("DisconnectionDateTime", ""))
time_up = format_time(item.get("EnergyOnPlanningDateTime", ""))
lines.append(f"{street}\nВремя: с {time_down} по {time_up}\n{'-' * 30}")
elif mode == 'avar':
street = clean_street(item.get("StreetHome", ""))
time_up = format_time(item.get("ScheduledTimeRemoval", ""))
lines.append(f"{street}\nВосстановление: ~{time_up}\n{'' * 30}")
file_path.write_text('\n'.join(lines), encoding='utf-8')
def get_plan():
try:
r = requests.get(URL_PLAN, headers=HEADERS, verify=False, timeout=15)
r.raise_for_status()
data = r.json()
parse_and_save(data, PLAN_FILE, 'plan')
logger.info("[Плановые отключения] обновлены")
except Exception as e:
logger.error(f"Ошибка при получении плановых отключений: {e}")
if 'r' in locals():
logger.error(f"Статус: {r.status_code} | URL: {r.url}")
logger.error(f"Ответ: {r.text[:500]}")
PLAN_FILE.write_text("[Ошибка загрузки данных]", encoding='utf-8')
def get_vnereglament():
try:
r = requests.get(URL_VNEREGLAMENT, headers=HEADERS, verify=False, timeout=15)
r.raise_for_status()
data = r.json()
parse_and_save(data, VN_FILE, 'vnereglament')
logger.info("[Внерегламентные отключения] обновлены")
except Exception as e:
logger.error(f"Ошибка при получении внерегламентных отключений: {e}")
if 'r' in locals():
logger.error(f"Статус: {r.status_code} | URL: {r.url}")
logger.error(f"Ответ: {r.text[:500]}")
VN_FILE.write_text("[Ошибка загрузки данных]", encoding='utf-8')
def get_avar():
try:
r = requests.get(URL_AVAR, headers=HEADERS, verify=False, timeout=15)
r.raise_for_status()
data = r.json()
parse_and_save(data, AVAR_FILE, 'avar')
logger.info("[Аварийные отключения] обновлены")
except Exception as e:
logger.error(f"Ошибка при получении аварийных отключений: {e}")
if 'r' in locals():
logger.error(f"Статус: {r.status_code} | URL: {r.url}")
logger.error(f"Ответ: {r.text[:500]}")
AVAR_FILE.write_text("[Ошибка загрузки данных]", encoding='utf-8')
def start_parser():
get_plan()
get_vnereglament()
get_avar()
logger.info("Парсер отработал")