| import gradio as gr |
| import feedparser |
| import requests |
| import hashlib |
| import time |
| import threading |
| import os |
| import random |
| import base64 |
| import json |
| from datetime import datetime |
| import pytz |
| from html import unescape |
| import re |
|
|
| |
| RSS_FILE = "rss.txt" |
| TOKEN_FILE = "token.txt" |
| SEEN_FILE = "seen.txt" |
| HF_LINK_FILE = "hf_link.txt" |
| CITY_FILE = "city.txt" |
| HOLIDAYS_FILE = "prazdniki.txt" |
| ROUTES_FILE = "routes.json" |
|
|
| |
| ENCODED_PASSWORD = "aXZhbmdyb3p6bmlfbmVfZGFzdF9zbG9tYXQ=" |
|
|
| |
| MOSCOW_TZ = pytz.timezone('Europe/Moscow') |
|
|
| |
| MAX_MESSAGE_LENGTH = 2000 |
|
|
| |
| WEATHER_RETRY_COUNT = 3 |
| WEATHER_RETRY_DELAY = 10 |
|
|
| |
| logs = [] |
| is_running = False |
| working_methods = {} |
| last_morning_date = None |
|
|
|
|
| def verify_password(password): |
| try: |
| correct_password = base64.b64decode(ENCODED_PASSWORD).decode('utf-8') |
| return password == correct_password |
| except: |
| return False |
|
|
|
|
| def log(message): |
| timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") |
| log_entry = f"[{timestamp}] {message}" |
| logs.append(log_entry) |
| if len(logs) > 100: |
| logs.pop(0) |
| print(log_entry) |
|
|
|
|
| def load_file(filepath): |
| if os.path.exists(filepath): |
| with open(filepath, "r", encoding="utf-8") as f: |
| return f.read().strip() |
| return "" |
|
|
|
|
| def save_file(filepath, content): |
| with open(filepath, "w", encoding="utf-8") as f: |
| f.write(content) |
|
|
|
|
| def load_json(filepath): |
| if os.path.exists(filepath): |
| try: |
| with open(filepath, "r", encoding="utf-8") as f: |
| return json.load(f) |
| except: |
| return {} |
| return {} |
|
|
|
|
| def save_json(filepath, data): |
| with open(filepath, "w", encoding="utf-8") as f: |
| json.dump(data, f, ensure_ascii=False, indent=2) |
|
|
|
|
| |
|
|
| def get_channels(): |
| content = load_file(TOKEN_FILE) |
| if not content: |
| return [] |
| channels = [] |
| for i, line in enumerate(content.split("\n")): |
| url = line.strip() |
| if url and not url.startswith("#"): |
| channels.append({"id": f"channel_{i}", "url": url, "name": f"Канал {i + 1}"}) |
| return channels |
|
|
|
|
| def get_channel_by_index(index): |
| channels = get_channels() |
| if 0 <= index < len(channels): |
| return channels[index] |
| return None |
|
|
|
|
| |
|
|
| def get_routes(): |
| data = load_json(ROUTES_FILE) |
| if not data: |
| channels = get_channels() |
| all_indices = list(range(len(channels))) |
| data = {"morning": all_indices, "rss": {}} |
| save_json(ROUTES_FILE, data) |
| return data |
|
|
|
|
| def save_routes(routes): |
| save_json(ROUTES_FILE, routes) |
|
|
|
|
| def get_morning_channel_indices(): |
| routes = get_routes() |
| channels = get_channels() |
| indices = routes.get("morning", list(range(len(channels)))) |
| return [i for i in indices if i < len(channels)] |
|
|
|
|
| def get_rss_channel_indices(rss_url): |
| routes = get_routes() |
| channels = get_channels() |
| rss_routes = routes.get("rss", {}) |
| if rss_url in rss_routes: |
| indices = rss_routes[rss_url] |
| return [i for i in indices if i < len(channels)] |
| return list(range(len(channels))) |
|
|
|
|
| def set_morning_channels(indices): |
| routes = get_routes() |
| routes["morning"] = indices |
| save_routes(routes) |
|
|
|
|
| def set_rss_channels(rss_url, indices): |
| routes = get_routes() |
| if "rss" not in routes: |
| routes["rss"] = {} |
| routes["rss"][rss_url] = indices |
| save_routes(routes) |
|
|
|
|
| |
|
|
| def get_rss_feeds(): |
| content = load_file(RSS_FILE) |
| if not content: |
| return [] |
| return [url.strip() for url in content.split("\n") if url.strip() and not url.strip().startswith("#")] |
|
|
|
|
| def get_hf_link(): |
| return load_file(HF_LINK_FILE) |
|
|
|
|
| def get_city(): |
| return load_file(CITY_FILE) or "Moscow" |
|
|
|
|
| def get_seen_ids(): |
| content = load_file(SEEN_FILE) |
| if not content: |
| return set() |
| return set(content.split("\n")) |
|
|
|
|
| def save_seen_id(article_id): |
| seen = get_seen_ids() |
| seen.add(article_id) |
| seen_list = list(seen)[-1000:] |
| save_file(SEEN_FILE, "\n".join(seen_list)) |
|
|
|
|
| def generate_article_id(entry): |
| unique_string = entry.get("id", "") or entry.get("link", "") or entry.get("title", "") |
| return hashlib.md5(unique_string.encode()).hexdigest() |
|
|
|
|
| def clean_html(text): |
| if not text: |
| return "" |
| clean = re.sub(r'<[^>]+>', '', text) |
| clean = unescape(clean) |
| clean = re.sub(r'\s+', ' ', clean).strip() |
| return clean |
|
|
|
|
| def get_hf_reminder(): |
| if random.random() < 0.25: |
| hf_link = get_hf_link() |
| if hf_link: |
| return f"\n\n💤 HF замораживает скрипты, которыми не пользуются люди. Пожалуйста, перейдите по ссылке, чтобы бот продолжал работать: {hf_link}" |
| return "" |
|
|
|
|
| def get_entry_content(entry): |
| if entry.get("summary"): |
| text = entry.get("summary") |
| if text and len(text.strip()) > 10: |
| return text |
| if entry.get("description"): |
| text = entry.get("description") |
| if text and len(text.strip()) > 10: |
| return text |
| if entry.get("content"): |
| content_list = entry.get("content") |
| if isinstance(content_list, list) and len(content_list) > 0: |
| text = content_list[0].get("value", "") |
| if text and len(text.strip()) > 10: |
| return text |
| if entry.get("summary_detail"): |
| text = entry.get("summary_detail", {}).get("value", "") |
| if text and len(text.strip()) > 10: |
| return text |
| return "" |
|
|
|
|
| def split_message(message, max_length=MAX_MESSAGE_LENGTH): |
| if len(message) <= max_length: |
| return [message] |
| parts = [] |
| remaining = message |
| while len(remaining) > max_length: |
| search_area = remaining[:max_length] |
| last_dot = -1 |
| for i in range(len(search_area) - 1, max_length // 4, -1): |
| if search_area[i] == '.': |
| last_dot = i |
| break |
| if last_dot == -1: |
| last_space = search_area.rfind(' ') |
| if last_space > max_length // 4: |
| last_dot = last_space |
| else: |
| last_dot = max_length - 1 |
| part = remaining[:last_dot + 1].strip() |
| parts.append(part) |
| remaining = remaining[last_dot + 1:].strip() |
| if remaining: |
| parts.append(remaining) |
| return parts |
|
|
|
|
| def format_message_parts(entry, feed_title): |
| title = clean_html(entry.get("title", "Без заголовка")) |
| link = entry.get("link", "") |
| raw_content = get_entry_content(entry) |
| summary = clean_html(raw_content) |
| |
| log(f"📄 Контент: {len(summary)} символов") |
| |
| message = f"📰 {title}\n\n" |
| if summary: |
| message += f"{summary}\n\n" |
| if link: |
| message += f"🔗 {link}\n" |
| message += f"\n📡 Источник: {feed_title}" |
| message += get_hf_reminder() |
| message += "\n\nℹ️" |
| |
| if len(message) <= MAX_MESSAGE_LENGTH: |
| return [message] |
| |
| log(f"📏 Сообщение {len(message)} символов, разбиваю на части...") |
| content_parts = split_message(summary, MAX_MESSAGE_LENGTH - 200) |
| total_parts = len(content_parts) |
| result_parts = [] |
| part_names = ["один", "два", "три", "четыре", "пять", "шесть", "семь", "восемь", "девять", "десять"] |
| |
| for i, part in enumerate(content_parts): |
| part_name = part_names[i] if i < len(part_names) else str(i + 1) |
| part_marker = f"📄 Часть {part_name} из {total_parts}\n\n" |
| if i == 0: |
| formatted = f"📰 {title}\n\n{part_marker}{part}\n\nℹ️" |
| elif i == total_parts - 1: |
| footer = "" |
| if link: |
| footer += f"\n\n🔗 {link}" |
| footer += f"\n\n📡 Источник: {feed_title}" |
| footer += get_hf_reminder() |
| footer += "\n\nℹ️" |
| formatted = f"{part_marker}{part}{footer}" |
| else: |
| formatted = f"{part_marker}{part}\n\nℹ️" |
| result_parts.append(formatted) |
| return result_parts |
|
|
|
|
| |
|
|
| def get_holidays_for_today(): |
| content = load_file(HOLIDAYS_FILE) |
| if not content: |
| return [] |
| now = datetime.now(MOSCOW_TZ) |
| today_str = now.strftime("%d.%m.") |
| holidays = [] |
| for line in content.split("\n"): |
| line = line.strip() |
| if not line: |
| continue |
| match = re.match(r'^(\d{2}\.\d{2}\.)\s*(.+)$', line) |
| if not match: |
| continue |
| date_part = match.group(1) |
| holiday_name = match.group(2).strip() |
| if date_part == today_str and holiday_name: |
| holidays.append(holiday_name) |
| return holidays |
|
|
|
|
| def format_holidays_section(): |
| holidays = get_holidays_for_today() |
| if not holidays: |
| return "" |
| section = "\n🎊 Сегодня отмечают:\n\n" |
| for holiday in holidays: |
| section += f"🎉 {holiday}\n" |
| return section |
|
|
|
|
| |
|
|
| def get_coordinates_single_attempt(city_name): |
| try: |
| url = f"https://geocoding-api.open-meteo.com/v1/search?name={city_name}&count=1&language=ru&format=json" |
| response = requests.get(url, timeout=15) |
| response.raise_for_status() |
| data = response.json() |
| if "results" in data and len(data["results"]) > 0: |
| result = data["results"][0] |
| return {"lat": result["latitude"], "lon": result["longitude"], |
| "name": result.get("name", city_name), "country": result.get("country", "")} |
| return None |
| except: |
| return None |
|
|
|
|
| def get_coordinates(city_name): |
| for attempt in range(WEATHER_RETRY_COUNT): |
| log(f"🔍 Геокодинг '{city_name}', попытка {attempt + 1}/{WEATHER_RETRY_COUNT}") |
| coords = get_coordinates_single_attempt(city_name) |
| if coords: |
| return coords |
| if attempt < WEATHER_RETRY_COUNT - 1: |
| time.sleep(WEATHER_RETRY_DELAY) |
| return None |
|
|
|
|
| def get_weather_single_attempt(lat, lon): |
| try: |
| url = f"https://api.open-meteo.com/v1/forecast?latitude={lat}&longitude={lon}¤t=temperature_2m,relative_humidity_2m,weather_code,wind_speed_10m&daily=temperature_2m_max,temperature_2m_min,weather_code&timezone=Europe/Moscow&forecast_days=1" |
| response = requests.get(url, timeout=15) |
| response.raise_for_status() |
| data = response.json() |
| current = data.get("current", {}) |
| daily = data.get("daily", {}) |
| if current.get("temperature_2m") is None: |
| return None |
| return {"temp": current.get("temperature_2m"), "humidity": current.get("relative_humidity_2m"), |
| "weather_code": current.get("weather_code"), "wind_speed": current.get("wind_speed_10m"), |
| "temp_max": daily.get("temperature_2m_max", [None])[0], |
| "temp_min": daily.get("temperature_2m_min", [None])[0]} |
| except: |
| return None |
|
|
|
|
| def get_weather(lat, lon): |
| for attempt in range(WEATHER_RETRY_COUNT): |
| log(f"🌤️ Запрос погоды, попытка {attempt + 1}/{WEATHER_RETRY_COUNT}") |
| weather = get_weather_single_attempt(lat, lon) |
| if weather: |
| return weather |
| if attempt < WEATHER_RETRY_COUNT - 1: |
| time.sleep(WEATHER_RETRY_DELAY) |
| return None |
|
|
|
|
| def weather_code_to_emoji(code): |
| weather_codes = { |
| 0: ("☀️", "ясно"), 1: ("🌤️", "малооблачно"), 2: ("⛅", "переменная облачность"), |
| 3: ("☁️", "облачно"), 45: ("🌫️", "туман"), 48: ("🌫️", "туман с изморозью"), |
| 51: ("🌧️", "лёгкая морось"), 53: ("🌧️", "морось"), 55: ("🌧️", "сильная морось"), |
| 61: ("🌧️", "небольшой дождь"), 63: ("🌧️", "дождь"), 65: ("🌧️", "сильный дождь"), |
| 71: ("🌨️", "небольшой снег"), 73: ("🌨️", "снег"), 75: ("🌨️", "сильный снег"), |
| 80: ("🌧️", "ливень"), 81: ("🌧️", "сильный ливень"), 95: ("⛈️", "гроза"), |
| } |
| return weather_codes.get(code, ("🌡️", "неизвестно")) |
|
|
|
|
| def get_date_string(): |
| now = datetime.now(MOSCOW_TZ) |
| months = ["января", "февраля", "марта", "апреля", "мая", "июня", |
| "июля", "августа", "сентября", "октября", "ноября", "декабря"] |
| weekdays = ["понедельник", "вторник", "среда", "четверг", "пятница", "суббота", "воскресенье"] |
| return f"{now.day} {months[now.month - 1]} {now.year} года, {weekdays[now.weekday()]}" |
|
|
|
|
| def format_morning_message(): |
| city = get_city() |
| date_str = get_date_string() |
| |
| coords = get_coordinates(city) |
| if not coords: |
| return None, f"❌ Не удалось определить город: {city}" |
| |
| weather = get_weather(coords["lat"], coords["lon"]) |
| if not weather: |
| return None, f"❌ Не удалось получить погоду для {coords['name']}" |
| |
| emoji, description = weather_code_to_emoji(weather.get("weather_code", 0)) |
| |
| message = f"☀️ Доброе утро!\n\n📅 Сегодня {date_str}\n\n" |
| message += f"🏙️ Погода в городе {coords['name']}" |
| if coords.get("country"): |
| message += f", {coords['country']}" |
| message += ":\n\n" |
| message += f"{emoji} {description.capitalize()}\n" |
| message += f"🌡️ Сейчас: {weather['temp']}°C\n" |
| if weather.get("temp_min") is not None and weather.get("temp_max") is not None: |
| message += f"📊 Мин/Макс: {weather['temp_min']}°C / {weather['temp_max']}°C\n" |
| if weather.get("humidity") is not None: |
| message += f"💧 Влажность: {weather['humidity']}%\n" |
| if weather.get("wind_speed") is not None: |
| message += f"💨 Ветер: {weather['wind_speed']} км/ч\n" |
| message += format_holidays_section() |
| message += "\nХорошего дня! 🌟\n\nℹ️" |
| |
| return message, None |
|
|
|
|
| |
|
|
| def try_send_method(push_url, message, method_name, request_func): |
| try: |
| response = request_func() |
| success = response.status_code == 200 and "error" not in response.text.lower() |
| log(f" {method_name}: {response.status_code} | {response.text[:60]}") |
| return success |
| except Exception as e: |
| log(f" {method_name}: ❌ {str(e)[:40]}") |
| return False |
|
|
|
|
| def send_to_url(push_url, message): |
| global working_methods |
| |
| if not push_url: |
| return False |
| |
| push_url = push_url.strip() |
| if not push_url.startswith("http"): |
| push_url = "https://" + push_url |
| |
| short_url = push_url.split("/")[-1][:10] |
| log(f"📤 [{short_url}] ({len(message)} симв.): {message[:50]}...") |
| |
| methods = [ |
| ("raw_string", lambda: requests.post(push_url, data=message, timeout=30)), |
| ("raw_bytes", lambda: requests.post(push_url, data=message.encode('utf-8'), timeout=30)), |
| ("text_plain", lambda: requests.post(push_url, data=message.encode('utf-8'), headers={'Content-Type': 'text/plain; charset=utf-8'}, timeout=30)), |
| ("form_d", lambda: requests.post(push_url, data={'d': message}, timeout=30)), |
| ("form_data", lambda: requests.post(push_url, data={'data': message}, timeout=30)), |
| ("form_text", lambda: requests.post(push_url, data={'text': message}, timeout=30)), |
| ] |
| |
| if push_url in working_methods: |
| method_name = working_methods[push_url] |
| for name, func in methods: |
| if name == method_name: |
| if try_send_method(push_url, message, name, func): |
| log(f"✅ [{short_url}] Отправлено") |
| return True |
| else: |
| working_methods.pop(push_url, None) |
| break |
| |
| for name, func in methods: |
| if try_send_method(push_url, message, name, func): |
| working_methods[push_url] = name |
| log(f"✅ [{short_url}] Отправлено методом {name}") |
| return True |
| |
| log(f"❌ [{short_url}] Все методы не сработали") |
| return False |
|
|
|
|
| def send_to_channels(message, channel_indices): |
| channels = get_channels() |
| success_count = 0 |
| for idx in channel_indices: |
| if idx < len(channels): |
| if send_to_url(channels[idx]["url"], message): |
| success_count += 1 |
| time.sleep(0.5) |
| return success_count |
|
|
|
|
| def send_message_parts_to_channels(parts, channel_indices): |
| channels = get_channels() |
| success = True |
| for idx in channel_indices: |
| if idx < len(channels): |
| for i, part in enumerate(parts): |
| if not send_to_url(channels[idx]["url"], part): |
| success = False |
| if i < len(parts) - 1: |
| time.sleep(1) |
| time.sleep(1) |
| return success |
|
|
|
|
| |
|
|
| def check_and_send_morning_message(): |
| global last_morning_date |
| |
| now = datetime.now(MOSCOW_TZ) |
| today = now.strftime("%Y-%m-%d") |
| |
| if now.hour == 7 and now.minute < 30 and last_morning_date != today: |
| log("🌅 Время утреннего сообщения!") |
| |
| channel_indices = get_morning_channel_indices() |
| if not channel_indices: |
| log("⚠️ Нет каналов для утреннего сообщения") |
| last_morning_date = today |
| return |
| |
| message, error = format_morning_message() |
| |
| if message: |
| count = send_to_channels(message, channel_indices) |
| if count > 0: |
| last_morning_date = today |
| log(f"✅ Утреннее сообщение отправлено в {count} каналов") |
| else: |
| log(f"❌ {error}") |
| error_msg = f"☀️ Доброе утро!\n\n📅 Сегодня {get_date_string()}\n\n{error}\n\nℹ️" |
| send_to_channels(error_msg, channel_indices) |
| last_morning_date = today |
|
|
|
|
| def check_feed(feed_url): |
| try: |
| feed = feedparser.parse(feed_url) |
| if feed.bozo and not feed.entries: |
| log(f"⚠️ Ошибка парсинга: {feed_url}") |
| return 0 |
| |
| feed_title = feed.feed.get("title", feed_url) |
| seen_ids = get_seen_ids() |
| channel_indices = get_rss_channel_indices(feed_url) |
| |
| if not channel_indices: |
| log(f"⚠️ Нет каналов для RSS: {feed_url}") |
| return 0 |
| |
| new_count = 0 |
| for entry in reversed(feed.entries[:10]): |
| article_id = generate_article_id(entry) |
| if article_id not in seen_ids: |
| parts = format_message_parts(entry, feed_title) |
| log(f"📝 Статья → {len(channel_indices)} каналов") |
| if send_message_parts_to_channels(parts, channel_indices): |
| save_seen_id(article_id) |
| new_count += 1 |
| time.sleep(2) |
| |
| return new_count |
| except Exception as e: |
| log(f"❌ Ошибка: {str(e)}") |
| return 0 |
|
|
|
|
| def check_all_feeds(): |
| feeds = get_rss_feeds() |
| channels = get_channels() |
| |
| if not feeds: |
| return "Список RSS пуст" |
| if not channels: |
| return "Нет каналов в token.txt" |
| |
| log(f"🔄 Проверка {len(feeds)} RSS...") |
| total_new = 0 |
| for feed_url in feeds: |
| log(f"📡 {feed_url}") |
| total_new += check_feed(feed_url) |
| time.sleep(2) |
| |
| log(f"✅ Готово. Новых: {total_new}") |
| return f"Проверено: {len(feeds)}, новых статей: {total_new}" |
|
|
|
|
| def background_checker(): |
| global is_running |
| log("🔄 Фоновый процесс запущен") |
| |
| while is_running: |
| try: |
| check_and_send_morning_message() |
| check_all_feeds() |
| except Exception as e: |
| log(f"❌ Ошибка: {str(e)}") |
| |
| for _ in range(30): |
| if not is_running: |
| break |
| time.sleep(60) |
| try: |
| check_and_send_morning_message() |
| except: |
| pass |
| |
| log("⏹️ Фоновый процесс остановлен") |
|
|
|
|
| |
|
|
| def start_monitoring(password): |
| if not verify_password(password): |
| return "🔒 Неверный пароль!" |
| global is_running |
| if is_running: |
| return "⚠️ Мониторинг уже запущен" |
| is_running = True |
| threading.Thread(target=background_checker, daemon=True).start() |
| log("🚀 Мониторинг запущен") |
| return "✅ Мониторинг запущен" |
|
|
|
|
| def stop_monitoring(password): |
| if not verify_password(password): |
| return "🔒 Неверный пароль!" |
| global is_running |
| is_running = False |
| log("⏹️ Мониторинг остановлен") |
| return "⏹️ Мониторинг остановлен" |
|
|
|
|
| def manual_check(password): |
| if not verify_password(password): |
| return "🔒 Неверный пароль!" |
| return check_all_feeds() |
|
|
|
|
| def test_all_channels(password): |
| """Тест всех каналов - отправляет по одному сообщению в каждый""" |
| if not verify_password(password): |
| return "🔒 Неверный пароль!" |
| |
| channels = get_channels() |
| if not channels: |
| return "❌ Нет каналов в token.txt" |
| |
| results = [] |
| for i, ch in enumerate(channels): |
| msg = f"🧪 Тест канала {i + 1}\n\nПроверка работоспособности.\n\nℹ️" |
| if send_to_url(ch["url"], msg): |
| results.append(f"✅ Канал {i + 1}: работает") |
| else: |
| results.append(f"❌ Канал {i + 1}: ошибка") |
| time.sleep(1) |
| |
| return "Проверка каналов:\n" + "\n".join(results) |
|
|
|
|
| def test_morning_message(password): |
| """Тест утреннего сообщения - ТОЛЬКО по маршрутам""" |
| if not verify_password(password): |
| return "🔒 Неверный пароль!" |
| |
| channels = get_channels() |
| if not channels: |
| return "❌ Нет каналов в token.txt" |
| |
| channel_indices = get_morning_channel_indices() |
| if not channel_indices: |
| return "❌ Утреннее сообщение не настроено.\nПерейдите в 'Маршруты' и выберите каналы." |
| |
| message, error = format_morning_message() |
| if error: |
| return error |
| |
| channel_names = [f"Канал {i+1}" for i in channel_indices] |
| log(f"🌅 Тест утра → {channel_names}") |
| |
| count = send_to_channels(message, channel_indices) |
| holidays = get_holidays_for_today() |
| |
| return f"✅ Отправлено в {count}/{len(channel_indices)} каналов\n📢 Каналы: {', '.join(channel_names)}\n🎉 Праздников: {len(holidays)}" |
|
|
|
|
| def test_rss_message(password): |
| """Тест RSS - отправляет тестовую новость по маршрутам первого RSS""" |
| if not verify_password(password): |
| return "🔒 Неверный пароль!" |
| |
| channels = get_channels() |
| if not channels: |
| return "❌ Нет каналов в token.txt" |
| |
| feeds = get_rss_feeds() |
| if not feeds: |
| return "❌ Нет RSS источников" |
| |
| test_feed = feeds[0] |
| channel_indices = get_rss_channel_indices(test_feed) |
| |
| if not channel_indices: |
| return "❌ RSS не настроен на каналы.\nПерейдите в 'Маршруты' и настройте." |
| |
| channel_names = [f"Канал {i+1}" for i in channel_indices] |
| test_msg = f"📰 Тестовая новость\n\nПроверка маршрутов RSS.\n\n🔗 https://example.com/test\n\n📡 Источник: Тест\n\nℹ️" |
| |
| log(f"🧪 Тест RSS → {channel_names}") |
| |
| count = send_to_channels(test_msg, channel_indices) |
| |
| short_feed = test_feed[:40] + "..." if len(test_feed) > 40 else test_feed |
| return f"✅ Отправлено в {count}/{len(channel_indices)} каналов\n📢 Каналы: {', '.join(channel_names)}\n📰 RSS: {short_feed}" |
|
|
|
|
| def test_holidays(password): |
| """Тест праздников - только показывает, НЕ отправляет""" |
| if not verify_password(password): |
| return "🔒 Неверный пароль!" |
| |
| now = datetime.now(MOSCOW_TZ) |
| holidays = get_holidays_for_today() |
| |
| if holidays: |
| result = f"📅 Дата: {now.strftime('%d.%m.')}\n🎉 Найдено праздников: {len(holidays)}\n\n" |
| for h in holidays: |
| result += f"🎉 {h}\n" |
| return result |
| return f"📅 Дата: {now.strftime('%d.%m.')}\n😔 Праздников не найдено" |
|
|
|
|
| def find_working_method(password): |
| """Найти рабочие методы для всех каналов""" |
| if not verify_password(password): |
| return "🔒 Неверный пароль!" |
| |
| global working_methods |
| working_methods = {} |
| |
| channels = get_channels() |
| if not channels: |
| return "❌ Нет каналов" |
| |
| log("🔍 Поиск рабочих методов...") |
| results = [] |
| |
| for i, ch in enumerate(channels): |
| msg = f"🔧 Поиск метода для канала {i + 1}\n\nℹ️" |
| if send_to_url(ch["url"], msg): |
| method = working_methods.get(ch["url"], "найден") |
| results.append(f"✅ Канал {i + 1}: {method}") |
| else: |
| results.append(f"❌ Канал {i + 1}: не найден") |
| time.sleep(1) |
| |
| return "Методы отправки:\n" + "\n".join(results) |
|
|
|
|
| |
|
|
| def get_routes_display(): |
| channels = get_channels() |
| routes = get_routes() |
| feeds = get_rss_feeds() |
| |
| if not channels: |
| return "Добавьте каналы в token.txt (по одной ссылке на строку)" |
| |
| lines = [] |
| |
| morning_idx = routes.get("morning", list(range(len(channels)))) |
| morning_names = [f"Канал {i+1}" for i in morning_idx if i < len(channels)] |
| lines.append(f"🌅 Утреннее сообщение → {', '.join(morning_names) if morning_names else 'никуда'}") |
| lines.append("") |
| |
| for feed in feeds: |
| rss_routes = routes.get("rss", {}) |
| if feed in rss_routes: |
| idx_list = [i for i in rss_routes[feed] if i < len(channels)] |
| names = [f"Канал {i+1}" for i in idx_list] |
| target = ', '.join(names) if names else 'никуда' |
| else: |
| target = 'все каналы' |
| short = feed[:50] + "..." if len(feed) > 50 else feed |
| lines.append(f"📰 {short}\n → {target}") |
| |
| return "\n\n".join(lines) if lines else "Нет маршрутов" |
|
|
|
|
| def get_channel_choices(): |
| channels = get_channels() |
| return [(f"Канал {i+1}", i) for i in range(len(channels))] |
|
|
|
|
| def get_rss_choices(): |
| feeds = get_rss_feeds() |
| return [(f[:50] + "..." if len(f) > 50 else f, f) for f in feeds] |
|
|
|
|
| def update_morning_route(password, indices): |
| if not verify_password(password): |
| return "🔒 Неверный пароль!", get_routes_display() |
| set_morning_channels(indices if indices else []) |
| names = [f"Канал {i+1}" for i in (indices or [])] |
| return f"✅ Утреннее → {', '.join(names) if names else 'никуда'}", get_routes_display() |
|
|
|
|
| def update_rss_route(password, rss_url, indices): |
| if not verify_password(password): |
| return "🔒 Неверный пароль!", get_routes_display() |
| if not rss_url: |
| return "❌ Выберите RSS", get_routes_display() |
| set_rss_channels(rss_url, indices if indices else []) |
| return "✅ Маршрут сохранён", get_routes_display() |
|
|
|
|
| def on_rss_select(rss_url): |
| if not rss_url: |
| return gr.update(value=[]) |
| indices = get_rss_channel_indices(rss_url) |
| return gr.update(value=indices) |
|
|
|
|
| |
|
|
| def save_tokens(password, tokens_text): |
| if not verify_password(password): |
| return "🔒 Неверный пароль!" |
| save_file(TOKEN_FILE, tokens_text) |
| count = len(get_channels()) |
| return f"✅ Сохранено {count} каналов" |
|
|
|
|
| def save_rss_list(password, rss_text): |
| if not verify_password(password): |
| return "🔒 Неверный пароль!" |
| save_file(RSS_FILE, rss_text) |
| return f"✅ Сохранено {len(get_rss_feeds())} RSS" |
|
|
|
|
| def save_hf_link(password, link): |
| if not verify_password(password): |
| return "🔒 Неверный пароль!" |
| save_file(HF_LINK_FILE, link) |
| return "✅ Ссылка на HF сохранена" |
|
|
|
|
| def save_city(password, city): |
| if not verify_password(password): |
| return "🔒 Неверный пароль!" |
| save_file(CITY_FILE, city.strip()) |
| return f"✅ Город сохранён: {city.strip()}" |
|
|
|
|
| def clear_seen(password): |
| if not verify_password(password): |
| return "🔒 Неверный пароль!" |
| save_file(SEEN_FILE, "") |
| return "✅ История очищена" |
|
|
|
|
| |
|
|
| def get_status(): |
| channels = get_channels() |
| feeds = get_rss_feeds() |
| hf_link = get_hf_link() |
| city = get_city() |
| seen_count = len(get_seen_ids()) |
| holidays_exist = os.path.exists(HOLIDAYS_FILE) |
| |
| now = datetime.now(MOSCOW_TZ) |
| moscow_time = now.strftime("%H:%M:%S") |
| |
| return f""" |
| 📊 Статус системы |
| |
| 🔄 Мониторинг: {"✅ Активен" if is_running else "⏹️ Остановлен"} |
| 📢 Каналов: {len(channels)} |
| 📡 RSS источников: {len(feeds)} |
| 🔗 Push.tg: {"✅ Настроен" if channels else "❌ Не настроен"} |
| 🌐 HF ссылка: {"✅ Настроена" if hf_link else "❌ Не настроена"} |
| 🏙️ Город для погоды: {city} |
| 🎉 Файл праздников: {"✅ Найден" if holidays_exist else "❌ Не найден"} |
| 📝 Обработано статей: {seen_count} |
| |
| 🕐 Московское время: {moscow_time} |
| 🌅 Утреннее сообщение: 7:00 МСК |
| 📅 Последнее утреннее: {last_morning_date or "ещё не отправлялось"} |
| |
| ⚠️ Автозапуск отключён - нужно запустить вручную |
| 🔒 Для управления требуется пароль |
| """ |
|
|
|
|
| def get_logs(): |
| if not logs: |
| return "Логов пока нет" |
| return "\n".join(logs[-50:]) |
|
|
|
|
| def load_tokens(): |
| return load_file(TOKEN_FILE) |
|
|
|
|
| def load_rss_list(): |
| return load_file(RSS_FILE) |
|
|
|
|
| def load_hf_link(): |
| return load_file(HF_LINK_FILE) |
|
|
|
|
| def load_city(): |
| return load_file(CITY_FILE) or "Moscow" |
|
|
|
|
| |
|
|
| with gr.Blocks(title="RSS Telegram Notifier") as app: |
| gr.Markdown("# 📰 RSS Telegram Notifier") |
| gr.Markdown("Автоматическая отправка новостей из RSS в Telegram + утренняя погода + праздники") |
| gr.Markdown("⚠️ После запуска приложения нужно вручную запустить мониторинг!") |
| |
| gr.Markdown("### 🔐 Авторизация") |
| password_input = gr.Textbox( |
| label="Пароль для управления", |
| placeholder="Введите пароль...", |
| type="password" |
| ) |
| |
| with gr.Tab("📊 Статус"): |
| status_output = gr.Markdown() |
| |
| with gr.Row(): |
| start_btn = gr.Button("▶️ Запустить мониторинг", variant="primary") |
| stop_btn = gr.Button("⏹️ Остановить", variant="stop") |
| refresh_btn = gr.Button("🔄 Обновить статус") |
| |
| with gr.Row(): |
| check_btn = gr.Button("🔍 Проверить RSS сейчас") |
| test_channels_btn = gr.Button("🧪 Тест всех каналов") |
| find_method_btn = gr.Button("🔧 Найти методы") |
| |
| with gr.Row(): |
| test_morning_btn = gr.Button("🌅 Тест утра (по маршрутам)", variant="secondary") |
| test_rss_btn = gr.Button("📰 Тест RSS (по маршрутам)", variant="secondary") |
| test_holidays_btn = gr.Button("🎉 Праздники (без отправки)", variant="secondary") |
| |
| result_output = gr.Textbox(label="Результат", lines=5) |
| |
| start_btn.click(start_monitoring, inputs=password_input, outputs=result_output) |
| stop_btn.click(stop_monitoring, inputs=password_input, outputs=result_output) |
| refresh_btn.click(get_status, outputs=status_output) |
| check_btn.click(manual_check, inputs=password_input, outputs=result_output) |
| test_channels_btn.click(test_all_channels, inputs=password_input, outputs=result_output) |
| find_method_btn.click(find_working_method, inputs=password_input, outputs=result_output) |
| test_morning_btn.click(test_morning_message, inputs=password_input, outputs=result_output) |
| test_rss_btn.click(test_rss_message, inputs=password_input, outputs=result_output) |
| test_holidays_btn.click(test_holidays, inputs=password_input, outputs=result_output) |
| |
| with gr.Tab("🔀 Маршруты"): |
| gr.Markdown("### Настройка маршрутов контента") |
| gr.Markdown("Выберите, в какие каналы отправлять каждый тип контента") |
| |
| routes_display = gr.Textbox(label="Текущие маршруты", lines=15, interactive=False) |
| |
| gr.Markdown("### 🌅 Утреннее сообщение (погода + праздники)") |
| morning_channels_select = gr.CheckboxGroup(label="Отправлять в каналы:", choices=[]) |
| update_morning_btn = gr.Button("💾 Сохранить маршрут утреннего сообщения") |
| morning_result = gr.Textbox(label="Результат", lines=1) |
| |
| gr.Markdown("### 📰 RSS источники") |
| rss_select = gr.Dropdown(label="Выберите RSS", choices=[]) |
| rss_channels_select = gr.CheckboxGroup(label="Отправлять в каналы:", choices=[]) |
| update_rss_btn = gr.Button("💾 Сохранить маршрут RSS") |
| rss_result = gr.Textbox(label="Результат", lines=1) |
| |
| def refresh_routes_ui(): |
| channels = get_channels() |
| choices = [(f"Канал {i+1}", i) for i in range(len(channels))] |
| routes = get_routes() |
| morning_selected = routes.get("morning", list(range(len(channels)))) |
| rss_choices = get_rss_choices() |
| return (get_routes_display(), gr.update(choices=choices, value=morning_selected), |
| gr.update(choices=rss_choices), gr.update(choices=choices)) |
| |
| app.load(refresh_routes_ui, outputs=[routes_display, morning_channels_select, rss_select, rss_channels_select]) |
| |
| update_morning_btn.click(update_morning_route, inputs=[password_input, morning_channels_select], |
| outputs=[morning_result, routes_display]) |
| rss_select.change(on_rss_select, inputs=rss_select, outputs=rss_channels_select) |
| update_rss_btn.click(update_rss_route, inputs=[password_input, rss_select, rss_channels_select], |
| outputs=[rss_result, routes_display]) |
| |
| with gr.Tab("⚙️ Настройки"): |
| gr.Markdown("### 📢 Каналы (token.txt)") |
| gr.Markdown("По одной ссылке push.tg на строку. Каждая строка = отдельный канал.") |
| tokens_input = gr.Textbox( |
| label="Ссылки push.tg", |
| placeholder="https://push.tg/xxx\nhttps://push.tg/yyy", |
| lines=5 |
| ) |
| tokens_save_btn = gr.Button("💾 Сохранить каналы") |
| tokens_result = gr.Textbox(label="Результат", lines=1) |
| |
| gr.Markdown("### 🏙️ Город для погоды") |
| gr.Markdown("Введите название города (на русском или английском)") |
| city_input = gr.Textbox(label="Город", placeholder="Москва") |
| city_save_btn = gr.Button("💾 Сохранить город") |
| city_result = gr.Textbox(label="Результат", lines=1) |
| |
| gr.Markdown("### 🌐 Ссылка на HF Space (для напоминаний)") |
| hf_link_input = gr.Textbox( |
| label="Ссылка на этот Space", |
| placeholder="https://huggingface.co/spaces/username/space-name" |
| ) |
| hf_save_btn = gr.Button("💾 Сохранить HF ссылку") |
| hf_result = gr.Textbox(label="Результат", lines=1) |
| |
| gr.Markdown("### 📡 RSS каналы (по одному на строку)") |
| rss_input = gr.Textbox( |
| label="RSS каналы", |
| placeholder="https://example.com/feed.xml\nhttps://another.com/rss", |
| lines=10 |
| ) |
| rss_save_btn = gr.Button("💾 Сохранить RSS список") |
| rss_save_result = gr.Textbox(label="Результат", lines=1) |
| |
| gr.Markdown("### 🗑️ Очистка") |
| clear_btn = gr.Button("🗑️ Очистить историю просмотренных") |
| clear_result = gr.Textbox(label="Результат", lines=1) |
| |
| app.load(load_tokens, outputs=tokens_input) |
| app.load(load_city, outputs=city_input) |
| app.load(load_hf_link, outputs=hf_link_input) |
| app.load(load_rss_list, outputs=rss_input) |
| |
| tokens_save_btn.click(save_tokens, inputs=[password_input, tokens_input], outputs=tokens_result) |
| city_save_btn.click(save_city, inputs=[password_input, city_input], outputs=city_result) |
| hf_save_btn.click(save_hf_link, inputs=[password_input, hf_link_input], outputs=hf_result) |
| rss_save_btn.click(save_rss_list, inputs=[password_input, rss_input], outputs=rss_save_result) |
| clear_btn.click(clear_seen, inputs=password_input, outputs=clear_result) |
| |
| with gr.Tab("📜 Логи"): |
| logs_output = gr.Textbox(label="Последние логи", lines=25) |
| logs_refresh_btn = gr.Button("🔄 Обновить логи") |
| logs_refresh_btn.click(get_logs, outputs=logs_output) |
| |
| app.load(get_status, outputs=status_output) |
|
|
| if __name__ == "__main__": |
| app.launch() |