danicor commited on
Commit
9a2d1dc
·
verified ·
1 Parent(s): f52833b

Create app.py

Browse files
Files changed (1) hide show
  1. app.py +516 -0
app.py ADDED
@@ -0,0 +1,516 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import os
2
+ import time
3
+ import asyncio
4
+ import uuid
5
+ from datetime import datetime
6
+ from typing import Dict, List, Optional
7
+ from collections import deque
8
+ from fastapi import FastAPI, HTTPException, Request
9
+ from fastapi.middleware.cors import CORSMiddleware
10
+ from pydantic import BaseModel
11
+ import requests
12
+ import threading
13
+
14
+ # Configuration
15
+ ORCHESTRATOR_VERSION = "2.0.1"
16
+ WORKER_HEALTH_CHECK_INTERVAL = 30
17
+ WORKER_TIMEOUT = 120
18
+
19
+ # Models
20
+ class TranslationRequest(BaseModel):
21
+ request_id: str
22
+ text: str
23
+ source_lang: str
24
+ target_lang: str
25
+ auto_charge: bool = False
26
+ notification_url: Optional[str] = None
27
+ wordpress_user_id: Optional[int] = None
28
+
29
+ class WorkerConfig(BaseModel):
30
+ url: str
31
+ name: str
32
+ priority: int = 1
33
+ max_concurrent: int = 3
34
+
35
+ class WorkerStatus:
36
+ def __init__(self, config: WorkerConfig):
37
+ self.config = config
38
+ self.available = False
39
+ self.active_jobs = 0
40
+ self.last_health_check = 0
41
+ self.total_completed = 0
42
+ self.total_failed = 0
43
+ self.avg_response_time = 0.0
44
+
45
+ class TranslationOrchestrator:
46
+ def __init__(self):
47
+ self.workers: Dict[str, WorkerStatus] = {}
48
+ self.job_queue = deque()
49
+ self.active_jobs: Dict[str, Dict] = {}
50
+ self.completed_jobs: Dict[str, Dict] = {}
51
+ self.worker_lock = threading.Lock()
52
+ self.job_lock = threading.Lock()
53
+
54
+ self.load_worker_configs()
55
+ self.start_health_checker()
56
+ self.start_job_processor()
57
+
58
+ def load_worker_configs(self):
59
+ """Load worker configurations with better debugging"""
60
+ worker_index = 1
61
+ workers_loaded = 0
62
+
63
+ print("🔧 Loading worker configurations...")
64
+
65
+ while True:
66
+ url_key = f"WORKER_{worker_index}_URL"
67
+ name_key = f"WORKER_{worker_index}_NAME"
68
+
69
+ url = os.getenv(url_key)
70
+ if not url:
71
+ if worker_index == 1:
72
+ print("❌ No workers configured! Please set WORKER_1_URL environment variable.")
73
+ break
74
+
75
+ name = os.getenv(name_key, f"Worker {worker_index}")
76
+ priority = int(os.getenv(f"WORKER_{worker_index}_PRIORITY", "1"))
77
+ max_concurrent = int(os.getenv(f"WORKER_{worker_index}_MAX_CONCURRENT", "3"))
78
+
79
+ worker_id = f"worker_{worker_index}"
80
+ config = WorkerConfig(
81
+ url=url.rstrip('/'), # Remove trailing slash
82
+ name=name,
83
+ priority=priority,
84
+ max_concurrent=max_concurrent
85
+ )
86
+
87
+ self.workers[worker_id] = WorkerStatus(config)
88
+ workers_loaded += 1
89
+ print(f"✅ Loaded worker: {name} at {url} (Priority: {priority}, Max: {max_concurrent})")
90
+
91
+ worker_index += 1
92
+
93
+ print(f"🎯 Total workers loaded: {workers_loaded}")
94
+
95
+ if workers_loaded == 0:
96
+ print("💡 To add workers, set environment variables like:")
97
+ print(" WORKER_1_URL=https://your-worker-space.hf.space")
98
+ print(" WORKER_1_NAME=Primary Worker")
99
+ print(" WORKER_1_PRIORITY=1")
100
+ print(" WORKER_1_MAX_CONCURRENT=3")
101
+
102
+ def start_health_checker(self):
103
+ def health_check_loop():
104
+ while True:
105
+ self.check_all_workers_health()
106
+ time.sleep(WORKER_HEALTH_CHECK_INTERVAL)
107
+
108
+ thread = threading.Thread(target=health_check_loop, daemon=True)
109
+ thread.start()
110
+ print("🔍 Health checker started")
111
+
112
+ def check_all_workers_health(self):
113
+ with self.worker_lock:
114
+ for worker_id, worker in self.workers.items():
115
+ try:
116
+ health_url = f"{worker.config.url}/api/health"
117
+ response = requests.get(health_url, timeout=10)
118
+
119
+ if response.status_code == 200:
120
+ data = response.json()
121
+ was_available = worker.available
122
+ worker.available = data.get('status') == 'healthy'
123
+ worker.last_health_check = time.time()
124
+
125
+ if worker.available and not was_available:
126
+ print(f"✅ {worker.config.name} is now available")
127
+ elif not worker.available and was_available:
128
+ print(f"❌ {worker.config.name} became unavailable")
129
+
130
+ else:
131
+ worker.available = False
132
+ print(f"⚠ {worker.config.name}: HTTP {response.status_code}")
133
+
134
+ except Exception as e:
135
+ worker.available = False
136
+ print(f"🚫 {worker.config.name}: {str(e)}")
137
+
138
+ def get_available_worker(self) -> Optional[str]:
139
+ with self.worker_lock:
140
+ available_workers = [
141
+ (worker_id, worker)
142
+ for worker_id, worker in self.workers.items()
143
+ if worker.available and worker.active_jobs < worker.config.max_concurrent
144
+ ]
145
+
146
+ if not available_workers:
147
+ return None
148
+
149
+ available_workers.sort(
150
+ key=lambda x: (-x[1].config.priority, x[1].active_jobs)
151
+ )
152
+
153
+ return available_workers[0][0]
154
+
155
+ def start_job_processor(self):
156
+ def process_queue_loop():
157
+ while True:
158
+ self.process_job_queue()
159
+ time.sleep(2)
160
+
161
+ thread = threading.Thread(target=process_queue_loop, daemon=True)
162
+ thread.start()
163
+ print("🔄 Job processor started")
164
+
165
+ def add_job_to_queue(self, job_data: Dict):
166
+ with self.job_lock:
167
+ self.job_queue.append(job_data)
168
+ print(f"📥 Job {job_data['request_id']} queued. Queue size: {len(self.job_queue)}")
169
+
170
+ def process_job_queue(self):
171
+ if not self.job_queue:
172
+ return
173
+
174
+ with self.job_lock:
175
+ if not self.job_queue:
176
+ return
177
+
178
+ worker_id = self.get_available_worker()
179
+ if not worker_id:
180
+ return
181
+
182
+ job_data = self.job_queue.popleft()
183
+
184
+ self.assign_job_to_worker(worker_id, job_data)
185
+
186
+ def assign_job_to_worker(self, worker_id: str, job_data: Dict):
187
+ worker = self.workers[worker_id]
188
+ request_id = job_data['request_id']
189
+
190
+ print(f"🚀 Assigning job {request_id} to {worker.config.name}")
191
+
192
+ with self.worker_lock:
193
+ worker.active_jobs += 1
194
+
195
+ self.active_jobs[request_id] = {
196
+ 'worker_id': worker_id,
197
+ 'job_data': job_data,
198
+ 'start_time': time.time(),
199
+ 'status': 'processing'
200
+ }
201
+
202
+ thread = threading.Thread(
203
+ target=self.send_to_worker,
204
+ args=(worker_id, job_data),
205
+ daemon=True
206
+ )
207
+ thread.start()
208
+
209
+ def send_to_worker(self, worker_id: str, job_data: Dict):
210
+ worker = self.workers[worker_id]
211
+ request_id = job_data['request_id']
212
+
213
+ try:
214
+ worker_request = {
215
+ 'request_id': request_id,
216
+ 'text': job_data['text'],
217
+ 'source_lang': job_data['source_lang'],
218
+ 'target_lang': job_data['target_lang'],
219
+ 'auto_charge': job_data.get('auto_charge', False),
220
+ 'notification_url': None
221
+ }
222
+
223
+ response = requests.post(
224
+ f"{worker.config.url}/api/translate/heavy",
225
+ json=worker_request,
226
+ timeout=WORKER_TIMEOUT
227
+ )
228
+
229
+ if response.status_code == 200:
230
+ print(f"✅ Job {request_id} sent to {worker.config.name}")
231
+ self.monitor_worker_job(worker_id, request_id)
232
+ else:
233
+ self.handle_worker_failure(worker_id, request_id, f"HTTP {response.status_code}")
234
+
235
+ except Exception as e:
236
+ self.handle_worker_failure(worker_id, request_id, str(e))
237
+
238
+ def monitor_worker_job(self, worker_id: str, request_id: str):
239
+ worker = self.workers[worker_id]
240
+ max_checks = 60
241
+ check_count = 0
242
+
243
+ while check_count < max_checks:
244
+ time.sleep(5)
245
+ check_count += 1
246
+
247
+ try:
248
+ response = requests.post(
249
+ f"{worker.config.url}/api/check-translation-status",
250
+ json={'request_id': request_id},
251
+ timeout=15
252
+ )
253
+
254
+ if response.status_code == 200:
255
+ data = response.json()
256
+
257
+ if data.get('success') and data.get('status') == 'completed':
258
+ self.handle_job_completion(worker_id, request_id, data)
259
+ return
260
+ elif data.get('status') == 'failed':
261
+ self.handle_worker_failure(worker_id, request_id, "Worker reported failure")
262
+ return
263
+
264
+ except Exception as e:
265
+ print(f"⚠ Error checking job {request_id}: {str(e)}")
266
+
267
+ self.handle_worker_failure(worker_id, request_id, "Timeout waiting for completion")
268
+
269
+ def handle_job_completion(self, worker_id: str, request_id: str, worker_response: Dict):
270
+ worker = self.workers[worker_id]
271
+
272
+ print(f"🎉 Job {request_id} completed by {worker.config.name}")
273
+
274
+ with self.worker_lock:
275
+ worker.active_jobs -= 1
276
+ worker.total_completed += 1
277
+
278
+ job_info = self.active_jobs.get(request_id, {})
279
+ job_data = job_info.get('job_data', {})
280
+
281
+ processing_time = time.time() - job_info.get('start_time', time.time())
282
+
283
+ completion_data = {
284
+ 'request_id': request_id,
285
+ 'status': 'completed',
286
+ 'translated_text': worker_response.get('translated_text'),
287
+ 'processing_time': processing_time,
288
+ 'character_count': worker_response.get('character_count', len(job_data.get('text', ''))),
289
+ 'translation_length': worker_response.get('translation_length', 0),
290
+ 'from_cache': worker_response.get('from_cache', False),
291
+ 'worker_name': worker.config.name,
292
+ 'completed_at': datetime.now().isoformat()
293
+ }
294
+
295
+ self.completed_jobs[request_id] = completion_data
296
+
297
+ if request_id in self.active_jobs:
298
+ del self.active_jobs[request_id]
299
+
300
+ notification_url = job_data.get('notification_url')
301
+ if notification_url:
302
+ self.notify_wordpress(notification_url, completion_data)
303
+
304
+ def handle_worker_failure(self, worker_id: str, request_id: str, error_message: str):
305
+ worker = self.workers[worker_id]
306
+
307
+ print(f"💥 Job {request_id} failed on {worker.config.name}: {error_message}")
308
+
309
+ with self.worker_lock:
310
+ worker.active_jobs -= 1
311
+ worker.total_failed += 1
312
+
313
+ job_info = self.active_jobs.get(request_id, {})
314
+ job_data = job_info.get('job_data', {})
315
+
316
+ if job_info.get('retry_count', 0) < 2:
317
+ print(f"🔄 Retrying job {request_id} (attempt {job_info.get('retry_count', 0) + 1})")
318
+
319
+ job_data['retry_count'] = job_info.get('retry_count', 0) + 1
320
+ self.add_job_to_queue(job_data)
321
+
322
+ if request_id in self.active_jobs:
323
+ del self.active_jobs[request_id]
324
+ else:
325
+ failure_data = {
326
+ 'request_id': request_id,
327
+ 'status': 'failed',
328
+ 'error_message': error_message,
329
+ 'failed_at': datetime.now().isoformat()
330
+ }
331
+
332
+ self.completed_jobs[request_id] = failure_data
333
+
334
+ if request_id in self.active_jobs:
335
+ del self.active_jobs[request_id]
336
+
337
+ notification_url = job_data.get('notification_url')
338
+ if notification_url:
339
+ self.notify_wordpress(notification_url, failure_data)
340
+
341
+ def notify_wordpress(self, notification_url: str, data: Dict):
342
+ try:
343
+ response = requests.post(
344
+ notification_url,
345
+ json=data,
346
+ timeout=30,
347
+ verify=False
348
+ )
349
+
350
+ if response.status_code == 200:
351
+ print(f"📤 WordPress notified for {data['request_id']}")
352
+ else:
353
+ print(f"⚠ WordPress notification failed: HTTP {response.status_code}")
354
+
355
+ except Exception as e:
356
+ print(f"⚠ WordPress notification error: {str(e)}")
357
+
358
+ # Initialize FastAPI app
359
+ app = FastAPI(
360
+ title="Translation Orchestrator",
361
+ description="Main orchestrator for distributed translation system",
362
+ version=ORCHESTRATOR_VERSION
363
+ )
364
+
365
+ app.add_middleware(
366
+ CORSMiddleware,
367
+ allow_origins=["*"],
368
+ allow_credentials=True,
369
+ allow_methods=["*"],
370
+ allow_headers=["*"],
371
+ )
372
+
373
+ orchestrator = TranslationOrchestrator()
374
+
375
+ @app.get("/")
376
+ async def root():
377
+ available_workers = sum(1 for w in orchestrator.workers.values() if w.available)
378
+ total_workers = len(orchestrator.workers)
379
+
380
+ return {
381
+ "service": "Translation Orchestrator",
382
+ "version": ORCHESTRATOR_VERSION,
383
+ "status": "running",
384
+ "workers": {
385
+ "total": total_workers,
386
+ "available": available_workers,
387
+ "unavailable": total_workers - available_workers
388
+ },
389
+ "queue": {
390
+ "active_jobs": len(orchestrator.active_jobs),
391
+ "queued_jobs": len(orchestrator.job_queue),
392
+ "completed_jobs": len(orchestrator.completed_jobs)
393
+ }
394
+ }
395
+
396
+ @app.get("/api/health")
397
+ async def health_check():
398
+ available_workers = sum(1 for w in orchestrator.workers.values() if w.available)
399
+
400
+ return {
401
+ "status": "healthy" if available_workers > 0 else "degraded",
402
+ "timestamp": datetime.now().isoformat(),
403
+ "workers": {
404
+ "total": len(orchestrator.workers),
405
+ "available": available_workers
406
+ },
407
+ "queue_stats": {
408
+ "active": len(orchestrator.active_jobs),
409
+ "queued": len(orchestrator.job_queue)
410
+ }
411
+ }
412
+
413
+ @app.post("/api/translate")
414
+ async def submit_translation(request: TranslationRequest):
415
+ if not any(w.available for w in orchestrator.workers.values()):
416
+ raise HTTPException(
417
+ status_code=503,
418
+ detail="No translation workers available. Please try again later."
419
+ )
420
+
421
+ job_data = {
422
+ 'request_id': request.request_id,
423
+ 'text': request.text,
424
+ 'source_lang': request.source_lang,
425
+ 'target_lang': request.target_lang,
426
+ 'auto_charge': request.auto_charge,
427
+ 'notification_url': request.notification_url,
428
+ 'wordpress_user_id': request.wordpress_user_id,
429
+ 'retry_count': 0
430
+ }
431
+
432
+ orchestrator.add_job_to_queue(job_data)
433
+
434
+ return {
435
+ "success": True,
436
+ "request_id": request.request_id,
437
+ "status": "queued",
438
+ "message": "Translation request queued successfully",
439
+ "queue_position": len(orchestrator.job_queue),
440
+ "estimated_wait_time": len(orchestrator.job_queue) * 10
441
+ }
442
+
443
+ @app.get("/api/status/{request_id}")
444
+ async def check_status(request_id: str):
445
+ if request_id in orchestrator.completed_jobs:
446
+ return {
447
+ "success": True,
448
+ **orchestrator.completed_jobs[request_id]
449
+ }
450
+
451
+ if request_id in orchestrator.active_jobs:
452
+ job_info = orchestrator.active_jobs[request_id]
453
+ elapsed_time = time.time() - job_info['start_time']
454
+
455
+ return {
456
+ "success": True,
457
+ "request_id": request_id,
458
+ "status": "processing",
459
+ "worker_id": job_info['worker_id'],
460
+ "elapsed_time": elapsed_time,
461
+ "message": "Translation in progress"
462
+ }
463
+
464
+ for job in orchestrator.job_queue:
465
+ if job['request_id'] == request_id:
466
+ return {
467
+ "success": True,
468
+ "request_id": request_id,
469
+ "status": "queued",
470
+ "message": "Translation request is queued"
471
+ }
472
+
473
+ return {
474
+ "success": False,
475
+ "request_id": request_id,
476
+ "status": "not_found",
477
+ "message": "Translation request not found"
478
+ }
479
+
480
+ @app.get("/api/workers")
481
+ async def list_workers():
482
+ workers_info = []
483
+
484
+ for worker_id, worker in orchestrator.workers.items():
485
+ workers_info.append({
486
+ "id": worker_id,
487
+ "name": worker.config.name,
488
+ "url": worker.config.url,
489
+ "available": worker.available,
490
+ "active_jobs": worker.active_jobs,
491
+ "max_concurrent": worker.config.max_concurrent,
492
+ "priority": worker.config.priority,
493
+ "total_completed": worker.total_completed,
494
+ "total_failed": worker.total_failed,
495
+ "last_health_check": worker.last_health_check
496
+ })
497
+
498
+ return {
499
+ "success": True,
500
+ "workers": workers_info
501
+ }
502
+
503
+ if __name__ == "__main__":
504
+ import uvicorn
505
+
506
+ port = int(os.getenv("PORT", 7860))
507
+
508
+ print(f"🚀 Translation Orchestrator v{ORCHESTRATOR_VERSION}")
509
+ print(f"📡 Starting on port {port}")
510
+
511
+ uvicorn.run(
512
+ app,
513
+ host="0.0.0.0",
514
+ port=port,
515
+ log_level="info"
516
+ )