server2 / app2.py
danicor's picture
Rename app.py to app2.py
f52833b verified
import os
import time
import asyncio
import uuid
from datetime import datetime
from typing import Dict, List, Optional
from collections import deque
from fastapi import FastAPI, HTTPException, Request
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
import requests
import threading
# Configuration
ORCHESTRATOR_VERSION = "2.0.0"
WORKER_HEALTH_CHECK_INTERVAL = 30 # seconds
WORKER_TIMEOUT = 120 # seconds for translation
# Models
class TranslationRequest(BaseModel):
request_id: str
text: str
source_lang: str
target_lang: str
auto_charge: bool = False
notification_url: Optional[str] = None
wordpress_user_id: Optional[int] = None
class WorkerConfig(BaseModel):
url: str
name: str
priority: int = 1
max_concurrent: int = 3
class WorkerStatus:
def __init__(self, config: WorkerConfig):
self.config = config
self.available = False
self.active_jobs = 0
self.last_health_check = 0
self.total_completed = 0
self.total_failed = 0
self.avg_response_time = 0.0
# Orchestrator Core Class
class TranslationOrchestrator:
def __init__(self):
self.workers: Dict[str, WorkerStatus] = {}
self.job_queue = deque()
self.active_jobs: Dict[str, Dict] = {}
self.completed_jobs: Dict[str, Dict] = {}
self.worker_lock = threading.Lock()
self.job_lock = threading.Lock()
# Load worker configurations from environment
self.load_worker_configs()
# Start background tasks
self.start_health_checker()
self.start_job_processor()
def load_worker_configs(self):
"""Load worker configurations from environment variables"""
worker_index = 1
while True:
url_key = f"WORKER_{worker_index}_URL"
name_key = f"WORKER_{worker_index}_NAME"
url = os.getenv(url_key)
if not url:
break
name = os.getenv(name_key, f"Worker {worker_index}")
priority = int(os.getenv(f"WORKER_{worker_index}_PRIORITY", "1"))
max_concurrent = int(os.getenv(f"WORKER_{worker_index}_MAX_CONCURRENT", "3"))
worker_id = f"worker_{worker_index}"
config = WorkerConfig(
url=url,
name=name,
priority=priority,
max_concurrent=max_concurrent
)
self.workers[worker_id] = WorkerStatus(config)
print(f"βœ“ Loaded worker: {name} at {url}")
worker_index += 1
if not self.workers:
print("⚠ No workers configured. Add WORKER_N_URL environment variables.")
def start_health_checker(self):
"""Start background thread for health checking"""
def health_check_loop():
while True:
self.check_all_workers_health()
time.sleep(WORKER_HEALTH_CHECK_INTERVAL)
thread = threading.Thread(target=health_check_loop, daemon=True)
thread.start()
print("βœ“ Health checker started")
def check_all_workers_health(self):
"""Check health of all workers"""
with self.worker_lock:
for worker_id, worker in self.workers.items():
try:
response = requests.get(
f"{worker.config.url}/api/health",
timeout=10
)
if response.status_code == 200:
data = response.json()
worker.available = data.get('status') == 'healthy'
worker.last_health_check = time.time()
if worker.available:
print(f"βœ“ {worker.config.name}: Healthy")
else:
print(f"βœ— {worker.config.name}: Unhealthy")
else:
worker.available = False
print(f"βœ— {worker.config.name}: HTTP {response.status_code}")
except Exception as e:
worker.available = False
print(f"βœ— {worker.config.name}: {str(e)}")
def get_available_worker(self) -> Optional[str]:
"""Get an available worker based on priority and load"""
with self.worker_lock:
available_workers = [
(worker_id, worker)
for worker_id, worker in self.workers.items()
if worker.available and worker.active_jobs < worker.config.max_concurrent
]
if not available_workers:
return None
# Sort by priority (higher first) then by active jobs (fewer first)
available_workers.sort(
key=lambda x: (-x[1].config.priority, x[1].active_jobs)
)
return available_workers[0][0]
def start_job_processor(self):
"""Start background thread for processing job queue"""
def process_queue_loop():
while True:
self.process_job_queue()
time.sleep(2) # Check queue every 2 seconds
thread = threading.Thread(target=process_queue_loop, daemon=True)
thread.start()
print("βœ“ Job processor started")
def add_job_to_queue(self, job_data: Dict):
"""Add a job to the queue"""
with self.job_lock:
self.job_queue.append(job_data)
print(f"πŸ“ Job {job_data['request_id']} added to queue. Queue size: {len(self.job_queue)}")
def process_job_queue(self):
"""Process pending jobs in queue"""
if not self.job_queue:
return
with self.job_lock:
if not self.job_queue:
return
worker_id = self.get_available_worker()
if not worker_id:
return # No available workers
job_data = self.job_queue.popleft()
# Process outside lock to avoid blocking
self.assign_job_to_worker(worker_id, job_data)
def assign_job_to_worker(self, worker_id: str, job_data: Dict):
"""Assign a job to a specific worker"""
worker = self.workers[worker_id]
request_id = job_data['request_id']
print(f"πŸ”„ Assigning job {request_id} to {worker.config.name}")
with self.worker_lock:
worker.active_jobs += 1
# Store job info
self.active_jobs[request_id] = {
'worker_id': worker_id,
'job_data': job_data,
'start_time': time.time(),
'status': 'processing'
}
# Send to worker in background
thread = threading.Thread(
target=self.send_to_worker,
args=(worker_id, job_data),
daemon=True
)
thread.start()
def send_to_worker(self, worker_id: str, job_data: Dict):
"""Send translation job to worker"""
worker = self.workers[worker_id]
request_id = job_data['request_id']
try:
# Prepare request for worker
worker_request = {
'request_id': request_id,
'text': job_data['text'],
'source_lang': job_data['source_lang'],
'target_lang': job_data['target_lang'],
'auto_charge': job_data.get('auto_charge', False),
'notification_url': None # Don't let worker notify WordPress directly
}
response = requests.post(
f"{worker.config.url}/api/translate/heavy",
json=worker_request,
timeout=WORKER_TIMEOUT
)
if response.status_code == 200:
print(f"βœ“ Job {request_id} sent to {worker.config.name}")
self.monitor_worker_job(worker_id, request_id)
else:
self.handle_worker_failure(worker_id, request_id, f"HTTP {response.status_code}")
except Exception as e:
self.handle_worker_failure(worker_id, request_id, str(e))
def monitor_worker_job(self, worker_id: str, request_id: str):
"""Monitor job progress on worker"""
worker = self.workers[worker_id]
max_checks = 60 # Maximum 5 minutes (60 * 5 seconds)
check_count = 0
while check_count < max_checks:
time.sleep(5)
check_count += 1
try:
# Check status on worker
response = requests.post(
f"{worker.config.url}/api/check-translation-status",
json={'request_id': request_id},
timeout=15
)
if response.status_code == 200:
data = response.json()
if data.get('success') and data.get('status') == 'completed':
# Job completed successfully
self.handle_job_completion(worker_id, request_id, data)
return
elif data.get('status') == 'failed':
self.handle_worker_failure(worker_id, request_id, "Worker reported failure")
return
except Exception as e:
print(f"⚠ Error checking job {request_id}: {str(e)}")
# Timeout reached
self.handle_worker_failure(worker_id, request_id, "Timeout waiting for completion")
def handle_job_completion(self, worker_id: str, request_id: str, worker_response: Dict):
"""Handle successful job completion"""
worker = self.workers[worker_id]
print(f"βœ… Job {request_id} completed by {worker.config.name}")
# Update worker stats
with self.worker_lock:
worker.active_jobs -= 1
worker.total_completed += 1
# Get job data
job_info = self.active_jobs.get(request_id, {})
job_data = job_info.get('job_data', {})
# Calculate processing time
processing_time = time.time() - job_info.get('start_time', time.time())
# Prepare completion data
completion_data = {
'request_id': request_id,
'status': 'completed',
'translated_text': worker_response.get('translated_text'),
'processing_time': processing_time,
'character_count': worker_response.get('character_count', len(job_data.get('text', ''))),
'translation_length': worker_response.get('translation_length', 0),
'from_cache': worker_response.get('from_cache', False),
'worker_name': worker.config.name,
'completed_at': datetime.now().isoformat()
}
# Store in completed jobs
self.completed_jobs[request_id] = completion_data
# Remove from active jobs
if request_id in self.active_jobs:
del self.active_jobs[request_id]
# Notify WordPress if notification URL provided
notification_url = job_data.get('notification_url')
if notification_url:
self.notify_wordpress(notification_url, completion_data)
def handle_worker_failure(self, worker_id: str, request_id: str, error_message: str):
"""Handle job failure"""
worker = self.workers[worker_id]
print(f"❌ Job {request_id} failed on {worker.config.name}: {error_message}")
# Update worker stats
with self.worker_lock:
worker.active_jobs -= 1
worker.total_failed += 1
# Get job data
job_info = self.active_jobs.get(request_id, {})
job_data = job_info.get('job_data', {})
# Try to reassign to another worker
if job_info.get('retry_count', 0) < 2: # Maximum 2 retries
print(f"πŸ”„ Retrying job {request_id} (attempt {job_info.get('retry_count', 0) + 1})")
job_data['retry_count'] = job_info.get('retry_count', 0) + 1
self.add_job_to_queue(job_data)
# Remove from active jobs
if request_id in self.active_jobs:
del self.active_jobs[request_id]
else:
# Max retries reached, mark as failed
failure_data = {
'request_id': request_id,
'status': 'failed',
'error_message': error_message,
'failed_at': datetime.now().isoformat()
}
self.completed_jobs[request_id] = failure_data
# Remove from active jobs
if request_id in self.active_jobs:
del self.active_jobs[request_id]
# Notify WordPress of failure
notification_url = job_data.get('notification_url')
if notification_url:
self.notify_wordpress(notification_url, failure_data)
def notify_wordpress(self, notification_url: str, data: Dict):
"""Send notification to WordPress"""
try:
response = requests.post(
notification_url,
json=data,
timeout=30,
verify=False
)
if response.status_code == 200:
print(f"πŸ“€ WordPress notified successfully for {data['request_id']}")
else:
print(f"⚠ WordPress notification failed: HTTP {response.status_code}")
except Exception as e:
print(f"⚠ WordPress notification error: {str(e)}")
# Initialize FastAPI app
app = FastAPI(
title="Translation Orchestrator",
description="Main orchestrator for distributed translation system",
version=ORCHESTRATOR_VERSION
)
# CORS configuration
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Initialize orchestrator
orchestrator = TranslationOrchestrator()
# Root endpoint
@app.get("/")
async def root():
"""API information"""
return {
"service": "Translation Orchestrator",
"version": ORCHESTRATOR_VERSION,
"status": "running",
"workers": len(orchestrator.workers),
"active_jobs": len(orchestrator.active_jobs),
"queued_jobs": len(orchestrator.job_queue),
"endpoints": {
"/api/translate": "Submit translation request",
"/api/status/{request_id}": "Check translation status",
"/api/workers": "List all workers",
"/api/health": "Health check"
}
}
# Health check endpoint
@app.get("/api/health")
async def health_check():
"""Health check"""
available_workers = sum(1 for w in orchestrator.workers.values() if w.available)
return {
"status": "healthy" if available_workers > 0 else "degraded",
"timestamp": datetime.now().isoformat(),
"workers": {
"total": len(orchestrator.workers),
"available": available_workers,
"unavailable": len(orchestrator.workers) - available_workers
},
"queue": {
"active_jobs": len(orchestrator.active_jobs),
"queued_jobs": len(orchestrator.job_queue),
"completed_jobs": len(orchestrator.completed_jobs)
}
}
# Submit translation request
@app.post("/api/translate")
async def submit_translation(request: TranslationRequest):
"""Submit a translation request"""
# Check if any workers available
if not any(w.available for w in orchestrator.workers.values()):
raise HTTPException(
status_code=503,
detail="No workers available at the moment. Please try again later."
)
# Prepare job data
job_data = {
'request_id': request.request_id,
'text': request.text,
'source_lang': request.source_lang,
'target_lang': request.target_lang,
'auto_charge': request.auto_charge,
'notification_url': request.notification_url,
'wordpress_user_id': request.wordpress_user_id,
'retry_count': 0
}
# Add to queue
orchestrator.add_job_to_queue(job_data)
return {
"success": True,
"request_id": request.request_id,
"status": "queued",
"message": "Translation request queued successfully",
"queue_position": len(orchestrator.job_queue),
"estimated_wait_time": len(orchestrator.job_queue) * 10 # Rough estimate
}
# Check translation status
@app.get("/api/status/{request_id}")
async def check_status(request_id: str):
"""Check status of a translation request"""
# Check completed jobs
if request_id in orchestrator.completed_jobs:
return {
"success": True,
**orchestrator.completed_jobs[request_id]
}
# Check active jobs
if request_id in orchestrator.active_jobs:
job_info = orchestrator.active_jobs[request_id]
elapsed_time = time.time() - job_info['start_time']
return {
"success": True,
"request_id": request_id,
"status": "processing",
"worker_id": job_info['worker_id'],
"elapsed_time": elapsed_time,
"message": "Translation in progress"
}
# Check queue
for job in orchestrator.job_queue:
if job['request_id'] == request_id:
return {
"success": True,
"request_id": request_id,
"status": "queued",
"message": "Translation request is queued"
}
# Not found
return {
"success": False,
"request_id": request_id,
"status": "not_found",
"message": "Translation request not found"
}
# List workers
@app.get("/api/workers")
async def list_workers():
"""List all workers and their status"""
workers_info = []
for worker_id, worker in orchestrator.workers.items():
workers_info.append({
"id": worker_id,
"name": worker.config.name,
"url": worker.config.url,
"available": worker.available,
"active_jobs": worker.active_jobs,
"max_concurrent": worker.config.max_concurrent,
"priority": worker.config.priority,
"total_completed": worker.total_completed,
"total_failed": worker.total_failed,
"last_health_check": worker.last_health_check
})
return {
"success": True,
"workers": workers_info
}
# Add worker dynamically (admin endpoint)
@app.post("/api/admin/add-worker")
async def add_worker(config: WorkerConfig):
"""Add a new worker (admin only - add authentication in production)"""
worker_id = f"worker_{len(orchestrator.workers) + 1}"
with orchestrator.worker_lock:
orchestrator.workers[worker_id] = WorkerStatus(config)
# Immediate health check
orchestrator.check_all_workers_health()
return {
"success": True,
"worker_id": worker_id,
"message": f"Worker {config.name} added successfully"
}
# Remove worker
@app.delete("/api/admin/remove-worker/{worker_id}")
async def remove_worker(worker_id: str):
"""Remove a worker (admin only - add authentication in production)"""
if worker_id not in orchestrator.workers:
raise HTTPException(status_code=404, detail="Worker not found")
worker = orchestrator.workers[worker_id]
# Check if worker has active jobs
if worker.active_jobs > 0:
raise HTTPException(
status_code=400,
detail=f"Cannot remove worker with {worker.active_jobs} active jobs"
)
with orchestrator.worker_lock:
del orchestrator.workers[worker_id]
return {
"success": True,
"message": f"Worker {worker_id} removed successfully"
}
# Force health check
@app.post("/api/admin/health-check")
async def force_health_check():
"""Force immediate health check of all workers"""
orchestrator.check_all_workers_health()
return {
"success": True,
"message": "Health check completed",
"timestamp": datetime.now().isoformat()
}
# Get statistics
@app.get("/api/stats")
async def get_statistics():
"""Get orchestrator statistics"""
total_completed = sum(w.total_completed for w in orchestrator.workers.values())
total_failed = sum(w.total_failed for w in orchestrator.workers.values())
return {
"success": True,
"statistics": {
"total_workers": len(orchestrator.workers),
"available_workers": sum(1 for w in orchestrator.workers.values() if w.available),
"active_jobs": len(orchestrator.active_jobs),
"queued_jobs": len(orchestrator.job_queue),
"completed_jobs": len(orchestrator.completed_jobs),
"total_completed": total_completed,
"total_failed": total_failed,
"success_rate": (total_completed / (total_completed + total_failed) * 100) if (total_completed + total_failed) > 0 else 0
}
}
# Main entry point
if __name__ == "__main__":
import uvicorn
port = int(os.getenv("PORT", 7860))
print(f"πŸš€ Translation Orchestrator v{ORCHESTRATOR_VERSION}")
print(f"πŸ“‘ Starting on port {port}")
print(f"πŸ‘· Workers configured: {len(orchestrator.workers)}")
uvicorn.run(
app,
host="0.0.0.0",
port=port,
log_level="info"
)