danicor commited on
Commit
8e8747f
·
verified ·
1 Parent(s): 760f6be

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +53 -58
app.py CHANGED
@@ -12,7 +12,7 @@ import requests
12
  import threading
13
 
14
  # Configuration
15
- ORCHESTRATOR_VERSION = "1.0.0"
16
  WORKER_HEALTH_CHECK_INTERVAL = 30 # seconds
17
  WORKER_TIMEOUT = 120 # seconds for translation
18
 
@@ -61,7 +61,6 @@ class TranslationOrchestrator:
61
 
62
  def load_worker_configs(self):
63
  """Load worker configurations from environment variables"""
64
- # Format: WORKER_1_URL=https://..., WORKER_1_NAME=Worker1, etc.
65
  worker_index = 1
66
  while True:
67
  url_key = f"WORKER_{worker_index}_URL"
@@ -91,17 +90,16 @@ class TranslationOrchestrator:
91
  if not self.workers:
92
  print("⚠ No workers configured. Add WORKER_N_URL environment variables.")
93
 
94
-
95
  def start_health_checker(self):
96
- """Start background thread for health checking"""
97
- def health_check_loop():
98
- while True:
99
- self.check_all_workers_health()
100
- time.sleep(WORKER_HEALTH_CHECK_INTERVAL)
101
-
102
- thread = threading.Thread(target=health_check_loop, daemon=True)
103
- thread.start()
104
- print("✓ Health checker started")
105
 
106
  def check_all_workers_health(self):
107
  """Check health of all workers"""
@@ -276,52 +274,49 @@ class TranslationOrchestrator:
276
  # Timeout reached
277
  self.handle_worker_failure(worker_id, request_id, "Timeout waiting for completion")
278
 
279
-
280
-
281
-
282
  def handle_job_completion(self, worker_id: str, request_id: str, worker_response: Dict):
283
- """Handle successful job completion"""
284
- worker = self.workers[worker_id]
285
-
286
- print(f"✅ Job {request_id} completed by {worker.config.name}")
287
-
288
- # Update worker stats
289
- with self.worker_lock:
290
- worker.active_jobs -= 1
291
- worker.total_completed += 1
292
-
293
- # Get job data
294
- job_info = self.active_jobs.get(request_id, {})
295
- job_data = job_info.get('job_data', {})
296
-
297
- # Calculate processing time
298
- processing_time = time.time() - job_info.get('start_time', time.time())
299
-
300
- # Prepare completion data
301
- completion_data = {
302
- 'request_id': request_id,
303
- 'status': 'completed',
304
- 'translated_text': worker_response.get('translated_text'),
305
- 'processing_time': processing_time,
306
- 'character_count': worker_response.get('character_count', len(job_data.get('text', ''))),
307
- 'translation_length': worker_response.get('translation_length', 0),
308
- 'from_cache': worker_response.get('from_cache', False),
309
- 'worker_name': worker.config.name,
310
- 'completed_at': datetime.now().isoformat()
311
- }
312
-
313
- # Store in completed jobs
314
- self.completed_jobs[request_id] = completion_data
315
-
316
- # Remove from active jobs
317
- if request_id in self.active_jobs:
318
- del self.active_jobs[request_id]
319
-
320
- # Notify WordPress if notification URL provided
321
- notification_url = job_data.get('notification_url')
322
- if notification_url:
323
- self.notify_wordpress(notification_url, completion_data)
324
-
325
  def handle_worker_failure(self, worker_id: str, request_id: str, error_message: str):
326
  """Handle job failure"""
327
  worker = self.workers[worker_id]
@@ -642,4 +637,4 @@ if __name__ == "__main__":
642
  host="0.0.0.0",
643
  port=port,
644
  log_level="info"
645
- )
 
12
  import threading
13
 
14
  # Configuration
15
+ ORCHESTRATOR_VERSION = "2.0.0"
16
  WORKER_HEALTH_CHECK_INTERVAL = 30 # seconds
17
  WORKER_TIMEOUT = 120 # seconds for translation
18
 
 
61
 
62
  def load_worker_configs(self):
63
  """Load worker configurations from environment variables"""
 
64
  worker_index = 1
65
  while True:
66
  url_key = f"WORKER_{worker_index}_URL"
 
90
  if not self.workers:
91
  print("⚠ No workers configured. Add WORKER_N_URL environment variables.")
92
 
 
93
  def start_health_checker(self):
94
+ """Start background thread for health checking"""
95
+ def health_check_loop():
96
+ while True:
97
+ self.check_all_workers_health()
98
+ time.sleep(WORKER_HEALTH_CHECK_INTERVAL)
99
+
100
+ thread = threading.Thread(target=health_check_loop, daemon=True)
101
+ thread.start()
102
+ print("✓ Health checker started")
103
 
104
  def check_all_workers_health(self):
105
  """Check health of all workers"""
 
274
  # Timeout reached
275
  self.handle_worker_failure(worker_id, request_id, "Timeout waiting for completion")
276
 
 
 
 
277
  def handle_job_completion(self, worker_id: str, request_id: str, worker_response: Dict):
278
+ """Handle successful job completion"""
279
+ worker = self.workers[worker_id]
280
+
281
+ print(f"✅ Job {request_id} completed by {worker.config.name}")
282
+
283
+ # Update worker stats
284
+ with self.worker_lock:
285
+ worker.active_jobs -= 1
286
+ worker.total_completed += 1
287
+
288
+ # Get job data
289
+ job_info = self.active_jobs.get(request_id, {})
290
+ job_data = job_info.get('job_data', {})
291
+
292
+ # Calculate processing time
293
+ processing_time = time.time() - job_info.get('start_time', time.time())
294
+
295
+ # Prepare completion data
296
+ completion_data = {
297
+ 'request_id': request_id,
298
+ 'status': 'completed',
299
+ 'translated_text': worker_response.get('translated_text'),
300
+ 'processing_time': processing_time,
301
+ 'character_count': worker_response.get('character_count', len(job_data.get('text', ''))),
302
+ 'translation_length': worker_response.get('translation_length', 0),
303
+ 'from_cache': worker_response.get('from_cache', False),
304
+ 'worker_name': worker.config.name,
305
+ 'completed_at': datetime.now().isoformat()
306
+ }
307
+
308
+ # Store in completed jobs
309
+ self.completed_jobs[request_id] = completion_data
310
+
311
+ # Remove from active jobs
312
+ if request_id in self.active_jobs:
313
+ del self.active_jobs[request_id]
314
+
315
+ # Notify WordPress if notification URL provided
316
+ notification_url = job_data.get('notification_url')
317
+ if notification_url:
318
+ self.notify_wordpress(notification_url, completion_data)
319
+
320
  def handle_worker_failure(self, worker_id: str, request_id: str, error_message: str):
321
  """Handle job failure"""
322
  worker = self.workers[worker_id]
 
637
  host="0.0.0.0",
638
  port=port,
639
  log_level="info"
640
+ )