import os import time import asyncio import uuid from datetime import datetime from typing import Dict, List, Optional from collections import deque from fastapi import FastAPI, HTTPException, Request from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel import requests import threading # Configuration ORCHESTRATOR_VERSION = "2.0.0" WORKER_HEALTH_CHECK_INTERVAL = 30 # seconds WORKER_TIMEOUT = 120 # seconds for translation # Models class TranslationRequest(BaseModel): request_id: str text: str source_lang: str target_lang: str auto_charge: bool = False notification_url: Optional[str] = None wordpress_user_id: Optional[int] = None class WorkerConfig(BaseModel): url: str name: str priority: int = 1 max_concurrent: int = 3 class WorkerStatus: def __init__(self, config: WorkerConfig): self.config = config self.available = False self.active_jobs = 0 self.last_health_check = 0 self.total_completed = 0 self.total_failed = 0 self.avg_response_time = 0.0 # Orchestrator Core Class class TranslationOrchestrator: def __init__(self): self.workers: Dict[str, WorkerStatus] = {} self.job_queue = deque() self.active_jobs: Dict[str, Dict] = {} self.completed_jobs: Dict[str, Dict] = {} self.worker_lock = threading.Lock() self.job_lock = threading.Lock() # Load worker configurations from environment self.load_worker_configs() # Start background tasks self.start_health_checker() self.start_job_processor() def load_worker_configs(self): """Load worker configurations from environment variables""" worker_index = 1 while True: url_key = f"WORKER_{worker_index}_URL" name_key = f"WORKER_{worker_index}_NAME" url = os.getenv(url_key) if not url: break name = os.getenv(name_key, f"Worker {worker_index}") priority = int(os.getenv(f"WORKER_{worker_index}_PRIORITY", "1")) max_concurrent = int(os.getenv(f"WORKER_{worker_index}_MAX_CONCURRENT", "3")) worker_id = f"worker_{worker_index}" config = WorkerConfig( url=url, name=name, priority=priority, max_concurrent=max_concurrent ) self.workers[worker_id] = WorkerStatus(config) print(f"✓ Loaded worker: {name} at {url}") worker_index += 1 if not self.workers: print("⚠ No workers configured. Add WORKER_N_URL environment variables.") def start_health_checker(self): """Start background thread for health checking""" def health_check_loop(): while True: self.check_all_workers_health() time.sleep(WORKER_HEALTH_CHECK_INTERVAL) thread = threading.Thread(target=health_check_loop, daemon=True) thread.start() print("✓ Health checker started") def check_all_workers_health(self): """Check health of all workers""" with self.worker_lock: for worker_id, worker in self.workers.items(): try: response = requests.get( f"{worker.config.url}/api/health", timeout=10 ) if response.status_code == 200: data = response.json() worker.available = data.get('status') == 'healthy' worker.last_health_check = time.time() if worker.available: print(f"✓ {worker.config.name}: Healthy") else: print(f"✗ {worker.config.name}: Unhealthy") else: worker.available = False print(f"✗ {worker.config.name}: HTTP {response.status_code}") except Exception as e: worker.available = False print(f"✗ {worker.config.name}: {str(e)}") def get_available_worker(self) -> Optional[str]: """Get an available worker based on priority and load""" with self.worker_lock: available_workers = [ (worker_id, worker) for worker_id, worker in self.workers.items() if worker.available and worker.active_jobs < worker.config.max_concurrent ] if not available_workers: return None # Sort by priority (higher first) then by active jobs (fewer first) available_workers.sort( key=lambda x: (-x[1].config.priority, x[1].active_jobs) ) return available_workers[0][0] def start_job_processor(self): """Start background thread for processing job queue""" def process_queue_loop(): while True: self.process_job_queue() time.sleep(2) # Check queue every 2 seconds thread = threading.Thread(target=process_queue_loop, daemon=True) thread.start() print("✓ Job processor started") def add_job_to_queue(self, job_data: Dict): """Add a job to the queue""" with self.job_lock: self.job_queue.append(job_data) print(f"📝 Job {job_data['request_id']} added to queue. Queue size: {len(self.job_queue)}") def process_job_queue(self): """Process pending jobs in queue""" if not self.job_queue: return with self.job_lock: if not self.job_queue: return worker_id = self.get_available_worker() if not worker_id: return # No available workers job_data = self.job_queue.popleft() # Process outside lock to avoid blocking self.assign_job_to_worker(worker_id, job_data) def assign_job_to_worker(self, worker_id: str, job_data: Dict): """Assign a job to a specific worker""" worker = self.workers[worker_id] request_id = job_data['request_id'] print(f"🔄 Assigning job {request_id} to {worker.config.name}") with self.worker_lock: worker.active_jobs += 1 # Store job info self.active_jobs[request_id] = { 'worker_id': worker_id, 'job_data': job_data, 'start_time': time.time(), 'status': 'processing' } # Send to worker in background thread = threading.Thread( target=self.send_to_worker, args=(worker_id, job_data), daemon=True ) thread.start() def send_to_worker(self, worker_id: str, job_data: Dict): """Send translation job to worker""" worker = self.workers[worker_id] request_id = job_data['request_id'] try: # Prepare request for worker worker_request = { 'request_id': request_id, 'text': job_data['text'], 'source_lang': job_data['source_lang'], 'target_lang': job_data['target_lang'], 'auto_charge': job_data.get('auto_charge', False), 'notification_url': None # Don't let worker notify WordPress directly } response = requests.post( f"{worker.config.url}/api/translate/heavy", json=worker_request, timeout=WORKER_TIMEOUT ) if response.status_code == 200: print(f"✓ Job {request_id} sent to {worker.config.name}") self.monitor_worker_job(worker_id, request_id) else: self.handle_worker_failure(worker_id, request_id, f"HTTP {response.status_code}") except Exception as e: self.handle_worker_failure(worker_id, request_id, str(e)) def monitor_worker_job(self, worker_id: str, request_id: str): """Monitor job progress on worker""" worker = self.workers[worker_id] max_checks = 60 # Maximum 5 minutes (60 * 5 seconds) check_count = 0 while check_count < max_checks: time.sleep(5) check_count += 1 try: # Check status on worker response = requests.post( f"{worker.config.url}/api/check-translation-status", json={'request_id': request_id}, timeout=15 ) if response.status_code == 200: data = response.json() if data.get('success') and data.get('status') == 'completed': # Job completed successfully self.handle_job_completion(worker_id, request_id, data) return elif data.get('status') == 'failed': self.handle_worker_failure(worker_id, request_id, "Worker reported failure") return except Exception as e: print(f"⚠ Error checking job {request_id}: {str(e)}") # Timeout reached self.handle_worker_failure(worker_id, request_id, "Timeout waiting for completion") def handle_job_completion(self, worker_id: str, request_id: str, worker_response: Dict): """Handle successful job completion""" worker = self.workers[worker_id] print(f"✅ Job {request_id} completed by {worker.config.name}") # Update worker stats with self.worker_lock: worker.active_jobs -= 1 worker.total_completed += 1 # Get job data job_info = self.active_jobs.get(request_id, {}) job_data = job_info.get('job_data', {}) # Calculate processing time processing_time = time.time() - job_info.get('start_time', time.time()) # Prepare completion data completion_data = { 'request_id': request_id, 'status': 'completed', 'translated_text': worker_response.get('translated_text'), 'processing_time': processing_time, 'character_count': worker_response.get('character_count', len(job_data.get('text', ''))), 'translation_length': worker_response.get('translation_length', 0), 'from_cache': worker_response.get('from_cache', False), 'worker_name': worker.config.name, 'completed_at': datetime.now().isoformat() } # Store in completed jobs self.completed_jobs[request_id] = completion_data # Remove from active jobs if request_id in self.active_jobs: del self.active_jobs[request_id] # Notify WordPress if notification URL provided notification_url = job_data.get('notification_url') if notification_url: self.notify_wordpress(notification_url, completion_data) def handle_worker_failure(self, worker_id: str, request_id: str, error_message: str): """Handle job failure""" worker = self.workers[worker_id] print(f"❌ Job {request_id} failed on {worker.config.name}: {error_message}") # Update worker stats with self.worker_lock: worker.active_jobs -= 1 worker.total_failed += 1 # Get job data job_info = self.active_jobs.get(request_id, {}) job_data = job_info.get('job_data', {}) # Try to reassign to another worker if job_info.get('retry_count', 0) < 2: # Maximum 2 retries print(f"🔄 Retrying job {request_id} (attempt {job_info.get('retry_count', 0) + 1})") job_data['retry_count'] = job_info.get('retry_count', 0) + 1 self.add_job_to_queue(job_data) # Remove from active jobs if request_id in self.active_jobs: del self.active_jobs[request_id] else: # Max retries reached, mark as failed failure_data = { 'request_id': request_id, 'status': 'failed', 'error_message': error_message, 'failed_at': datetime.now().isoformat() } self.completed_jobs[request_id] = failure_data # Remove from active jobs if request_id in self.active_jobs: del self.active_jobs[request_id] # Notify WordPress of failure notification_url = job_data.get('notification_url') if notification_url: self.notify_wordpress(notification_url, failure_data) def notify_wordpress(self, notification_url: str, data: Dict): """Send notification to WordPress""" try: response = requests.post( notification_url, json=data, timeout=30, verify=False ) if response.status_code == 200: print(f"📤 WordPress notified successfully for {data['request_id']}") else: print(f"⚠ WordPress notification failed: HTTP {response.status_code}") except Exception as e: print(f"⚠ WordPress notification error: {str(e)}") # Initialize FastAPI app app = FastAPI( title="Translation Orchestrator", description="Main orchestrator for distributed translation system", version=ORCHESTRATOR_VERSION ) # CORS configuration app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # Initialize orchestrator orchestrator = TranslationOrchestrator() # Root endpoint @app.get("/") async def root(): """API information""" return { "service": "Translation Orchestrator", "version": ORCHESTRATOR_VERSION, "status": "running", "workers": len(orchestrator.workers), "active_jobs": len(orchestrator.active_jobs), "queued_jobs": len(orchestrator.job_queue), "endpoints": { "/api/translate": "Submit translation request", "/api/status/{request_id}": "Check translation status", "/api/workers": "List all workers", "/api/health": "Health check" } } # Health check endpoint @app.get("/api/health") async def health_check(): """Health check""" available_workers = sum(1 for w in orchestrator.workers.values() if w.available) return { "status": "healthy" if available_workers > 0 else "degraded", "timestamp": datetime.now().isoformat(), "workers": { "total": len(orchestrator.workers), "available": available_workers, "unavailable": len(orchestrator.workers) - available_workers }, "queue": { "active_jobs": len(orchestrator.active_jobs), "queued_jobs": len(orchestrator.job_queue), "completed_jobs": len(orchestrator.completed_jobs) } } # Submit translation request @app.post("/api/translate") async def submit_translation(request: TranslationRequest): """Submit a translation request""" # Check if any workers available if not any(w.available for w in orchestrator.workers.values()): raise HTTPException( status_code=503, detail="No workers available at the moment. Please try again later." ) # Prepare job data job_data = { 'request_id': request.request_id, 'text': request.text, 'source_lang': request.source_lang, 'target_lang': request.target_lang, 'auto_charge': request.auto_charge, 'notification_url': request.notification_url, 'wordpress_user_id': request.wordpress_user_id, 'retry_count': 0 } # Add to queue orchestrator.add_job_to_queue(job_data) return { "success": True, "request_id": request.request_id, "status": "queued", "message": "Translation request queued successfully", "queue_position": len(orchestrator.job_queue), "estimated_wait_time": len(orchestrator.job_queue) * 10 # Rough estimate } # Check translation status @app.get("/api/status/{request_id}") async def check_status(request_id: str): """Check status of a translation request""" # Check completed jobs if request_id in orchestrator.completed_jobs: return { "success": True, **orchestrator.completed_jobs[request_id] } # Check active jobs if request_id in orchestrator.active_jobs: job_info = orchestrator.active_jobs[request_id] elapsed_time = time.time() - job_info['start_time'] return { "success": True, "request_id": request_id, "status": "processing", "worker_id": job_info['worker_id'], "elapsed_time": elapsed_time, "message": "Translation in progress" } # Check queue for job in orchestrator.job_queue: if job['request_id'] == request_id: return { "success": True, "request_id": request_id, "status": "queued", "message": "Translation request is queued" } # Not found return { "success": False, "request_id": request_id, "status": "not_found", "message": "Translation request not found" } # List workers @app.get("/api/workers") async def list_workers(): """List all workers and their status""" workers_info = [] for worker_id, worker in orchestrator.workers.items(): workers_info.append({ "id": worker_id, "name": worker.config.name, "url": worker.config.url, "available": worker.available, "active_jobs": worker.active_jobs, "max_concurrent": worker.config.max_concurrent, "priority": worker.config.priority, "total_completed": worker.total_completed, "total_failed": worker.total_failed, "last_health_check": worker.last_health_check }) return { "success": True, "workers": workers_info } # Add worker dynamically (admin endpoint) @app.post("/api/admin/add-worker") async def add_worker(config: WorkerConfig): """Add a new worker (admin only - add authentication in production)""" worker_id = f"worker_{len(orchestrator.workers) + 1}" with orchestrator.worker_lock: orchestrator.workers[worker_id] = WorkerStatus(config) # Immediate health check orchestrator.check_all_workers_health() return { "success": True, "worker_id": worker_id, "message": f"Worker {config.name} added successfully" } # Remove worker @app.delete("/api/admin/remove-worker/{worker_id}") async def remove_worker(worker_id: str): """Remove a worker (admin only - add authentication in production)""" if worker_id not in orchestrator.workers: raise HTTPException(status_code=404, detail="Worker not found") worker = orchestrator.workers[worker_id] # Check if worker has active jobs if worker.active_jobs > 0: raise HTTPException( status_code=400, detail=f"Cannot remove worker with {worker.active_jobs} active jobs" ) with orchestrator.worker_lock: del orchestrator.workers[worker_id] return { "success": True, "message": f"Worker {worker_id} removed successfully" } # Force health check @app.post("/api/admin/health-check") async def force_health_check(): """Force immediate health check of all workers""" orchestrator.check_all_workers_health() return { "success": True, "message": "Health check completed", "timestamp": datetime.now().isoformat() } # Get statistics @app.get("/api/stats") async def get_statistics(): """Get orchestrator statistics""" total_completed = sum(w.total_completed for w in orchestrator.workers.values()) total_failed = sum(w.total_failed for w in orchestrator.workers.values()) return { "success": True, "statistics": { "total_workers": len(orchestrator.workers), "available_workers": sum(1 for w in orchestrator.workers.values() if w.available), "active_jobs": len(orchestrator.active_jobs), "queued_jobs": len(orchestrator.job_queue), "completed_jobs": len(orchestrator.completed_jobs), "total_completed": total_completed, "total_failed": total_failed, "success_rate": (total_completed / (total_completed + total_failed) * 100) if (total_completed + total_failed) > 0 else 0 } } # Main entry point if __name__ == "__main__": import uvicorn port = int(os.getenv("PORT", 7860)) print(f"🚀 Translation Orchestrator v{ORCHESTRATOR_VERSION}") print(f"📡 Starting on port {port}") print(f"👷 Workers configured: {len(orchestrator.workers)}") uvicorn.run( app, host="0.0.0.0", port=port, log_level="info" )