danicor commited on
Commit
d36df9c
Β·
verified Β·
1 Parent(s): 24af620

Create app.py

Browse files
Files changed (1) hide show
  1. app.py +548 -0
app.py ADDED
@@ -0,0 +1,548 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import os
2
+ import time
3
+ import uuid
4
+ import hashlib
5
+ from datetime import datetime
6
+ from typing import Dict, Optional
7
+ from collections import deque
8
+ from fastapi import FastAPI, HTTPException
9
+ from fastapi.middleware.cors import CORSMiddleware
10
+ from pydantic import BaseModel
11
+ import requests
12
+ import threading
13
+
14
+ # Configuration
15
+ ORCHESTRATOR_VERSION = "2.1.0"
16
+ WORKER_HEALTH_CHECK_INTERVAL = 30
17
+ WORKER_TIMEOUT = 120
18
+
19
+ # Models
20
+ class TranslationRequest(BaseModel):
21
+ request_id: str
22
+ text: str
23
+ source_lang: str
24
+ target_lang: str
25
+ auto_charge: bool = False
26
+ notification_url: Optional[str] = None
27
+ wordpress_user_id: Optional[int] = None
28
+
29
+ class WorkerConfig(BaseModel):
30
+ url: str
31
+ name: str
32
+ priority: int = 1
33
+ max_concurrent: int = 3
34
+
35
+ class WorkerStatus:
36
+ def __init__(self, config: WorkerConfig):
37
+ self.config = config
38
+ self.available = False
39
+ self.active_jobs = 0
40
+ self.last_health_check = 0
41
+ self.total_completed = 0
42
+ self.total_failed = 0
43
+ self.avg_response_time = 0.0
44
+
45
+ class TranslationOrchestrator:
46
+ def __init__(self):
47
+ self.workers: Dict[str, WorkerStatus] = {}
48
+ self.job_queue = deque()
49
+ self.active_jobs: Dict[str, Dict] = {}
50
+ self.completed_jobs: Dict[str, Dict] = {}
51
+
52
+ # in-memory cache: key -> {translated_text, character_count, created_at}
53
+ self.cache: Dict[str, Dict] = {}
54
+
55
+ # Locks to protect shared structures
56
+ self.worker_lock = threading.Lock()
57
+ self.job_lock = threading.Lock()
58
+ self.cache_lock = threading.Lock()
59
+
60
+ self.load_worker_configs()
61
+ self.start_health_checker()
62
+ self.start_job_processor()
63
+
64
+ def make_cache_key(self, text: str, source_lang: str, target_lang: str) -> str:
65
+ normalized = f"{source_lang}|{target_lang}|{text}"
66
+ return hashlib.md5(normalized.encode("utf-8")).hexdigest()
67
+
68
+ def load_worker_configs(self):
69
+ worker_index = 1
70
+ workers_loaded = 0
71
+
72
+ print("πŸ”§ Loading worker configurations...")
73
+
74
+ while True:
75
+ url_key = f"WORKER_{worker_index}_URL"
76
+ name_key = f"WORKER_{worker_index}_NAME"
77
+
78
+ url = os.getenv(url_key)
79
+ if not url:
80
+ break
81
+
82
+ name = os.getenv(name_key, f"Worker {worker_index}")
83
+ priority = int(os.getenv(f"WORKER_{worker_index}_PRIORITY", "1"))
84
+ max_concurrent = int(os.getenv(f"WORKER_{worker_index}_MAX_CONCURRENT", "1"))
85
+
86
+ worker_id = f"worker_{worker_index}"
87
+ config = WorkerConfig(
88
+ url=url.rstrip('/'),
89
+ name=name,
90
+ priority=priority,
91
+ max_concurrent=max_concurrent
92
+ )
93
+
94
+ self.workers[worker_id] = WorkerStatus(config)
95
+ workers_loaded += 1
96
+ print(f"βœ… Loaded worker: {name} at {url}")
97
+ worker_index += 1
98
+
99
+ if workers_loaded == 0:
100
+ print("⚠️ No workers in env, using hardcoded fallback")
101
+ fallback_workers = [
102
+ WorkerConfig(url="https://danicor-w1.hf.space", name="Worker 1", priority=1, max_concurrent=1),
103
+ WorkerConfig(url="https://danicor-w2.hf.space", name="Worker 2", priority=1, max_concurrent=1),
104
+ WorkerConfig(url="https://danicor-w3.hf.space", name="Worker 3", priority=1, max_concurrent=1),
105
+ ]
106
+ for i, cfg in enumerate(fallback_workers, start=1):
107
+ self.workers[f"worker_{i}"] = WorkerStatus(cfg)
108
+ print(f"βœ… Hardcoded worker: {cfg.name} at {cfg.url}")
109
+
110
+ def start_health_checker(self):
111
+ def health_check_loop():
112
+ while True:
113
+ self.check_all_workers_health()
114
+ time.sleep(WORKER_HEALTH_CHECK_INTERVAL)
115
+
116
+ thread = threading.Thread(target=health_check_loop, daemon=True)
117
+ thread.start()
118
+ print("πŸ” Health checker started")
119
+
120
+ def check_all_workers_health(self):
121
+ with self.worker_lock:
122
+ for worker_id, worker in self.workers.items():
123
+ try:
124
+ health_url = f"{worker.config.url}/api/health"
125
+ response = requests.get(health_url, timeout=10)
126
+
127
+ if response.status_code == 200:
128
+ data = response.json()
129
+ was_available = worker.available
130
+ worker.available = data.get('status') == 'healthy'
131
+ worker.last_health_check = time.time()
132
+
133
+ if worker.available and not was_available:
134
+ print(f"βœ… {worker.config.name} is now available")
135
+ elif not worker.available and was_available:
136
+ print(f"❌ {worker.config.name} became unavailable")
137
+ else:
138
+ worker.available = False
139
+ print(f"⚠ {worker.config.name}: HTTP {response.status_code}")
140
+
141
+ except Exception as e:
142
+ worker.available = False
143
+ print(f"🚫 {worker.config.name}: {str(e)}")
144
+
145
+ def get_available_worker(self) -> Optional[str]:
146
+ with self.worker_lock:
147
+ available_workers = [
148
+ (worker_id, worker)
149
+ for worker_id, worker in self.workers.items()
150
+ if worker.available and worker.active_jobs < worker.config.max_concurrent
151
+ ]
152
+
153
+ if not available_workers:
154
+ return None
155
+
156
+ available_workers.sort(key=lambda x: (-x[1].config.priority, x[1].active_jobs))
157
+ return available_workers[0][0]
158
+
159
+ def start_job_processor(self):
160
+ def process_queue_loop():
161
+ while True:
162
+ self.process_job_queue()
163
+ time.sleep(2)
164
+
165
+ thread = threading.Thread(target=process_queue_loop, daemon=True)
166
+ thread.start()
167
+ print("πŸ”„ Job processor started")
168
+
169
+ def add_job_to_queue(self, job_data: Dict):
170
+ with self.job_lock:
171
+ self.job_queue.append(job_data)
172
+ print(f"πŸ“₯ Job {job_data['request_id']} queued. Queue size: {len(self.job_queue)}")
173
+
174
+ def process_job_queue(self):
175
+ if not self.job_queue:
176
+ return
177
+
178
+ with self.job_lock:
179
+ if not self.job_queue:
180
+ return
181
+
182
+ worker_id = self.get_available_worker()
183
+ if not worker_id:
184
+ return
185
+
186
+ job_data = self.job_queue.popleft()
187
+
188
+ self.assign_job_to_worker(worker_id, job_data)
189
+
190
+ def assign_job_to_worker(self, worker_id: str, job_data: Dict):
191
+ worker = self.workers[worker_id]
192
+ request_id = job_data['request_id']
193
+
194
+ print(f"πŸš€ Assigning job {request_id} to {worker.config.name}")
195
+
196
+ with self.worker_lock:
197
+ worker.active_jobs += 1
198
+
199
+ self.active_jobs[request_id] = {
200
+ 'worker_id': worker_id,
201
+ 'job_data': job_data,
202
+ 'start_time': time.time(),
203
+ 'status': 'processing'
204
+ }
205
+
206
+ thread = threading.Thread(
207
+ target=self.send_to_worker,
208
+ args=(worker_id, job_data),
209
+ daemon=True
210
+ )
211
+ thread.start()
212
+
213
+ def send_to_worker(self, worker_id: str, job_data: Dict):
214
+ worker = self.workers[worker_id]
215
+ request_id = job_data['request_id']
216
+
217
+ try:
218
+ worker_request = {
219
+ 'request_id': request_id,
220
+ 'text': job_data['text'],
221
+ 'source_lang': job_data['source_lang'],
222
+ 'target_lang': job_data['target_lang'],
223
+ 'auto_charge': False,
224
+ 'notification_url': None
225
+ }
226
+
227
+ response = requests.post(
228
+ f"{worker.config.url}/api/translate/heavy",
229
+ json=worker_request,
230
+ timeout=WORKER_TIMEOUT
231
+ )
232
+
233
+ if response.status_code == 200:
234
+ print(f"βœ… Job {request_id} sent to {worker.config.name}")
235
+ self.monitor_worker_job(worker_id, request_id)
236
+ else:
237
+ self.handle_worker_failure(worker_id, request_id, f"HTTP {response.status_code}")
238
+
239
+ except Exception as e:
240
+ self.handle_worker_failure(worker_id, request_id, str(e))
241
+
242
+ def monitor_worker_job(self, worker_id: str, request_id: str):
243
+ worker = self.workers[worker_id]
244
+ max_checks = 60
245
+ check_count = 0
246
+
247
+ while check_count < max_checks:
248
+ time.sleep(5)
249
+ check_count += 1
250
+
251
+ try:
252
+ response = requests.post(
253
+ f"{worker.config.url}/api/check-translation-status",
254
+ json={'request_id': request_id},
255
+ timeout=15
256
+ )
257
+
258
+ if response.status_code == 200:
259
+ data = response.json()
260
+
261
+ if data.get('success') and data.get('status') == 'completed':
262
+ self.handle_job_completion(worker_id, request_id, data)
263
+ return
264
+ elif data.get('status') == 'failed':
265
+ self.handle_worker_failure(worker_id, request_id, "Worker reported failure")
266
+ return
267
+
268
+ except Exception as e:
269
+ print(f"⚠ Error checking job {request_id}: {str(e)}")
270
+
271
+ self.handle_worker_failure(worker_id, request_id, "Timeout waiting for completion")
272
+
273
+ def handle_job_completion(self, worker_id: str, request_id: str, worker_response: Dict):
274
+ worker = self.workers[worker_id]
275
+
276
+ print(f"πŸŽ‰ Job {request_id} completed by {worker.config.name}")
277
+
278
+ with self.worker_lock:
279
+ worker.active_jobs -= 1
280
+ worker.total_completed += 1
281
+
282
+ job_info = self.active_jobs.get(request_id, {})
283
+ job_data = job_info.get('job_data', {})
284
+
285
+ processing_time = time.time() - job_info.get('start_time', time.time())
286
+ translated_text = worker_response.get('translated_text')
287
+
288
+ completion_data = {
289
+ 'request_id': request_id,
290
+ 'status': 'completed',
291
+ 'translated_text': translated_text,
292
+ 'processing_time': processing_time,
293
+ 'character_count': worker_response.get('character_count', len(job_data.get('text', ''))),
294
+ 'translation_length': worker_response.get('translation_length', 0),
295
+ 'from_cache': worker_response.get('from_cache', False),
296
+ 'worker_name': worker.config.name,
297
+ 'completed_at': datetime.now().isoformat()
298
+ }
299
+
300
+ self.completed_jobs[request_id] = completion_data
301
+
302
+ # store in cache (thread-safe)
303
+ text = job_data.get('text')
304
+ source_lang = job_data.get('source_lang')
305
+ target_lang = job_data.get('target_lang')
306
+ if text is not None:
307
+ cache_key = self.make_cache_key(text, source_lang, target_lang)
308
+ with self.cache_lock:
309
+ self.cache[cache_key] = {
310
+ 'translated_text': translated_text,
311
+ 'character_count': len(text),
312
+ 'created_at': datetime.now().isoformat()
313
+ }
314
+
315
+ if request_id in self.active_jobs:
316
+ del self.active_jobs[request_id]
317
+
318
+ print(f"βœ… Result stored for {request_id}, waiting for WordPress to fetch")
319
+
320
+ def handle_worker_failure(self, worker_id: str, request_id: str, error_message: str):
321
+ worker = self.workers[worker_id]
322
+
323
+ print(f"πŸ’₯ Job {request_id} failed on {worker.config.name}: {error_message}")
324
+
325
+ with self.worker_lock:
326
+ worker.active_jobs -= 1
327
+ worker.total_failed += 1
328
+
329
+ job_info = self.active_jobs.get(request_id, {})
330
+ job_data = job_info.get('job_data', {})
331
+
332
+ if job_info.get('retry_count', 0) < 2:
333
+ print(f"πŸ”„ Retrying job {request_id} (attempt {job_info.get('retry_count', 0) + 1})")
334
+ job_data['retry_count'] = job_info.get('retry_count', 0) + 1
335
+ self.add_job_to_queue(job_data)
336
+ if request_id in self.active_jobs:
337
+ del self.active_jobs[request_id]
338
+ else:
339
+ failure_data = {
340
+ 'request_id': request_id,
341
+ 'status': 'failed',
342
+ 'error_message': error_message,
343
+ 'failed_at': datetime.now().isoformat()
344
+ }
345
+ self.completed_jobs[request_id] = failure_data
346
+ if request_id in self.active_jobs:
347
+ del self.active_jobs[request_id]
348
+
349
+ def notify_wordpress(self, notification_url: str, data: Dict):
350
+ try:
351
+ print(f"πŸ“€ Attempting to notify WordPress at: {notification_url}")
352
+ if notification_url.startswith('http://localhost'):
353
+ wordpress_url = os.getenv('WORDPRESS_BASE_URL', 'https://your-actual-site.com')
354
+ notification_url = notification_url.replace('http://localhost', wordpress_url)
355
+ notification_url = notification_url.replace('http://', 'https://')
356
+
357
+ headers = {
358
+ 'Content-Type': 'application/json',
359
+ 'User-Agent': 'MLT-Orchestrator/1.0'
360
+ }
361
+
362
+ response = requests.post(
363
+ notification_url,
364
+ json=data,
365
+ headers=headers,
366
+ timeout=30,
367
+ verify=True
368
+ )
369
+
370
+ if response.status_code == 200:
371
+ print(f"βœ… WordPress notified successfully for {data['request_id']}")
372
+ return True
373
+ else:
374
+ print(f"❌ WordPress notification failed: HTTP {response.status_code}")
375
+ return False
376
+
377
+ except Exception as e:
378
+ print(f"πŸ’₯ Notification error for {data.get('request_id', '<unknown>')}: {str(e)}")
379
+ return False
380
+
381
+ # Initialize FastAPI app
382
+ app = FastAPI(
383
+ title="Translation Orchestrator",
384
+ description="Main orchestrator for distributed translation system",
385
+ version=ORCHESTRATOR_VERSION
386
+ )
387
+
388
+ app.add_middleware(
389
+ CORSMiddleware,
390
+ allow_origins=["*"],
391
+ allow_credentials=True,
392
+ allow_methods=["*"],
393
+ allow_headers=["*"],
394
+ )
395
+
396
+ orchestrator = TranslationOrchestrator()
397
+
398
+ @app.get("/")
399
+ async def root():
400
+ available_workers = sum(1 for w in orchestrator.workers.values() if w.available)
401
+ total_workers = len(orchestrator.workers)
402
+
403
+ return {
404
+ "service": "Translation Orchestrator",
405
+ "version": ORCHESTRATOR_VERSION,
406
+ "status": "running",
407
+ "workers": {
408
+ "total": total_workers,
409
+ "available": available_workers,
410
+ "unavailable": total_workers - available_workers
411
+ },
412
+ "queue": {
413
+ "active_jobs": len(orchestrator.active_jobs),
414
+ "queued_jobs": len(orchestrator.job_queue),
415
+ "completed_jobs": len(orchestrator.completed_jobs)
416
+ }
417
+ }
418
+
419
+ @app.get("/api/health")
420
+ async def health_check():
421
+ available_workers = sum(1 for w in orchestrator.workers.values() if w.available)
422
+
423
+ return {
424
+ "status": "healthy" if available_workers > 0 else "degraded",
425
+ "timestamp": datetime.now().isoformat(),
426
+ "workers": {
427
+ "total": len(orchestrator.workers),
428
+ "available": available_workers
429
+ },
430
+ "queue_stats": {
431
+ "active": len(orchestrator.active_jobs),
432
+ "queued": len(orchestrator.job_queue)
433
+ }
434
+ }
435
+
436
+ @app.post("/api/translate")
437
+ async def submit_translation(request: TranslationRequest):
438
+ # check cache first
439
+ cache_key = orchestrator.make_cache_key(request.text, request.source_lang, request.target_lang)
440
+ with orchestrator.cache_lock:
441
+ cached = orchestrator.cache.get(cache_key)
442
+
443
+ if cached:
444
+ return {
445
+ "success": True,
446
+ "request_id": str(uuid.uuid4()),
447
+ "status": "completed",
448
+ "translated_text": cached["translated_text"],
449
+ "character_count": cached.get("character_count", len(request.text)),
450
+ "processing_time": 0,
451
+ "from_cache": True,
452
+ "message": "Translation served from cache"
453
+ }
454
+
455
+ if not any(w.available for w in orchestrator.workers.values()):
456
+ raise HTTPException(
457
+ status_code=503,
458
+ detail="No translation workers available. Please try again later."
459
+ )
460
+
461
+ job_data = {
462
+ 'request_id': request.request_id,
463
+ 'text': request.text,
464
+ 'source_lang': request.source_lang,
465
+ 'target_lang': request.target_lang,
466
+ 'auto_charge': request.auto_charge,
467
+ 'notification_url': request.notification_url,
468
+ 'wordpress_user_id': request.wordpress_user_id,
469
+ 'retry_count': 0
470
+ }
471
+
472
+ orchestrator.add_job_to_queue(job_data)
473
+
474
+ return {
475
+ "success": True,
476
+ "request_id": request.request_id,
477
+ "status": "queued",
478
+ "message": "Translation request queued successfully",
479
+ "queue_position": len(orchestrator.job_queue),
480
+ "estimated_wait_time": len(orchestrator.job_queue) * 10
481
+ }
482
+
483
+ @app.get("/api/status/{request_id}")
484
+ async def check_status(request_id: str):
485
+ if request_id in orchestrator.completed_jobs:
486
+ return {
487
+ "success": True,
488
+ **orchestrator.completed_jobs[request_id]
489
+ }
490
+
491
+ if request_id in orchestrator.active_jobs:
492
+ job_info = orchestrator.active_jobs[request_id]
493
+ elapsed_time = time.time() - job_info['start_time']
494
+
495
+ return {
496
+ "success": True,
497
+ "request_id": request_id,
498
+ "status": "processing",
499
+ "worker_id": job_info['worker_id'],
500
+ "elapsed_time": elapsed_time,
501
+ "message": "Translation in progress"
502
+ }
503
+
504
+ for job in orchestrator.job_queue:
505
+ if job['request_id'] == request_id:
506
+ return {
507
+ "success": True,
508
+ "request_id": request_id,
509
+ "status": "queued",
510
+ "message": "Translation request is queued"
511
+ }
512
+
513
+ return {
514
+ "success": False,
515
+ "request_id": request_id,
516
+ "status": "not_found",
517
+ "message": "Translation request not found"
518
+ }
519
+
520
+ @app.get("/api/workers")
521
+ async def list_workers():
522
+ workers_info = []
523
+
524
+ for worker_id, worker in orchestrator.workers.items():
525
+ workers_info.append({
526
+ "id": worker_id,
527
+ "name": worker.config.name,
528
+ "url": worker.config.url,
529
+ "available": worker.available,
530
+ "active_jobs": worker.active_jobs,
531
+ "max_concurrent": worker.config.max_concurrent,
532
+ "priority": worker.config.priority,
533
+ "total_completed": worker.total_completed,
534
+ "total_failed": worker.total_failed,
535
+ "last_health_check": worker.last_health_check
536
+ })
537
+
538
+ return {
539
+ "success": True,
540
+ "workers": workers_info
541
+ }
542
+
543
+ if __name__ == "__main__":
544
+ import uvicorn
545
+ port = int(os.getenv("PORT", 7860))
546
+ print(f"πŸš€ Translation Orchestrator v{ORCHESTRATOR_VERSION}")
547
+ print(f"πŸ“‘ Starting on port {port}")
548
+ uvicorn.run(app, host="0.0.0.0", port=port, log_level="info")