import os import time import asyncio import uuid from datetime import datetime from typing import Dict, List, Optional from collections import deque from fastapi import FastAPI, HTTPException, Request from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel import requests import threading # Configuration ORCHESTRATOR_VERSION = "2.0.1" WORKER_HEALTH_CHECK_INTERVAL = 30 WORKER_TIMEOUT = 120 # Models class TranslationRequest(BaseModel): request_id: str text: str source_lang: str target_lang: str auto_charge: bool = False notification_url: Optional[str] = None wordpress_user_id: Optional[int] = None class WorkerConfig(BaseModel): url: str name: str priority: int = 1 max_concurrent: int = 3 class WorkerStatus: def __init__(self, config: WorkerConfig): self.config = config self.available = False self.active_jobs = 0 self.last_health_check = 0 self.total_completed = 0 self.total_failed = 0 self.avg_response_time = 0.0 class TranslationOrchestrator: def __init__(self): self.workers: Dict[str, WorkerStatus] = {} self.job_queue = deque() self.active_jobs: Dict[str, Dict] = {} self.completed_jobs: Dict[str, Dict] = {} self.worker_lock = threading.Lock() self.job_lock = threading.Lock() self.load_worker_configs() self.start_health_checker() self.start_job_processor() def load_worker_configs(self): # حالت ۱: از env بخونه (روش اصلی) worker_index = 1 workers_loaded = 0 print("🔧 Loading worker configurations...") while True: url_key = f"WORKER_{worker_index}_URL" name_key = f"WORKER_{worker_index}_NAME" url = os.getenv(url_key) if not url: break name = os.getenv(name_key, f"Worker {worker_index}") priority = int(os.getenv(f"WORKER_{worker_index}_PRIORITY", "1")) max_concurrent = int(os.getenv(f"WORKER_{worker_index}_MAX_CONCURRENT", "3")) worker_id = f"worker_{worker_index}" config = WorkerConfig( url=url.rstrip('/'), name=name, priority=priority, max_concurrent=max_concurrent ) self.workers[worker_id] = WorkerStatus(config) workers_loaded += 1 print(f"✅ Loaded worker: {name} at {url}") worker_index += 1 # حالت ۲: اگه هیچ وورکری از env نیومد، دستی اضافه کن if workers_loaded == 0: print("⚠️ No workers in env, using hardcoded fallback") fallback_workers = [ WorkerConfig(url="https://danicor-w1.hf.space", name="Worker Local 1", priority=1, max_concurrent=1), ] for i, cfg in enumerate(fallback_workers, start=1): self.workers[f"worker_{i}"] = WorkerStatus(cfg) print(f"✅ Hardcoded worker: {cfg.name} at {cfg.url}") def start_health_checker(self): def health_check_loop(): while True: self.check_all_workers_health() time.sleep(WORKER_HEALTH_CHECK_INTERVAL) thread = threading.Thread(target=health_check_loop, daemon=True) thread.start() print("🔍 Health checker started") def check_all_workers_health(self): with self.worker_lock: for worker_id, worker in self.workers.items(): try: health_url = f"{worker.config.url}/api/health" response = requests.get(health_url, timeout=10) if response.status_code == 200: data = response.json() was_available = worker.available worker.available = data.get('status') == 'healthy' worker.last_health_check = time.time() if worker.available and not was_available: print(f"✅ {worker.config.name} is now available") elif not worker.available and was_available: print(f"❌ {worker.config.name} became unavailable") else: worker.available = False print(f"⚠ {worker.config.name}: HTTP {response.status_code}") except Exception as e: worker.available = False print(f"🚫 {worker.config.name}: {str(e)}") def get_available_worker(self) -> Optional[str]: with self.worker_lock: available_workers = [ (worker_id, worker) for worker_id, worker in self.workers.items() if worker.available and worker.active_jobs < worker.config.max_concurrent ] if not available_workers: return None available_workers.sort( key=lambda x: (-x[1].config.priority, x[1].active_jobs) ) return available_workers[0][0] def start_job_processor(self): def process_queue_loop(): while True: self.process_job_queue() time.sleep(2) thread = threading.Thread(target=process_queue_loop, daemon=True) thread.start() print("🔄 Job processor started") def add_job_to_queue(self, job_data: Dict): with self.job_lock: self.job_queue.append(job_data) print(f"📥 Job {job_data['request_id']} queued. Queue size: {len(self.job_queue)}") def process_job_queue(self): if not self.job_queue: return with self.job_lock: if not self.job_queue: return worker_id = self.get_available_worker() if not worker_id: return job_data = self.job_queue.popleft() self.assign_job_to_worker(worker_id, job_data) def assign_job_to_worker(self, worker_id: str, job_data: Dict): worker = self.workers[worker_id] request_id = job_data['request_id'] print(f"🚀 Assigning job {request_id} to {worker.config.name}") with self.worker_lock: worker.active_jobs += 1 self.active_jobs[request_id] = { 'worker_id': worker_id, 'job_data': job_data, 'start_time': time.time(), 'status': 'processing' } thread = threading.Thread( target=self.send_to_worker, args=(worker_id, job_data), daemon=True ) thread.start() def send_to_worker(self, worker_id: str, job_data: Dict): worker = self.workers[worker_id] request_id = job_data['request_id'] try: worker_request = { 'request_id': request_id, 'text': job_data['text'], 'source_lang': job_data['source_lang'], 'target_lang': job_data['target_lang'], 'auto_charge': job_data.get('auto_charge', False), 'notification_url': None } response = requests.post( f"{worker.config.url}/api/translate/heavy", json=worker_request, timeout=WORKER_TIMEOUT ) if response.status_code == 200: print(f"✅ Job {request_id} sent to {worker.config.name}") self.monitor_worker_job(worker_id, request_id) else: self.handle_worker_failure(worker_id, request_id, f"HTTP {response.status_code}") except Exception as e: self.handle_worker_failure(worker_id, request_id, str(e)) def monitor_worker_job(self, worker_id: str, request_id: str): worker = self.workers[worker_id] max_checks = 60 check_count = 0 while check_count < max_checks: time.sleep(5) check_count += 1 try: response = requests.post( f"{worker.config.url}/api/check-translation-status", json={'request_id': request_id}, timeout=15 ) if response.status_code == 200: data = response.json() if data.get('success') and data.get('status') == 'completed': self.handle_job_completion(worker_id, request_id, data) return elif data.get('status') == 'failed': self.handle_worker_failure(worker_id, request_id, "Worker reported failure") return except Exception as e: print(f"⚠ Error checking job {request_id}: {str(e)}") self.handle_worker_failure(worker_id, request_id, "Timeout waiting for completion") def handle_job_completion(self, worker_id: str, request_id: str, worker_response: Dict): worker = self.workers[worker_id] print(f"🎉 Job {request_id} completed by {worker.config.name}") with self.worker_lock: worker.active_jobs -= 1 worker.total_completed += 1 job_info = self.active_jobs.get(request_id, {}) job_data = job_info.get('job_data', {}) processing_time = time.time() - job_info.get('start_time', time.time()) completion_data = { 'request_id': request_id, 'status': 'completed', 'translated_text': worker_response.get('translated_text', ''), 'processing_time': processing_time, 'character_count': worker_response.get('character_count', len(job_data.get('text', ''))), 'translation_length': worker_response.get('translation_length', 0), 'from_cache': worker_response.get('from_cache', False), 'worker_name': worker.config.name, 'completed_at': datetime.now().isoformat(), 'success': True # اضافه کردن این خط } self.completed_jobs[request_id] = completion_data if request_id in self.active_jobs: del self.active_jobs[request_id] notification_url = job_data.get('notification_url') if notification_url: # اضافه کردن لاگ برای دیباگ print(f"📤 Sending notification to WordPress: {notification_url}") self.notify_wordpress(notification_url, completion_data) else: print(f"⚠️ No notification URL for {request_id}") def handle_worker_failure(self, worker_id: str, request_id: str, error_message: str): worker = self.workers[worker_id] print(f"💥 Job {request_id} failed on {worker.config.name}: {error_message}") with self.worker_lock: worker.active_jobs -= 1 worker.total_failed += 1 job_info = self.active_jobs.get(request_id, {}) job_data = job_info.get('job_data', {}) if job_info.get('retry_count', 0) < 2: print(f"🔄 Retrying job {request_id} (attempt {job_info.get('retry_count', 0) + 1})") job_data['retry_count'] = job_info.get('retry_count', 0) + 1 self.add_job_to_queue(job_data) if request_id in self.active_jobs: del self.active_jobs[request_id] else: failure_data = { 'request_id': request_id, 'status': 'failed', 'error_message': error_message, 'failed_at': datetime.now().isoformat() } self.completed_jobs[request_id] = failure_data if request_id in self.active_jobs: del self.active_jobs[request_id] notification_url = job_data.get('notification_url') if notification_url: self.notify_wordpress(notification_url, failure_data) def notify_wordpress(self, notification_url: str, data: Dict): try: print(f"🔔 Notifying WordPress for request {data['request_id']}") response = requests.post( notification_url, json=data, timeout=30, verify=False ) if response.status_code == 200: print(f"✅ WordPress notified successfully for {data['request_id']}") try: response_data = response.json() print(f"📝 WordPress response: {response_data}") except: print(f"📝 WordPress response: {response.text}") else: print(f"❌ WordPress notification failed: HTTP {response.status_code}") print(f"📄 Response: {response.text}") except Exception as e: print(f"💥 WordPress notification error: {str(e)}") # Initialize FastAPI app app = FastAPI( title="Translation Orchestrator", description="Main orchestrator for distributed translation system", version=ORCHESTRATOR_VERSION ) app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) orchestrator = TranslationOrchestrator() @app.get("/") async def root(): available_workers = sum(1 for w in orchestrator.workers.values() if w.available) total_workers = len(orchestrator.workers) return { "service": "Translation Orchestrator", "version": ORCHESTRATOR_VERSION, "status": "running", "workers": { "total": total_workers, "available": available_workers, "unavailable": total_workers - available_workers }, "queue": { "active_jobs": len(orchestrator.active_jobs), "queued_jobs": len(orchestrator.job_queue), "completed_jobs": len(orchestrator.completed_jobs) } } @app.get("/api/health") async def health_check(): available_workers = sum(1 for w in orchestrator.workers.values() if w.available) return { "status": "healthy" if available_workers > 0 else "degraded", "timestamp": datetime.now().isoformat(), "workers": { "total": len(orchestrator.workers), "available": available_workers }, "queue_stats": { "active": len(orchestrator.active_jobs), "queued": len(orchestrator.job_queue) } } @app.post("/api/translate") async def submit_translation(request: TranslationRequest): if not any(w.available for w in orchestrator.workers.values()): raise HTTPException( status_code=503, detail="No translation workers available. Please try again later." ) job_data = { 'request_id': request.request_id, 'text': request.text, 'source_lang': request.source_lang, 'target_lang': request.target_lang, 'auto_charge': request.auto_charge, 'notification_url': request.notification_url, 'wordpress_user_id': request.wordpress_user_id, 'retry_count': 0 } orchestrator.add_job_to_queue(job_data) return { "success": True, "request_id": request.request_id, "status": "queued", "message": "Translation request queued successfully", "queue_position": len(orchestrator.job_queue), "estimated_wait_time": len(orchestrator.job_queue) * 10 } @app.get("/api/status/{request_id}") async def check_status(request_id: str): if request_id in orchestrator.completed_jobs: return { "success": True, **orchestrator.completed_jobs[request_id] } if request_id in orchestrator.active_jobs: job_info = orchestrator.active_jobs[request_id] elapsed_time = time.time() - job_info['start_time'] return { "success": True, "request_id": request_id, "status": "processing", "worker_id": job_info['worker_id'], "elapsed_time": elapsed_time, "message": "Translation in progress" } for job in orchestrator.job_queue: if job['request_id'] == request_id: return { "success": True, "request_id": request_id, "status": "queued", "message": "Translation request is queued" } return { "success": False, "request_id": request_id, "status": "not_found", "message": "Translation request not found" } # اضافه کردن endpoint جدید برای بررسی وضعیت @app.post("/api/get-translation-result") async def get_translation_result(request: dict): request_id = request.get('request_id') if not request_id: raise HTTPException(status_code=400, detail="Request ID is required") # بررسی در jobs تکمیل شده if request_id in orchestrator.completed_jobs: result = orchestrator.completed_jobs[request_id] return { "success": True, "status": "completed", **result } # بررسی در jobs فعال if request_id in orchestrator.active_jobs: job_info = orchestrator.active_jobs[request_id] elapsed_time = time.time() - job_info['start_time'] return { "success": True, "status": "processing", "request_id": request_id, "elapsed_time": elapsed_time, "message": "Translation in progress" } # بررسی در صف for job in orchestrator.job_queue: if job['request_id'] == request_id: return { "success": True, "status": "queued", "request_id": request_id, "queue_position": list(orchestrator.job_queue).index(job) + 1, "message": "Translation queued" } return { "success": False, "status": "not_found", "message": "Translation request not found" } @app.get("/api/workers") async def list_workers(): workers_info = [] for worker_id, worker in orchestrator.workers.items(): workers_info.append({ "id": worker_id, "name": worker.config.name, "url": worker.config.url, "available": worker.available, "active_jobs": worker.active_jobs, "max_concurrent": worker.config.max_concurrent, "priority": worker.config.priority, "total_completed": worker.total_completed, "total_failed": worker.total_failed, "last_health_check": worker.last_health_check }) return { "success": True, "workers": workers_info } if __name__ == "__main__": import uvicorn port = int(os.getenv("PORT", 7860)) print(f"🚀 Translation Orchestrator v{ORCHESTRATOR_VERSION}") print(f"📡 Starting on port {port}") uvicorn.run( app, host="0.0.0.0", port=port, log_level="info" )