Spaces:
Runtime error
Runtime error
File size: 7,389 Bytes
c5f9050 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 | """
Telegram Bot Integration for BrowserPilot
- Job completion notifications
- Remote control commands
- Keepalive alerts
"""
import os
import asyncio
from typing import Optional
from telegram import Bot, Update
from telegram.ext import Application, CommandHandler, MessageHandler, filters, ContextTypes
class TelegramNotifier:
def __init__(self):
self.token = os.getenv("TELEGRAM_BOT_TOKEN")
self.chat_id = os.getenv("TELEGRAM_CHAT_ID")
self.bot: Optional[Bot] = None
self.app = None
self._initialized = False
async def initialize(self):
"""Initialize bot"""
if not self.token or not self.chat_id:
print("β οΈ Telegram not configured (missing TOKEN or CHAT_ID)")
return
try:
self.bot = Bot(token=self.token)
await self.bot.get_me()
self._initialized = True
print(f"β
Telegram bot initialized: @{self.bot.username}")
except Exception as e:
print(f"β Telegram init failed: {e}")
self._initialized = False
async def send_message(self, message: str, parse_mode: str = "HTML"):
"""Send message to configured chat"""
if not self._initialized:
return
try:
await self.bot.send_message(
chat_id=self.chat_id,
text=message,
parse_mode=parse_mode
)
except Exception as e:
print(f"β Failed to send Telegram message: {e}")
async def notify_job_started(self, job_id: str, prompt: str, format: str):
"""Notify when a job starts"""
message = (
"π <b>Job Started</b>\n\n"
f"<b>ID:</b> <code>{job_id}</code>\n"
f"<b>Task:</b> {prompt[:200]}\n"
f"<b>Format:</b> {format}\n\n"
"β³ Processing..."
)
await self.send_message(message)
async def notify_job_completed(self, job_id: str, format: str, download_url: str):
"""Notify when a job completes"""
message = (
"β
<b>Job Completed!</b>\n\n"
f"<b>ID:</b> <code>{job_id}</code>\n"
f"<b>Format:</b> {format}\n\n"
f"π₯ <a href='{download_url}'>Download Result</a>"
)
await self.send_message(message)
async def notify_job_failed(self, job_id: str, error: str):
"""Notify when a job fails"""
message = (
"β <b>Job Failed</b>\n\n"
f"<b>ID:</b> <code>{job_id}</code>\n"
f"<b>Error:</b> {error[:500]}"
)
await self.send_message(message)
async def notify_keepalive_failed(self, status_code: int):
"""Notify when keepalive check fails"""
message = (
"β οΈ <b>KeepAlive Alert</b>\n\n"
"π΄ HF Space health check failed!\n"
f"<b>Status:</b> {status_code}\n\n"
"The Space might be sleeping or down."
)
await self.send_message(message)
async def notify_keepalive_restored(self):
"""Notify when keepalive check succeeds after failure"""
message = (
"β
<b>KeepAlive Restored</b>\n\n"
"π’ HF Space is back online!\n\n"
"Health check passed."
)
await self.send_message(message)
# Command handlers for bot control
async def start_command(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Handle /start command"""
await update.message.reply_text(
"π€ <b>BrowserPilot Bot</b>\n\n"
"Commands:\n"
"/start - Show this help\n"
"/status - Check system status\n"
"/jobs - List recent jobs\n"
"/ping - Check if bot is alive\n\n"
"To create a job, send a message with your task."
)
async def ping_command(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Handle /ping command"""
await update.message.reply_text("π’ Bot is alive!")
async def status_command(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Handle /status command"""
from backend.main import smart_proxy_manager, tasks, streaming_sessions
proxy_stats = smart_proxy_manager.get_proxy_stats()
message = (
"π <b>System Status</b>\n\n"
f"<b>Active Jobs:</b> {len(tasks)}\n"
f"<b>Active Streams:</b> {len(streaming_sessions)}\n"
f"<b>Proxies Available:</b> {proxy_stats.get('available', 0)}/{proxy_stats.get('total', 0)}\n\n"
f"<b>Uptime:</b> Running"
)
await update.message.reply_text(message)
async def jobs_command(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Handle /jobs command"""
from backend.database import db
jobs = await db.get_all_jobs(limit=5)
if not jobs:
await update.message.reply_text("π No jobs found.")
return
message = "π <b>Recent Jobs</b>\n\n"
for job in jobs[:5]:
status_emoji = {"completed": "β
", "failed": "β", "running": "π"}.get(job.get("status"), "β³")
message += (
f"{status_emoji} <code>{job.get('id', 'unknown')[:8]}</code>\n"
f" {job.get('prompt', 'No prompt')[:50]}...\n"
f" Format: {job.get('format', 'unknown')} | Status: {job.get('status', 'unknown')}\n\n"
)
await update.message.reply_text(message)
async def handle_message(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Handle text messages as job prompts"""
from backend.main import create_job, JobRequest
prompt = update.message.text
if not prompt:
return
# Create a job request
req = JobRequest(prompt=prompt, format="json", headless=True, enable_streaming=False)
# Create the job
try:
result = await create_job(req)
job_id = result["job_id"]
await update.message.reply_text(
f"β
<b>Job Created!</b>\n\n"
f"<b>ID:</b> <code>{job_id}</code>\n"
f"<b>Task:</b> {prompt[:100]}...\n\n"
"I'll notify you when it's done!"
)
# Also notify via notifier (for consistency)
notifier = TelegramNotifier()
await notifier.initialize()
await notifier.notify_job_started(job_id, prompt, "json")
except Exception as e:
await update.message.reply_text(f"β Failed to create job: {str(e)}")
# Global bot instance
bot_notifier = TelegramNotifier()
async def start_bot():
"""Start the Telegram bot"""
await bot_notifier.initialize()
if not bot_notifier._initialized:
print("β οΈ Telegram bot not started (missing credentials)")
return
# Create application
application = Application.builder().token(bot_notifier.token).build()
# Add handlers
application.add_handler(CommandHandler("start", start_command))
application.add_handler(CommandHandler("ping", ping_command))
application.add_handler(CommandHandler("status", status_command))
application.add_handler(CommandHandler("jobs", jobs_command))
application.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, handle_message))
# Start polling
print("π€ Starting Telegram bot polling...")
await application.start_polling(allowed_updates=Update.ALL_TYPES)
|