from flask import Flask, request, jsonify, send_from_directory from flask_cors import CORS import os import time import traceback from pathlib import Path import threading import atexit import random import math import string from datetime import datetime, timedelta from collections import deque app = Flask(__name__) CORS(app) BASE_DIR = Path(__file__).parent PUBLIC_DIR = BASE_DIR / 'public' PUBLIC_DIR.mkdir(exist_ok=True) PORT = int(os.environ.get('PORT', 7860)) VIEWPORT_WIDTH = 1920 VIEWPORT_HEIGHT = 1080 MAX_WORKERS = 4 MAX_ROOMS = 100 ROOM_TIMEOUT_MINUTES = 10 SCREENSHOT_EXPIRY_MINUTES = 10 JOB_EXPIRY_MINUTES = 30 SCREENSHOT_BASE_URL = os.environ.get('SCREENSHOT_BASE_URL', 'http://localhost:7860') class Job: def __init__(self, job_id, code, lang, base_url): self.job_id = job_id self.code = code self.lang = lang self.base_url = base_url self.status = 'queued' self.result = None self.error = None self.created_at = datetime.now() self.started_at = None self.completed_at = None self.room_id = None self.screenshots = [] def to_dict(self): return { 'job_id': self.job_id, 'status': self.status, 'result': self.result, 'error': self.error, 'created_at': self.created_at.isoformat(), 'started_at': self.started_at.isoformat() if self.started_at else None, 'completed_at': self.completed_at.isoformat() if self.completed_at else None, 'room_id': self.room_id, 'screenshots': self.screenshots, } def is_expired(self): return (datetime.now() - self.created_at) > timedelta(minutes=JOB_EXPIRY_MINUTES) class BrowserRoom: def __init__(self, room_id, browser, playwright_ctx): self.room_id = room_id self.browser = browser self.playwright_ctx = playwright_ctx self.context = None self.page = None self.cookies = [] self.created_at = datetime.now() self.last_activity = datetime.now() self.is_busy = False self.current_job_id = None self.lock = threading.Lock() self.screenshots_dir = PUBLIC_DIR / f'room_{room_id}' self.screenshots_dir.mkdir(exist_ok=True) def update_activity(self): self.last_activity = datetime.now() def is_expired(self): return (datetime.now() - self.last_activity) > timedelta(minutes=ROOM_TIMEOUT_MINUTES) def cleanup_screenshots(self): try: current_time = time.time() for f in self.screenshots_dir.glob('*.png'): if current_time - f.stat().st_mtime > SCREENSHOT_EXPIRY_MINUTES * 60: f.unlink() except Exception as e: print(f'[ROOM-{self.room_id}] Screenshot cleanup error: {e}') def reset_context(self): try: if self.page: try: self.page.close() except: pass self.page = None if self.context: try: self.cookies = self.context.cookies() except: pass try: self.context.close() except: pass self.context = None except Exception as e: print(f'[ROOM-{self.room_id}] Context reset error: {e}') def cleanup(self): self.reset_context() self.cleanup_screenshots() try: if self.screenshots_dir.exists(): self.screenshots_dir.rmdir() except: pass class JobManager: def __init__(self): self.jobs = {} self.lock = threading.Lock() def generate_job_id(self): chars = string.ascii_uppercase + string.digits while True: job_id = ''.join(random.choices(chars, k=3)) if job_id not in self.jobs: return job_id def create_job(self, code, lang, base_url): with self.lock: job_id = self.generate_job_id() job = Job(job_id, code, lang, base_url) self.jobs[job_id] = job return job def get_job(self, job_id): with self.lock: return self.jobs.get(job_id) def update_job(self, job_id, **kwargs): with self.lock: if job_id in self.jobs: for k, v in kwargs.items(): setattr(self.jobs[job_id], k, v) def cleanup_expired_jobs(self): with self.lock: expired = [jid for jid, j in self.jobs.items() if j.is_expired()] for jid in expired: del self.jobs[jid] print(f'[JOB-{jid}] Deleted expired job') class RoomManager: def __init__(self): self.rooms = {} self.available_rooms = deque() self.lock = threading.Lock() self.playwright = None self._stop = threading.Event() self._cleanup_thread = None self._job_cleanup_thread = None self._next_room_id = 1 def _create_room(self): from camoufox.sync_api import Camoufox room_id = self._next_room_id self._next_room_id += 1 print(f'[POOL] Launching browser for room {room_id}') cf = Camoufox(headless=True, humanize=True) browser = cf.__enter__() room = BrowserRoom(room_id, browser, cf) self.rooms[room_id] = room return room def init_pool(self): print(f'[POOL] Initializing {MAX_WORKERS} browser rooms...') for _ in range(MAX_WORKERS): try: room = self._create_room() self.available_rooms.append(room.room_id) print(f'[POOL] Room {room.room_id} ready') except Exception as e: print(f'[POOL] Failed to create room: {e}') def acquire_room(self, job_id, timeout=900): start = time.time() while True: with self.lock: if self.available_rooms: room_id = self.available_rooms.popleft() room = self.rooms.get(room_id) if room: room.is_busy = True room.current_job_id = job_id room.update_activity() print(f'[ROOM-{room_id}] Acquired for job {job_id}. Available: {len(self.available_rooms)}/{MAX_WORKERS}') return room if time.time() - start > timeout: raise Exception(f'Timeout waiting for available room after {timeout}s') print(f'[QUEUE] All {MAX_WORKERS} rooms busy. Waiting...') time.sleep(1) def release_room(self, room): with self.lock: room.is_busy = False room.current_job_id = None room.update_activity() if room.room_id not in self.available_rooms: self.available_rooms.append(room.room_id) print(f'[ROOM-{room.room_id}] Released. Available: {len(self.available_rooms)}/{MAX_WORKERS}') def start_cleanup(self): def _cleanup(): while not self._stop.is_set(): try: current_time = time.time() with self.lock: for room in list(self.rooms.values()): if not room.is_busy: for f in room.screenshots_dir.glob('*.png'): if current_time - f.stat().st_mtime > SCREENSHOT_EXPIRY_MINUTES * 60: try: f.unlink() except: pass except Exception as e: print(f'[CLEANUP] Error: {e}') self._stop.wait(60) self._cleanup_thread = threading.Thread(target=_cleanup, daemon=True) self._cleanup_thread.start() def shutdown_all(self): self._stop.set() with self.lock: for room in self.rooms.values(): try: room.cleanup() room.playwright_ctx.__exit__(None, None, None) except: pass self.rooms.clear() self.available_rooms.clear() room_manager = RoomManager() job_manager = JobManager() def execute_in_room(code_snippet, room, job_id): result = {'screenshots': [], 'data': None, 'error': None} room.reset_context() try: print(f'[ROOM-{room.room_id}] [JOB-{job_id}] Starting execution') context = room.browser.new_context( viewport={'width': VIEWPORT_WIDTH, 'height': VIEWPORT_HEIGHT}, ignore_https_errors=True, ) room.context = context if room.cookies: try: context.add_cookies(room.cookies) except: pass page = context.new_page() room.page = page namespace = { 'browser': room.browser, 'context': context, 'page': page, 'room_cookies': room.cookies, 'public_dir': str(room.screenshots_dir), 'room_id': room.room_id, 'job_id': job_id, 'time': time, 'result': result, 'random': random, 'math': math, 'print': print, 'VIEWPORT_WIDTH': VIEWPORT_WIDTH, 'VIEWPORT_HEIGHT': VIEWPORT_HEIGHT, } exec(code_snippet, namespace) if 'return_value' in namespace: result['data'] = namespace['return_value'] print(f'[ROOM-{room.room_id}] [JOB-{job_id}] Execution completed successfully') except Exception as e: error_msg = str(e) + '\n' + traceback.format_exc() result['error'] = error_msg print(f'[ROOM-{room.room_id}] [JOB-{job_id}] ERROR: {e}') finally: room.reset_context() return result def process_job(job): room = None try: job_manager.update_job(job.job_id, status='running', started_at=datetime.now()) room = room_manager.acquire_room(job.job_id, timeout=900) job_manager.update_job(job.job_id, room_id=room.room_id) result = execute_in_room(job.code, room, job.job_id) screenshot_files = [] for f in sorted(room.screenshots_dir.glob('*.png'), key=lambda x: x.stat().st_mtime): screenshot_files.append({ 'name': f.name, 'publicURL': f'{SCREENSHOT_BASE_URL}/files/room_{room.room_id}/{f.name}' }) if result.get('error'): job_manager.update_job(job.job_id, status='failed', error=result['error'], completed_at=datetime.now(), screenshots=screenshot_files) else: job_manager.update_job(job.job_id, status='completed', result=result.get('data'), completed_at=datetime.now(), screenshots=screenshot_files) except Exception as e: error_msg = str(e) + '\n' + traceback.format_exc() job_manager.update_job(job.job_id, status='failed', error=error_msg, completed_at=datetime.now()) finally: if room: room.cleanup_screenshots() room_manager.release_room(room) @app.route('/api/s-playwright', methods=['POST']) def execute_playwright(): try: data = request.get_json() if not data or 'code' not in data or 'lang' not in data: return jsonify({'success': False, 'error': 'Missing required fields: code and lang'}), 400 code = data['code'] lang = data['lang'].lower() if lang != 'python': return jsonify({'success': False, 'error': f'Only Python is supported, got: {lang}'}), 400 base_url = request.url_root.rstrip('/') job = job_manager.create_job(code, lang, base_url) t = threading.Thread(target=process_job, args=(job,)) t.daemon = True t.start() return jsonify({'success': True, 'job_id': job.job_id, 'status': 'queued', 'check_url': f'/job/{job.job_id}'}) except Exception as e: return jsonify({'success': False, 'error': str(e), 'stack': traceback.format_exc()}), 500 @app.route('/job/', methods=['GET']) def get_job_status(job_id): job = job_manager.get_job(job_id.upper()) if not job: return jsonify({'success': False, 'error': 'Job not found'}), 404 return jsonify({'success': True, 'job': job.to_dict()}) @app.route('/files/room_/') def serve_file(room_id, filename): room_dir = PUBLIC_DIR / f'room_{room_id}' return send_from_directory(room_dir, filename) @app.route('/health', methods=['GET']) def health(): with room_manager.lock: busy = sum(1 for r in room_manager.rooms.values() if r.is_busy) available = len(room_manager.available_rooms) with job_manager.lock: jobs = list(job_manager.jobs.values()) return jsonify({ 'status': 'healthy', 'rooms': {'total': MAX_WORKERS, 'available': available, 'busy': busy}, 'jobs': { 'total': len(jobs), 'queued': sum(1 for j in jobs if j.status == 'queued'), 'running': sum(1 for j in jobs if j.status == 'running'), }, 'viewport': {'width': VIEWPORT_WIDTH, 'height': VIEWPORT_HEIGHT}, 'timestamp': int(time.time() * 1000), }) @app.route('/', methods=['GET']) def index(): with room_manager.lock: available = len(room_manager.available_rooms) busy = sum(1 for r in room_manager.rooms.values() if r.is_busy) return jsonify({ 'message': 'Camoufox API - Shared Browser Pool', 'endpoints': { 'POST /api/s-playwright': 'Execute Python code (returns job_id)', 'GET /job/:id': 'Check job status', 'GET /health': 'Health check', }, 'configuration': { 'workers': MAX_WORKERS, 'viewport': {'width': VIEWPORT_WIDTH, 'height': VIEWPORT_HEIGHT}, 'port': PORT, 'rooms': {'total': MAX_WORKERS, 'available': available, 'busy': busy}, } }) if __name__ == '__main__': print(f'🚀 Camoufox API - Shared Browser Pool') print(f'🌐 Port: {PORT}') print(f'🏠 Workers: {MAX_WORKERS} (shared browsers, no memory leak)') print(f'📐 Viewport: {VIEWPORT_WIDTH}x{VIEWPORT_HEIGHT}') room_manager.init_pool() room_manager.start_cleanup() def cleanup_jobs_loop(): while True: job_manager.cleanup_expired_jobs() time.sleep(300) t = threading.Thread(target=cleanup_jobs_loop, daemon=True) t.start() def on_exit(): room_manager.shutdown_all() atexit.register(on_exit) app.run(host='0.0.0.0', port=PORT, debug=False, threaded=True)