danicor commited on
Commit
09afc43
·
verified ·
1 Parent(s): d36df9c

Delete app.py

Browse files
Files changed (1) hide show
  1. app.py +0 -548
app.py DELETED
@@ -1,548 +0,0 @@
1
- import os
2
- import time
3
- import uuid
4
- import hashlib
5
- from datetime import datetime
6
- from typing import Dict, Optional
7
- from collections import deque
8
- from fastapi import FastAPI, HTTPException
9
- from fastapi.middleware.cors import CORSMiddleware
10
- from pydantic import BaseModel
11
- import requests
12
- import threading
13
-
14
- # Configuration
15
- ORCHESTRATOR_VERSION = "2.1.0"
16
- WORKER_HEALTH_CHECK_INTERVAL = 30
17
- WORKER_TIMEOUT = 120
18
-
19
- # Models
20
- class TranslationRequest(BaseModel):
21
- request_id: str
22
- text: str
23
- source_lang: str
24
- target_lang: str
25
- auto_charge: bool = False
26
- notification_url: Optional[str] = None
27
- wordpress_user_id: Optional[int] = None
28
-
29
- class WorkerConfig(BaseModel):
30
- url: str
31
- name: str
32
- priority: int = 1
33
- max_concurrent: int = 3
34
-
35
- class WorkerStatus:
36
- def __init__(self, config: WorkerConfig):
37
- self.config = config
38
- self.available = False
39
- self.active_jobs = 0
40
- self.last_health_check = 0
41
- self.total_completed = 0
42
- self.total_failed = 0
43
- self.avg_response_time = 0.0
44
-
45
- class TranslationOrchestrator:
46
- def __init__(self):
47
- self.workers: Dict[str, WorkerStatus] = {}
48
- self.job_queue = deque()
49
- self.active_jobs: Dict[str, Dict] = {}
50
- self.completed_jobs: Dict[str, Dict] = {}
51
-
52
- # in-memory cache: key -> {translated_text, character_count, created_at}
53
- self.cache: Dict[str, Dict] = {}
54
-
55
- # Locks to protect shared structures
56
- self.worker_lock = threading.Lock()
57
- self.job_lock = threading.Lock()
58
- self.cache_lock = threading.Lock()
59
-
60
- self.load_worker_configs()
61
- self.start_health_checker()
62
- self.start_job_processor()
63
-
64
- def make_cache_key(self, text: str, source_lang: str, target_lang: str) -> str:
65
- normalized = f"{source_lang}|{target_lang}|{text}"
66
- return hashlib.md5(normalized.encode("utf-8")).hexdigest()
67
-
68
- def load_worker_configs(self):
69
- worker_index = 1
70
- workers_loaded = 0
71
-
72
- print("🔧 Loading worker configurations...")
73
-
74
- while True:
75
- url_key = f"WORKER_{worker_index}_URL"
76
- name_key = f"WORKER_{worker_index}_NAME"
77
-
78
- url = os.getenv(url_key)
79
- if not url:
80
- break
81
-
82
- name = os.getenv(name_key, f"Worker {worker_index}")
83
- priority = int(os.getenv(f"WORKER_{worker_index}_PRIORITY", "1"))
84
- max_concurrent = int(os.getenv(f"WORKER_{worker_index}_MAX_CONCURRENT", "1"))
85
-
86
- worker_id = f"worker_{worker_index}"
87
- config = WorkerConfig(
88
- url=url.rstrip('/'),
89
- name=name,
90
- priority=priority,
91
- max_concurrent=max_concurrent
92
- )
93
-
94
- self.workers[worker_id] = WorkerStatus(config)
95
- workers_loaded += 1
96
- print(f"✅ Loaded worker: {name} at {url}")
97
- worker_index += 1
98
-
99
- if workers_loaded == 0:
100
- print("⚠️ No workers in env, using hardcoded fallback")
101
- fallback_workers = [
102
- WorkerConfig(url="https://danicor-w1.hf.space", name="Worker 1", priority=1, max_concurrent=1),
103
- WorkerConfig(url="https://danicor-w2.hf.space", name="Worker 2", priority=1, max_concurrent=1),
104
- WorkerConfig(url="https://danicor-w3.hf.space", name="Worker 3", priority=1, max_concurrent=1),
105
- ]
106
- for i, cfg in enumerate(fallback_workers, start=1):
107
- self.workers[f"worker_{i}"] = WorkerStatus(cfg)
108
- print(f"✅ Hardcoded worker: {cfg.name} at {cfg.url}")
109
-
110
- def start_health_checker(self):
111
- def health_check_loop():
112
- while True:
113
- self.check_all_workers_health()
114
- time.sleep(WORKER_HEALTH_CHECK_INTERVAL)
115
-
116
- thread = threading.Thread(target=health_check_loop, daemon=True)
117
- thread.start()
118
- print("🔍 Health checker started")
119
-
120
- def check_all_workers_health(self):
121
- with self.worker_lock:
122
- for worker_id, worker in self.workers.items():
123
- try:
124
- health_url = f"{worker.config.url}/api/health"
125
- response = requests.get(health_url, timeout=10)
126
-
127
- if response.status_code == 200:
128
- data = response.json()
129
- was_available = worker.available
130
- worker.available = data.get('status') == 'healthy'
131
- worker.last_health_check = time.time()
132
-
133
- if worker.available and not was_available:
134
- print(f"✅ {worker.config.name} is now available")
135
- elif not worker.available and was_available:
136
- print(f"❌ {worker.config.name} became unavailable")
137
- else:
138
- worker.available = False
139
- print(f"⚠ {worker.config.name}: HTTP {response.status_code}")
140
-
141
- except Exception as e:
142
- worker.available = False
143
- print(f"🚫 {worker.config.name}: {str(e)}")
144
-
145
- def get_available_worker(self) -> Optional[str]:
146
- with self.worker_lock:
147
- available_workers = [
148
- (worker_id, worker)
149
- for worker_id, worker in self.workers.items()
150
- if worker.available and worker.active_jobs < worker.config.max_concurrent
151
- ]
152
-
153
- if not available_workers:
154
- return None
155
-
156
- available_workers.sort(key=lambda x: (-x[1].config.priority, x[1].active_jobs))
157
- return available_workers[0][0]
158
-
159
- def start_job_processor(self):
160
- def process_queue_loop():
161
- while True:
162
- self.process_job_queue()
163
- time.sleep(2)
164
-
165
- thread = threading.Thread(target=process_queue_loop, daemon=True)
166
- thread.start()
167
- print("🔄 Job processor started")
168
-
169
- def add_job_to_queue(self, job_data: Dict):
170
- with self.job_lock:
171
- self.job_queue.append(job_data)
172
- print(f"📥 Job {job_data['request_id']} queued. Queue size: {len(self.job_queue)}")
173
-
174
- def process_job_queue(self):
175
- if not self.job_queue:
176
- return
177
-
178
- with self.job_lock:
179
- if not self.job_queue:
180
- return
181
-
182
- worker_id = self.get_available_worker()
183
- if not worker_id:
184
- return
185
-
186
- job_data = self.job_queue.popleft()
187
-
188
- self.assign_job_to_worker(worker_id, job_data)
189
-
190
- def assign_job_to_worker(self, worker_id: str, job_data: Dict):
191
- worker = self.workers[worker_id]
192
- request_id = job_data['request_id']
193
-
194
- print(f"🚀 Assigning job {request_id} to {worker.config.name}")
195
-
196
- with self.worker_lock:
197
- worker.active_jobs += 1
198
-
199
- self.active_jobs[request_id] = {
200
- 'worker_id': worker_id,
201
- 'job_data': job_data,
202
- 'start_time': time.time(),
203
- 'status': 'processing'
204
- }
205
-
206
- thread = threading.Thread(
207
- target=self.send_to_worker,
208
- args=(worker_id, job_data),
209
- daemon=True
210
- )
211
- thread.start()
212
-
213
- def send_to_worker(self, worker_id: str, job_data: Dict):
214
- worker = self.workers[worker_id]
215
- request_id = job_data['request_id']
216
-
217
- try:
218
- worker_request = {
219
- 'request_id': request_id,
220
- 'text': job_data['text'],
221
- 'source_lang': job_data['source_lang'],
222
- 'target_lang': job_data['target_lang'],
223
- 'auto_charge': False,
224
- 'notification_url': None
225
- }
226
-
227
- response = requests.post(
228
- f"{worker.config.url}/api/translate/heavy",
229
- json=worker_request,
230
- timeout=WORKER_TIMEOUT
231
- )
232
-
233
- if response.status_code == 200:
234
- print(f"✅ Job {request_id} sent to {worker.config.name}")
235
- self.monitor_worker_job(worker_id, request_id)
236
- else:
237
- self.handle_worker_failure(worker_id, request_id, f"HTTP {response.status_code}")
238
-
239
- except Exception as e:
240
- self.handle_worker_failure(worker_id, request_id, str(e))
241
-
242
- def monitor_worker_job(self, worker_id: str, request_id: str):
243
- worker = self.workers[worker_id]
244
- max_checks = 60
245
- check_count = 0
246
-
247
- while check_count < max_checks:
248
- time.sleep(5)
249
- check_count += 1
250
-
251
- try:
252
- response = requests.post(
253
- f"{worker.config.url}/api/check-translation-status",
254
- json={'request_id': request_id},
255
- timeout=15
256
- )
257
-
258
- if response.status_code == 200:
259
- data = response.json()
260
-
261
- if data.get('success') and data.get('status') == 'completed':
262
- self.handle_job_completion(worker_id, request_id, data)
263
- return
264
- elif data.get('status') == 'failed':
265
- self.handle_worker_failure(worker_id, request_id, "Worker reported failure")
266
- return
267
-
268
- except Exception as e:
269
- print(f"⚠ Error checking job {request_id}: {str(e)}")
270
-
271
- self.handle_worker_failure(worker_id, request_id, "Timeout waiting for completion")
272
-
273
- def handle_job_completion(self, worker_id: str, request_id: str, worker_response: Dict):
274
- worker = self.workers[worker_id]
275
-
276
- print(f"🎉 Job {request_id} completed by {worker.config.name}")
277
-
278
- with self.worker_lock:
279
- worker.active_jobs -= 1
280
- worker.total_completed += 1
281
-
282
- job_info = self.active_jobs.get(request_id, {})
283
- job_data = job_info.get('job_data', {})
284
-
285
- processing_time = time.time() - job_info.get('start_time', time.time())
286
- translated_text = worker_response.get('translated_text')
287
-
288
- completion_data = {
289
- 'request_id': request_id,
290
- 'status': 'completed',
291
- 'translated_text': translated_text,
292
- 'processing_time': processing_time,
293
- 'character_count': worker_response.get('character_count', len(job_data.get('text', ''))),
294
- 'translation_length': worker_response.get('translation_length', 0),
295
- 'from_cache': worker_response.get('from_cache', False),
296
- 'worker_name': worker.config.name,
297
- 'completed_at': datetime.now().isoformat()
298
- }
299
-
300
- self.completed_jobs[request_id] = completion_data
301
-
302
- # store in cache (thread-safe)
303
- text = job_data.get('text')
304
- source_lang = job_data.get('source_lang')
305
- target_lang = job_data.get('target_lang')
306
- if text is not None:
307
- cache_key = self.make_cache_key(text, source_lang, target_lang)
308
- with self.cache_lock:
309
- self.cache[cache_key] = {
310
- 'translated_text': translated_text,
311
- 'character_count': len(text),
312
- 'created_at': datetime.now().isoformat()
313
- }
314
-
315
- if request_id in self.active_jobs:
316
- del self.active_jobs[request_id]
317
-
318
- print(f"✅ Result stored for {request_id}, waiting for WordPress to fetch")
319
-
320
- def handle_worker_failure(self, worker_id: str, request_id: str, error_message: str):
321
- worker = self.workers[worker_id]
322
-
323
- print(f"💥 Job {request_id} failed on {worker.config.name}: {error_message}")
324
-
325
- with self.worker_lock:
326
- worker.active_jobs -= 1
327
- worker.total_failed += 1
328
-
329
- job_info = self.active_jobs.get(request_id, {})
330
- job_data = job_info.get('job_data', {})
331
-
332
- if job_info.get('retry_count', 0) < 2:
333
- print(f"🔄 Retrying job {request_id} (attempt {job_info.get('retry_count', 0) + 1})")
334
- job_data['retry_count'] = job_info.get('retry_count', 0) + 1
335
- self.add_job_to_queue(job_data)
336
- if request_id in self.active_jobs:
337
- del self.active_jobs[request_id]
338
- else:
339
- failure_data = {
340
- 'request_id': request_id,
341
- 'status': 'failed',
342
- 'error_message': error_message,
343
- 'failed_at': datetime.now().isoformat()
344
- }
345
- self.completed_jobs[request_id] = failure_data
346
- if request_id in self.active_jobs:
347
- del self.active_jobs[request_id]
348
-
349
- def notify_wordpress(self, notification_url: str, data: Dict):
350
- try:
351
- print(f"📤 Attempting to notify WordPress at: {notification_url}")
352
- if notification_url.startswith('http://localhost'):
353
- wordpress_url = os.getenv('WORDPRESS_BASE_URL', 'https://your-actual-site.com')
354
- notification_url = notification_url.replace('http://localhost', wordpress_url)
355
- notification_url = notification_url.replace('http://', 'https://')
356
-
357
- headers = {
358
- 'Content-Type': 'application/json',
359
- 'User-Agent': 'MLT-Orchestrator/1.0'
360
- }
361
-
362
- response = requests.post(
363
- notification_url,
364
- json=data,
365
- headers=headers,
366
- timeout=30,
367
- verify=True
368
- )
369
-
370
- if response.status_code == 200:
371
- print(f"✅ WordPress notified successfully for {data['request_id']}")
372
- return True
373
- else:
374
- print(f"❌ WordPress notification failed: HTTP {response.status_code}")
375
- return False
376
-
377
- except Exception as e:
378
- print(f"💥 Notification error for {data.get('request_id', '<unknown>')}: {str(e)}")
379
- return False
380
-
381
- # Initialize FastAPI app
382
- app = FastAPI(
383
- title="Translation Orchestrator",
384
- description="Main orchestrator for distributed translation system",
385
- version=ORCHESTRATOR_VERSION
386
- )
387
-
388
- app.add_middleware(
389
- CORSMiddleware,
390
- allow_origins=["*"],
391
- allow_credentials=True,
392
- allow_methods=["*"],
393
- allow_headers=["*"],
394
- )
395
-
396
- orchestrator = TranslationOrchestrator()
397
-
398
- @app.get("/")
399
- async def root():
400
- available_workers = sum(1 for w in orchestrator.workers.values() if w.available)
401
- total_workers = len(orchestrator.workers)
402
-
403
- return {
404
- "service": "Translation Orchestrator",
405
- "version": ORCHESTRATOR_VERSION,
406
- "status": "running",
407
- "workers": {
408
- "total": total_workers,
409
- "available": available_workers,
410
- "unavailable": total_workers - available_workers
411
- },
412
- "queue": {
413
- "active_jobs": len(orchestrator.active_jobs),
414
- "queued_jobs": len(orchestrator.job_queue),
415
- "completed_jobs": len(orchestrator.completed_jobs)
416
- }
417
- }
418
-
419
- @app.get("/api/health")
420
- async def health_check():
421
- available_workers = sum(1 for w in orchestrator.workers.values() if w.available)
422
-
423
- return {
424
- "status": "healthy" if available_workers > 0 else "degraded",
425
- "timestamp": datetime.now().isoformat(),
426
- "workers": {
427
- "total": len(orchestrator.workers),
428
- "available": available_workers
429
- },
430
- "queue_stats": {
431
- "active": len(orchestrator.active_jobs),
432
- "queued": len(orchestrator.job_queue)
433
- }
434
- }
435
-
436
- @app.post("/api/translate")
437
- async def submit_translation(request: TranslationRequest):
438
- # check cache first
439
- cache_key = orchestrator.make_cache_key(request.text, request.source_lang, request.target_lang)
440
- with orchestrator.cache_lock:
441
- cached = orchestrator.cache.get(cache_key)
442
-
443
- if cached:
444
- return {
445
- "success": True,
446
- "request_id": str(uuid.uuid4()),
447
- "status": "completed",
448
- "translated_text": cached["translated_text"],
449
- "character_count": cached.get("character_count", len(request.text)),
450
- "processing_time": 0,
451
- "from_cache": True,
452
- "message": "Translation served from cache"
453
- }
454
-
455
- if not any(w.available for w in orchestrator.workers.values()):
456
- raise HTTPException(
457
- status_code=503,
458
- detail="No translation workers available. Please try again later."
459
- )
460
-
461
- job_data = {
462
- 'request_id': request.request_id,
463
- 'text': request.text,
464
- 'source_lang': request.source_lang,
465
- 'target_lang': request.target_lang,
466
- 'auto_charge': request.auto_charge,
467
- 'notification_url': request.notification_url,
468
- 'wordpress_user_id': request.wordpress_user_id,
469
- 'retry_count': 0
470
- }
471
-
472
- orchestrator.add_job_to_queue(job_data)
473
-
474
- return {
475
- "success": True,
476
- "request_id": request.request_id,
477
- "status": "queued",
478
- "message": "Translation request queued successfully",
479
- "queue_position": len(orchestrator.job_queue),
480
- "estimated_wait_time": len(orchestrator.job_queue) * 10
481
- }
482
-
483
- @app.get("/api/status/{request_id}")
484
- async def check_status(request_id: str):
485
- if request_id in orchestrator.completed_jobs:
486
- return {
487
- "success": True,
488
- **orchestrator.completed_jobs[request_id]
489
- }
490
-
491
- if request_id in orchestrator.active_jobs:
492
- job_info = orchestrator.active_jobs[request_id]
493
- elapsed_time = time.time() - job_info['start_time']
494
-
495
- return {
496
- "success": True,
497
- "request_id": request_id,
498
- "status": "processing",
499
- "worker_id": job_info['worker_id'],
500
- "elapsed_time": elapsed_time,
501
- "message": "Translation in progress"
502
- }
503
-
504
- for job in orchestrator.job_queue:
505
- if job['request_id'] == request_id:
506
- return {
507
- "success": True,
508
- "request_id": request_id,
509
- "status": "queued",
510
- "message": "Translation request is queued"
511
- }
512
-
513
- return {
514
- "success": False,
515
- "request_id": request_id,
516
- "status": "not_found",
517
- "message": "Translation request not found"
518
- }
519
-
520
- @app.get("/api/workers")
521
- async def list_workers():
522
- workers_info = []
523
-
524
- for worker_id, worker in orchestrator.workers.items():
525
- workers_info.append({
526
- "id": worker_id,
527
- "name": worker.config.name,
528
- "url": worker.config.url,
529
- "available": worker.available,
530
- "active_jobs": worker.active_jobs,
531
- "max_concurrent": worker.config.max_concurrent,
532
- "priority": worker.config.priority,
533
- "total_completed": worker.total_completed,
534
- "total_failed": worker.total_failed,
535
- "last_health_check": worker.last_health_check
536
- })
537
-
538
- return {
539
- "success": True,
540
- "workers": workers_info
541
- }
542
-
543
- if __name__ == "__main__":
544
- import uvicorn
545
- port = int(os.getenv("PORT", 7860))
546
- print(f"🚀 Translation Orchestrator v{ORCHESTRATOR_VERSION}")
547
- print(f"📡 Starting on port {port}")
548
- uvicorn.run(app, host="0.0.0.0", port=port, log_level="info")