import os import time import uuid import asyncio import aiohttp import torch import json import logging import threading from datetime import datetime from typing import Dict, List, Optional, Any, Callable from dataclasses import dataclass, field from enum import Enum from fastapi import FastAPI, HTTPException, Request from pydantic import BaseModel # Configure logging logger = logging.getLogger(__name__) # Define data models class TranslationRequest(BaseModel): text: str source_lang: str target_lang: str auto_charge: bool = False # Enums class JobStatus(Enum): PENDING = "pending" ASSIGNED = "assigned" PROCESSING = "processing" COMPLETED = "completed" FAILED = "failed" CANCELLED = "cancelled" class ServerStatus(Enum): AVAILABLE = "available" BUSY = "busy" OFFLINE = "offline" ERROR = "error" # Data classes @dataclass class TranslationJob: job_id: str request_id: str text: str source_lang: str target_lang: str priority: int = 0 auto_charge: bool = False notification_url: Optional[str] = None created_at: float = field(default_factory=time.time) assigned_at: Optional[float] = None started_at: Optional[float] = None completed_at: Optional[float] = None assigned_server: Optional[str] = None status: JobStatus = JobStatus.PENDING result: Optional[Dict[str, Any]] = None error: Optional[str] = None retry_count: int = 0 max_retries: int = 3 metadata: Dict[str, Any] = field(default_factory=dict) @dataclass class ServerInfo: id: str url: str status: ServerStatus = ServerStatus.OFFLINE last_ping: float = 0 current_jobs: int = 0 max_concurrent_jobs: int = 1 response_time: float = 0 error_count: int = 0 total_requests: int = 0 last_error: Optional[str] = None metadata: Dict[str, Any] = field(default_factory=dict) # Server Registry Class class ServerRegistry: def __init__(self, health_check_interval: int = 30): self.servers: Dict[str, ServerInfo] = {} self.health_check_interval = health_check_interval self.lock = threading.Lock() self.health_monitor_task = None self.running = False def register_server(self, server_id: str, url: str, max_concurrent_jobs: int = 1): """Register a new translation server""" with self.lock: self.servers[server_id] = ServerInfo( id=server_id, url=url, max_concurrent_jobs=max_concurrent_jobs ) logger.info(f"Registered server {server_id} at {url}") def unregister_server(self, server_id: str): """Remove a server from registry""" with self.lock: if server_id in self.servers: del self.servers[server_id] logger.info(f"Unregistered server {server_id}") def get_available_server(self) -> Optional[ServerInfo]: """Get the best available server for processing""" with self.lock: available_servers = [ server for server in self.servers.values() if server.status == ServerStatus.AVAILABLE and server.current_jobs < server.max_concurrent_jobs ] if not available_servers: return None available_servers.sort(key=lambda s: (s.current_jobs, s.response_time)) return available_servers[0] def mark_server_busy(self, server_id: str): """Mark server as busy""" with self.lock: if server_id in self.servers: self.servers[server_id].current_jobs += 1 if self.servers[server_id].current_jobs >= self.servers[server_id].max_concurrent_jobs: self.servers[server_id].status = ServerStatus.BUSY def mark_server_available(self, server_id: str): """Mark server as available""" with self.lock: if server_id in self.servers: self.servers[server_id].current_jobs = max(0, self.servers[server_id].current_jobs - 1) if self.servers[server_id].current_jobs < self.servers[server_id].max_concurrent_jobs: self.servers[server_id].status = ServerStatus.AVAILABLE def get_server_stats(self) -> Dict[str, Any]: """Get statistics about all servers""" with self.lock: stats = { 'total_servers': len(self.servers), 'available_servers': len([s for s in self.servers.values() if s.status == ServerStatus.AVAILABLE]), 'busy_servers': len([s for s in self.servers.values() if s.status == ServerStatus.BUSY]), 'offline_servers': len([s for s in self.servers.values() if s.status == ServerStatus.OFFLINE]), 'servers': { server_id: { 'status': server.status.value, 'current_jobs': server.current_jobs, 'max_jobs': server.max_concurrent_jobs, 'response_time': server.response_time, 'total_requests': server.total_requests, 'error_count': server.error_count, 'last_ping': server.last_ping } for server_id, server in self.servers.items() } } return stats async def check_server_health(self, server: ServerInfo) -> bool: """Check if a server is healthy""" try: start_time = time.time() async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=10)) as session: async with session.get(f"{server.url}/api/health") as response: response_time = time.time() - start_time if response.status == 200: data = await response.json() with self.lock: server.last_ping = time.time() server.response_time = response_time server.error_count = 0 server.last_error = None if data.get('status') == 'healthy': if server.current_jobs < server.max_concurrent_jobs: server.status = ServerStatus.AVAILABLE else: server.status = ServerStatus.BUSY else: server.status = ServerStatus.ERROR return True else: raise Exception(f"HTTP {response.status}") except Exception as e: with self.lock: server.status = ServerStatus.OFFLINE server.error_count += 1 server.last_error = str(e) logger.error(f"Health check failed for server {server.id}: {e}") return False async def health_monitor(self): """Continuously monitor server health""" while self.running: try: servers_to_check = list(self.servers.values()) health_tasks = [ self.check_server_health(server) for server in servers_to_check ] await asyncio.gather(*health_tasks, return_exceptions=True) except Exception as e: logger.error(f"Error in health monitor: {e}") await asyncio.sleep(self.health_check_interval) def start_health_monitoring(self): """Start the health monitoring task""" if not self.running: self.running = True loop = asyncio.get_event_loop() self.health_monitor_task = loop.create_task(self.health_monitor()) logger.info("Started server health monitoring") def stop_health_monitoring(self): """Stop the health monitoring task""" self.running = False if self.health_monitor_task: self.health_monitor_task.cancel() logger.info("Stopped server health monitoring") # Translation Queue Class class TranslationQueue: def __init__(self, max_queue_size: int = 1000): self.pending_jobs: asyncio.Queue = asyncio.Queue(maxsize=max_queue_size) self.active_jobs: Dict[str, TranslationJob] = {} self.completed_jobs: Dict[str, TranslationJob] = {} self.failed_jobs: Dict[str, TranslationJob] = {} self.lock = asyncio.Lock() self.processor_task: Optional[asyncio.Task] = None self.running = False self.total_jobs = 0 self.processed_jobs = 0 self.failed_job_count = 0 async def add_job(self, text: str, source_lang: str, target_lang: str, request_id: Optional[str] = None, priority: int = 0, auto_charge: bool = False, notification_url: Optional[str] = None) -> str: """Add a new translation job to the queue""" if not request_id: request_id = str(uuid.uuid4()) job_id = f"job_{int(time.time())}_{str(uuid.uuid4())[:8]}" job = TranslationJob( job_id=job_id, request_id=request_id, text=text, source_lang=source_lang, target_lang=target_lang, priority=priority, auto_charge=auto_charge, notification_url=notification_url ) try: await self.pending_jobs.put(job) async with self.lock: self.total_jobs += 1 logger.info(f"Added job {job_id} to queue (request_id: {request_id})") return job_id except asyncio.QueueFull: logger.error(f"Queue is full, cannot add job {job_id}") raise Exception("Translation queue is full, please try again later") async def get_job_status(self, job_id: str) -> Optional[Dict[str, Any]]: """Get the status of a specific job""" async with self.lock: if job_id in self.active_jobs: job = self.active_jobs[job_id] return { "job_id": job_id, "request_id": job.request_id, "status": job.status.value, "assigned_server": job.assigned_server, "created_at": job.created_at, "assigned_at": job.assigned_at, "started_at": job.started_at, "processing_time": time.time() - job.started_at if job.started_at else 0, "retry_count": job.retry_count } if job_id in self.completed_jobs: job = self.completed_jobs[job_id] return { "job_id": job_id, "request_id": job.request_id, "status": job.status.value, "assigned_server": job.assigned_server, "created_at": job.created_at, "completed_at": job.completed_at, "processing_time": job.completed_at - job.started_at if job.started_at and job.completed_at else 0, "result": job.result, "retry_count": job.retry_count } if job_id in self.failed_jobs: job = self.failed_jobs[job_id] return { "job_id": job_id, "request_id": job.request_id, "status": job.status.value, "error": job.error, "created_at": job.created_at, "failed_at": job.completed_at, "retry_count": job.retry_count } return None async def get_job_by_request_id(self, request_id: str) -> Optional[Dict[str, Any]]: """Get job status by request_id""" async with self.lock: all_jobs = {**self.active_jobs, **self.completed_jobs, **self.failed_jobs} for job in all_jobs.values(): if job.request_id == request_id: return await self.get_job_status(job.job_id) return None async def cancel_job(self, job_id: str) -> bool: """Cancel a pending or active job""" async with self.lock: if job_id in self.active_jobs: job = self.active_jobs[job_id] if job.status in [JobStatus.PENDING, JobStatus.ASSIGNED]: job.status = JobStatus.CANCELLED job.completed_at = time.time() self.failed_jobs[job_id] = job del self.active_jobs[job_id] if job.assigned_server: server_registry.mark_server_available(job.assigned_server) logger.info(f"Cancelled job {job_id}") return True return False async def get_queue_stats(self) -> Dict[str, Any]: """Get queue statistics""" async with self.lock: pending_count = self.pending_jobs.qsize() active_count = len(self.active_jobs) completed_count = len(self.completed_jobs) failed_count = len(self.failed_jobs) return { "pending_jobs": pending_count, "active_jobs": active_count, "completed_jobs": completed_count, "failed_jobs": failed_count, "total_jobs": self.total_jobs, "processed_jobs": self.processed_jobs, "success_rate": (self.processed_jobs / max(1, self.total_jobs)) * 100, "queue_utilization": (pending_count / self.pending_jobs.maxsize) * 100 } async def send_translation_request(self, server_url: str, job: TranslationJob) -> Dict[str, Any]: """Send translation request to a specific server""" try: payload = { "text": job.text, "source_lang": job.source_lang, "target_lang": job.target_lang, "request_id": job.request_id, "auto_charge": job.auto_charge, "notification_url": job.notification_url } timeout = aiohttp.ClientTimeout(total=300) async with aiohttp.ClientSession(timeout=timeout) as session: async with session.post( f"{server_url}/api/translate/heavy", json=payload, headers={"Content-Type": "application/json"} ) as response: if response.status == 200: result = await response.json() logger.info(f"Successfully submitted job {job.job_id} to server {server_url}") return result else: error_text = await response.text() raise Exception(f"Server returned {response.status}: {error_text}") except Exception as e: logger.error(f"Failed to send job {job.job_id} to server {server_url}: {e}") raise e async def process_queue(self): """Main queue processor - assigns jobs to available servers""" logger.info("Started queue processor") while self.running: try: try: job = await asyncio.wait_for(self.pending_jobs.get(), timeout=1.0) except asyncio.TimeoutError: continue available_server = server_registry.get_available_server() if not available_server: await self.pending_jobs.put(job) logger.warning(f"No available servers for job {job.job_id}, requeueing") await asyncio.sleep(2) continue async with self.lock: job.assigned_server = available_server.id job.assigned_at = time.time() job.status = JobStatus.ASSIGNED self.active_jobs[job.job_id] = job server_registry.mark_server_busy(available_server.id) try: job.status = JobStatus.PROCESSING job.started_at = time.time() result = await self.send_translation_request(available_server.url, job) logger.info(f"Job {job.job_id} submitted to server {available_server.id}") except Exception as e: async with self.lock: job.retry_count += 1 job.error = str(e) if job.retry_count < job.max_retries: job.status = JobStatus.PENDING job.assigned_server = None job.assigned_at = None job.started_at = None await self.pending_jobs.put(job) del self.active_jobs[job.job_id] logger.warning(f"Job {job.job_id} failed, retrying ({job.retry_count}/{job.max_retries})") else: job.status = JobStatus.FAILED job.completed_at = time.time() self.failed_jobs[job.job_id] = job self.failed_job_count += 1 del self.active_jobs[job.job_id] logger.error(f"Job {job.job_id} failed permanently after {job.retry_count} retries") server_registry.mark_server_available(available_server.id) except Exception as e: logger.error(f"Error in queue processor: {e}") await asyncio.sleep(1) def start_processing(self): """Start the queue processor""" if not self.running: self.running = True self.processor_task = asyncio.create_task(self.process_queue()) logger.info("Started queue processing") def stop_processing(self): """Stop the queue processor""" self.running = False if self.processor_task: self.processor_task.cancel() logger.info("Stopped queue processing") # Global instances server_registry = ServerRegistry() translation_queue = TranslationQueue() # Configuration LOAD_BALANCER_ENABLED = os.getenv("LOAD_BALANCER_ENABLED", "false").lower() == "true" SERVER_ID = os.getenv("SERVER_ID", f"server_{int(time.time())}") CURRENT_SERVER_URL = os.getenv("CURRENT_SERVER_URL", "http://localhost:7860") PEER_SERVERS = os.getenv("PEER_SERVERS", "").split(",") if os.getenv("PEER_SERVERS") else [] MODEL_NAME = os.getenv("MODEL_NAME", "default_model") # Initialize FastAPI app app = FastAPI(title="Enhanced Translation Service with Load Balancer") # Global storage for translations (you may need to replace this with your actual implementation) translations = {} translator = None # This should be your actual translator instance # Helper functions async def estimate_queue_wait_time() -> int: """Estimate wait time in seconds based on queue size and server availability""" try: queue_stats = await translation_queue.get_queue_stats() server_stats = server_registry.get_server_stats() pending_jobs = queue_stats['pending_jobs'] available_servers = server_stats['available_servers'] if available_servers == 0: return 300 estimated_seconds = (pending_jobs * 30) // max(1, available_servers) return min(estimated_seconds, 1800) except Exception: return 120 async def send_completion_notification(notification_url: str, request_id: str, translated_text: str, result: dict, character_count: int, translation_length: int, source_lang: str, target_lang: str, auto_charge: bool): """Send completion notification with enhanced data""" try: payload = { "request_id": request_id, "status": "completed", "translated_text": translated_text, "processing_time": result['processing_time'], "character_count": character_count, "translation_length": translation_length, "source_lang": source_lang, "target_lang": target_lang, "from_cache": result.get('from_cache', False), "chunks_count": result.get('chunks_count', 1), "auto_charge": auto_charge, "server_id": SERVER_ID, "completed_at": datetime.now().isoformat() } timeout = aiohttp.ClientTimeout(total=45) async with aiohttp.ClientSession(timeout=timeout) as session: async with session.post( notification_url, json=payload, headers={ 'Content-Type': 'application/json', 'User-Agent': 'MLT-Server/2.0' } ) as response: if response.status == 200: logger.info(f"Notification sent successfully for {request_id}") return True else: logger.warning(f"Notification failed with status {response.status} for {request_id}") return False except Exception as e: logger.error(f"Failed to send notification for {request_id}: {e}") return False async def run_enhanced_translation_job(request_id: str, text: str, source_lang: str, target_lang: str, notification_url: Optional[str], auto_charge: bool = False): """Enhanced translation job runner with load balancer integration""" try: start_time = time.time() # Simulate progress updates for i in range(1, 10): await asyncio.sleep(2) if request_id in translations: translations[request_id]["progress"] = i * 10 translations[request_id]["elapsed_time"] = time.time() - start_time # Perform actual translation (replace with your actual translation logic) result = translator.translate_text(text, source_lang, target_lang) translated_text = result['translated_text'] processing_time = time.time() - start_time # Update translation status translations[request_id] = { "status": "completed", "progress": 100, "elapsed_time": processing_time, "message": "Translation completed successfully", "result": translated_text, "server_id": SERVER_ID, "processing_time": result['processing_time'], "from_cache": result.get('from_cache', False) } # Store in completed translations translator.completed_translations[request_id] = { 'result': result, 'completed_at': time.time(), 'character_count': len(text), 'translation_length': len(translated_text), 'server_id': SERVER_ID } # Free up server capacity if LOAD_BALANCER_ENABLED: server_registry.mark_server_available(SERVER_ID) # Send notification if URL provided if notification_url: await send_completion_notification( notification_url, request_id, translated_text, result, len(text), len(translated_text), source_lang, target_lang, auto_charge ) logger.info(f"Translation job {request_id} completed successfully on server {SERVER_ID}") except Exception as e: logger.error(f"Error in translation job {request_id}: {e}") # Update error status if request_id in translations: translations[request_id] = { "status": "failed", "message": f"Translation failed: {str(e)}", "server_id": SERVER_ID, "elapsed_time": time.time() - start_time if 'start_time' in locals() else 0 } # Free up server capacity if LOAD_BALANCER_ENABLED: server_registry.mark_server_available(SERVER_ID) # Event handlers @app.on_event("startup") async def startup_event(): """Initialize load balancer on startup""" if LOAD_BALANCER_ENABLED: server_registry.register_server(SERVER_ID, CURRENT_SERVER_URL, max_concurrent_jobs=1) for i, peer_url in enumerate(PEER_SERVERS): if peer_url.strip(): peer_id = f"peer_server_{i}" server_registry.register_server(peer_id, peer_url.strip(), max_concurrent_jobs=1) server_registry.start_health_monitoring() translation_queue.start_processing() logger.info(f"Load balancer initialized with {len(PEER_SERVERS)} peer servers") @app.on_event("shutdown") async def shutdown_event(): """Cleanup load balancer on shutdown""" if LOAD_BALANCER_ENABLED: server_registry.stop_health_monitoring() translation_queue.stop_processing() logger.info("Load balancer shutdown complete") # API Endpoints @app.post("/api/translate/heavy") async def heavy_translate_enhanced(request: Request): """Enhanced heavy translation with load balancer support""" try: data = await request.json() # Extract parameters request_id = data.get("request_id") if not request_id: request_id = str(uuid.uuid4()) text = data.get("text") source_lang = data.get("source_lang") target_lang = data.get("target_lang") auto_charge = data.get("auto_charge", False) notification_url = data.get("notification_url") # Validate required fields if not all([text, source_lang, target_lang]): raise HTTPException(status_code=400, detail="Missing required fields: text, source_lang, target_lang") # Check if load balancer is enabled and this server is busy if LOAD_BALANCER_ENABLED: local_server = server_registry.servers.get(SERVER_ID) # If this server is at capacity, try to route to another server if (local_server and local_server.current_jobs >= local_server.max_concurrent_jobs): # Try to find an available peer server available_server = server_registry.get_available_server() if available_server and available_server.id != SERVER_ID: # Route to available peer server try: async with aiohttp.ClientSession() as session: async with session.post( f"{available_server.url}/api/translate/heavy", json=data, timeout=aiohttp.ClientTimeout(total=10) ) as response: if response.status == 200: result = await response.json() logger.info(f"Routed request {request_id} to server {available_server.id}") return result else: logger.warning(f"Failed to route to {available_server.id}: {response.status}") except Exception as e: logger.error(f"Error routing to {available_server.id}: {e}") # If routing failed, add to queue job_id = await translation_queue.add_job( text=text, source_lang=source_lang, target_lang=target_lang, request_id=request_id, auto_charge=auto_charge, notification_url=notification_url ) return { "success": True, "request_id": request_id, "job_id": job_id, "message": "Server busy, request queued for processing", "processing_mode": "queued" } # Process locally translations[request_id] = { "status": "processing", "progress": 0, "elapsed_time": 0, "message": "Translation in progress...", "server_id": SERVER_ID } # Mark server as busy if load balancer is enabled if LOAD_BALANCER_ENABLED: server_registry.mark_server_busy(SERVER_ID) # Start translation task asyncio.create_task( run_enhanced_translation_job( request_id, text, source_lang, target_lang, notification_url, auto_charge ) ) return { "success": True, "request_id": request_id, "message": "Translation started on current server", "processing_mode": "local", "server_id": SERVER_ID } except HTTPException: raise except Exception as e: logger.error(f"Error in heavy_translate_enhanced: {e}") raise HTTPException(status_code=500, detail=str(e)) @app.post("/api/webhook/job-completion") async def job_completion_webhook(data: dict): """Webhook endpoint for receiving job completion notifications from peer servers""" try: job_id = data.get('job_id') request_id = data.get('request_id') status = data.get('status') result = data.get('result') server_id = data.get('server_id') if not all([job_id, request_id, status]): raise HTTPException(status_code=400, detail="Missing required fields") # Update job status in queue async with translation_queue.lock: if job_id in translation_queue.active_jobs: job = translation_queue.active_jobs[job_id] if status == 'completed': job.status = JobStatus.COMPLETED job.completed_at = time.time() job.result = result # Move to completed jobs translation_queue.completed_jobs[job_id] = job del translation_queue.active_jobs[job_id] translation_queue.processed_jobs += 1 logger.info(f"Job {job_id} completed on server {server_id}") elif status == 'failed': job.status = JobStatus.FAILED job.completed_at = time.time() job.error = data.get('error', 'Unknown error') # Move to failed jobs translation_queue.failed_jobs[job_id] = job del translation_queue.active_jobs[job_id] translation_queue.failed_job_count += 1 logger.error(f"Job {job_id} failed on server {server_id}") # Free up the server if job.assigned_server: server_registry.mark_server_available(job.assigned_server) return { "success": True, "message": f"Job {job_id} status updated to {status}" } except Exception as e: logger.error(f"Error in job completion webhook: {e}") raise HTTPException(status_code=500, detail=str(e)) @app.get("/api/enhanced-status") async def enhanced_server_status(): """Get enhanced server status including load balancer information""" try: base_stats = { "server_id": SERVER_ID, "server_url": CURRENT_SERVER_URL, "load_balancer_enabled": LOAD_BALANCER_ENABLED, "model": MODEL_NAME, "device": str(translator.device) if translator else "unknown", "gpu_available": torch.cuda.is_available(), } if LOAD_BALANCER_ENABLED: server_stats = server_registry.get_server_stats() queue_stats = await translation_queue.get_queue_stats() base_stats.update({ "server_registry": server_stats, "queue_stats": queue_stats, "peer_servers": len(PEER_SERVERS) }) else: # Local server stats only base_stats.update({ "active_sessions": len(translator.translation_sessions) if translator else 0, "completed_translations": len(translator.completed_translations) if translator else 0, "total_requests": translator.total_requests if translator else 0 }) return { "success": True, **base_stats, "timestamp": datetime.now().isoformat() } except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @app.post("/api/translate/distributed") async def distributed_translate(request: TranslationRequest): """ Distributed translation endpoint - routes requests to available servers """ try: if not LOAD_BALANCER_ENABLED: # Fallback to local processing return await translate_text_api(request) # Check if this server is available for local processing local_server = server_registry.servers.get(SERVER_ID) if (local_server and local_server.status == ServerStatus.AVAILABLE and local_server.current_jobs < local_server.max_concurrent_jobs): # Process locally if available server_registry.mark_server_busy(SERVER_ID) try: result = perform_translation_internal( request.text, request.source_lang, request.target_lang ) return { "success": True, "processed_by": SERVER_ID, "processing_mode": "local", "translated_text": result['translated_text'], "processing_time": result['processing_time'], "chunks_count": result['chunks_count'], "from_cache": result.get('from_cache', False), "character_count": len(request.text), "translation_length": len(result['translated_text']) } finally: server_registry.mark_server_available(SERVER_ID) else: # Add to distributed queue job_id = await translation_queue.add_job( text=request.text, source_lang=request.source_lang, target_lang=request.target_lang, auto_charge=request.auto_charge ) return { "success": True, "processing_mode": "queued", "job_id": job_id, "message": "Request queued for processing on available server", "estimated_wait_time": await estimate_queue_wait_time() } except Exception as e: logger.error(f"Error in distributed translation: {e}") raise HTTPException(status_code=500, detail=str(e)) @app.post("/api/translate/queue") async def queue_translate(request: TranslationRequest): """ Force translation through the queue system """ try: job_id = await translation_queue.add_job( text=request.text, source_lang=request.source_lang, target_lang=request.target_lang, auto_charge=request.auto_charge ) return { "success": True, "job_id": job_id, "message": "Translation request added to queue", "estimated_wait_time": await estimate_queue_wait_time() } except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @app.get("/api/job/{job_id}/status") async def get_job_status(job_id: str): """Get status of a queued translation job""" try: status = await translation_queue.get_job_status(job_id) if not status: raise HTTPException(status_code=404, detail="Job not found") return { "success": True, **status } except HTTPException: raise except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @app.get("/api/request/{request_id}/status") async def get_request_status(request_id: str): """Get status by request_id (WordPress compatibility)""" try: status = await translation_queue.get_job_by_request_id(request_id) if not status: raise HTTPException(status_code=404, detail="Request not found") return { "success": True, **status } except HTTPException: raise except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @app.post("/api/job/{job_id}/cancel") async def cancel_job(job_id: str): """Cancel a queued translation job""" try: cancelled = await translation_queue.cancel_job(job_id) if cancelled: return { "success": True, "message": f"Job {job_id} cancelled successfully" } else: raise HTTPException(status_code=404, detail="Job not found or cannot be cancelled") except HTTPException: raise except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @app.get("/api/load-balancer/status") async def load_balancer_status(): """Get load balancer status""" try: server_stats = server_registry.get_server_stats() queue_stats = await translation_queue.get_queue_stats() return { "success": True, "load_balancer_enabled": LOAD_BALANCER_ENABLED, "server_registry": server_stats, "queue_stats": queue_stats, "total_servers": len(server_registry.servers), "available_servers": len([s for s in server_registry.servers.values() if s.status == ServerStatus.AVAILABLE]) } except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @app.post("/api/load-balancer/register") async def register_server(server_data: dict): """Register a new server with the load balancer""" try: server_id = server_data.get("server_id") url = server_data.get("url") max_concurrent_jobs = server_data.get("max_concurrent_jobs", 1) if not all([server_id, url]): raise HTTPException(status_code=400, detail="Missing server_id or url") server_registry.register_server(server_id, url, max_concurrent_jobs) return { "success": True, "message": f"Server {server_id} registered successfully" } except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @app.post("/api/load-balancer/unregister") async def unregister_server(server_data: dict): """Unregister a server from the load balancer""" try: server_id = server_data.get("server_id") if not server_id: raise HTTPException(status_code=400, detail="Missing server_id") server_registry.unregister_server(server_id) return { "success": True, "message": f"Server {server_id} unregistered successfully" } except Exception as e: raise HTTPException(status_code=500, detail=str(e)) # Helper functions for internal translation processing def perform_translation_internal(text: str, source_lang: str, target_lang: str) -> Dict[str, Any]: """Internal translation function - replace with your actual implementation""" # This is a placeholder - replace with your actual translation logic start_time = time.time() # Simulate translation processing time.sleep(0.1) translated_text = f"[TRANSLATED] {text} [{source_lang}->{target_lang}]" return { "translated_text": translated_text, "processing_time": time.time() - start_time, "chunks_count": 1, "from_cache": False } async def translate_text_api(request: TranslationRequest): """Fallback translation API - replace with your actual implementation""" try: result = perform_translation_internal( request.text, request.source_lang, request.target_lang ) return { "success": True, "translated_text": result['translated_text'], "processing_time": result['processing_time'], "chunks_count": result['chunks_count'], "from_cache": result.get('from_cache', False) } except Exception as e: raise HTTPException(status_code=500, detail=str(e)) if __name__ == "__main__": import uvicorn # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) # Start the server uvicorn.run( app, host="0.0.0.0", port=7860, log_level="info" )