File size: 16,729 Bytes
c5f9050
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
import asyncio, json, os, uuid, shutil, base64
from fastapi import FastAPI, WebSocket, WebSocketDisconnect, BackgroundTasks, UploadFile, Form
from fastapi.responses import FileResponse
from pydantic import BaseModel
from pathlib import Path
from backend.smart_browser_controller import SmartBrowserController  # Updated import
from backend.proxy_manager import SmartProxyManager  # Updated import
from backend.agent import run_agent
from backend.database import db  # Database integration
from backend.telegram_bot import bot_notifier, start_bot  # Telegram integration
from fastapi.staticfiles import StaticFiles
from fastapi.middleware.cors import CORSMiddleware

app = FastAPI()


app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],  # TODO add specific origins in production
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

tasks = {} # job_id β†’ async.Task
ws_subscribers = {} # job_id β†’ { websocket, … }
streaming_sessions = {} # job_id β†’ browser_controller
job_info = {} # job_id β†’ { format, content_type, extension, prompt }

# Initialize global smart proxy manager
smart_proxy_manager = SmartProxyManager()

OUTPUT_DIR = Path("outputs")
OUTPUT_DIR.mkdir(exist_ok=True)

class JobRequest(BaseModel):
    prompt: str
    format: str = "txt" # txt | md | json | html | csv | pdf
    headless: bool = False
    enable_streaming: bool = False

async def store_job_info(job_id: str, info: dict):
    """Store job information for later retrieval"""
    job_info[job_id] = info
    print(f"πŸ“Š Stored job info for {job_id}: {info}")

@app.post("/job")
async def create_job(req: JobRequest):
    # Validate format
    valid_formats = ["txt", "md", "json", "html", "csv", "pdf"]
    if req.format not in valid_formats:
        print(f"⚠️ Invalid format '{req.format}', defaulting to 'txt'")
        req.format = "txt"

    job_id = str(uuid.uuid4())

    # Use smart proxy manager to get the best available proxy
    proxy_info = smart_proxy_manager.get_best_proxy()
    proxy = proxy_info.to_playwright_dict() if proxy_info else None
    proxy_server = proxy.get("server", "None") if proxy else "None"

    print(f"πŸš€ Creating smart job {job_id}")
    print(f"πŸ“‹ Goal: {req.prompt}")
    print(f"🌐 Format: {req.format}")
    print(f"πŸ–₯️ Headless: {req.headless}")
    print(f"πŸ“‘ Streaming: {req.enable_streaming}")
    print(f"πŸ”„ Selected proxy: {proxy_server}")

    # Get initial proxy stats
    proxy_stats = smart_proxy_manager.get_proxy_stats()
    print(f"πŸ“Š Proxy pool stats: {proxy_stats}")

    # Store job in database
    await db.create_job(
        job_id=job_id,
        prompt=req.prompt,
        format=req.format,
        headless=req.headless,
        streaming_enabled=req.enable_streaming,
        proxy_server=proxy_server
    )

    # Send Telegram notification
    asyncio.create_task(bot_notifier.notify_job_started(job_id, req.prompt, req.format))

    # Create the agent task
    coro = run_agent(job_id, req.prompt, req.format, req.headless, proxy, req.enable_streaming)
    task = asyncio.create_task(coro)
    
    # Add callback to notify when done
    def on_task_done(fut):
        try:
            result = fut.result()
            # Job completed successfully
            download_url = f"/download/{job_id}"
            asyncio.create_task(bot_notifier.notify_job_completed(job_id, req.format, download_url))
        except Exception as e:
            # Job failed
            asyncio.create_task(bot_notifier.notify_job_failed(job_id, str(e)))
    
    task.add_done_callback(on_task_done)
    tasks[job_id] = task

    response = {
        "job_id": job_id,
        "format": req.format,
        "proxy_stats": proxy_stats
    }

    if req.enable_streaming:
        response["streaming_enabled"] = True
        response["stream_url"] = f"ws://localhost:8000/stream/{job_id}"

    return response

