abaronca / space_app.py
archivartaunik's picture
Update space_app.py
2d831df verified
import logging
import os
from typing import Optional
from fastapi import FastAPI, Request, HTTPException
from telegram import Bot, Update
from telegram.constants import ParseMode
from google import genai
from google.genai import types
# --- Налады ---
OWNER_ID = os.environ.get("SPACE_OWNER_ID")
TELEGRAM_TOKEN = os.environ.get("bottok")
BOT = Bot(token=TELEGRAM_TOKEN)
WEBHOOK_PATH = "/webhook"
# Дазваляем усе чаты і трэды
ALLOWED_CHATS = None # None азначае — без абмежаванняў
# --- НАЛАДА ЛАГАВАННЯ ---
logging.basicConfig(level=logging.INFO)
# --- Утыліты ---
def is_allowed_chat_and_thread(chat_id: int, thread_id: Optional[int]) -> bool:
return True
def get_chat_link(chat) -> str:
return f"https://t.me/{chat.username}" if chat.username else f"ID:{chat.id}"
def get_user_mention(user) -> str:
if user.username:
return f"@{user.username}"
return f'<a href="tg://user?id={user.id}">user_{user.id}</a>'
# --- АНАЛІЗ З GEMINI ---
def analyze_with_gemini(text: str) -> dict:
api_key = os.environ.get("gemtok")
if not api_key:
raise RuntimeError("Не зададзены gemtok у сакрэтах Space")
client = genai.Client(api_key=api_key)
model = "gemini-2.0-flash-lite"
contents = [
types.Content(role="user", parts=[types.Part.from_text(text=text)])
]
config = types.GenerateContentConfig(
temperature=0.45,
response_mime_type="application/json",
system_instruction=[ types.Part.from_text(text=(
"ты лінгвіст і мовазнаўца\n"
"вызначаеш мову (language: be, ru, eng, other) і махлярства (is_scam: yes, no)"
)) ]
)
response = ""
for chunk in client.models.generate_content_stream(
model=model, contents=contents, config=config
):
response += chunk.text
try:
parsed = __import__("json").loads(response)
if isinstance(parsed, list) and parsed:
return parsed[0]
if isinstance(parsed, dict):
return parsed
raise ValueError("Unexpected format")
except Exception as e:
logging.error(f"Gemini parsing error: {e}, raw={response}")
return {"language": "unknown", "is_scam": "no"}
# --- FASTAPI APP ---
app = FastAPI()
@app.get("/")
async def root():
return {"status": "ok", "info": "Anti-Scam Bot"}
@app.post(WEBHOOK_PATH)
async def telegram_webhook(request: Request):
data = await request.json()
try:
update = Update.de_json(data, BOT)
except Exception as e:
raise HTTPException(status_code=400, detail=f"Bad update: {e}")
await handle_message(update)
return {"ok": True}
async def handle_message(update: Update):
logging.info(f"🔥 handle_message called, update: {update}")
msg = update.message
if not msg or not msg.text:
return
user, chat, text = msg.from_user, msg.chat, msg.text
thread_id = getattr(msg, "message_thread_id", None)
# --- Фільтр па тэмам форума ---
is_topic = getattr(msg, "is_topic_message", False)
if is_topic and thread_id and thread_id > 1:
logging.info(
f"Фільтр: паведамленне ў іншай тэме (thread_id={thread_id}) — прапуск"
)
return
# --- Фільтр па мове карыстальніка (language_code='be') ---
user_lang = getattr(user, "language_code", None)
if user_lang == "be":
logging.info("Паведамленне ад карыстальніка з language_code='be' — прапуск без аналізу")
return
# --- Аналіз з Gemini ---
try:
gm = analyze_with_gemini(text)
lang = gm.get("language", "unknown")
scam = gm.get("is_scam", "no")
except Exception as e:
logging.error(f"Gemini error: {e}")
lang, scam = "unknown", "no"
# Калі паведамленне на беларускай — нічога не робім
if lang == "be":
logging.info("Паведамленне на беларускай мове (па версіі Gemini) — прапуск без апрацоўкі")
return
# --- Адладка: даслаць адказ Gemini уладальніку ---
try:
debug_text = (
f"[АДЛАДКА Gemini]\n"
f"Дата: {msg.date.strftime('%Y-%m-%d %H:%M:%S')}\n"
f"Карыстальнік: {get_user_mention(user)}\n"
f"Чат: {chat.title if chat.title else chat.id}\n"
f"Тэкст: {text}\n"
f"Адказ Gemini: {gm}\n"
f"Мова: {lang}, Махлярства: {scam}"
)
owner = OWNER_ID
try:
chat_to = int(owner)
except ValueError:
chat_to = owner if owner.startswith('@') else f"@{owner}"
logging.info(f"Debug message: OWNER_ID={owner!r}, chat_to={chat_to!r}")
await BOT.send_message(chat_id=chat_to, text=debug_text, parse_mode=ParseMode.HTML)
except Exception as e:
logging.error(f"Debug message error: {e}")
# --- Апрацоўка ў групах і супергрупах ---
if chat.type in ["group", "supergroup"] and is_allowed_chat_and_thread(chat.id, thread_id):
if scam == "yes":
try:
# Банім карыстальніка
await BOT.ban_chat_member(chat.id, user.id)
# Выдаляем ягонае паведамленне
await msg.delete()
# Паведамляем у чат
await msg.reply_text(
f"Карыстальнік {get_user_mention(user)} забанены за махлярства і паведамленне выдалена.",
parse_mode=ParseMode.HTML
)
# Інфарміруем уладальніка
info = (
f"[Анты-Махлярства]\n"
f"Дата: {msg.date.strftime('%Y-%m-%d %H:%M:%S')}\n"
f"Група: {chat.title} ({get_chat_link(chat)})\n"
f"Карыстальнік: {get_user_mention(user)}\n"
f"Мова: {lang}\n"
f"Тэкст: {text}\n"
f"Статус: махлярскае"
)
try:
chat_to = int(owner)
except ValueError:
chat_to = owner if owner.startswith('@') else f"@{owner}"
logging.info(f"Scam report: OWNER_ID={owner!r}, chat_to={chat_to!r}")
await BOT.send_message(chat_id=chat_to, text=info, parse_mode=ParseMode.HTML)
except Exception as e:
logging.error(f"Error handling scam: {e}")
elif lang == "ru":
try:
# Выдаляем рускі тэкст
await msg.delete()
logging.info(f"Выдаліў рускі тэкст ад {get_user_mention(user)}")
except Exception as e:
logging.error(f"Delete error: {e}")