Spaces:
Sleeping
Sleeping
| import logging | |
| import os | |
| from typing import Optional | |
| from fastapi import FastAPI, Request, HTTPException | |
| from telegram import Bot, Update | |
| from telegram.constants import ParseMode | |
| from google import genai | |
| from google.genai import types | |
| # --- Налады --- | |
| OWNER_ID = os.environ.get("SPACE_OWNER_ID") | |
| TELEGRAM_TOKEN = os.environ.get("bottok") | |
| BOT = Bot(token=TELEGRAM_TOKEN) | |
| WEBHOOK_PATH = "/webhook" | |
| # Дазваляем усе чаты і трэды | |
| ALLOWED_CHATS = None # None азначае — без абмежаванняў | |
| # --- НАЛАДА ЛАГАВАННЯ --- | |
| logging.basicConfig(level=logging.INFO) | |
| # --- Утыліты --- | |
| def is_allowed_chat_and_thread(chat_id: int, thread_id: Optional[int]) -> bool: | |
| return True | |
| def get_chat_link(chat) -> str: | |
| return f"https://t.me/{chat.username}" if chat.username else f"ID:{chat.id}" | |
| def get_user_mention(user) -> str: | |
| if user.username: | |
| return f"@{user.username}" | |
| return f'<a href="tg://user?id={user.id}">user_{user.id}</a>' | |
| # --- АНАЛІЗ З GEMINI --- | |
| def analyze_with_gemini(text: str) -> dict: | |
| api_key = os.environ.get("gemtok") | |
| if not api_key: | |
| raise RuntimeError("Не зададзены gemtok у сакрэтах Space") | |
| client = genai.Client(api_key=api_key) | |
| model = "gemini-2.0-flash-lite" | |
| contents = [ | |
| types.Content(role="user", parts=[types.Part.from_text(text=text)]) | |
| ] | |
| config = types.GenerateContentConfig( | |
| temperature=0.45, | |
| response_mime_type="application/json", | |
| system_instruction=[ types.Part.from_text(text=( | |
| "ты лінгвіст і мовазнаўца\n" | |
| "вызначаеш мову (language: be, ru, eng, other) і махлярства (is_scam: yes, no)" | |
| )) ] | |
| ) | |
| response = "" | |
| for chunk in client.models.generate_content_stream( | |
| model=model, contents=contents, config=config | |
| ): | |
| response += chunk.text | |
| try: | |
| parsed = __import__("json").loads(response) | |
| if isinstance(parsed, list) and parsed: | |
| return parsed[0] | |
| if isinstance(parsed, dict): | |
| return parsed | |
| raise ValueError("Unexpected format") | |
| except Exception as e: | |
| logging.error(f"Gemini parsing error: {e}, raw={response}") | |
| return {"language": "unknown", "is_scam": "no"} | |
| # --- FASTAPI APP --- | |
| app = FastAPI() | |
| async def root(): | |
| return {"status": "ok", "info": "Anti-Scam Bot"} | |
| async def telegram_webhook(request: Request): | |
| data = await request.json() | |
| try: | |
| update = Update.de_json(data, BOT) | |
| except Exception as e: | |
| raise HTTPException(status_code=400, detail=f"Bad update: {e}") | |
| await handle_message(update) | |
| return {"ok": True} | |
| async def handle_message(update: Update): | |
| logging.info(f"🔥 handle_message called, update: {update}") | |
| msg = update.message | |
| if not msg or not msg.text: | |
| return | |
| user, chat, text = msg.from_user, msg.chat, msg.text | |
| thread_id = getattr(msg, "message_thread_id", None) | |
| # --- Фільтр па тэмам форума --- | |
| is_topic = getattr(msg, "is_topic_message", False) | |
| if is_topic and thread_id and thread_id > 1: | |
| logging.info( | |
| f"Фільтр: паведамленне ў іншай тэме (thread_id={thread_id}) — прапуск" | |
| ) | |
| return | |
| # --- Фільтр па мове карыстальніка (language_code='be') --- | |
| user_lang = getattr(user, "language_code", None) | |
| if user_lang == "be": | |
| logging.info("Паведамленне ад карыстальніка з language_code='be' — прапуск без аналізу") | |
| return | |
| # --- Аналіз з Gemini --- | |
| try: | |
| gm = analyze_with_gemini(text) | |
| lang = gm.get("language", "unknown") | |
| scam = gm.get("is_scam", "no") | |
| except Exception as e: | |
| logging.error(f"Gemini error: {e}") | |
| lang, scam = "unknown", "no" | |
| # Калі паведамленне на беларускай — нічога не робім | |
| if lang == "be": | |
| logging.info("Паведамленне на беларускай мове (па версіі Gemini) — прапуск без апрацоўкі") | |
| return | |
| # --- Адладка: даслаць адказ Gemini уладальніку --- | |
| try: | |
| debug_text = ( | |
| f"[АДЛАДКА Gemini]\n" | |
| f"Дата: {msg.date.strftime('%Y-%m-%d %H:%M:%S')}\n" | |
| f"Карыстальнік: {get_user_mention(user)}\n" | |
| f"Чат: {chat.title if chat.title else chat.id}\n" | |
| f"Тэкст: {text}\n" | |
| f"Адказ Gemini: {gm}\n" | |
| f"Мова: {lang}, Махлярства: {scam}" | |
| ) | |
| owner = OWNER_ID | |
| try: | |
| chat_to = int(owner) | |
| except ValueError: | |
| chat_to = owner if owner.startswith('@') else f"@{owner}" | |
| logging.info(f"Debug message: OWNER_ID={owner!r}, chat_to={chat_to!r}") | |
| await BOT.send_message(chat_id=chat_to, text=debug_text, parse_mode=ParseMode.HTML) | |
| except Exception as e: | |
| logging.error(f"Debug message error: {e}") | |
| # --- Апрацоўка ў групах і супергрупах --- | |
| if chat.type in ["group", "supergroup"] and is_allowed_chat_and_thread(chat.id, thread_id): | |
| if scam == "yes": | |
| try: | |
| # Банім карыстальніка | |
| await BOT.ban_chat_member(chat.id, user.id) | |
| # Выдаляем ягонае паведамленне | |
| await msg.delete() | |
| # Паведамляем у чат | |
| await msg.reply_text( | |
| f"Карыстальнік {get_user_mention(user)} забанены за махлярства і паведамленне выдалена.", | |
| parse_mode=ParseMode.HTML | |
| ) | |
| # Інфарміруем уладальніка | |
| info = ( | |
| f"[Анты-Махлярства]\n" | |
| f"Дата: {msg.date.strftime('%Y-%m-%d %H:%M:%S')}\n" | |
| f"Група: {chat.title} ({get_chat_link(chat)})\n" | |
| f"Карыстальнік: {get_user_mention(user)}\n" | |
| f"Мова: {lang}\n" | |
| f"Тэкст: {text}\n" | |
| f"Статус: махлярскае" | |
| ) | |
| try: | |
| chat_to = int(owner) | |
| except ValueError: | |
| chat_to = owner if owner.startswith('@') else f"@{owner}" | |
| logging.info(f"Scam report: OWNER_ID={owner!r}, chat_to={chat_to!r}") | |
| await BOT.send_message(chat_id=chat_to, text=info, parse_mode=ParseMode.HTML) | |
| except Exception as e: | |
| logging.error(f"Error handling scam: {e}") | |
| elif lang == "ru": | |
| try: | |
| # Выдаляем рускі тэкст | |
| await msg.delete() | |
| logging.info(f"Выдаліў рускі тэкст ад {get_user_mention(user)}") | |
| except Exception as e: | |
| logging.error(f"Delete error: {e}") | |