TelegramGuard / bot.py
ilang-ai
fix: webhook mode starts listening immediately, no AI test blocking
57a8795
import logging
import sys
import os
import time as _time
import hashlib as _hashlib
import asyncio
import threading
from http.server import HTTPServer, BaseHTTPRequestHandler
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup
from telegram.ext import (
Application, CommandHandler, MessageHandler,
CallbackQueryHandler, ChatMemberHandler, filters, ContextTypes
)
import config
from modules.db import db_exec
from modules.database import init_db
from modules.chat import (
ai_text, ai_vision, ai_voice, ai_group_reply,
ai_judge_group_message, ai_judge_group_image, ai_group_vision,
GROUP_WELCOME
)
from modules.admin import is_admin, is_bot_admin, register_group
from modules.prefilter import prefilter
logging.basicConfig(
format="%(asctime)s [%(levelname)s] %(message)s",
level=logging.INFO
)
logger = logging.getLogger(__name__)
TOS_TEXT = (
"I-Lang Guard Terms of Service\n\n"
"This bot provides:\n"
"- Auto spam detection and cleanup\n"
"- AI-powered message analysis\n\n"
"Usage:\n"
"- Bot analyzes group messages for spam detection\n"
"- No personal data is stored\n"
"- Admins can remove the bot at any time\n\n"
"Group admins: tap the button below to accept"
)
async def check_tos(chat_id):
async def _do(db):
cur = await db.execute("SELECT 1 FROM tos_consent WHERE chat_id=?", (chat_id,))
return await cur.fetchone() is not None
return await db_exec(_do)
async def record_tos(chat_id, user_id):
async def _do(db):
await db.execute(
"INSERT OR REPLACE INTO tos_consent (chat_id, accepted_by) VALUES (?, ?)",
(chat_id, user_id)
)
await db.commit()
await db_exec(_do)
async def delete_tos(chat_id):
async def _do(db):
await db.execute("DELETE FROM tos_consent WHERE chat_id=?", (chat_id,))
await db.commit()
await db_exec(_do)
def _ctx_info(context):
parts = []
history = context.user_data.get("history", [])
if not history:
parts.append("NEW_SESSION:New conversation, greet casually, ask what they need")
return " | ".join(parts)
async def _handle_ai_result(intent, device, reply, msg, user_id, context):
history = context.user_data.setdefault("history", [])
await msg.reply_text(reply)
history.append({"role": "assistant", "text": reply})
if len(history) > 20:
history[:] = history[-20:]
# ==================== Commands ====================
async def cmd_start(update: Update, context: ContextTypes.DEFAULT_TYPE):
if not config.ADMIN_USER_ID:
config.ADMIN_USER_ID = update.effective_user.id
if update.effective_chat.type == "private":
context.user_data["history"] = []
intent, device, reply = await ai_text(
"/start",
history=None,
context_info="NEW_SESSION:User just opened chat, greet briefly, say what you can do"
)
await update.message.reply_text(reply)
context.user_data.setdefault("history", []).append({"role": "assistant", "text": reply})
else:
chat_id = update.effective_chat.id
await register_group(chat_id, update.effective_chat.title)
if not await check_tos(chat_id):
keyboard = InlineKeyboardMarkup([
[InlineKeyboardButton("Accept & Enable", callback_data="tos_accept_" + str(chat_id))],
[InlineKeyboardButton("Decline", callback_data="tos_decline_" + str(chat_id))]
])
await update.message.reply_text(TOS_TEXT, reply_markup=keyboard)
else:
await update.message.reply_text(GROUP_WELCOME)
async def cmd_help(update: Update, context: ContextTypes.DEFAULT_TYPE):
if update.effective_chat.type == "private":
await update.message.reply_text(
"Just talk to me, no commands needed\n\n"
"Group admin → Add me to a group, give me admin permissions\n"
"Anything else → Just chat"
)
else:
await update.message.reply_text(
"I work automatically in groups. No config needed.\n\nAdmin commands:\n/ban — Reply to a message to ban the user"
)
async def cmd_ban(update: Update, context: ContextTypes.DEFAULT_TYPE):
if update.effective_chat.type == "private" or not update.message.reply_to_message:
return
if not await is_admin(update, context):
return
try:
t = update.message.reply_to_message.from_user
await context.bot.ban_chat_member(update.effective_chat.id, t.id)
await update.message.reply_text("done")
except Exception as e:
await update.message.reply_text(str(e))
# ==================== Group ====================
async def handle_group_message(update: Update, context: ContextTypes.DEFAULT_TYPE):
msg = update.message
if not msg:
return
chat_id = msg.chat.id
user = msg.from_user
if not user:
return
text = msg.text or msg.caption or ""
await register_group(chat_id, msg.chat.title)
tos_ok = await check_tos(chat_id)
# @mention or reply to bot
is_mention = text and context.bot.username and ("@" + context.bot.username) in text
is_reply_to_bot = msg.reply_to_message and msg.reply_to_message.from_user and msg.reply_to_message.from_user.id == context.bot.id
has_media = bool(msg.photo or msg.video or msg.document)
if (text or has_media) and (is_mention or is_reply_to_bot):
clean = text.replace("@" + context.bot.username, "").strip() if (text and is_mention) else (text.strip() if text else "")
if not tos_ok:
reply = "I haven't been enabled yet. Ask an admin to tap the Accept & Enable button above."
else:
g_history = context.chat_data.setdefault("group_history", [])
if msg.photo:
try:
f = await context.bot.get_file(msg.photo[-1].file_id)
img_data = bytes(await f.download_as_bytearray())
reply = await ai_group_vision(img_data, caption=clean, history=g_history)
except Exception:
reply = "Couldn't read that image. Try sending another one?"
elif msg.video:
if msg.video.thumbnail:
try:
vf = await context.bot.get_file(msg.video.thumbnail.file_id)
vimg = bytes(await vf.download_as_bytearray())
reply = await ai_group_vision(vimg, caption=clean, history=g_history)
except Exception:
if clean:
g_history.append({"role": "user", "text": "[video] " + clean})
reply = await ai_group_reply("[video] " + clean, g_history)
else:
reply = "Couldn't read the video thumbnail. What's it about?"
elif clean:
g_history.append({"role": "user", "text": "[video] " + clean})
reply = await ai_group_reply("[video] " + clean, g_history)
else:
reply = "Can't process videos directly. What's it about?"
else:
g_history.append({"role": "user", "text": clean})
reply = await ai_group_reply(clean, g_history)
g_history.append({"role": "assistant", "text": reply})
if len(g_history) > 20:
g_history[:] = g_history[-20:]
await msg.reply_text(reply)
return
if not tos_ok:
return
# Admin check
is_admin_user = False
try:
member = await context.bot.get_chat_member(chat_id, user.id)
if member.status in ("administrator", "creator"):
is_admin_user = True
except Exception:
pass
# Track recent messages: (msg_id, content_hash, timestamp)
user_msgs = context.chat_data.setdefault("user_recent_msgs", {})
uid = user.id
if uid not in user_msgs:
user_msgs[uid] = []
content_hash = _hashlib.md5(text.encode()).hexdigest() if text else ""
now = _time.time()
user_msgs[uid].append((msg.message_id, content_hash, now))
if len(user_msgs[uid]) > 20:
user_msgs[uid] = user_msgs[uid][-20:]
# Admin bypass
if is_admin_user:
return
# Duplicate message detection
spam = False
if content_hash:
recent_same = [
e for e in user_msgs[uid]
if e[1] == content_hash and (now - e[2]) < config.SPAM_REPEAT_WINDOW
]
if len(recent_same) >= config.SPAM_REPEAT_THRESHOLD:
spam = True
logger.info("REPEAT SPAM: user=" + str(uid) + " chat=" + str(chat_id) + " count=" + str(len(recent_same)))
# Pre-filter: keyword/regex/forward/new-account (zero API cost)
if not spam:
verdict = prefilter(msg, user, text)
if verdict == "spam":
spam = True
elif verdict == "ai":
# Needs AI analysis (API budget available)
if msg.photo:
try:
f = await context.bot.get_file(msg.photo[-1].file_id)
data = bytes(await f.download_as_bytearray())
spam = await ai_judge_group_image(data, text)
except Exception:
if text:
spam = await ai_judge_group_message(text)
elif msg.video:
if msg.video.thumbnail:
try:
vf = await context.bot.get_file(msg.video.thumbnail.file_id)
vdata = bytes(await vf.download_as_bytearray())
spam = await ai_judge_group_image(vdata, text)
except Exception:
if text:
spam = await ai_judge_group_message(text)
elif text:
spam = await ai_judge_group_message(text)
elif msg.forward_date:
spam = True
elif msg.document or msg.sticker:
if text:
spam = await ai_judge_group_message(text)
elif msg.forward_date:
spam = True
elif text:
spam = await ai_judge_group_message(text)
# verdict == "clean" → skip AI, let it through
if spam:
try:
tasks = []
for entry in user_msgs.get(uid, []):
mid = entry[0] if isinstance(entry, tuple) else entry
tasks.append(context.bot.delete_message(chat_id, mid))
tasks.append(context.bot.ban_chat_member(chat_id, uid))
results = await asyncio.gather(*tasks, return_exceptions=True)
user_msgs.pop(uid, None)
logger.info("SPAM nuked: user=" + str(uid) + " chat=" + str(chat_id) + " tasks=" + str(len(tasks)))
except Exception as e:
logger.warning("Anti-spam action failed: " + str(e))
last_remind = context.bot_data.get("perm_remind_" + str(chat_id), 0)
if _time.time() - last_remind > 3600:
context.bot_data["perm_remind_" + str(chat_id)] = _time.time()
try:
await msg.reply_text(
"\u26a0\ufe0f Detected spam but I don't have permissions to act.\n\n"
"Tap group name → Admins → Add Admin → Find me → Enable Delete Messages and Ban Users → Done"
)
except Exception:
pass
return
# ==================== Private ====================
async def handle_private_text(update: Update, context: ContextTypes.DEFAULT_TYPE):
msg = update.message
if not msg or not msg.text:
return
user_id = msg.from_user.id
history = context.user_data.setdefault("history", [])
history.append({"role": "user", "text": msg.text})
intent, device, reply = await ai_text(msg.text, history, _ctx_info(context))
await _handle_ai_result(intent, device, reply, msg, user_id, context)
async def handle_private_photo(update: Update, context: ContextTypes.DEFAULT_TYPE):
msg = update.message
if not msg or not msg.photo:
return
user_id = msg.from_user.id
history = context.user_data.setdefault("history", [])
caption = msg.caption or ""
history.append({"role": "user", "text": "[photo] " + caption if caption else "[photo]"})
try:
file = await context.bot.get_file(msg.photo[-1].file_id)
img_bytes = bytes(await file.download_as_bytearray())
except Exception:
await msg.reply_text("Didn't receive that image. Try again?")
return
intent, device, reply = await ai_vision(img_bytes, caption, history, _ctx_info(context))
await _handle_ai_result(intent, device, reply, msg, user_id, context)
async def handle_private_voice(update: Update, context: ContextTypes.DEFAULT_TYPE):
msg = update.message
if not msg or not msg.voice:
return
user_id = msg.from_user.id
history = context.user_data.setdefault("history", [])
history.append({"role": "user", "text": "[voice]"})
try:
file = await context.bot.get_file(msg.voice.file_id)
audio_bytes = bytes(await file.download_as_bytearray())
mime = msg.voice.mime_type or "audio/ogg"
except Exception:
await msg.reply_text("Didn't catch that voice message. Try again or just type it out.")
return
intent, device, reply = await ai_voice(audio_bytes, mime, history, _ctx_info(context))
await _handle_ai_result(intent, device, reply, msg, user_id, context)
# ==================== Events ====================
async def handle_my_chat_member(update: Update, context: ContextTypes.DEFAULT_TYPE):
result = update.my_chat_member
if not result:
return
chat_id = result.chat.id
old = result.old_chat_member.status if result.old_chat_member else "left"
new = result.new_chat_member.status if result.new_chat_member else "left"
if old in ("left", "kicked") and new in ("member", "administrator"):
await delete_tos(chat_id)
await register_group(chat_id, result.chat.title)
keyboard = InlineKeyboardMarkup([
[InlineKeyboardButton("Accept & Enable", callback_data="tos_accept_" + str(chat_id))],
[InlineKeyboardButton("Decline", callback_data="tos_decline_" + str(chat_id))]
])
await context.bot.send_message(chat_id, TOS_TEXT, reply_markup=keyboard)
elif old in ("member", "administrator") and new in ("left", "kicked"):
await delete_tos(chat_id)
async def handle_tos_callback(update: Update, context: ContextTypes.DEFAULT_TYPE):
query = update.callback_query
if not query or not query.data:
return
is_accept = query.data.startswith("tos_accept_")
is_decline = query.data.startswith("tos_decline_")
if not is_accept and not is_decline:
return
chat_id = query.message.chat.id
user_id = query.from_user.id
try:
admins = await context.bot.get_chat_administrators(chat_id)
admin_ids = [a.user.id for a in admins]
if user_id not in admin_ids:
await query.answer("Only admins can do this", show_alert=True)
return
except Exception:
pass
if is_accept:
await record_tos(chat_id, user_id)
await query.answer("Enabled")
has_perms = False
try:
bot_member = await context.bot.get_chat_member(chat_id, context.bot.id)
if hasattr(bot_member, 'can_delete_messages') and bot_member.can_delete_messages and hasattr(bot_member, 'can_restrict_members') and bot_member.can_restrict_members:
has_perms = True
except Exception:
pass
if has_perms:
await query.edit_message_text("I-Lang Guard enabled ✅\n\nSpam cleanup is now automatic.")
else:
await query.edit_message_text(
"I-Lang Guard enabled\n\n"
"⚠️ I need admin permissions to work:\n\n"
"1. Tap the group name\n"
"2. Tap Administrators\n"
"3. Tap Add Admin\n"
"4. Find I-Lang Guard\n"
"5. Enable Delete Messages and Ban Users\n"
"6. Tap Done"
)
else:
await query.answer("OK, goodbye")
await query.edit_message_text("Left the group")
await context.bot.leave_chat(chat_id)
# ==================== Health Check (HF Space) ====================
class HealthHandler(BaseHTTPRequestHandler):
def do_GET(self):
self.send_response(200)
self.end_headers()
self.wfile.write(b"OK")
def log_message(self, *args):
pass
def start_health_server():
port = int(os.environ.get("PORT", 7860))
server = HTTPServer(("0.0.0.0", port), HealthHandler)
thread = threading.Thread(target=server.serve_forever, daemon=True)
thread.start()
logger.info("Health check on port " + str(port))
# ==================== Main ====================
def main():
# Ensure data directory exists
db_dir = os.path.dirname(config.DB_PATH)
if db_dir:
os.makedirs(db_dir, exist_ok=True)
app = Application.builder().token(config.BOT_TOKEN).connect_timeout(30).read_timeout(30).write_timeout(30).pool_timeout(30).build()
for cmd, fn in [
("start", cmd_start), ("help", cmd_help), ("ban", cmd_ban),
]:
app.add_handler(CommandHandler(cmd, fn))
app.add_handler(ChatMemberHandler(handle_my_chat_member, ChatMemberHandler.MY_CHAT_MEMBER))
app.add_handler(CallbackQueryHandler(handle_tos_callback, pattern="^tos_"))
# Group
app.add_handler(MessageHandler(
(filters.TEXT | filters.PHOTO | filters.VIDEO | filters.Document.ALL | filters.Sticker.ALL) & filters.ChatType.GROUPS & ~filters.COMMAND,
handle_group_message
))
# Private
app.add_handler(MessageHandler(filters.TEXT & filters.ChatType.PRIVATE & ~filters.COMMAND, handle_private_text))
app.add_handler(MessageHandler(filters.PHOTO & filters.ChatType.PRIVATE, handle_private_photo))
app.add_handler(MessageHandler(filters.VOICE & filters.ChatType.PRIVATE, handle_private_voice))
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(init_db())
# Detect mode: webhook (Cloud Run / Railway) or polling (VPS)
webhook_url = os.environ.get("WEBHOOK_URL", "")
port = int(os.environ.get("PORT", 8080))
if webhook_url:
# Webhook mode: start listening IMMEDIATELY (Cloud Run health check is strict)
logger.info("I-Lang Guard starting (webhook: " + webhook_url + ")")
app.run_webhook(
listen="0.0.0.0",
port=port,
url_path="webhook",
webhook_url=webhook_url + "/webhook",
drop_pending_updates=True,
allowed_updates=["message", "callback_query", "my_chat_member"],
)
else:
# Polling mode: run AI test first, then start
async def _test_ai():
try:
from modules.chat import model, _safe_text
r = await model.generate_content_async("Say hi in one word. JSON: {\"intent\":\"chat\",\"reply\":\"hi\"}", safety_settings=[
{"category": "HARM_CATEGORY_HARASSMENT", "threshold": "BLOCK_NONE"},
{"category": "HARM_CATEGORY_HATE_SPEECH", "threshold": "BLOCK_NONE"},
{"category": "HARM_CATEGORY_SEXUALLY_EXPLICIT", "threshold": "BLOCK_NONE"},
{"category": "HARM_CATEGORY_DANGEROUS_CONTENT", "threshold": "BLOCK_NONE"},
])
text = _safe_text(r)
if text:
logger.info("AI startup test OK: " + text[:100])
else:
logger.error("AI startup test FAILED: empty response")
except Exception as e:
logger.error("AI startup test EXCEPTION: " + str(e))
loop.run_until_complete(_test_ai())
start_health_server()
logger.info("I-Lang Guard starting (polling mode)")
app.run_polling(
drop_pending_updates=True,
allowed_updates=["message", "callback_query", "my_chat_member"],
bootstrap_retries=10
)
if __name__ == "__main__":
main()