danicor commited on
Commit
101fd96
·
verified ·
1 Parent(s): 524ffec

Create app.py

Browse files
Files changed (1) hide show
  1. app.py +551 -0
app.py ADDED
@@ -0,0 +1,551 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import os
2
+ import time
3
+ import asyncio
4
+ import uuid
5
+ from datetime import datetime
6
+ from typing import Dict, List, Optional
7
+ from collections import deque
8
+ from fastapi import FastAPI, HTTPException, Request
9
+ from fastapi.middleware.cors import CORSMiddleware
10
+ from pydantic import BaseModel
11
+ import requests
12
+ import threading
13
+
14
+ # Configuration
15
+ ORCHESTRATOR_VERSION = "2.0.1"
16
+ WORKER_HEALTH_CHECK_INTERVAL = 30
17
+ WORKER_TIMEOUT = 120
18
+
19
+ # Models
20
+ class TranslationRequest(BaseModel):
21
+ request_id: str
22
+ text: str
23
+ source_lang: str
24
+ target_lang: str
25
+ auto_charge: bool = False
26
+ notification_url: Optional[str] = None
27
+ wordpress_user_id: Optional[int] = None
28
+
29
+ class WorkerConfig(BaseModel):
30
+ url: str
31
+ name: str
32
+ priority: int = 1
33
+ max_concurrent: int = 3
34
+
35
+ class WorkerStatus:
36
+ def __init__(self, config: WorkerConfig):
37
+ self.config = config
38
+ self.available = False
39
+ self.active_jobs = 0
40
+ self.last_health_check = 0
41
+ self.total_completed = 0
42
+ self.total_failed = 0
43
+ self.avg_response_time = 0.0
44
+
45
+ class TranslationOrchestrator:
46
+ def __init__(self):
47
+ self.workers: Dict[str, WorkerStatus] = {}
48
+ self.job_queue = deque()
49
+ self.active_jobs: Dict[str, Dict] = {}
50
+ self.completed_jobs: Dict[str, Dict] = {}
51
+ self.worker_lock = threading.Lock()
52
+ self.job_lock = threading.Lock()
53
+
54
+ self.load_worker_configs()
55
+ self.start_health_checker()
56
+ self.start_job_processor()
57
+
58
+ def load_worker_configs(self):
59
+ # حالت ۱: از env بخونه (روش اصلی)
60
+ worker_index = 1
61
+ workers_loaded = 0
62
+
63
+ print("🔧 Loading worker configurations...")
64
+
65
+ while True:
66
+ url_key = f"WORKER_{worker_index}_URL"
67
+ name_key = f"WORKER_{worker_index}_NAME"
68
+
69
+ url = os.getenv(url_key)
70
+ if not url:
71
+ break
72
+
73
+ name = os.getenv(name_key, f"Worker {worker_index}")
74
+ priority = int(os.getenv(f"WORKER_{worker_index}_PRIORITY", "1"))
75
+ max_concurrent = int(os.getenv(f"WORKER_{worker_index}_MAX_CONCURRENT", "1"))
76
+
77
+ worker_id = f"worker_{worker_index}"
78
+ config = WorkerConfig(
79
+ url=url.rstrip('/'),
80
+ name=name,
81
+ priority=priority,
82
+ max_concurrent=max_concurrent
83
+ )
84
+
85
+ self.workers[worker_id] = WorkerStatus(config)
86
+ workers_loaded += 1
87
+ print(f"✅ Loaded worker: {name} at {url}")
88
+ worker_index += 1
89
+
90
+ # حالت ۲: اگه هیچ وورکری از env نیومد، دستی اضافه کن
91
+ if workers_loaded == 0:
92
+ print("⚠️ No workers in env, using hardcoded fallback")
93
+ fallback_workers = [
94
+ WorkerConfig(url="https://danicor-w1.hf.space", name="Worker 1", priority=1, max_concurrent=1),
95
+ WorkerConfig(url="https://danicor-w2.hf.space", name="Worker 2", priority=1, max_concurrent=1),
96
+ WorkerConfig(url="https://danicor-w3.hf.space", name="Worker 3", priority=1, max_concurrent=1),
97
+ ]
98
+ for i, cfg in enumerate(fallback_workers, start=1):
99
+ self.workers[f"worker_{i}"] = WorkerStatus(cfg)
100
+ print(f"✅ Hardcoded worker: {cfg.name} at {cfg.url}")
101
+
102
+
103
+ def start_health_checker(self):
104
+ def health_check_loop():
105
+ while True:
106
+ self.check_all_workers_health()
107
+ time.sleep(WORKER_HEALTH_CHECK_INTERVAL)
108
+
109
+ thread = threading.Thread(target=health_check_loop, daemon=True)
110
+ thread.start()
111
+ print("🔍 Health checker started")
112
+
113
+ def check_all_workers_health(self):
114
+ with self.worker_lock:
115
+ for worker_id, worker in self.workers.items():
116
+ try:
117
+ health_url = f"{worker.config.url}/api/health"
118
+ response = requests.get(health_url, timeout=10)
119
+
120
+ if response.status_code == 200:
121
+ data = response.json()
122
+ was_available = worker.available
123
+ worker.available = data.get('status') == 'healthy'
124
+ worker.last_health_check = time.time()
125
+
126
+ if worker.available and not was_available:
127
+ print(f"✅ {worker.config.name} is now available")
128
+ elif not worker.available and was_available:
129
+ print(f"❌ {worker.config.name} became unavailable")
130
+
131
+ else:
132
+ worker.available = False
133
+ print(f"⚠ {worker.config.name}: HTTP {response.status_code}")
134
+
135
+ except Exception as e:
136
+ worker.available = False
137
+ print(f"🚫 {worker.config.name}: {str(e)}")
138
+
139
+ def get_available_worker(self) -> Optional[str]:
140
+ with self.worker_lock:
141
+ available_workers = [
142
+ (worker_id, worker)
143
+ for worker_id, worker in self.workers.items()
144
+ if worker.available and worker.active_jobs < worker.config.max_concurrent
145
+ ]
146
+
147
+ if not available_workers:
148
+ return None
149
+
150
+ available_workers.sort(
151
+ key=lambda x: (-x[1].config.priority, x[1].active_jobs)
152
+ )
153
+
154
+ return available_workers[0][0]
155
+
156
+ def start_job_processor(self):
157
+ def process_queue_loop():
158
+ while True:
159
+ self.process_job_queue()
160
+ time.sleep(2)
161
+
162
+ thread = threading.Thread(target=process_queue_loop, daemon=True)
163
+ thread.start()
164
+ print("🔄 Job processor started")
165
+
166
+ def add_job_to_queue(self, job_data: Dict):
167
+ with self.job_lock:
168
+ self.job_queue.append(job_data)
169
+ print(f"📥 Job {job_data['request_id']} queued. Queue size: {len(self.job_queue)}")
170
+
171
+ def process_job_queue(self):
172
+ if not self.job_queue:
173
+ return
174
+
175
+ with self.job_lock:
176
+ if not self.job_queue:
177
+ return
178
+
179
+ worker_id = self.get_available_worker()
180
+ if not worker_id:
181
+ return
182
+
183
+ job_data = self.job_queue.popleft()
184
+
185
+ self.assign_job_to_worker(worker_id, job_data)
186
+
187
+ def assign_job_to_worker(self, worker_id: str, job_data: Dict):
188
+ worker = self.workers[worker_id]
189
+ request_id = job_data['request_id']
190
+
191
+ print(f"🚀 Assigning job {request_id} to {worker.config.name}")
192
+
193
+ with self.worker_lock:
194
+ worker.active_jobs += 1
195
+
196
+ self.active_jobs[request_id] = {
197
+ 'worker_id': worker_id,
198
+ 'job_data': job_data,
199
+ 'start_time': time.time(),
200
+ 'status': 'processing'
201
+ }
202
+
203
+ thread = threading.Thread(
204
+ target=self.send_to_worker,
205
+ args=(worker_id, job_data),
206
+ daemon=True
207
+ )
208
+ thread.start()
209
+
210
+ def send_to_worker(self, worker_id: str, job_data: Dict):
211
+ worker = self.workers[worker_id]
212
+ request_id = job_data['request_id']
213
+
214
+ try:
215
+ worker_request = {
216
+ 'request_id': request_id,
217
+ 'text': job_data['text'],
218
+ 'source_lang': job_data['source_lang'],
219
+ 'target_lang': job_data['target_lang'],
220
+ 'auto_charge': False, # حذف auto_charge - orchestrator مدیریت می‌کند
221
+ 'notification_url': None # حذف notification
222
+ }
223
+
224
+ response = requests.post(
225
+ f"{worker.config.url}/api/translate/heavy",
226
+ json=worker_request,
227
+ timeout=WORKER_TIMEOUT
228
+ )
229
+
230
+ if response.status_code == 200:
231
+ print(f"✅ Job {request_id} sent to {worker.config.name}")
232
+ self.monitor_worker_job(worker_id, request_id)
233
+ else:
234
+ self.handle_worker_failure(worker_id, request_id, f"HTTP {response.status_code}")
235
+
236
+ except Exception as e:
237
+ self.handle_worker_failure(worker_id, request_id, str(e))
238
+
239
+ def monitor_worker_job(self, worker_id: str, request_id: str):
240
+ worker = self.workers[worker_id]
241
+ max_checks = 60
242
+ check_count = 0
243
+
244
+ while check_count < max_checks:
245
+ time.sleep(5)
246
+ check_count += 1
247
+
248
+ try:
249
+ response = requests.post(
250
+ f"{worker.config.url}/api/check-translation-status",
251
+ json={'request_id': request_id},
252
+ timeout=15
253
+ )
254
+
255
+ if response.status_code == 200:
256
+ data = response.json()
257
+
258
+ if data.get('success') and data.get('status') == 'completed':
259
+ self.handle_job_completion(worker_id, request_id, data)
260
+ return
261
+ elif data.get('status') == 'failed':
262
+ self.handle_worker_failure(worker_id, request_id, "Worker reported failure")
263
+ return
264
+
265
+ except Exception as e:
266
+ print(f"⚠ Error checking job {request_id}: {str(e)}")
267
+
268
+ self.handle_worker_failure(worker_id, request_id, "Timeout waiting for completion")
269
+
270
+ def handle_job_completion(self, worker_id: str, request_id: str, worker_response: Dict):
271
+ worker = self.workers[worker_id]
272
+
273
+ print(f"🎉 Job {request_id} completed by {worker.config.name}")
274
+
275
+ with self.worker_lock:
276
+ worker.active_jobs -= 1
277
+ worker.total_completed += 1
278
+
279
+ job_info = self.active_jobs.get(request_id, {})
280
+ job_data = job_info.get('job_data', {})
281
+
282
+ processing_time = time.time() - job_info.get('start_time', time.time())
283
+
284
+ # ذخیره نتیجه برای دریافت توسط WordPress
285
+ completion_data = {
286
+ 'request_id': request_id,
287
+ 'status': 'completed',
288
+ 'translated_text': worker_response.get('translated_text'),
289
+ 'processing_time': processing_time,
290
+ 'character_count': worker_response.get('character_count', len(job_data.get('text', ''))),
291
+ 'translation_length': worker_response.get('translation_length', 0),
292
+ 'from_cache': worker_response.get('from_cache', False),
293
+ 'worker_name': worker.config.name,
294
+ 'completed_at': datetime.now().isoformat()
295
+ }
296
+
297
+ self.completed_jobs[request_id] = completion_data
298
+
299
+ if request_id in self.active_jobs:
300
+ del self.active_jobs[request_id]
301
+
302
+ # حذف notification به WordPress
303
+ print(f"✅ Result stored for {request_id}, waiting for WordPress to fetch")
304
+
305
+ def handle_worker_failure(self, worker_id: str, request_id: str, error_message: str):
306
+ worker = self.workers[worker_id]
307
+
308
+ print(f"💥 Job {request_id} failed on {worker.config.name}: {error_message}")
309
+
310
+ with self.worker_lock:
311
+ worker.active_jobs -= 1
312
+ worker.total_failed += 1
313
+
314
+ job_info = self.active_jobs.get(request_id, {})
315
+ job_data = job_info.get('job_data', {})
316
+
317
+ if job_info.get('retry_count', 0) < 2:
318
+ print(f"🔄 Retrying job {request_id} (attempt {job_info.get('retry_count', 0) + 1})")
319
+
320
+ job_data['retry_count'] = job_info.get('retry_count', 0) + 1
321
+ self.add_job_to_queue(job_data)
322
+
323
+ if request_id in self.active_jobs:
324
+ del self.active_jobs[request_id]
325
+ else:
326
+ failure_data = {
327
+ 'request_id': request_id,
328
+ 'status': 'failed',
329
+ 'error_message': error_message,
330
+ 'failed_at': datetime.now().isoformat()
331
+ }
332
+
333
+ self.completed_jobs[request_id] = failure_data
334
+
335
+ if request_id in self.active_jobs:
336
+ del self.active_jobs[request_id]
337
+
338
+ notification_url = job_data.get('notification_url')
339
+ if notification_url:
340
+ self.notify_wordpress(notification_url, failure_data)
341
+
342
+ def notify_wordpress(self, notification_url: str, data: Dict):
343
+ """Send notification to WordPress with improved error handling"""
344
+ try:
345
+ # Log the notification attempt
346
+ print(f"📤 Attempting to notify WordPress at: {notification_url}")
347
+
348
+ # Ensure HTTPS and proper URL format
349
+ if notification_url.startswith('http://localhost'):
350
+ # Replace localhost with the actual WordPress URL from environment
351
+ wordpress_url = os.getenv('WORDPRESS_BASE_URL', 'https://your-actual-site.com')
352
+ notification_url = notification_url.replace('http://localhost', wordpress_url)
353
+
354
+ # Ensure HTTPS
355
+ notification_url = notification_url.replace('http://', 'https://')
356
+
357
+ headers = {
358
+ 'Content-Type': 'application/json',
359
+ 'User-Agent': 'MLT-Orchestrator/1.0'
360
+ }
361
+
362
+ print(f"🔗 Final notification URL: {notification_url}")
363
+ print(f"📦 Payload keys: {list(data.keys())}")
364
+
365
+ response = requests.post(
366
+ notification_url,
367
+ json=data,
368
+ headers=headers,
369
+ timeout=30,
370
+ verify=True # Keep SSL verification enabled
371
+ )
372
+
373
+ print(f"📡 WordPress notification response: {response.status_code}")
374
+
375
+ if response.status_code == 200:
376
+ print(f"✅ WordPress notified successfully for {data['request_id']}")
377
+ return True
378
+ else:
379
+ print(f"❌ WordPress notification failed: HTTP {response.status_code}")
380
+ print(f"📄 Response: {response.text}")
381
+ return False
382
+
383
+ except requests.exceptions.Timeout:
384
+ print(f"⏰ WordPress notification timeout for {data['request_id']}")
385
+ return False
386
+ except requests.exceptions.ConnectionError as e:
387
+ print(f"🔌 WordPress connection error for {data['request_id']}: {str(e)}")
388
+ return False
389
+ except Exception as e:
390
+ print(f"💥 Notification error for {data['request_id']}: {str(e)}")
391
+ return False
392
+
393
+ # Initialize FastAPI app
394
+ app = FastAPI(
395
+ title="Translation Orchestrator",
396
+ description="Main orchestrator for distributed translation system",
397
+ version=ORCHESTRATOR_VERSION
398
+ )
399
+
400
+ app.add_middleware(
401
+ CORSMiddleware,
402
+ allow_origins=["*"],
403
+ allow_credentials=True,
404
+ allow_methods=["*"],
405
+ allow_headers=["*"],
406
+ )
407
+
408
+ orchestrator = TranslationOrchestrator()
409
+
410
+ @app.get("/")
411
+ async def root():
412
+ available_workers = sum(1 for w in orchestrator.workers.values() if w.available)
413
+ total_workers = len(orchestrator.workers)
414
+
415
+ return {
416
+ "service": "Translation Orchestrator",
417
+ "version": ORCHESTRATOR_VERSION,
418
+ "status": "running",
419
+ "workers": {
420
+ "total": total_workers,
421
+ "available": available_workers,
422
+ "unavailable": total_workers - available_workers
423
+ },
424
+ "queue": {
425
+ "active_jobs": len(orchestrator.active_jobs),
426
+ "queued_jobs": len(orchestrator.job_queue),
427
+ "completed_jobs": len(orchestrator.completed_jobs)
428
+ }
429
+ }
430
+
431
+ @app.get("/api/health")
432
+ async def health_check():
433
+ available_workers = sum(1 for w in orchestrator.workers.values() if w.available)
434
+
435
+ return {
436
+ "status": "healthy" if available_workers > 0 else "degraded",
437
+ "timestamp": datetime.now().isoformat(),
438
+ "workers": {
439
+ "total": len(orchestrator.workers),
440
+ "available": available_workers
441
+ },
442
+ "queue_stats": {
443
+ "active": len(orchestrator.active_jobs),
444
+ "queued": len(orchestrator.job_queue)
445
+ }
446
+ }
447
+
448
+ @app.post("/api/translate")
449
+ async def submit_translation(request: TranslationRequest):
450
+ if not any(w.available for w in orchestrator.workers.values()):
451
+ raise HTTPException(
452
+ status_code=503,
453
+ detail="No translation workers available. Please try again later."
454
+ )
455
+
456
+ job_data = {
457
+ 'request_id': request.request_id,
458
+ 'text': request.text,
459
+ 'source_lang': request.source_lang,
460
+ 'target_lang': request.target_lang,
461
+ 'auto_charge': request.auto_charge,
462
+ 'notification_url': request.notification_url,
463
+ 'wordpress_user_id': request.wordpress_user_id,
464
+ 'retry_count': 0
465
+ }
466
+
467
+ orchestrator.add_job_to_queue(job_data)
468
+
469
+ return {
470
+ "success": True,
471
+ "request_id": request.request_id,
472
+ "status": "queued",
473
+ "message": "Translation request queued successfully",
474
+ "queue_position": len(orchestrator.job_queue),
475
+ "estimated_wait_time": len(orchestrator.job_queue) * 10
476
+ }
477
+
478
+ @app.get("/api/status/{request_id}")
479
+ async def check_status(request_id: str):
480
+ if request_id in orchestrator.completed_jobs:
481
+ return {
482
+ "success": True,
483
+ **orchestrator.completed_jobs[request_id]
484
+ }
485
+
486
+ if request_id in orchestrator.active_jobs:
487
+ job_info = orchestrator.active_jobs[request_id]
488
+ elapsed_time = time.time() - job_info['start_time']
489
+
490
+ return {
491
+ "success": True,
492
+ "request_id": request_id,
493
+ "status": "processing",
494
+ "worker_id": job_info['worker_id'],
495
+ "elapsed_time": elapsed_time,
496
+ "message": "Translation in progress"
497
+ }
498
+
499
+ for job in orchestrator.job_queue:
500
+ if job['request_id'] == request_id:
501
+ return {
502
+ "success": True,
503
+ "request_id": request_id,
504
+ "status": "queued",
505
+ "message": "Translation request is queued"
506
+ }
507
+
508
+ return {
509
+ "success": False,
510
+ "request_id": request_id,
511
+ "status": "not_found",
512
+ "message": "Translation request not found"
513
+ }
514
+
515
+ @app.get("/api/workers")
516
+ async def list_workers():
517
+ workers_info = []
518
+
519
+ for worker_id, worker in orchestrator.workers.items():
520
+ workers_info.append({
521
+ "id": worker_id,
522
+ "name": worker.config.name,
523
+ "url": worker.config.url,
524
+ "available": worker.available,
525
+ "active_jobs": worker.active_jobs,
526
+ "max_concurrent": worker.config.max_concurrent,
527
+ "priority": worker.config.priority,
528
+ "total_completed": worker.total_completed,
529
+ "total_failed": worker.total_failed,
530
+ "last_health_check": worker.last_health_check
531
+ })
532
+
533
+ return {
534
+ "success": True,
535
+ "workers": workers_info
536
+ }
537
+
538
+ if __name__ == "__main__":
539
+ import uvicorn
540
+
541
+ port = int(os.getenv("PORT", 7860))
542
+
543
+ print(f"🚀 Translation Orchestrator v{ORCHESTRATOR_VERSION}")
544
+ print(f"📡 Starting on port {port}")
545
+
546
+ uvicorn.run(
547
+ app,
548
+ host="0.0.0.0",
549
+ port=port,
550
+ log_level="info"
551
+ )