Spaces:
Runtime error
Runtime error
| import asyncio, json, os, uuid, shutil, base64 | |
| from fastapi import FastAPI, WebSocket, WebSocketDisconnect, BackgroundTasks, UploadFile, Form | |
| from fastapi.responses import FileResponse | |
| from pydantic import BaseModel | |
| from pathlib import Path | |
| from backend.smart_browser_controller import SmartBrowserController # Updated import | |
| from backend.proxy_manager import SmartProxyManager # Updated import | |
| from backend.agent import run_agent | |
| from backend.database import db # Database integration | |
| from backend.telegram_bot import bot_notifier, start_bot # Telegram integration | |
| from fastapi.staticfiles import StaticFiles | |
| from fastapi.middleware.cors import CORSMiddleware | |
| app = FastAPI() | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], # TODO add specific origins in production | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| tasks = {} # job_id β async.Task | |
| ws_subscribers = {} # job_id β { websocket, β¦ } | |
| streaming_sessions = {} # job_id β browser_controller | |
| job_info = {} # job_id β { format, content_type, extension, prompt } | |
| # Initialize global smart proxy manager | |
| smart_proxy_manager = SmartProxyManager() | |
| OUTPUT_DIR = Path("outputs") | |
| OUTPUT_DIR.mkdir(exist_ok=True) | |
| class JobRequest(BaseModel): | |
| prompt: str | |
| format: str = "txt" # txt | md | json | html | csv | pdf | |
| headless: bool = False | |
| enable_streaming: bool = False | |
| async def store_job_info(job_id: str, info: dict): | |
| """Store job information for later retrieval""" | |
| job_info[job_id] = info | |
| print(f"π Stored job info for {job_id}: {info}") | |
| async def create_job(req: JobRequest): | |
| # Validate format | |
| valid_formats = ["txt", "md", "json", "html", "csv", "pdf"] | |
| if req.format not in valid_formats: | |
| print(f"β οΈ Invalid format '{req.format}', defaulting to 'txt'") | |
| req.format = "txt" | |
| job_id = str(uuid.uuid4()) | |
| # Use smart proxy manager to get the best available proxy | |
| proxy_info = smart_proxy_manager.get_best_proxy() | |
| proxy = proxy_info.to_playwright_dict() if proxy_info else None | |
| proxy_server = proxy.get("server", "None") if proxy else "None" | |
| print(f"π Creating smart job {job_id}") | |
| print(f"π Goal: {req.prompt}") | |
| print(f"π Format: {req.format}") | |
| print(f"π₯οΈ Headless: {req.headless}") | |
| print(f"π‘ Streaming: {req.enable_streaming}") | |
| print(f"π Selected proxy: {proxy_server}") | |
| # Get initial proxy stats | |
| proxy_stats = smart_proxy_manager.get_proxy_stats() | |
| print(f"π Proxy pool stats: {proxy_stats}") | |
| # Store job in database | |
| await db.create_job( | |
| job_id=job_id, | |
| prompt=req.prompt, | |
| format=req.format, | |
| headless=req.headless, | |
| streaming_enabled=req.enable_streaming, | |
| proxy_server=proxy_server | |
| ) | |
| # Send Telegram notification | |
| asyncio.create_task(bot_notifier.notify_job_started(job_id, req.prompt, req.format)) | |
| # Create the agent task | |
| coro = run_agent(job_id, req.prompt, req.format, req.headless, proxy, req.enable_streaming) | |
| task = asyncio.create_task(coro) | |
| # Add callback to notify when done | |
| def on_task_done(fut): | |
| try: | |
| result = fut.result() | |
| # Job completed successfully | |
| download_url = f"/download/{job_id}" | |
| asyncio.create_task(bot_notifier.notify_job_completed(job_id, req.format, download_url)) | |
| except Exception as e: | |
| # Job failed | |
| asyncio.create_task(bot_notifier.notify_job_failed(job_id, str(e))) | |
| task.add_done_callback(on_task_done) | |
| tasks[job_id] = task | |
| response = { | |
| "job_id": job_id, | |
| "format": req.format, | |
| "proxy_stats": proxy_stats | |
| } | |
| if req.enable_streaming: | |
| response["streaming_enabled"] = True | |
| response["stream_url"] = f"ws://localhost:8000/stream/{job_id}" | |
| return response | |
| async def job_ws(ws: WebSocket, job_id: str): | |
| await ws.accept() | |
| ws_subscribers.setdefault(job_id, set()).add(ws) | |
| # Send streaming info if available | |
| if job_id in streaming_sessions: | |
| browser_ctrl = streaming_sessions[job_id] | |
| stream_info = browser_ctrl.get_streaming_info() | |
| await ws.send_text(json.dumps({ | |
| "type": "streaming_info", | |
| "streaming": stream_info | |
| })) | |
| # Send initial proxy stats | |
| proxy_stats = smart_proxy_manager.get_proxy_stats() | |
| await ws.send_text(json.dumps({ | |
| "type": "proxy_stats", | |
| "stats": proxy_stats | |
| })) | |
| try: | |
| while True: | |
| await ws.receive_text() # keep connection alive | |
| except WebSocketDisconnect: | |
| ws_subscribers[job_id].discard(ws) | |
| async def stream_ws(websocket: WebSocket, job_id: str): | |
| """WebSocket endpoint for real-time browser streaming""" | |
| await websocket.accept() | |
| # Wait for streaming session to be available (with timeout) | |
| max_wait = 30 # seconds | |
| wait_time = 0 | |
| while job_id not in streaming_sessions and wait_time < max_wait: | |
| await asyncio.sleep(0.5) | |
| wait_time += 0.5 | |
| if job_id not in streaming_sessions: | |
| await websocket.send_text(json.dumps({ | |
| "type": "error", | |
| "message": "Streaming session not available - job may not have streaming enabled" | |
| })) | |
| await websocket.close() | |
| return | |
| browser_ctrl = streaming_sessions[job_id] | |
| browser_ctrl.add_stream_client(websocket) | |
| # Send initial connection confirmation | |
| await websocket.send_text(json.dumps({ | |
| "type": "connected", | |
| "message": "Connected to browser stream", | |
| "streaming_active": browser_ctrl.streaming_active | |
| })) | |
| try: | |
| while True: | |
| try: | |
| message = await websocket.receive_text() | |
| data = json.loads(message) | |
| if data['type'] == 'mouse': | |
| await browser_ctrl.handle_mouse_event(data) | |
| elif data['type'] == 'keyboard': | |
| await browser_ctrl.handle_keyboard_event(data) | |
| elif data['type'] == 'ping': | |
| await websocket.send_text(json.dumps({"type": "pong"})) | |
| except asyncio.TimeoutError: | |
| await websocket.send_text(json.dumps({"type": "ping"})) | |
| except WebSocketDisconnect: | |
| browser_ctrl.remove_stream_client(websocket) | |
| print(f"Stream client disconnected from job {job_id}") | |
| except Exception as e: | |
| print(f"Error in stream WebSocket: {e}") | |
| browser_ctrl.remove_stream_client(websocket) | |
| async def create_streaming_session(job_id: str): | |
| """Create a streaming session without starting a job""" | |
| if job_id in streaming_sessions: | |
| browser_ctrl = streaming_sessions[job_id] | |
| return browser_ctrl.get_streaming_info() | |
| try: | |
| # Get best available proxy for streaming session | |
| proxy_info = smart_proxy_manager.get_best_proxy() | |
| proxy = proxy_info.to_playwright_dict() if proxy_info else None | |
| print(f"π₯ Creating streaming session with proxy: {proxy.get('server', 'None') if proxy else 'None'}") | |
| # Create smart browser controller with streaming enabled | |
| browser_ctrl = SmartBrowserController(headless=False, proxy=proxy, enable_streaming=True) | |
| await browser_ctrl.__aenter__() | |
| await browser_ctrl.start_streaming(quality=80) | |
| streaming_sessions[job_id] = browser_ctrl | |
| stream_info = browser_ctrl.get_streaming_info() | |
| # Add proxy information to stream info | |
| stream_info["proxy_info"] = { | |
| "current_proxy": proxy.get("server", "None") if proxy else "None", | |
| "proxy_stats": smart_proxy_manager.get_proxy_stats() | |
| } | |
| # Broadcast to connected clients | |
| await broadcast(job_id, { | |
| "type": "streaming_info", | |
| "streaming": stream_info | |
| }) | |
| return stream_info | |
| except Exception as e: | |
| print(f"β Failed to create streaming session: {e}") | |
| return {"enabled": False, "error": str(e)} | |
| async def get_streaming_info(job_id: str): | |
| """Get streaming connection information for a job""" | |
| if job_id in streaming_sessions: | |
| browser_ctrl = streaming_sessions[job_id] | |
| stream_info = browser_ctrl.get_streaming_info() | |
| # Add current proxy stats | |
| stream_info["proxy_stats"] = smart_proxy_manager.get_proxy_stats() | |
| return stream_info | |
| return {"enabled": False, "error": "Streaming not enabled for this job"} | |
| async def cleanup_streaming(job_id: str): | |
| """Clean up streaming session for a job""" | |
| if job_id in streaming_sessions: | |
| browser_ctrl = streaming_sessions[job_id] | |
| try: | |
| await browser_ctrl.__aexit__(None, None, None) | |
| except Exception as e: | |
| print(f"Error cleaning up streaming session: {e}") | |
| finally: | |
| del streaming_sessions[job_id] | |
| return {"message": "Streaming session cleaned up"} | |
| return {"message": "No streaming session found"} | |
| def download(job_id: str): | |
| """Enhanced download endpoint that handles all file formats""" | |
| print(f"π₯ Download request for job {job_id}") | |
| # Get job information | |
| if job_id in job_info: | |
| info = job_info[job_id] | |
| extension = info.get("extension", "output") | |
| content_type = info.get("content_type", "application/octet-stream") | |
| format_name = info.get("format", "unknown") | |
| print(f"π Job info found: {info}") | |
| else: | |
| # Fallback for jobs without stored info | |
| extension = "output" | |
| content_type = "application/octet-stream" | |
| format_name = "unknown" | |
| print(f"β οΈ No job info found for {job_id}, using fallback") | |
| # Try to find the file with proper extension first | |
| file_path = OUTPUT_DIR / f"{job_id}.{extension}" | |
| if not file_path.exists(): | |
| # Fallback: try common extensions | |
| for fallback_ext in ['txt', 'pdf', 'csv', 'json', 'html', 'md', 'output']: | |
| fallback_path = OUTPUT_DIR / f"{job_id}.{fallback_ext}" | |
| if fallback_path.exists(): | |
| file_path = fallback_path | |
| extension = fallback_ext | |
| print(f"π Found file with fallback extension: {file_path}") | |
| break | |
| if not file_path.exists(): | |
| print(f"β File not found: {file_path}") | |
| from fastapi import HTTPException | |
| raise HTTPException(status_code=404, detail="File not found") | |
| # Generate appropriate filename | |
| safe_filename = f"extracted_data_{job_id}.{extension}" | |
| print(f"β Serving file: {file_path}") | |
| print(f"π Content-Type: {content_type}") | |
| print(f"π Filename: {safe_filename}") | |
| # Serve file with proper content type and filename | |
| return FileResponse( | |
| path=file_path, | |
| filename=safe_filename, | |
| media_type=content_type, | |
| headers={ | |
| "Content-Disposition": f"attachment; filename={safe_filename}", | |
| "X-File-Format": format_name, | |
| "X-Original-Extension": extension | |
| } | |
| ) | |
| def get_job_info(job_id: str): | |
| """Get job information including format and status""" | |
| if job_id in job_info: | |
| info = job_info[job_id].copy() | |
| # Add file existence check | |
| extension = info.get("extension", "output") | |
| file_path = OUTPUT_DIR / f"{job_id}.{extension}" | |
| info["file_exists"] = file_path.exists() | |
| info["file_path"] = str(file_path) if file_path.exists() else None | |
| # Add current proxy stats | |
| info["proxy_stats"] = smart_proxy_manager.get_proxy_stats() | |
| return info | |
| else: | |
| return {"error": "Job not found", "job_id": job_id} | |
| def get_proxy_stats(): | |
| """Get current proxy pool statistics""" | |
| stats = smart_proxy_manager.get_proxy_stats() | |
| return { | |
| "proxy_stats": stats, | |
| "timestamp": asyncio.get_event_loop().time() | |
| } | |
| def reload_proxies(): | |
| """Reload proxy list from environment""" | |
| try: | |
| global smart_proxy_manager | |
| smart_proxy_manager = SmartProxyManager() | |
| stats = smart_proxy_manager.get_proxy_stats() | |
| return { | |
| "success": True, | |
| "message": "Proxy list reloaded successfully", | |
| "proxy_stats": stats | |
| } | |
| except Exception as e: | |
| return { | |
| "success": False, | |
| "message": f"Failed to reload proxies: {str(e)}" | |
| } | |
| async def get_all_jobs(limit: int = 50, offset: int = 0): | |
| """Get all jobs from database with pagination""" | |
| jobs = await db.get_all_jobs(limit, offset) | |
| stats = await db.get_job_stats() | |
| return { | |
| "jobs": jobs, | |
| "stats": stats, | |
| "pagination": { | |
| "limit": limit, | |
| "offset": offset | |
| } | |
| } | |
| async def get_job(job_id: str): | |
| """Get detailed job information from database""" | |
| job = await db.get_job(job_id) | |
| if job: | |
| # Add file existence check | |
| extension = job.get("file_extension", "output") | |
| file_path = OUTPUT_DIR / f"{job_id}.{extension}" | |
| job["file_exists"] = file_path.exists() | |
| job["file_path"] = str(file_path) if file_path.exists() else None | |
| return {"job": job} | |
| else: | |
| return {"error": "Job not found", "job_id": job_id} | |
| async def delete_job(job_id: str): | |
| """Delete a job from database and remove output file""" | |
| # Delete from database | |
| success = await db.delete_job(job_id) | |
| # Also delete output file if exists | |
| job = await db.get_job(job_id) | |
| if job: | |
| extension = job.get("file_extension", "output") | |
| file_path = OUTPUT_DIR / f"{job_id}.{extension}" | |
| if file_path.exists(): | |
| file_path.unlink() | |
| if success: | |
| return {"message": f"Job {job_id} deleted successfully"} | |
| else: | |
| return {"error": "Failed to delete job"} | |
| async def get_system_stats(): | |
| """Get overall system statistics from database""" | |
| db_stats = await db.get_job_stats() | |
| proxy_stats = smart_proxy_manager.get_proxy_stats() | |
| return { | |
| "database": db_stats, | |
| "proxy": proxy_stats, | |
| "active_jobs": len(tasks), | |
| "active_streams": len(streaming_sessions) | |
| } | |
| app.mount("/", StaticFiles(directory="frontend", html=True), name="static") | |
| # Helper functions | |
| async def broadcast(job_id: str, msg: dict): | |
| """Broadcast message to all subscribers of a job""" | |
| if job_id in ws_subscribers: | |
| for ws in list(ws_subscribers[job_id]): | |
| try: | |
| await ws.send_text(json.dumps(msg)) | |
| except: | |
| ws_subscribers[job_id].discard(ws) | |
| async def register_streaming_session(job_id: str, browser_ctrl): | |
| """Register streaming session information""" | |
| streaming_sessions[job_id] = browser_ctrl | |
| if browser_ctrl.enable_streaming: | |
| await browser_ctrl.start_streaming(quality=80) | |
| stream_info = browser_ctrl.get_streaming_info() | |
| await broadcast(job_id, { | |
| "type": "streaming_info", | |
| "streaming": stream_info | |
| }) | |
| # Cleanup on shutdown | |
| async def cleanup(): | |
| """Cleanup resources on shutdown""" | |
| print("π§Ή Cleaning up resources...") | |
| # Cleanup streaming sessions | |
| for job_id, browser_ctrl in streaming_sessions.items(): | |
| try: | |
| await browser_ctrl.__aexit__(None, None, None) | |
| print(f"β Cleaned up streaming session: {job_id}") | |
| except Exception as e: | |
| print(f"β Error cleaning up session {job_id}: {e}") | |
| streaming_sessions.clear() | |
| job_info.clear() | |
| # Disconnect database | |
| await db.disconnect() | |
| # Print final proxy stats | |
| final_stats = smart_proxy_manager.get_proxy_stats() | |
| print(f"π Final proxy stats: {final_stats}") | |
| print("β Cleanup completed") | |
| async def startup(): | |
| """Initialize database connection and Telegram bot on startup""" | |
| print("π Starting up BrowserPilot...") | |
| await db.connect() | |
| # Start Telegram bot in background | |
| asyncio.create_task(start_bot()) | |
| print("β Startup completed") | |