@app.websocket("/ws/{job_id}")
async def job_ws(ws: WebSocket, job_id: str):
    await ws.accept()
    ws_subscribers.setdefault(job_id, set()).add(ws)
    
    # Send streaming info if available
    if job_id in streaming_sessions:
        browser_ctrl = streaming_sessions[job_id]
        stream_info = browser_ctrl.get_streaming_info()
        await ws.send_text(json.dumps({
            "type": "streaming_info",
            "streaming": stream_info
        }))
    
    # Send initial proxy stats
    proxy_stats = smart_proxy_manager.get_proxy_stats()
    await ws.send_text(json.dumps({
        "type": "proxy_stats",
        "stats": proxy_stats
    }))
    
    try:
        while True:
            await ws.receive_text() # keep connection alive
    except WebSocketDisconnect:
        ws_subscribers[job_id].discard(ws)

@app.websocket("/stream/{job_id}")
async def stream_ws(websocket: WebSocket, job_id: str):
    """WebSocket endpoint for real-time browser streaming"""
    await websocket.accept()
    
    # Wait for streaming session to be available (with timeout)
    max_wait = 30  # seconds
    wait_time = 0
    while job_id not in streaming_sessions and wait_time < max_wait:
        await asyncio.sleep(0.5)
        wait_time += 0.5
    
    if job_id not in streaming_sessions:
        await websocket.send_text(json.dumps({
            "type": "error",
            "message": "Streaming session not available - job may not have streaming enabled"
        }))
        await websocket.close()
        return
    
    browser_ctrl = streaming_sessions[job_id]
    browser_ctrl.add_stream_client(websocket)
    
    # Send initial connection confirmation
    await websocket.send_text(json.dumps({
        "type": "connected",
        "message": "Connected to browser stream",
        "streaming_active": browser_ctrl.streaming_active
    }))
    
    try:
        while True:
            try:
                message = await websocket.receive_text()
                data = json.loads(message)
                
                if data['type'] == 'mouse':
                    await browser_ctrl.handle_mouse_event(data)
                elif data['type'] == 'keyboard':
                    await browser_ctrl.handle_keyboard_event(data)
                elif data['type'] == 'ping':
                    await websocket.send_text(json.dumps({"type": "pong"}))
                    
            except asyncio.TimeoutError:
                await websocket.send_text(json.dumps({"type": "ping"}))
                
    except WebSocketDisconnect:
        browser_ctrl.remove_stream_client(websocket)
        print(f"Stream client disconnected from job {job_id}")
    except Exception as e:
        print(f"Error in stream WebSocket: {e}")
        browser_ctrl.remove_stream_client(websocket)

@app.post("/streaming/create/{job_id}")
async def create_streaming_session(job_id: str):
    """Create a streaming session without starting a job"""
    if job_id in streaming_sessions:
        browser_ctrl = streaming_sessions[job_id]
        return browser_ctrl.get_streaming_info()
    
    try:
        # Get best available proxy for streaming session
        proxy_info = smart_proxy_manager.get_best_proxy()
        proxy = proxy_info.to_playwright_dict() if proxy_info else None
        
        print(f"πŸŽ₯ Creating streaming session with proxy: {proxy.get('server', 'None') if proxy else 'None'}")
        
        # Create smart browser controller with streaming enabled
        browser_ctrl = SmartBrowserController(headless=False, proxy=proxy, enable_streaming=True)
        await browser_ctrl.__aenter__()
        await browser_ctrl.start_streaming(quality=80)
        streaming_sessions[job_id] = browser_ctrl
        
        stream_info = browser_ctrl.get_streaming_info()
        
        # Add proxy information to stream info
        stream_info["proxy_info"] = {
            "current_proxy": proxy.get("server", "None") if proxy else "None",
            "proxy_stats": smart_proxy_manager.get_proxy_stats()
        }
        
        # Broadcast to connected clients
        await broadcast(job_id, {
            "type": "streaming_info",
            "streaming": stream_info
        })
        
        return stream_info
        
    except Exception as e:
        print(f"❌ Failed to create streaming session: {e}")
        return {"enabled": False, "error": str(e)}

