|
|
import os |
|
|
import time |
|
|
import asyncio |
|
|
import uuid |
|
|
from datetime import datetime |
|
|
from typing import Dict, List, Optional, Literal |
|
|
from collections import deque |
|
|
from fastapi import FastAPI, HTTPException, Request |
|
|
from fastapi.middleware.cors import CORSMiddleware |
|
|
from pydantic import BaseModel |
|
|
import requests |
|
|
import threading |
|
|
|
|
|
|
|
|
ORCHESTRATOR_VERSION = "2.0.1" |
|
|
WORKER_HEALTH_CHECK_INTERVAL = 30 |
|
|
WORKER_TIMEOUT = 120 |
|
|
|
|
|
|
|
|
class TranslationRequest(BaseModel): |
|
|
request_id: str |
|
|
text: str |
|
|
source_lang: str |
|
|
target_lang: str |
|
|
auto_charge: bool = False |
|
|
notification_url: Optional[str] = None |
|
|
wordpress_user_id: Optional[int] = None |
|
|
|
|
|
class WorkerConfig(BaseModel): |
|
|
url: str |
|
|
name: str |
|
|
priority: int = 1 |
|
|
max_concurrent: int = 3 |
|
|
|
|
|
class WorkerStatus: |
|
|
def __init__(self, config: WorkerConfig): |
|
|
self.config = config |
|
|
self.available = False |
|
|
self.active_jobs = 0 |
|
|
self.last_health_check = 0 |
|
|
self.total_completed = 0 |
|
|
self.total_failed = 0 |
|
|
self.avg_response_time = 0.0 |
|
|
|
|
|
class TranslationOrchestrator: |
|
|
def __init__(self): |
|
|
self.workers: Dict[str, WorkerStatus] = {} |
|
|
|
|
|
self.high_priority_queue = deque() |
|
|
self.normal_priority_queue = deque() |
|
|
self.active_jobs: Dict[str, Dict] = {} |
|
|
self.completed_jobs: Dict[str, Dict] = {} |
|
|
self.worker_lock = threading.Lock() |
|
|
self.job_lock = threading.Lock() |
|
|
|
|
|
self.load_worker_configs() |
|
|
self.start_health_checker() |
|
|
self.start_job_processor() |
|
|
|
|
|
def load_worker_configs(self): |
|
|
|
|
|
worker_index = 1 |
|
|
workers_loaded = 0 |
|
|
|
|
|
print("🔧 Loading worker configurations...") |
|
|
|
|
|
while True: |
|
|
url_key = f"WORKER_{worker_index}_URL" |
|
|
name_key = f"WORKER_{worker_index}_NAME" |
|
|
|
|
|
url = os.getenv(url_key) |
|
|
if not url: |
|
|
break |
|
|
|
|
|
name = os.getenv(name_key, f"Worker {worker_index}") |
|
|
priority = int(os.getenv(f"WORKER_{worker_index}_PRIORITY", "1")) |
|
|
max_concurrent = int(os.getenv(f"WORKER_{worker_index}_MAX_CONCURRENT", "1")) |
|
|
|
|
|
worker_id = f"worker_{worker_index}" |
|
|
config = WorkerConfig( |
|
|
url=url.rstrip('/'), |
|
|
name=name, |
|
|
priority=priority, |
|
|
max_concurrent=max_concurrent |
|
|
) |
|
|
|
|
|
self.workers[worker_id] = WorkerStatus(config) |
|
|
workers_loaded += 1 |
|
|
print(f"✅ Loaded worker: {name} at {url}") |
|
|
worker_index += 1 |
|
|
|
|
|
|
|
|
if workers_loaded == 0: |
|
|
print("⚠️ No workers in env, using hardcoded fallback") |
|
|
fallback_workers = [ |
|
|
WorkerConfig(url="https://danicor-w1.hf.space", name="Worker 1", priority=1, max_concurrent=1), |
|
|
WorkerConfig(url="https://danicor-w2.hf.space", name="Worker 2", priority=1, max_concurrent=1), |
|
|
WorkerConfig(url="https://danicor-w3.hf.space", name="Worker 3", priority=1, max_concurrent=1), |
|
|
] |
|
|
for i, cfg in enumerate(fallback_workers, start=1): |
|
|
self.workers[f"worker_{i}"] = WorkerStatus(cfg) |
|
|
print(f"✅ Hardcoded worker: {cfg.name} at {cfg.url}") |
|
|
|
|
|
|
|
|
def start_health_checker(self): |
|
|
def health_check_loop(): |
|
|
while True: |
|
|
self.check_all_workers_health() |
|
|
time.sleep(WORKER_HEALTH_CHECK_INTERVAL) |
|
|
|
|
|
thread = threading.Thread(target=health_check_loop, daemon=True) |
|
|
thread.start() |
|
|
print("🔍 Health checker started") |
|
|
|
|
|
def check_all_workers_health(self): |
|
|
with self.worker_lock: |
|
|
for worker_id, worker in self.workers.items(): |
|
|
try: |
|
|
health_url = f"{worker.config.url}/api/health" |
|
|
response = requests.get(health_url, timeout=10) |
|
|
|
|
|
if response.status_code == 200: |
|
|
data = response.json() |
|
|
was_available = worker.available |
|
|
worker.available = data.get('status') == 'healthy' |
|
|
worker.last_health_check = time.time() |
|
|
|
|
|
if worker.available and not was_available: |
|
|
print(f"✅ {worker.config.name} is now available") |
|
|
elif not worker.available and was_available: |
|
|
print(f"❌ {worker.config.name} became unavailable") |
|
|
|
|
|
else: |
|
|
worker.available = False |
|
|
print(f"⚠ {worker.config.name}: HTTP {response.status_code}") |
|
|
|
|
|
except Exception as e: |
|
|
worker.available = False |
|
|
print(f"🚫 {worker.config.name}: {str(e)}") |
|
|
|
|
|
def get_available_worker(self) -> Optional[str]: |
|
|
with self.worker_lock: |
|
|
available_workers = [ |
|
|
(worker_id, worker) |
|
|
for worker_id, worker in self.workers.items() |
|
|
if worker.available and worker.active_jobs < worker.config.max_concurrent |
|
|
] |
|
|
|
|
|
if not available_workers: |
|
|
return None |
|
|
|
|
|
available_workers.sort( |
|
|
key=lambda x: (-x[1].config.priority, x[1].active_jobs) |
|
|
) |
|
|
|
|
|
return available_workers[0][0] |
|
|
|
|
|
def start_job_processor(self): |
|
|
def process_queue_loop(): |
|
|
while True: |
|
|
self.process_job_queue() |
|
|
time.sleep(2) |
|
|
|
|
|
thread = threading.Thread(target=process_queue_loop, daemon=True) |
|
|
thread.start() |
|
|
print("🔄 Job processor started") |
|
|
|
|
|
def add_job_to_queue(self, job_data: Dict): |
|
|
with self.job_lock: |
|
|
text_length = len(job_data.get('text', '')) |
|
|
|
|
|
|
|
|
if text_length < 1000: |
|
|
job_data['priority'] = 'high' |
|
|
self.high_priority_queue.append(job_data) |
|
|
queue_type = "HIGH priority" |
|
|
else: |
|
|
job_data['priority'] = 'normal' |
|
|
self.normal_priority_queue.append(job_data) |
|
|
queue_type = "NORMAL priority" |
|
|
|
|
|
total_queue_size = len(self.high_priority_queue) + len(self.normal_priority_queue) |
|
|
print(f"📥 Job {job_data['request_id']} ({text_length} chars) queued in {queue_type}. " |
|
|
f"Queue sizes - High: {len(self.high_priority_queue)}, Normal: {len(self.normal_priority_queue)}") |
|
|
|
|
|
def process_job_queue(self): |
|
|
"""پردازش صف با اولویت به متنهای کوتاه""" |
|
|
with self.job_lock: |
|
|
|
|
|
if self.high_priority_queue: |
|
|
worker_id = self.get_available_worker() |
|
|
if worker_id: |
|
|
job_data = self.high_priority_queue.popleft() |
|
|
print(f"⚡ Processing HIGH priority job {job_data['request_id']}") |
|
|
threading.Thread( |
|
|
target=self.assign_job_to_worker, |
|
|
args=(worker_id, job_data), |
|
|
daemon=True |
|
|
).start() |
|
|
return |
|
|
|
|
|
|
|
|
if self.normal_priority_queue: |
|
|
worker_id = self.get_available_worker() |
|
|
if worker_id: |
|
|
job_data = self.normal_priority_queue.popleft() |
|
|
print(f"📄 Processing NORMAL priority job {job_data['request_id']}") |
|
|
threading.Thread( |
|
|
target=self.assign_job_to_worker, |
|
|
args=(worker_id, job_data), |
|
|
daemon=True |
|
|
).start() |
|
|
return |
|
|
|
|
|
|
|
|
def assign_job_to_worker(self, worker_id: str, job_data: Dict): |
|
|
worker = self.workers[worker_id] |
|
|
request_id = job_data['request_id'] |
|
|
|
|
|
print(f"🚀 Assigning job {request_id} to {worker.config.name}") |
|
|
|
|
|
with self.worker_lock: |
|
|
worker.active_jobs += 1 |
|
|
|
|
|
self.active_jobs[request_id] = { |
|
|
'worker_id': worker_id, |
|
|
'job_data': job_data, |
|
|
'start_time': time.time(), |
|
|
'status': 'processing' |
|
|
} |
|
|
|
|
|
thread = threading.Thread( |
|
|
target=self.send_to_worker, |
|
|
args=(worker_id, job_data), |
|
|
daemon=True |
|
|
) |
|
|
thread.start() |
|
|
|
|
|
def send_to_worker(self, worker_id: str, job_data: Dict): |
|
|
worker = self.workers[worker_id] |
|
|
request_id = job_data['request_id'] |
|
|
|
|
|
try: |
|
|
worker_request = { |
|
|
'request_id': request_id, |
|
|
'text': job_data['text'], |
|
|
'source_lang': job_data['source_lang'], |
|
|
'target_lang': job_data['target_lang'], |
|
|
'auto_charge': False, |
|
|
'notification_url': None |
|
|
} |
|
|
|
|
|
response = requests.post( |
|
|
f"{worker.config.url}/api/translate/heavy", |
|
|
json=worker_request, |
|
|
timeout=WORKER_TIMEOUT |
|
|
) |
|
|
|
|
|
if response.status_code == 200: |
|
|
print(f"✅ Job {request_id} sent to {worker.config.name}") |
|
|
self.monitor_worker_job(worker_id, request_id) |
|
|
else: |
|
|
self.handle_worker_failure(worker_id, request_id, f"HTTP {response.status_code}") |
|
|
|
|
|
except Exception as e: |
|
|
self.handle_worker_failure(worker_id, request_id, str(e)) |
|
|
|
|
|
def monitor_worker_job(self, worker_id: str, request_id: str): |
|
|
worker = self.workers[worker_id] |
|
|
max_checks = 60 |
|
|
check_count = 0 |
|
|
|
|
|
while check_count < max_checks: |
|
|
time.sleep(5) |
|
|
check_count += 1 |
|
|
|
|
|
try: |
|
|
response = requests.post( |
|
|
f"{worker.config.url}/api/check-translation-status", |
|
|
json={'request_id': request_id}, |
|
|
timeout=15 |
|
|
) |
|
|
|
|
|
if response.status_code == 200: |
|
|
data = response.json() |
|
|
|
|
|
if data.get('success') and data.get('status') == 'completed': |
|
|
self.handle_job_completion(worker_id, request_id, data) |
|
|
return |
|
|
elif data.get('status') == 'failed': |
|
|
self.handle_worker_failure(worker_id, request_id, "Worker reported failure") |
|
|
return |
|
|
|
|
|
except Exception as e: |
|
|
print(f"⚠ Error checking job {request_id}: {str(e)}") |
|
|
|
|
|
self.handle_worker_failure(worker_id, request_id, "Timeout waiting for completion") |
|
|
|
|
|
def handle_job_completion(self, worker_id: str, request_id: str, worker_response: Dict): |
|
|
worker = self.workers[worker_id] |
|
|
|
|
|
print(f"🎉 Job {request_id} completed by {worker.config.name}") |
|
|
|
|
|
with self.worker_lock: |
|
|
worker.active_jobs -= 1 |
|
|
worker.total_completed += 1 |
|
|
|
|
|
job_info = self.active_jobs.get(request_id, {}) |
|
|
job_data = job_info.get('job_data', {}) |
|
|
|
|
|
processing_time = time.time() - job_info.get('start_time', time.time()) |
|
|
|
|
|
|
|
|
completion_data = { |
|
|
'request_id': request_id, |
|
|
'status': 'completed', |
|
|
'translated_text': worker_response.get('translated_text'), |
|
|
'processing_time': processing_time, |
|
|
'character_count': worker_response.get('character_count', len(job_data.get('text', ''))), |
|
|
'translation_length': worker_response.get('translation_length', 0), |
|
|
'from_cache': worker_response.get('from_cache', False), |
|
|
'worker_name': worker.config.name, |
|
|
'completed_at': datetime.now().isoformat() |
|
|
} |
|
|
|
|
|
self.completed_jobs[request_id] = completion_data |
|
|
|
|
|
if request_id in self.active_jobs: |
|
|
del self.active_jobs[request_id] |
|
|
|
|
|
|
|
|
print(f"✅ Result stored for {request_id}, waiting for WordPress to fetch") |
|
|
|
|
|
def handle_worker_failure(self, worker_id: str, request_id: str, error_message: str): |
|
|
worker = self.workers[worker_id] |
|
|
|
|
|
print(f"💥 Job {request_id} failed on {worker.config.name}: {error_message}") |
|
|
|
|
|
with self.worker_lock: |
|
|
worker.active_jobs -= 1 |
|
|
worker.total_failed += 1 |
|
|
|
|
|
job_info = self.active_jobs.get(request_id, {}) |
|
|
job_data = job_info.get('job_data', {}) |
|
|
|
|
|
if job_info.get('retry_count', 0) < 2: |
|
|
print(f"🔄 Retrying job {request_id} (attempt {job_info.get('retry_count', 0) + 1})") |
|
|
|
|
|
job_data['retry_count'] = job_info.get('retry_count', 0) + 1 |
|
|
self.add_job_to_queue(job_data) |
|
|
|
|
|
if request_id in self.active_jobs: |
|
|
del self.active_jobs[request_id] |
|
|
else: |
|
|
failure_data = { |
|
|
'request_id': request_id, |
|
|
'status': 'failed', |
|
|
'error_message': error_message, |
|
|
'failed_at': datetime.now().isoformat() |
|
|
} |
|
|
|
|
|
self.completed_jobs[request_id] = failure_data |
|
|
|
|
|
if request_id in self.active_jobs: |
|
|
del self.active_jobs[request_id] |
|
|
|
|
|
notification_url = job_data.get('notification_url') |
|
|
if notification_url: |
|
|
self.notify_wordpress(notification_url, failure_data) |
|
|
|
|
|
def notify_wordpress(self, notification_url: str, data: Dict): |
|
|
"""Send notification to WordPress with improved error handling""" |
|
|
try: |
|
|
|
|
|
print(f"📤 Attempting to notify WordPress at: {notification_url}") |
|
|
|
|
|
|
|
|
if notification_url.startswith('http://localhost'): |
|
|
|
|
|
wordpress_url = os.getenv('WORDPRESS_BASE_URL', 'https://your-actual-site.com') |
|
|
notification_url = notification_url.replace('http://localhost', wordpress_url) |
|
|
|
|
|
|
|
|
notification_url = notification_url.replace('http://', 'https://') |
|
|
|
|
|
headers = { |
|
|
'Content-Type': 'application/json', |
|
|
'User-Agent': 'MLT-Orchestrator/1.0' |
|
|
} |
|
|
|
|
|
print(f"🔗 Final notification URL: {notification_url}") |
|
|
print(f"📦 Payload keys: {list(data.keys())}") |
|
|
|
|
|
response = requests.post( |
|
|
notification_url, |
|
|
json=data, |
|
|
headers=headers, |
|
|
timeout=30, |
|
|
verify=True |
|
|
) |
|
|
|
|
|
print(f"📡 WordPress notification response: {response.status_code}") |
|
|
|
|
|
if response.status_code == 200: |
|
|
print(f"✅ WordPress notified successfully for {data['request_id']}") |
|
|
return True |
|
|
else: |
|
|
print(f"❌ WordPress notification failed: HTTP {response.status_code}") |
|
|
print(f"📄 Response: {response.text}") |
|
|
return False |
|
|
|
|
|
except requests.exceptions.Timeout: |
|
|
print(f"⏰ WordPress notification timeout for {data['request_id']}") |
|
|
return False |
|
|
except requests.exceptions.ConnectionError as e: |
|
|
print(f"🔌 WordPress connection error for {data['request_id']}: {str(e)}") |
|
|
return False |
|
|
except Exception as e: |
|
|
print(f"💥 Notification error for {data['request_id']}: {str(e)}") |
|
|
return False |
|
|
|
|
|
|
|
|
app = FastAPI( |
|
|
title="Translation Orchestrator", |
|
|
description="Main orchestrator for distributed translation system", |
|
|
version=ORCHESTRATOR_VERSION |
|
|
) |
|
|
|
|
|
app.add_middleware( |
|
|
CORSMiddleware, |
|
|
allow_origins=["*"], |
|
|
allow_credentials=True, |
|
|
allow_methods=["*"], |
|
|
allow_headers=["*"], |
|
|
) |
|
|
|
|
|
orchestrator = TranslationOrchestrator() |
|
|
|
|
|
@app.get("/") |
|
|
async def root(): |
|
|
available_workers = sum(1 for w in orchestrator.workers.values() if w.available) |
|
|
total_workers = len(orchestrator.workers) |
|
|
|
|
|
return { |
|
|
"service": "Translation Orchestrator", |
|
|
"version": ORCHESTRATOR_VERSION, |
|
|
"status": "running", |
|
|
"workers": { |
|
|
"total": total_workers, |
|
|
"available": available_workers, |
|
|
"unavailable": total_workers - available_workers |
|
|
}, |
|
|
"queue": { |
|
|
"active_jobs": len(orchestrator.active_jobs), |
|
|
"high_priority_queued": len(orchestrator.high_priority_queue), |
|
|
"normal_priority_queued": len(orchestrator.normal_priority_queue), |
|
|
"total_queued": len(orchestrator.high_priority_queue) + len(orchestrator.normal_priority_queue), |
|
|
"completed_jobs": len(orchestrator.completed_jobs) |
|
|
} |
|
|
} |
|
|
|
|
|
@app.get("/api/health") |
|
|
async def health_check(): |
|
|
available_workers = sum(1 for w in orchestrator.workers.values() if w.available) |
|
|
|
|
|
return { |
|
|
"status": "healthy" if available_workers > 0 else "degraded", |
|
|
"timestamp": datetime.now().isoformat(), |
|
|
"workers": { |
|
|
"total": len(orchestrator.workers), |
|
|
"available": available_workers |
|
|
}, |
|
|
"queue_stats": { |
|
|
"active": len(orchestrator.active_jobs), |
|
|
"high_priority": len(orchestrator.high_priority_queue), |
|
|
"normal_priority": len(orchestrator.normal_priority_queue), |
|
|
"total_queued": len(orchestrator.high_priority_queue) + len(orchestrator.normal_priority_queue) |
|
|
} |
|
|
} |
|
|
|
|
|
@app.post("/api/translate") |
|
|
async def submit_translation(request: TranslationRequest): |
|
|
if not any(w.available for w in orchestrator.workers.values()): |
|
|
raise HTTPException( |
|
|
status_code=503, |
|
|
detail="No translation workers available. Please try again later." |
|
|
) |
|
|
|
|
|
job_data = { |
|
|
'request_id': request.request_id, |
|
|
'text': request.text, |
|
|
'source_lang': request.source_lang, |
|
|
'target_lang': request.target_lang, |
|
|
'auto_charge': request.auto_charge, |
|
|
'notification_url': request.notification_url, |
|
|
'wordpress_user_id': request.wordpress_user_id, |
|
|
'retry_count': 0 |
|
|
} |
|
|
|
|
|
orchestrator.add_job_to_queue(job_data) |
|
|
|
|
|
return { |
|
|
"success": True, |
|
|
"request_id": request.request_id, |
|
|
"status": "queued", |
|
|
"message": "Translation request queued successfully", |
|
|
"queue_position": len(orchestrator.job_queue), |
|
|
"estimated_wait_time": len(orchestrator.job_queue) * 10 |
|
|
} |
|
|
|
|
|
@app.get("/api/status/{request_id}") |
|
|
async def check_status(request_id: str): |
|
|
if request_id in orchestrator.completed_jobs: |
|
|
return { |
|
|
"success": True, |
|
|
**orchestrator.completed_jobs[request_id] |
|
|
} |
|
|
|
|
|
if request_id in orchestrator.active_jobs: |
|
|
job_info = orchestrator.active_jobs[request_id] |
|
|
elapsed_time = time.time() - job_info['start_time'] |
|
|
|
|
|
return { |
|
|
"success": True, |
|
|
"request_id": request_id, |
|
|
"status": "processing", |
|
|
"worker_id": job_info['worker_id'], |
|
|
"elapsed_time": elapsed_time, |
|
|
"message": "Translation in progress" |
|
|
} |
|
|
|
|
|
for job in orchestrator.job_queue: |
|
|
if job['request_id'] == request_id: |
|
|
return { |
|
|
"success": True, |
|
|
"request_id": request_id, |
|
|
"status": "queued", |
|
|
"message": "Translation request is queued" |
|
|
} |
|
|
|
|
|
return { |
|
|
"success": False, |
|
|
"request_id": request_id, |
|
|
"status": "not_found", |
|
|
"message": "Translation request not found" |
|
|
} |
|
|
|
|
|
@app.get("/api/workers") |
|
|
async def list_workers(): |
|
|
workers_info = [] |
|
|
|
|
|
for worker_id, worker in orchestrator.workers.items(): |
|
|
workers_info.append({ |
|
|
"id": worker_id, |
|
|
"name": worker.config.name, |
|
|
"url": worker.config.url, |
|
|
"available": worker.available, |
|
|
"active_jobs": worker.active_jobs, |
|
|
"max_concurrent": worker.config.max_concurrent, |
|
|
"priority": worker.config.priority, |
|
|
"total_completed": worker.total_completed, |
|
|
"total_failed": worker.total_failed, |
|
|
"last_health_check": worker.last_health_check |
|
|
}) |
|
|
|
|
|
return { |
|
|
"success": True, |
|
|
"workers": workers_info |
|
|
} |
|
|
|
|
|
if __name__ == "__main__": |
|
|
import uvicorn |
|
|
|
|
|
port = int(os.getenv("PORT", 7860)) |
|
|
|
|
|
print(f"🚀 Translation Orchestrator v{ORCHESTRATOR_VERSION}") |
|
|
print(f"📡 Starting on port {port}") |
|
|
|
|
|
uvicorn.run( |
|
|
app, |
|
|
host="0.0.0.0", |
|
|
port=port, |
|
|
log_level="info" |
|
|
) |