"""
Telegram Bot Integration for BrowserPilot
- Job completion notifications
- Remote control commands
- Keepalive alerts
"""
import os
import asyncio
from typing import Optional
from telegram import Bot, Update
from telegram.ext import Application, CommandHandler, MessageHandler, filters, ContextTypes
class TelegramNotifier:
def __init__(self):
self.token = os.getenv("TELEGRAM_BOT_TOKEN")
self.chat_id = os.getenv("TELEGRAM_CHAT_ID")
self.bot: Optional[Bot] = None
self.app = None
self._initialized = False
async def initialize(self):
"""Initialize bot"""
if not self.token or not self.chat_id:
print("⚠️ Telegram not configured (missing TOKEN or CHAT_ID)")
return
try:
self.bot = Bot(token=self.token)
await self.bot.get_me()
self._initialized = True
print(f"✅ Telegram bot initialized: @{self.bot.username}")
except Exception as e:
print(f"❌ Telegram init failed: {e}")
self._initialized = False
async def send_message(self, message: str, parse_mode: str = "HTML"):
"""Send message to configured chat"""
if not self._initialized:
return
try:
await self.bot.send_message(
chat_id=self.chat_id,
text=message,
parse_mode=parse_mode
)
except Exception as e:
print(f"❌ Failed to send Telegram message: {e}")
async def notify_job_started(self, job_id: str, prompt: str, format: str):
"""Notify when a job starts"""
message = (
"🚀 Job Started\n\n"
f"ID: {job_id}\n"
f"Task: {prompt[:200]}\n"
f"Format: {format}\n\n"
"⏳ Processing..."
)
await self.send_message(message)
async def notify_job_completed(self, job_id: str, format: str, download_url: str):
"""Notify when a job completes"""
message = (
"✅ Job Completed!\n\n"
f"ID: {job_id}\n"
f"Format: {format}\n\n"
f"📥 Download Result"
)
await self.send_message(message)
async def notify_job_failed(self, job_id: str, error: str):
"""Notify when a job fails"""
message = (
"❌ Job Failed\n\n"
f"ID: {job_id}\n"
f"Error: {error[:500]}"
)
await self.send_message(message)
async def notify_keepalive_failed(self, status_code: int):
"""Notify when keepalive check fails"""
message = (
"⚠️ KeepAlive Alert\n\n"
"🔴 HF Space health check failed!\n"
f"Status: {status_code}\n\n"
"The Space might be sleeping or down."
)
await self.send_message(message)
async def notify_keepalive_restored(self):
"""Notify when keepalive check succeeds after failure"""
message = (
"✅ KeepAlive Restored\n\n"
"🟢 HF Space is back online!\n\n"
"Health check passed."
)
await self.send_message(message)
# Command handlers for bot control
async def start_command(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Handle /start command"""
await update.message.reply_text(
"🤖 BrowserPilot Bot\n\n"
"Commands:\n"
"/start - Show this help\n"
"/status - Check system status\n"
"/jobs - List recent jobs\n"
"/ping - Check if bot is alive\n\n"
"To create a job, send a message with your task."
)
async def ping_command(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Handle /ping command"""
await update.message.reply_text("🟢 Bot is alive!")
async def status_command(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Handle /status command"""
from backend.main import smart_proxy_manager, tasks, streaming_sessions
proxy_stats = smart_proxy_manager.get_proxy_stats()
message = (
"📊 System Status\n\n"
f"Active Jobs: {len(tasks)}\n"
f"Active Streams: {len(streaming_sessions)}\n"
f"Proxies Available: {proxy_stats.get('available', 0)}/{proxy_stats.get('total', 0)}\n\n"
f"Uptime: Running"
)
await update.message.reply_text(message)
async def jobs_command(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Handle /jobs command"""
from backend.database import db
jobs = await db.get_all_jobs(limit=5)
if not jobs:
await update.message.reply_text("📋 No jobs found.")
return
message = "📋 Recent Jobs\n\n"
for job in jobs[:5]:
status_emoji = {"completed": "✅", "failed": "❌", "running": "🔄"}.get(job.get("status"), "⏳")
message += (
f"{status_emoji} {job.get('id', 'unknown')[:8]}\n"
f" {job.get('prompt', 'No prompt')[:50]}...\n"
f" Format: {job.get('format', 'unknown')} | Status: {job.get('status', 'unknown')}\n\n"
)
await update.message.reply_text(message)
async def handle_message(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Handle text messages as job prompts"""
from backend.main import create_job, JobRequest
prompt = update.message.text
if not prompt:
return
# Create a job request
req = JobRequest(prompt=prompt, format="json", headless=True, enable_streaming=False)
# Create the job
try:
result = await create_job(req)
job_id = result["job_id"]
await update.message.reply_text(
f"✅ Job Created!\n\n"
f"ID: {job_id}\n"
f"Task: {prompt[:100]}...\n\n"
"I'll notify you when it's done!"
)
# Also notify via notifier (for consistency)
notifier = TelegramNotifier()
await notifier.initialize()
await notifier.notify_job_started(job_id, prompt, "json")
except Exception as e:
await update.message.reply_text(f"❌ Failed to create job: {str(e)}")
# Global bot instance
bot_notifier = TelegramNotifier()
async def start_bot():
"""Start the Telegram bot"""
await bot_notifier.initialize()
if not bot_notifier._initialized:
print("⚠️ Telegram bot not started (missing credentials)")
return
# Create application
application = Application.builder().token(bot_notifier.token).build()
# Add handlers
application.add_handler(CommandHandler("start", start_command))
application.add_handler(CommandHandler("ping", ping_command))
application.add_handler(CommandHandler("status", status_command))
application.add_handler(CommandHandler("jobs", jobs_command))
application.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, handle_message))
# Start polling
print("🤖 Starting Telegram bot polling...")
await application.start_polling(allowed_updates=Update.ALL_TYPES)