@app.get("/streaming/{job_id}")
async def get_streaming_info(job_id: str):
    """Get streaming connection information for a job"""
    if job_id in streaming_sessions:
        browser_ctrl = streaming_sessions[job_id]
        stream_info = browser_ctrl.get_streaming_info()
        
        # Add current proxy stats
        stream_info["proxy_stats"] = smart_proxy_manager.get_proxy_stats()
        
        return stream_info
    
    return {"enabled": False, "error": "Streaming not enabled for this job"}

@app.delete("/streaming/{job_id}")
async def cleanup_streaming(job_id: str):
    """Clean up streaming session for a job"""
    if job_id in streaming_sessions:
        browser_ctrl = streaming_sessions[job_id]
        try:
            await browser_ctrl.__aexit__(None, None, None)
        except Exception as e:
            print(f"Error cleaning up streaming session: {e}")
        finally:
            del streaming_sessions[job_id]
        return {"message": "Streaming session cleaned up"}
    return {"message": "No streaming session found"}

@app.get("/download/{job_id}")
def download(job_id: str):
    """Enhanced download endpoint that handles all file formats"""
    print(f"πŸ“₯ Download request for job {job_id}")
    
    # Get job information
    if job_id in job_info:
        info = job_info[job_id]
        extension = info.get("extension", "output")
        content_type = info.get("content_type", "application/octet-stream")
        format_name = info.get("format", "unknown")
        
        print(f"πŸ“‹ Job info found: {info}")
    else:
        # Fallback for jobs without stored info
        extension = "output"
        content_type = "application/octet-stream"
        format_name = "unknown"
        print(f"⚠️ No job info found for {job_id}, using fallback")
    
    # Try to find the file with proper extension first
    file_path = OUTPUT_DIR / f"{job_id}.{extension}"
    
    if not file_path.exists():
        # Fallback: try common extensions
        for fallback_ext in ['txt', 'pdf', 'csv', 'json', 'html', 'md', 'output']:
            fallback_path = OUTPUT_DIR / f"{job_id}.{fallback_ext}"
            if fallback_path.exists():
                file_path = fallback_path
                extension = fallback_ext
                print(f"πŸ“ Found file with fallback extension: {file_path}")
                break
    
    if not file_path.exists():
        print(f"❌ File not found: {file_path}")
        from fastapi import HTTPException
        raise HTTPException(status_code=404, detail="File not found")
    
    # Generate appropriate filename
    safe_filename = f"extracted_data_{job_id}.{extension}"
    
    print(f"βœ… Serving file: {file_path}")
    print(f"πŸ“„ Content-Type: {content_type}")
    print(f"πŸ“Ž Filename: {safe_filename}")
    
    # Serve file with proper content type and filename
    return FileResponse(
        path=file_path, 
        filename=safe_filename,
        media_type=content_type,
        headers={
            "Content-Disposition": f"attachment; filename={safe_filename}",
            "X-File-Format": format_name,
            "X-Original-Extension": extension
        }
    )

@app.get("/job/{job_id}/info")
def get_job_info(job_id: str):
    """Get job information including format and status"""
    if job_id in job_info:
        info = job_info[job_id].copy()
        
        # Add file existence check
        extension = info.get("extension", "output")
        file_path = OUTPUT_DIR / f"{job_id}.{extension}"
        info["file_exists"] = file_path.exists()
        info["file_path"] = str(file_path) if file_path.exists() else None
        
        # Add current proxy stats
        info["proxy_stats"] = smart_proxy_manager.get_proxy_stats()
        
        return info
    else:
        return {"error": "Job not found", "job_id": job_id}

@app.get("/proxy/stats")
def get_proxy_stats():
    """Get current proxy pool statistics"""
    stats = smart_proxy_manager.get_proxy_stats()
    return {
        "proxy_stats": stats,
        "timestamp": asyncio.get_event_loop().time()
    }

