TG_info / app.py
Ivan000's picture
Update app.py
05af3ab verified
import gradio as gr
import feedparser
import requests
import hashlib
import time
import threading
import os
import random
import base64
import json
from datetime import datetime
import pytz
from html import unescape
import re
# Пути к файлам
RSS_FILE = "rss.txt"
TOKEN_FILE = "token.txt"
SEEN_FILE = "seen.txt"
HF_LINK_FILE = "hf_link.txt"
CITY_FILE = "city.txt"
HOLIDAYS_FILE = "prazdniki.txt"
ROUTES_FILE = "routes.json"
# Пароль в base64 (ivangrozzni_ne_dast_slomat)
ENCODED_PASSWORD = "aXZhbmdyb3p6bmlfbmVfZGFzdF9zbG9tYXQ="
# Московский часовой пояс
MOSCOW_TZ = pytz.timezone('Europe/Moscow')
# Максимальный размер сообщения
MAX_MESSAGE_LENGTH = 2000
# Количество попыток для погоды
WEATHER_RETRY_COUNT = 3
WEATHER_RETRY_DELAY = 10
# Глобальные переменные
logs = []
is_running = False
working_methods = {}
last_morning_date = None
def verify_password(password):
try:
correct_password = base64.b64decode(ENCODED_PASSWORD).decode('utf-8')
return password == correct_password
except:
return False
def log(message):
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
log_entry = f"[{timestamp}] {message}"
logs.append(log_entry)
if len(logs) > 100:
logs.pop(0)
print(log_entry)
def load_file(filepath):
if os.path.exists(filepath):
with open(filepath, "r", encoding="utf-8") as f:
return f.read().strip()
return ""
def save_file(filepath, content):
with open(filepath, "w", encoding="utf-8") as f:
f.write(content)
def load_json(filepath):
if os.path.exists(filepath):
try:
with open(filepath, "r", encoding="utf-8") as f:
return json.load(f)
except:
return {}
return {}
def save_json(filepath, data):
with open(filepath, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=2)
# === Каналы ===
def get_channels():
content = load_file(TOKEN_FILE)
if not content:
return []
channels = []
for i, line in enumerate(content.split("\n")):
url = line.strip()
if url and not url.startswith("#"):
channels.append({"id": f"channel_{i}", "url": url, "name": f"Канал {i + 1}"})
return channels
def get_channel_by_index(index):
channels = get_channels()
if 0 <= index < len(channels):
return channels[index]
return None
# === Маршруты ===
def get_routes():
data = load_json(ROUTES_FILE)
if not data:
channels = get_channels()
all_indices = list(range(len(channels)))
data = {"morning": all_indices, "rss": {}}
save_json(ROUTES_FILE, data)
return data
def save_routes(routes):
save_json(ROUTES_FILE, routes)
def get_morning_channel_indices():
routes = get_routes()
channels = get_channels()
indices = routes.get("morning", list(range(len(channels))))
return [i for i in indices if i < len(channels)]
def get_rss_channel_indices(rss_url):
routes = get_routes()
channels = get_channels()
rss_routes = routes.get("rss", {})
if rss_url in rss_routes:
indices = rss_routes[rss_url]
return [i for i in indices if i < len(channels)]
return list(range(len(channels)))
def set_morning_channels(indices):
routes = get_routes()
routes["morning"] = indices
save_routes(routes)
def set_rss_channels(rss_url, indices):
routes = get_routes()
if "rss" not in routes:
routes["rss"] = {}
routes["rss"][rss_url] = indices
save_routes(routes)
# === Остальные функции ===
def get_rss_feeds():
content = load_file(RSS_FILE)
if not content:
return []
return [url.strip() for url in content.split("\n") if url.strip() and not url.strip().startswith("#")]
def get_hf_link():
return load_file(HF_LINK_FILE)
def get_city():
return load_file(CITY_FILE) or "Moscow"
def get_seen_ids():
content = load_file(SEEN_FILE)
if not content:
return set()
return set(content.split("\n"))
def save_seen_id(article_id):
seen = get_seen_ids()
seen.add(article_id)
seen_list = list(seen)[-1000:]
save_file(SEEN_FILE, "\n".join(seen_list))
def generate_article_id(entry):
unique_string = entry.get("id", "") or entry.get("link", "") or entry.get("title", "")
return hashlib.md5(unique_string.encode()).hexdigest()
def clean_html(text):
if not text:
return ""
clean = re.sub(r'<[^>]+>', '', text)
clean = unescape(clean)
clean = re.sub(r'\s+', ' ', clean).strip()
return clean
def get_hf_reminder():
if random.random() < 0.25:
hf_link = get_hf_link()
if hf_link:
return f"\n\n💤 HF замораживает скрипты, которыми не пользуются люди. Пожалуйста, перейдите по ссылке, чтобы бот продолжал работать: {hf_link}"
return ""
def get_entry_content(entry):
if entry.get("summary"):
text = entry.get("summary")
if text and len(text.strip()) > 10:
return text
if entry.get("description"):
text = entry.get("description")
if text and len(text.strip()) > 10:
return text
if entry.get("content"):
content_list = entry.get("content")
if isinstance(content_list, list) and len(content_list) > 0:
text = content_list[0].get("value", "")
if text and len(text.strip()) > 10:
return text
if entry.get("summary_detail"):
text = entry.get("summary_detail", {}).get("value", "")
if text and len(text.strip()) > 10:
return text
return ""
def split_message(message, max_length=MAX_MESSAGE_LENGTH):
if len(message) <= max_length:
return [message]
parts = []
remaining = message
while len(remaining) > max_length:
search_area = remaining[:max_length]
last_dot = -1
for i in range(len(search_area) - 1, max_length // 4, -1):
if search_area[i] == '.':
last_dot = i
break
if last_dot == -1:
last_space = search_area.rfind(' ')
if last_space > max_length // 4:
last_dot = last_space
else:
last_dot = max_length - 1
part = remaining[:last_dot + 1].strip()
parts.append(part)
remaining = remaining[last_dot + 1:].strip()
if remaining:
parts.append(remaining)
return parts
def format_message_parts(entry, feed_title):
title = clean_html(entry.get("title", "Без заголовка"))
link = entry.get("link", "")
raw_content = get_entry_content(entry)
summary = clean_html(raw_content)
log(f"📄 Контент: {len(summary)} символов")
message = f"📰 {title}\n\n"
if summary:
message += f"{summary}\n\n"
if link:
message += f"🔗 {link}\n"
message += f"\n📡 Источник: {feed_title}"
message += get_hf_reminder()
message += "\n\nℹ️"
if len(message) <= MAX_MESSAGE_LENGTH:
return [message]
log(f"📏 Сообщение {len(message)} символов, разбиваю на части...")
content_parts = split_message(summary, MAX_MESSAGE_LENGTH - 200)
total_parts = len(content_parts)
result_parts = []
part_names = ["один", "два", "три", "четыре", "пять", "шесть", "семь", "восемь", "девять", "десять"]
for i, part in enumerate(content_parts):
part_name = part_names[i] if i < len(part_names) else str(i + 1)
part_marker = f"📄 Часть {part_name} из {total_parts}\n\n"
if i == 0:
formatted = f"📰 {title}\n\n{part_marker}{part}\n\nℹ️"
elif i == total_parts - 1:
footer = ""
if link:
footer += f"\n\n🔗 {link}"
footer += f"\n\n📡 Источник: {feed_title}"
footer += get_hf_reminder()
footer += "\n\nℹ️"
formatted = f"{part_marker}{part}{footer}"
else:
formatted = f"{part_marker}{part}\n\nℹ️"
result_parts.append(formatted)
return result_parts
# === Праздники ===
def get_holidays_for_today():
content = load_file(HOLIDAYS_FILE)
if not content:
return []
now = datetime.now(MOSCOW_TZ)
today_str = now.strftime("%d.%m.")
holidays = []
for line in content.split("\n"):
line = line.strip()
if not line:
continue
match = re.match(r'^(\d{2}\.\d{2}\.)\s*(.+)$', line)
if not match:
continue
date_part = match.group(1)
holiday_name = match.group(2).strip()
if date_part == today_str and holiday_name:
holidays.append(holiday_name)
return holidays
def format_holidays_section():
holidays = get_holidays_for_today()
if not holidays:
return ""
section = "\n🎊 Сегодня отмечают:\n\n"
for holiday in holidays:
section += f"🎉 {holiday}\n"
return section
# === Погода ===
def get_coordinates_single_attempt(city_name):
try:
url = f"https://geocoding-api.open-meteo.com/v1/search?name={city_name}&count=1&language=ru&format=json"
response = requests.get(url, timeout=15)
response.raise_for_status()
data = response.json()
if "results" in data and len(data["results"]) > 0:
result = data["results"][0]
return {"lat": result["latitude"], "lon": result["longitude"],
"name": result.get("name", city_name), "country": result.get("country", "")}
return None
except:
return None
def get_coordinates(city_name):
for attempt in range(WEATHER_RETRY_COUNT):
log(f"🔍 Геокодинг '{city_name}', попытка {attempt + 1}/{WEATHER_RETRY_COUNT}")
coords = get_coordinates_single_attempt(city_name)
if coords:
return coords
if attempt < WEATHER_RETRY_COUNT - 1:
time.sleep(WEATHER_RETRY_DELAY)
return None
def get_weather_single_attempt(lat, lon):
try:
url = f"https://api.open-meteo.com/v1/forecast?latitude={lat}&longitude={lon}&current=temperature_2m,relative_humidity_2m,weather_code,wind_speed_10m&daily=temperature_2m_max,temperature_2m_min,weather_code&timezone=Europe/Moscow&forecast_days=1"
response = requests.get(url, timeout=15)
response.raise_for_status()
data = response.json()
current = data.get("current", {})
daily = data.get("daily", {})
if current.get("temperature_2m") is None:
return None
return {"temp": current.get("temperature_2m"), "humidity": current.get("relative_humidity_2m"),
"weather_code": current.get("weather_code"), "wind_speed": current.get("wind_speed_10m"),
"temp_max": daily.get("temperature_2m_max", [None])[0],
"temp_min": daily.get("temperature_2m_min", [None])[0]}
except:
return None
def get_weather(lat, lon):
for attempt in range(WEATHER_RETRY_COUNT):
log(f"🌤️ Запрос погоды, попытка {attempt + 1}/{WEATHER_RETRY_COUNT}")
weather = get_weather_single_attempt(lat, lon)
if weather:
return weather
if attempt < WEATHER_RETRY_COUNT - 1:
time.sleep(WEATHER_RETRY_DELAY)
return None
def weather_code_to_emoji(code):
weather_codes = {
0: ("☀️", "ясно"), 1: ("🌤️", "малооблачно"), 2: ("⛅", "переменная облачность"),
3: ("☁️", "облачно"), 45: ("🌫️", "туман"), 48: ("🌫️", "туман с изморозью"),
51: ("🌧️", "лёгкая морось"), 53: ("🌧️", "морось"), 55: ("🌧️", "сильная морось"),
61: ("🌧️", "небольшой дождь"), 63: ("🌧️", "дождь"), 65: ("🌧️", "сильный дождь"),
71: ("🌨️", "небольшой снег"), 73: ("🌨️", "снег"), 75: ("🌨️", "сильный снег"),
80: ("🌧️", "ливень"), 81: ("🌧️", "сильный ливень"), 95: ("⛈️", "гроза"),
}
return weather_codes.get(code, ("🌡️", "неизвестно"))
def get_date_string():
now = datetime.now(MOSCOW_TZ)
months = ["января", "февраля", "марта", "апреля", "мая", "июня",
"июля", "августа", "сентября", "октября", "ноября", "декабря"]
weekdays = ["понедельник", "вторник", "среда", "четверг", "пятница", "суббота", "воскресенье"]
return f"{now.day} {months[now.month - 1]} {now.year} года, {weekdays[now.weekday()]}"
def format_morning_message():
city = get_city()
date_str = get_date_string()
coords = get_coordinates(city)
if not coords:
return None, f"❌ Не удалось определить город: {city}"
weather = get_weather(coords["lat"], coords["lon"])
if not weather:
return None, f"❌ Не удалось получить погоду для {coords['name']}"
emoji, description = weather_code_to_emoji(weather.get("weather_code", 0))
message = f"☀️ Доброе утро!\n\n📅 Сегодня {date_str}\n\n"
message += f"🏙️ Погода в городе {coords['name']}"
if coords.get("country"):
message += f", {coords['country']}"
message += ":\n\n"
message += f"{emoji} {description.capitalize()}\n"
message += f"🌡️ Сейчас: {weather['temp']}°C\n"
if weather.get("temp_min") is not None and weather.get("temp_max") is not None:
message += f"📊 Мин/Макс: {weather['temp_min']}°C / {weather['temp_max']}°C\n"
if weather.get("humidity") is not None:
message += f"💧 Влажность: {weather['humidity']}%\n"
if weather.get("wind_speed") is not None:
message += f"💨 Ветер: {weather['wind_speed']} км/ч\n"
message += format_holidays_section()
message += "\nХорошего дня! 🌟\n\nℹ️"
return message, None
# === Отправка ===
def try_send_method(push_url, message, method_name, request_func):
try:
response = request_func()
success = response.status_code == 200 and "error" not in response.text.lower()
log(f" {method_name}: {response.status_code} | {response.text[:60]}")
return success
except Exception as e:
log(f" {method_name}: ❌ {str(e)[:40]}")
return False
def send_to_url(push_url, message):
global working_methods
if not push_url:
return False
push_url = push_url.strip()
if not push_url.startswith("http"):
push_url = "https://" + push_url
short_url = push_url.split("/")[-1][:10]
log(f"📤 [{short_url}] ({len(message)} симв.): {message[:50]}...")
methods = [
("raw_string", lambda: requests.post(push_url, data=message, timeout=30)),
("raw_bytes", lambda: requests.post(push_url, data=message.encode('utf-8'), timeout=30)),
("text_plain", lambda: requests.post(push_url, data=message.encode('utf-8'), headers={'Content-Type': 'text/plain; charset=utf-8'}, timeout=30)),
("form_d", lambda: requests.post(push_url, data={'d': message}, timeout=30)),
("form_data", lambda: requests.post(push_url, data={'data': message}, timeout=30)),
("form_text", lambda: requests.post(push_url, data={'text': message}, timeout=30)),
]
if push_url in working_methods:
method_name = working_methods[push_url]
for name, func in methods:
if name == method_name:
if try_send_method(push_url, message, name, func):
log(f"✅ [{short_url}] Отправлено")
return True
else:
working_methods.pop(push_url, None)
break
for name, func in methods:
if try_send_method(push_url, message, name, func):
working_methods[push_url] = name
log(f"✅ [{short_url}] Отправлено методом {name}")
return True
log(f"❌ [{short_url}] Все методы не сработали")
return False
def send_to_channels(message, channel_indices):
channels = get_channels()
success_count = 0
for idx in channel_indices:
if idx < len(channels):
if send_to_url(channels[idx]["url"], message):
success_count += 1
time.sleep(0.5)
return success_count
def send_message_parts_to_channels(parts, channel_indices):
channels = get_channels()
success = True
for idx in channel_indices:
if idx < len(channels):
for i, part in enumerate(parts):
if not send_to_url(channels[idx]["url"], part):
success = False
if i < len(parts) - 1:
time.sleep(1)
time.sleep(1)
return success
# === Проверки ===
def check_and_send_morning_message():
global last_morning_date
now = datetime.now(MOSCOW_TZ)
today = now.strftime("%Y-%m-%d")
if now.hour == 7 and now.minute < 30 and last_morning_date != today:
log("🌅 Время утреннего сообщения!")
channel_indices = get_morning_channel_indices()
if not channel_indices:
log("⚠️ Нет каналов для утреннего сообщения")
last_morning_date = today
return
message, error = format_morning_message()
if message:
count = send_to_channels(message, channel_indices)
if count > 0:
last_morning_date = today
log(f"✅ Утреннее сообщение отправлено в {count} каналов")
else:
log(f"❌ {error}")
error_msg = f"☀️ Доброе утро!\n\n📅 Сегодня {get_date_string()}\n\n{error}\n\nℹ️"
send_to_channels(error_msg, channel_indices)
last_morning_date = today
def check_feed(feed_url):
try:
feed = feedparser.parse(feed_url)
if feed.bozo and not feed.entries:
log(f"⚠️ Ошибка парсинга: {feed_url}")
return 0
feed_title = feed.feed.get("title", feed_url)
seen_ids = get_seen_ids()
channel_indices = get_rss_channel_indices(feed_url)
if not channel_indices:
log(f"⚠️ Нет каналов для RSS: {feed_url}")
return 0
new_count = 0
for entry in reversed(feed.entries[:10]):
article_id = generate_article_id(entry)
if article_id not in seen_ids:
parts = format_message_parts(entry, feed_title)
log(f"📝 Статья → {len(channel_indices)} каналов")
if send_message_parts_to_channels(parts, channel_indices):
save_seen_id(article_id)
new_count += 1
time.sleep(2)
return new_count
except Exception as e:
log(f"❌ Ошибка: {str(e)}")
return 0
def check_all_feeds():
feeds = get_rss_feeds()
channels = get_channels()
if not feeds:
return "Список RSS пуст"
if not channels:
return "Нет каналов в token.txt"
log(f"🔄 Проверка {len(feeds)} RSS...")
total_new = 0
for feed_url in feeds:
log(f"📡 {feed_url}")
total_new += check_feed(feed_url)
time.sleep(2)
log(f"✅ Готово. Новых: {total_new}")
return f"Проверено: {len(feeds)}, новых статей: {total_new}"
def background_checker():
global is_running
log("🔄 Фоновый процесс запущен")
while is_running:
try:
check_and_send_morning_message()
check_all_feeds()
except Exception as e:
log(f"❌ Ошибка: {str(e)}")
for _ in range(30):
if not is_running:
break
time.sleep(60)
try:
check_and_send_morning_message()
except:
pass
log("⏹️ Фоновый процесс остановлен")
# === Функции с паролем ===
def start_monitoring(password):
if not verify_password(password):
return "🔒 Неверный пароль!"
global is_running
if is_running:
return "⚠️ Мониторинг уже запущен"
is_running = True
threading.Thread(target=background_checker, daemon=True).start()
log("🚀 Мониторинг запущен")
return "✅ Мониторинг запущен"
def stop_monitoring(password):
if not verify_password(password):
return "🔒 Неверный пароль!"
global is_running
is_running = False
log("⏹️ Мониторинг остановлен")
return "⏹️ Мониторинг остановлен"
def manual_check(password):
if not verify_password(password):
return "🔒 Неверный пароль!"
return check_all_feeds()
def test_all_channels(password):
"""Тест всех каналов - отправляет по одному сообщению в каждый"""
if not verify_password(password):
return "🔒 Неверный пароль!"
channels = get_channels()
if not channels:
return "❌ Нет каналов в token.txt"
results = []
for i, ch in enumerate(channels):
msg = f"🧪 Тест канала {i + 1}\n\nПроверка работоспособности.\n\nℹ️"
if send_to_url(ch["url"], msg):
results.append(f"✅ Канал {i + 1}: работает")
else:
results.append(f"❌ Канал {i + 1}: ошибка")
time.sleep(1)
return "Проверка каналов:\n" + "\n".join(results)
def test_morning_message(password):
"""Тест утреннего сообщения - ТОЛЬКО по маршрутам"""
if not verify_password(password):
return "🔒 Неверный пароль!"
channels = get_channels()
if not channels:
return "❌ Нет каналов в token.txt"
channel_indices = get_morning_channel_indices()
if not channel_indices:
return "❌ Утреннее сообщение не настроено.\nПерейдите в 'Маршруты' и выберите каналы."
message, error = format_morning_message()
if error:
return error
channel_names = [f"Канал {i+1}" for i in channel_indices]
log(f"🌅 Тест утра → {channel_names}")
count = send_to_channels(message, channel_indices)
holidays = get_holidays_for_today()
return f"✅ Отправлено в {count}/{len(channel_indices)} каналов\n📢 Каналы: {', '.join(channel_names)}\n🎉 Праздников: {len(holidays)}"
def test_rss_message(password):
"""Тест RSS - отправляет тестовую новость по маршрутам первого RSS"""
if not verify_password(password):
return "🔒 Неверный пароль!"
channels = get_channels()
if not channels:
return "❌ Нет каналов в token.txt"
feeds = get_rss_feeds()
if not feeds:
return "❌ Нет RSS источников"
test_feed = feeds[0]
channel_indices = get_rss_channel_indices(test_feed)
if not channel_indices:
return "❌ RSS не настроен на каналы.\nПерейдите в 'Маршруты' и настройте."
channel_names = [f"Канал {i+1}" for i in channel_indices]
test_msg = f"📰 Тестовая новость\n\nПроверка маршрутов RSS.\n\n🔗 https://example.com/test\n\n📡 Источник: Тест\n\nℹ️"
log(f"🧪 Тест RSS → {channel_names}")
count = send_to_channels(test_msg, channel_indices)
short_feed = test_feed[:40] + "..." if len(test_feed) > 40 else test_feed
return f"✅ Отправлено в {count}/{len(channel_indices)} каналов\n📢 Каналы: {', '.join(channel_names)}\n📰 RSS: {short_feed}"
def test_holidays(password):
"""Тест праздников - только показывает, НЕ отправляет"""
if not verify_password(password):
return "🔒 Неверный пароль!"
now = datetime.now(MOSCOW_TZ)
holidays = get_holidays_for_today()
if holidays:
result = f"📅 Дата: {now.strftime('%d.%m.')}\n🎉 Найдено праздников: {len(holidays)}\n\n"
for h in holidays:
result += f"🎉 {h}\n"
return result
return f"📅 Дата: {now.strftime('%d.%m.')}\n😔 Праздников не найдено"
def find_working_method(password):
"""Найти рабочие методы для всех каналов"""
if not verify_password(password):
return "🔒 Неверный пароль!"
global working_methods
working_methods = {}
channels = get_channels()
if not channels:
return "❌ Нет каналов"
log("🔍 Поиск рабочих методов...")
results = []
for i, ch in enumerate(channels):
msg = f"🔧 Поиск метода для канала {i + 1}\n\nℹ️"
if send_to_url(ch["url"], msg):
method = working_methods.get(ch["url"], "найден")
results.append(f"✅ Канал {i + 1}: {method}")
else:
results.append(f"❌ Канал {i + 1}: не найден")
time.sleep(1)
return "Методы отправки:\n" + "\n".join(results)
# === Маршруты ===
def get_routes_display():
channels = get_channels()
routes = get_routes()
feeds = get_rss_feeds()
if not channels:
return "Добавьте каналы в token.txt (по одной ссылке на строку)"
lines = []
morning_idx = routes.get("morning", list(range(len(channels))))
morning_names = [f"Канал {i+1}" for i in morning_idx if i < len(channels)]
lines.append(f"🌅 Утреннее сообщение → {', '.join(morning_names) if morning_names else 'никуда'}")
lines.append("")
for feed in feeds:
rss_routes = routes.get("rss", {})
if feed in rss_routes:
idx_list = [i for i in rss_routes[feed] if i < len(channels)]
names = [f"Канал {i+1}" for i in idx_list]
target = ', '.join(names) if names else 'никуда'
else:
target = 'все каналы'
short = feed[:50] + "..." if len(feed) > 50 else feed
lines.append(f"📰 {short}\n → {target}")
return "\n\n".join(lines) if lines else "Нет маршрутов"
def get_channel_choices():
channels = get_channels()
return [(f"Канал {i+1}", i) for i in range(len(channels))]
def get_rss_choices():
feeds = get_rss_feeds()
return [(f[:50] + "..." if len(f) > 50 else f, f) for f in feeds]
def update_morning_route(password, indices):
if not verify_password(password):
return "🔒 Неверный пароль!", get_routes_display()
set_morning_channels(indices if indices else [])
names = [f"Канал {i+1}" for i in (indices or [])]
return f"✅ Утреннее → {', '.join(names) if names else 'никуда'}", get_routes_display()
def update_rss_route(password, rss_url, indices):
if not verify_password(password):
return "🔒 Неверный пароль!", get_routes_display()
if not rss_url:
return "❌ Выберите RSS", get_routes_display()
set_rss_channels(rss_url, indices if indices else [])
return "✅ Маршрут сохранён", get_routes_display()
def on_rss_select(rss_url):
if not rss_url:
return gr.update(value=[])
indices = get_rss_channel_indices(rss_url)
return gr.update(value=indices)
# === Настройки ===
def save_tokens(password, tokens_text):
if not verify_password(password):
return "🔒 Неверный пароль!"
save_file(TOKEN_FILE, tokens_text)
count = len(get_channels())
return f"✅ Сохранено {count} каналов"
def save_rss_list(password, rss_text):
if not verify_password(password):
return "🔒 Неверный пароль!"
save_file(RSS_FILE, rss_text)
return f"✅ Сохранено {len(get_rss_feeds())} RSS"
def save_hf_link(password, link):
if not verify_password(password):
return "🔒 Неверный пароль!"
save_file(HF_LINK_FILE, link)
return "✅ Ссылка на HF сохранена"
def save_city(password, city):
if not verify_password(password):
return "🔒 Неверный пароль!"
save_file(CITY_FILE, city.strip())
return f"✅ Город сохранён: {city.strip()}"
def clear_seen(password):
if not verify_password(password):
return "🔒 Неверный пароль!"
save_file(SEEN_FILE, "")
return "✅ История очищена"
# === Без пароля ===
def get_status():
channels = get_channels()
feeds = get_rss_feeds()
hf_link = get_hf_link()
city = get_city()
seen_count = len(get_seen_ids())
holidays_exist = os.path.exists(HOLIDAYS_FILE)
now = datetime.now(MOSCOW_TZ)
moscow_time = now.strftime("%H:%M:%S")
return f"""
📊 Статус системы
🔄 Мониторинг: {"✅ Активен" if is_running else "⏹️ Остановлен"}
📢 Каналов: {len(channels)}
📡 RSS источников: {len(feeds)}
🔗 Push.tg: {"✅ Настроен" if channels else "❌ Не настроен"}
🌐 HF ссылка: {"✅ Настроена" if hf_link else "❌ Не настроена"}
🏙️ Город для погоды: {city}
🎉 Файл праздников: {"✅ Найден" if holidays_exist else "❌ Не найден"}
📝 Обработано статей: {seen_count}
🕐 Московское время: {moscow_time}
🌅 Утреннее сообщение: 7:00 МСК
📅 Последнее утреннее: {last_morning_date or "ещё не отправлялось"}
⚠️ Автозапуск отключён - нужно запустить вручную
🔒 Для управления требуется пароль
"""
def get_logs():
if not logs:
return "Логов пока нет"
return "\n".join(logs[-50:])
def load_tokens():
return load_file(TOKEN_FILE)
def load_rss_list():
return load_file(RSS_FILE)
def load_hf_link():
return load_file(HF_LINK_FILE)
def load_city():
return load_file(CITY_FILE) or "Moscow"
# === Интерфейс Gradio ===
with gr.Blocks(title="RSS Telegram Notifier") as app:
gr.Markdown("# 📰 RSS Telegram Notifier")
gr.Markdown("Автоматическая отправка новостей из RSS в Telegram + утренняя погода + праздники")
gr.Markdown("⚠️ После запуска приложения нужно вручную запустить мониторинг!")
gr.Markdown("### 🔐 Авторизация")
password_input = gr.Textbox(
label="Пароль для управления",
placeholder="Введите пароль...",
type="password"
)
with gr.Tab("📊 Статус"):
status_output = gr.Markdown()
with gr.Row():
start_btn = gr.Button("▶️ Запустить мониторинг", variant="primary")
stop_btn = gr.Button("⏹️ Остановить", variant="stop")
refresh_btn = gr.Button("🔄 Обновить статус")
with gr.Row():
check_btn = gr.Button("🔍 Проверить RSS сейчас")
test_channels_btn = gr.Button("🧪 Тест всех каналов")
find_method_btn = gr.Button("🔧 Найти методы")
with gr.Row():
test_morning_btn = gr.Button("🌅 Тест утра (по маршрутам)", variant="secondary")
test_rss_btn = gr.Button("📰 Тест RSS (по маршрутам)", variant="secondary")
test_holidays_btn = gr.Button("🎉 Праздники (без отправки)", variant="secondary")
result_output = gr.Textbox(label="Результат", lines=5)
start_btn.click(start_monitoring, inputs=password_input, outputs=result_output)
stop_btn.click(stop_monitoring, inputs=password_input, outputs=result_output)
refresh_btn.click(get_status, outputs=status_output)
check_btn.click(manual_check, inputs=password_input, outputs=result_output)
test_channels_btn.click(test_all_channels, inputs=password_input, outputs=result_output)
find_method_btn.click(find_working_method, inputs=password_input, outputs=result_output)
test_morning_btn.click(test_morning_message, inputs=password_input, outputs=result_output)
test_rss_btn.click(test_rss_message, inputs=password_input, outputs=result_output)
test_holidays_btn.click(test_holidays, inputs=password_input, outputs=result_output)
with gr.Tab("🔀 Маршруты"):
gr.Markdown("### Настройка маршрутов контента")
gr.Markdown("Выберите, в какие каналы отправлять каждый тип контента")
routes_display = gr.Textbox(label="Текущие маршруты", lines=15, interactive=False)
gr.Markdown("### 🌅 Утреннее сообщение (погода + праздники)")
morning_channels_select = gr.CheckboxGroup(label="Отправлять в каналы:", choices=[])
update_morning_btn = gr.Button("💾 Сохранить маршрут утреннего сообщения")
morning_result = gr.Textbox(label="Результат", lines=1)
gr.Markdown("### 📰 RSS источники")
rss_select = gr.Dropdown(label="Выберите RSS", choices=[])
rss_channels_select = gr.CheckboxGroup(label="Отправлять в каналы:", choices=[])
update_rss_btn = gr.Button("💾 Сохранить маршрут RSS")
rss_result = gr.Textbox(label="Результат", lines=1)
def refresh_routes_ui():
channels = get_channels()
choices = [(f"Канал {i+1}", i) for i in range(len(channels))]
routes = get_routes()
morning_selected = routes.get("morning", list(range(len(channels))))
rss_choices = get_rss_choices()
return (get_routes_display(), gr.update(choices=choices, value=morning_selected),
gr.update(choices=rss_choices), gr.update(choices=choices))
app.load(refresh_routes_ui, outputs=[routes_display, morning_channels_select, rss_select, rss_channels_select])
update_morning_btn.click(update_morning_route, inputs=[password_input, morning_channels_select],
outputs=[morning_result, routes_display])
rss_select.change(on_rss_select, inputs=rss_select, outputs=rss_channels_select)
update_rss_btn.click(update_rss_route, inputs=[password_input, rss_select, rss_channels_select],
outputs=[rss_result, routes_display])
with gr.Tab("⚙️ Настройки"):
gr.Markdown("### 📢 Каналы (token.txt)")
gr.Markdown("По одной ссылке push.tg на строку. Каждая строка = отдельный канал.")
tokens_input = gr.Textbox(
label="Ссылки push.tg",
placeholder="https://push.tg/xxx\nhttps://push.tg/yyy",
lines=5
)
tokens_save_btn = gr.Button("💾 Сохранить каналы")
tokens_result = gr.Textbox(label="Результат", lines=1)
gr.Markdown("### 🏙️ Город для погоды")
gr.Markdown("Введите название города (на русском или английском)")
city_input = gr.Textbox(label="Город", placeholder="Москва")
city_save_btn = gr.Button("💾 Сохранить город")
city_result = gr.Textbox(label="Результат", lines=1)
gr.Markdown("### 🌐 Ссылка на HF Space (для напоминаний)")
hf_link_input = gr.Textbox(
label="Ссылка на этот Space",
placeholder="https://huggingface.co/spaces/username/space-name"
)
hf_save_btn = gr.Button("💾 Сохранить HF ссылку")
hf_result = gr.Textbox(label="Результат", lines=1)
gr.Markdown("### 📡 RSS каналы (по одному на строку)")
rss_input = gr.Textbox(
label="RSS каналы",
placeholder="https://example.com/feed.xml\nhttps://another.com/rss",
lines=10
)
rss_save_btn = gr.Button("💾 Сохранить RSS список")
rss_save_result = gr.Textbox(label="Результат", lines=1)
gr.Markdown("### 🗑️ Очистка")
clear_btn = gr.Button("🗑️ Очистить историю просмотренных")
clear_result = gr.Textbox(label="Результат", lines=1)
app.load(load_tokens, outputs=tokens_input)
app.load(load_city, outputs=city_input)
app.load(load_hf_link, outputs=hf_link_input)
app.load(load_rss_list, outputs=rss_input)
tokens_save_btn.click(save_tokens, inputs=[password_input, tokens_input], outputs=tokens_result)
city_save_btn.click(save_city, inputs=[password_input, city_input], outputs=city_result)
hf_save_btn.click(save_hf_link, inputs=[password_input, hf_link_input], outputs=hf_result)
rss_save_btn.click(save_rss_list, inputs=[password_input, rss_input], outputs=rss_save_result)
clear_btn.click(clear_seen, inputs=password_input, outputs=clear_result)
with gr.Tab("📜 Логи"):
logs_output = gr.Textbox(label="Последние логи", lines=25)
logs_refresh_btn = gr.Button("🔄 Обновить логи")
logs_refresh_btn.click(get_logs, outputs=logs_output)
app.load(get_status, outputs=status_output)
if __name__ == "__main__":
app.launch()