danicor commited on
Commit
b47008f
·
verified ·
1 Parent(s): 4be5205

Create app33.py

Browse files
Files changed (1) hide show
  1. app33.py +515 -0
app33.py ADDED
@@ -0,0 +1,515 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import os
2
+ import time
3
+ import asyncio
4
+ import uuid
5
+ from datetime import datetime
6
+ from typing import Dict, List, Optional
7
+ from collections import deque
8
+ from fastapi import FastAPI, HTTPException, Request
9
+ from fastapi.middleware.cors import CORSMiddleware
10
+ from pydantic import BaseModel
11
+ import requests
12
+ import threading
13
+
14
+ # Configuration
15
+ ORCHESTRATOR_VERSION = "2.0.1"
16
+ WORKER_HEALTH_CHECK_INTERVAL = 30
17
+ WORKER_TIMEOUT = 120
18
+
19
+ # Models
20
+ class TranslationRequest(BaseModel):
21
+ request_id: str
22
+ text: str
23
+ source_lang: str
24
+ target_lang: str
25
+ auto_charge: bool = False
26
+ notification_url: Optional[str] = None
27
+ wordpress_user_id: Optional[int] = None
28
+
29
+ class WorkerConfig(BaseModel):
30
+ url: str
31
+ name: str
32
+ priority: int = 1
33
+ max_concurrent: int = 3
34
+
35
+ class WorkerStatus:
36
+ def __init__(self, config: WorkerConfig):
37
+ self.config = config
38
+ self.available = False
39
+ self.active_jobs = 0
40
+ self.last_health_check = 0
41
+ self.total_completed = 0
42
+ self.total_failed = 0
43
+ self.avg_response_time = 0.0
44
+
45
+ class TranslationOrchestrator:
46
+ def __init__(self):
47
+ self.workers: Dict[str, WorkerStatus] = {}
48
+ self.job_queue = deque()
49
+ self.active_jobs: Dict[str, Dict] = {}
50
+ self.completed_jobs: Dict[str, Dict] = {}
51
+ self.worker_lock = threading.Lock()
52
+ self.job_lock = threading.Lock()
53
+
54
+ self.load_worker_configs()
55
+ self.start_health_checker()
56
+ self.start_job_processor()
57
+
58
+ def load_worker_configs(self):
59
+ # حالت ۱: از env بخونه (روش اصلی)
60
+ worker_index = 1
61
+ workers_loaded = 0
62
+
63
+ print("🔧 Loading worker configurations...")
64
+
65
+ while True:
66
+ url_key = f"WORKER_{worker_index}_URL"
67
+ name_key = f"WORKER_{worker_index}_NAME"
68
+
69
+ url = os.getenv(url_key)
70
+ if not url:
71
+ break
72
+
73
+ name = os.getenv(name_key, f"Worker {worker_index}")
74
+ priority = int(os.getenv(f"WORKER_{worker_index}_PRIORITY", "1"))
75
+ max_concurrent = int(os.getenv(f"WORKER_{worker_index}_MAX_CONCURRENT", "3"))
76
+
77
+ worker_id = f"worker_{worker_index}"
78
+ config = WorkerConfig(
79
+ url=url.rstrip('/'),
80
+ name=name,
81
+ priority=priority,
82
+ max_concurrent=max_concurrent
83
+ )
84
+
85
+ self.workers[worker_id] = WorkerStatus(config)
86
+ workers_loaded += 1
87
+ print(f"✅ Loaded worker: {name} at {url}")
88
+ worker_index += 1
89
+
90
+ # حالت ۲: اگه هیچ وورکری از env نیومد، دستی اضافه کن
91
+ if workers_loaded == 0:
92
+ print("⚠️ No workers in env, using hardcoded fallback")
93
+ fallback_workers = [
94
+ WorkerConfig(url="https://danicor-w1.hf.space", name="Worker Local 1", priority=1, max_concurrent=1),
95
+ ]
96
+ for i, cfg in enumerate(fallback_workers, start=1):
97
+ self.workers[f"worker_{i}"] = WorkerStatus(cfg)
98
+ print(f"✅ Hardcoded worker: {cfg.name} at {cfg.url}")
99
+
100
+
101
+ def start_health_checker(self):
102
+ def health_check_loop():
103
+ while True:
104
+ self.check_all_workers_health()
105
+ time.sleep(WORKER_HEALTH_CHECK_INTERVAL)
106
+
107
+ thread = threading.Thread(target=health_check_loop, daemon=True)
108
+ thread.start()
109
+ print("🔍 Health checker started")
110
+
111
+ def check_all_workers_health(self):
112
+ with self.worker_lock:
113
+ for worker_id, worker in self.workers.items():
114
+ try:
115
+ health_url = f"{worker.config.url}/api/health"
116
+ response = requests.get(health_url, timeout=10)
117
+
118
+ if response.status_code == 200:
119
+ data = response.json()
120
+ was_available = worker.available
121
+ worker.available = data.get('status') == 'healthy'
122
+ worker.last_health_check = time.time()
123
+
124
+ if worker.available and not was_available:
125
+ print(f"✅ {worker.config.name} is now available")
126
+ elif not worker.available and was_available:
127
+ print(f"❌ {worker.config.name} became unavailable")
128
+
129
+ else:
130
+ worker.available = False
131
+ print(f"⚠ {worker.config.name}: HTTP {response.status_code}")
132
+
133
+ except Exception as e:
134
+ worker.available = False
135
+ print(f"🚫 {worker.config.name}: {str(e)}")
136
+
137
+ def get_available_worker(self) -> Optional[str]:
138
+ with self.worker_lock:
139
+ available_workers = [
140
+ (worker_id, worker)
141
+ for worker_id, worker in self.workers.items()
142
+ if worker.available and worker.active_jobs < worker.config.max_concurrent
143
+ ]
144
+
145
+ if not available_workers:
146
+ return None
147
+
148
+ available_workers.sort(
149
+ key=lambda x: (-x[1].config.priority, x[1].active_jobs)
150
+ )
151
+
152
+ return available_workers[0][0]
153
+
154
+ def start_job_processor(self):
155
+ def process_queue_loop():
156
+ while True:
157
+ self.process_job_queue()
158
+ time.sleep(2)
159
+
160
+ thread = threading.Thread(target=process_queue_loop, daemon=True)
161
+ thread.start()
162
+ print("🔄 Job processor started")
163
+
164
+ def add_job_to_queue(self, job_data: Dict):
165
+ with self.job_lock:
166
+ self.job_queue.append(job_data)
167
+ print(f"📥 Job {job_data['request_id']} queued. Queue size: {len(self.job_queue)}")
168
+
169
+ def process_job_queue(self):
170
+ if not self.job_queue:
171
+ return
172
+
173
+ with self.job_lock:
174
+ if not self.job_queue:
175
+ return
176
+
177
+ worker_id = self.get_available_worker()
178
+ if not worker_id:
179
+ return
180
+
181
+ job_data = self.job_queue.popleft()
182
+
183
+ self.assign_job_to_worker(worker_id, job_data)
184
+
185
+ def assign_job_to_worker(self, worker_id: str, job_data: Dict):
186
+ worker = self.workers[worker_id]
187
+ request_id = job_data['request_id']
188
+
189
+ print(f"🚀 Assigning job {request_id} to {worker.config.name}")
190
+
191
+ with self.worker_lock:
192
+ worker.active_jobs += 1
193
+
194
+ self.active_jobs[request_id] = {
195
+ 'worker_id': worker_id,
196
+ 'job_data': job_data,
197
+ 'start_time': time.time(),
198
+ 'status': 'processing'
199
+ }
200
+
201
+ thread = threading.Thread(
202
+ target=self.send_to_worker,
203
+ args=(worker_id, job_data),
204
+ daemon=True
205
+ )
206
+ thread.start()
207
+
208
+ def send_to_worker(self, worker_id: str, job_data: Dict):
209
+ worker = self.workers[worker_id]
210
+ request_id = job_data['request_id']
211
+
212
+ try:
213
+ worker_request = {
214
+ 'request_id': request_id,
215
+ 'text': job_data['text'],
216
+ 'source_lang': job_data['source_lang'],
217
+ 'target_lang': job_data['target_lang'],
218
+ 'auto_charge': job_data.get('auto_charge', False),
219
+ 'notification_url': None
220
+ }
221
+
222
+ response = requests.post(
223
+ f"{worker.config.url}/api/translate/heavy",
224
+ json=worker_request,
225
+ timeout=WORKER_TIMEOUT
226
+ )
227
+
228
+ if response.status_code == 200:
229
+ print(f"✅ Job {request_id} sent to {worker.config.name}")
230
+ self.monitor_worker_job(worker_id, request_id)
231
+ else:
232
+ self.handle_worker_failure(worker_id, request_id, f"HTTP {response.status_code}")
233
+
234
+ except Exception as e:
235
+ self.handle_worker_failure(worker_id, request_id, str(e))
236
+
237
+ def monitor_worker_job(self, worker_id: str, request_id: str):
238
+ worker = self.workers[worker_id]
239
+ max_checks = 60
240
+ check_count = 0
241
+
242
+ while check_count < max_checks:
243
+ time.sleep(5)
244
+ check_count += 1
245
+
246
+ try:
247
+ response = requests.post(
248
+ f"{worker.config.url}/api/check-translation-status",
249
+ json={'request_id': request_id},
250
+ timeout=15
251
+ )
252
+
253
+ if response.status_code == 200:
254
+ data = response.json()
255
+
256
+ if data.get('success') and data.get('status') == 'completed':
257
+ self.handle_job_completion(worker_id, request_id, data)
258
+ return
259
+ elif data.get('status') == 'failed':
260
+ self.handle_worker_failure(worker_id, request_id, "Worker reported failure")
261
+ return
262
+
263
+ except Exception as e:
264
+ print(f"⚠ Error checking job {request_id}: {str(e)}")
265
+
266
+ self.handle_worker_failure(worker_id, request_id, "Timeout waiting for completion")
267
+
268
+ def handle_job_completion(self, worker_id: str, request_id: str, worker_response: Dict):
269
+ worker = self.workers[worker_id]
270
+
271
+ print(f"🎉 Job {request_id} completed by {worker.config.name}")
272
+
273
+ with self.worker_lock:
274
+ worker.active_jobs -= 1
275
+ worker.total_completed += 1
276
+
277
+ job_info = self.active_jobs.get(request_id, {})
278
+ job_data = job_info.get('job_data', {})
279
+
280
+ processing_time = time.time() - job_info.get('start_time', time.time())
281
+
282
+ completion_data = {
283
+ 'request_id': request_id,
284
+ 'status': 'completed',
285
+ 'translated_text': worker_response.get('translated_text'),
286
+ 'processing_time': processing_time,
287
+ 'character_count': worker_response.get('character_count', len(job_data.get('text', ''))),
288
+ 'translation_length': worker_response.get('translation_length', 0),
289
+ 'from_cache': worker_response.get('from_cache', False),
290
+ 'worker_name': worker.config.name,
291
+ 'completed_at': datetime.now().isoformat()
292
+ }
293
+
294
+ self.completed_jobs[request_id] = completion_data
295
+
296
+ if request_id in self.active_jobs:
297
+ del self.active_jobs[request_id]
298
+
299
+ notification_url = job_data.get('notification_url')
300
+ if notification_url:
301
+ self.notify_wordpress(notification_url, completion_data)
302
+
303
+ def handle_worker_failure(self, worker_id: str, request_id: str, error_message: str):
304
+ worker = self.workers[worker_id]
305
+
306
+ print(f"💥 Job {request_id} failed on {worker.config.name}: {error_message}")
307
+
308
+ with self.worker_lock:
309
+ worker.active_jobs -= 1
310
+ worker.total_failed += 1
311
+
312
+ job_info = self.active_jobs.get(request_id, {})
313
+ job_data = job_info.get('job_data', {})
314
+
315
+ if job_info.get('retry_count', 0) < 2:
316
+ print(f"🔄 Retrying job {request_id} (attempt {job_info.get('retry_count', 0) + 1})")
317
+
318
+ job_data['retry_count'] = job_info.get('retry_count', 0) + 1
319
+ self.add_job_to_queue(job_data)
320
+
321
+ if request_id in self.active_jobs:
322
+ del self.active_jobs[request_id]
323
+ else:
324
+ failure_data = {
325
+ 'request_id': request_id,
326
+ 'status': 'failed',
327
+ 'error_message': error_message,
328
+ 'failed_at': datetime.now().isoformat()
329
+ }
330
+
331
+ self.completed_jobs[request_id] = failure_data
332
+
333
+ if request_id in self.active_jobs:
334
+ del self.active_jobs[request_id]
335
+
336
+ notification_url = job_data.get('notification_url')
337
+ if notification_url:
338
+ self.notify_wordpress(notification_url, failure_data)
339
+
340
+ def notify_wordpress(self, notification_url: str, data: Dict):
341
+ try:
342
+ response = requests.post(
343
+ notification_url,
344
+ json=data,
345
+ timeout=30,
346
+ verify=False
347
+ )
348
+
349
+ if response.status_code == 200:
350
+ print(f"📤 WordPress notified for {data['request_id']}")
351
+ else:
352
+ print(f"⚠ WordPress notification failed: HTTP {response.status_code}")
353
+
354
+ except Exception as e:
355
+ print(f"⚠ WordPress notification error: {str(e)}")
356
+
357
+ # Initialize FastAPI app
358
+ app = FastAPI(
359
+ title="Translation Orchestrator",
360
+ description="Main orchestrator for distributed translation system",
361
+ version=ORCHESTRATOR_VERSION
362
+ )
363
+
364
+ app.add_middleware(
365
+ CORSMiddleware,
366
+ allow_origins=["*"],
367
+ allow_credentials=True,
368
+ allow_methods=["*"],
369
+ allow_headers=["*"],
370
+ )
371
+
372
+ orchestrator = TranslationOrchestrator()
373
+
374
+ @app.get("/")
375
+ async def root():
376
+ available_workers = sum(1 for w in orchestrator.workers.values() if w.available)
377
+ total_workers = len(orchestrator.workers)
378
+
379
+ return {
380
+ "service": "Translation Orchestrator",
381
+ "version": ORCHESTRATOR_VERSION,
382
+ "status": "running",
383
+ "workers": {
384
+ "total": total_workers,
385
+ "available": available_workers,
386
+ "unavailable": total_workers - available_workers
387
+ },
388
+ "queue": {
389
+ "active_jobs": len(orchestrator.active_jobs),
390
+ "queued_jobs": len(orchestrator.job_queue),
391
+ "completed_jobs": len(orchestrator.completed_jobs)
392
+ }
393
+ }
394
+
395
+ @app.get("/api/health")
396
+ async def health_check():
397
+ available_workers = sum(1 for w in orchestrator.workers.values() if w.available)
398
+
399
+ return {
400
+ "status": "healthy" if available_workers > 0 else "degraded",
401
+ "timestamp": datetime.now().isoformat(),
402
+ "workers": {
403
+ "total": len(orchestrator.workers),
404
+ "available": available_workers
405
+ },
406
+ "queue_stats": {
407
+ "active": len(orchestrator.active_jobs),
408
+ "queued": len(orchestrator.job_queue)
409
+ }
410
+ }
411
+
412
+ @app.post("/api/translate")
413
+ async def submit_translation(request: TranslationRequest):
414
+ if not any(w.available for w in orchestrator.workers.values()):
415
+ raise HTTPException(
416
+ status_code=503,
417
+ detail="No translation workers available. Please try again later."
418
+ )
419
+
420
+ job_data = {
421
+ 'request_id': request.request_id,
422
+ 'text': request.text,
423
+ 'source_lang': request.source_lang,
424
+ 'target_lang': request.target_lang,
425
+ 'auto_charge': request.auto_charge,
426
+ 'notification_url': request.notification_url,
427
+ 'wordpress_user_id': request.wordpress_user_id,
428
+ 'retry_count': 0
429
+ }
430
+
431
+ orchestrator.add_job_to_queue(job_data)
432
+
433
+ return {
434
+ "success": True,
435
+ "request_id": request.request_id,
436
+ "status": "queued",
437
+ "message": "Translation request queued successfully",
438
+ "queue_position": len(orchestrator.job_queue),
439
+ "estimated_wait_time": len(orchestrator.job_queue) * 10
440
+ }
441
+
442
+ @app.get("/api/status/{request_id}")
443
+ async def check_status(request_id: str):
444
+ if request_id in orchestrator.completed_jobs:
445
+ return {
446
+ "success": True,
447
+ **orchestrator.completed_jobs[request_id]
448
+ }
449
+
450
+ if request_id in orchestrator.active_jobs:
451
+ job_info = orchestrator.active_jobs[request_id]
452
+ elapsed_time = time.time() - job_info['start_time']
453
+
454
+ return {
455
+ "success": True,
456
+ "request_id": request_id,
457
+ "status": "processing",
458
+ "worker_id": job_info['worker_id'],
459
+ "elapsed_time": elapsed_time,
460
+ "message": "Translation in progress"
461
+ }
462
+
463
+ for job in orchestrator.job_queue:
464
+ if job['request_id'] == request_id:
465
+ return {
466
+ "success": True,
467
+ "request_id": request_id,
468
+ "status": "queued",
469
+ "message": "Translation request is queued"
470
+ }
471
+
472
+ return {
473
+ "success": False,
474
+ "request_id": request_id,
475
+ "status": "not_found",
476
+ "message": "Translation request not found"
477
+ }
478
+
479
+ @app.get("/api/workers")
480
+ async def list_workers():
481
+ workers_info = []
482
+
483
+ for worker_id, worker in orchestrator.workers.items():
484
+ workers_info.append({
485
+ "id": worker_id,
486
+ "name": worker.config.name,
487
+ "url": worker.config.url,
488
+ "available": worker.available,
489
+ "active_jobs": worker.active_jobs,
490
+ "max_concurrent": worker.config.max_concurrent,
491
+ "priority": worker.config.priority,
492
+ "total_completed": worker.total_completed,
493
+ "total_failed": worker.total_failed,
494
+ "last_health_check": worker.last_health_check
495
+ })
496
+
497
+ return {
498
+ "success": True,
499
+ "workers": workers_info
500
+ }
501
+
502
+ if __name__ == "__main__":
503
+ import uvicorn
504
+
505
+ port = int(os.getenv("PORT", 7860))
506
+
507
+ print(f"🚀 Translation Orchestrator v{ORCHESTRATOR_VERSION}")
508
+ print(f"📡 Starting on port {port}")
509
+
510
+ uvicorn.run(
511
+ app,
512
+ host="0.0.0.0",
513
+ port=port,
514
+ log_level="info"
515
+ )