@app.post("/proxy/reload")
def reload_proxies():
    """Reload proxy list from environment"""
    try:
        global smart_proxy_manager
        smart_proxy_manager = SmartProxyManager()
        stats = smart_proxy_manager.get_proxy_stats()
        return {
            "success": True,
            "message": "Proxy list reloaded successfully",
            "proxy_stats": stats
        }
    except Exception as e:
        return {
            "success": False,
            "message": f"Failed to reload proxies: {str(e)}"
        }

@app.get("/jobs")
async def get_all_jobs(limit: int = 50, offset: int = 0):
    """Get all jobs from database with pagination"""
    jobs = await db.get_all_jobs(limit, offset)
    stats = await db.get_job_stats()
    return {
        "jobs": jobs,
        "stats": stats,
        "pagination": {
            "limit": limit,
            "offset": offset
        }
    }

@app.get("/job/{job_id}")
async def get_job(job_id: str):
    """Get detailed job information from database"""
    job = await db.get_job(job_id)
    if job:
        # Add file existence check
        extension = job.get("file_extension", "output")
        file_path = OUTPUT_DIR / f"{job_id}.{extension}"
        job["file_exists"] = file_path.exists()
        job["file_path"] = str(file_path) if file_path.exists() else None
        return {"job": job}
    else:
        return {"error": "Job not found", "job_id": job_id}

@app.delete("/job/{job_id}")
async def delete_job(job_id: str):
    """Delete a job from database and remove output file"""
    # Delete from database
    success = await db.delete_job(job_id)
    
    # Also delete output file if exists
    job = await db.get_job(job_id)
    if job:
        extension = job.get("file_extension", "output")
        file_path = OUTPUT_DIR / f"{job_id}.{extension}"
        if file_path.exists():
            file_path.unlink()
    
    if success:
        return {"message": f"Job {job_id} deleted successfully"}
    else:
        return {"error": "Failed to delete job"}

@app.get("/stats")
async def get_system_stats():
    """Get overall system statistics from database"""
    db_stats = await db.get_job_stats()
    proxy_stats = smart_proxy_manager.get_proxy_stats()
    
    return {
        "database": db_stats,
        "proxy": proxy_stats,
        "active_jobs": len(tasks),
        "active_streams": len(streaming_sessions)
    }

app.mount("/", StaticFiles(directory="frontend", html=True), name="static")

# Helper functions
async def broadcast(job_id: str, msg: dict):
    """Broadcast message to all subscribers of a job"""
    if job_id in ws_subscribers:
        for ws in list(ws_subscribers[job_id]):
            try:
                await ws.send_text(json.dumps(msg))
            except:
                ws_subscribers[job_id].discard(ws)

async def register_streaming_session(job_id: str, browser_ctrl):
    """Register streaming session information"""
    streaming_sessions[job_id] = browser_ctrl
    
    if browser_ctrl.enable_streaming:
        await browser_ctrl.start_streaming(quality=80)
    
    stream_info = browser_ctrl.get_streaming_info()
    await broadcast(job_id, {
        "type": "streaming_info",
        "streaming": stream_info
    })

# Cleanup on shutdown
@app.on_event("shutdown")
async def cleanup():
    """Cleanup resources on shutdown"""
    print("🧹 Cleaning up resources...")

    # Cleanup streaming sessions
    for job_id, browser_ctrl in streaming_sessions.items():
        try:
            await browser_ctrl.__aexit__(None, None, None)
            print(f"βœ… Cleaned up streaming session: {job_id}")
        except Exception as e:
            print(f"❌ Error cleaning up session {job_id}: {e}")

    streaming_sessions.clear()
    job_info.clear()

    # Disconnect database
    await db.disconnect()

    # Print final proxy stats
    final_stats = smart_proxy_manager.get_proxy_stats()
    print(f"πŸ“Š Final proxy stats: {final_stats}")

    print("βœ… Cleanup completed")

@app.on_event("startup")
async def startup():
    """Initialize database connection and Telegram bot on startup"""
    print("πŸš€ Starting up BrowserPilot...")
    await db.connect()
    
    # Start Telegram bot in background
    asyncio.create_task(start_bot())
    
    print("βœ… Startup completed")