danicor commited on
Commit
e139f66
·
verified ·
1 Parent(s): f29052d

Create app.py

Browse files
Files changed (1) hide show
  1. app.py +1119 -0
app.py ADDED
@@ -0,0 +1,1119 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import os
2
+ import time
3
+ import uuid
4
+ import asyncio
5
+ import aiohttp
6
+ import torch
7
+ import json
8
+ import logging
9
+ import threading
10
+ from datetime import datetime
11
+ from typing import Dict, List, Optional, Any, Callable
12
+ from dataclasses import dataclass, field
13
+ from enum import Enum
14
+ from fastapi import FastAPI, HTTPException, Request
15
+ from pydantic import BaseModel
16
+
17
+ # Configure logging
18
+ logger = logging.getLogger(__name__)
19
+
20
+ # Define data models
21
+ class TranslationRequest(BaseModel):
22
+ text: str
23
+ source_lang: str
24
+ target_lang: str
25
+ auto_charge: bool = False
26
+
27
+ # Enums
28
+ class JobStatus(Enum):
29
+ PENDING = "pending"
30
+ ASSIGNED = "assigned"
31
+ PROCESSING = "processing"
32
+ COMPLETED = "completed"
33
+ FAILED = "failed"
34
+ CANCELLED = "cancelled"
35
+
36
+ class ServerStatus(Enum):
37
+ AVAILABLE = "available"
38
+ BUSY = "busy"
39
+ OFFLINE = "offline"
40
+ ERROR = "error"
41
+
42
+ # Data classes
43
+ @dataclass
44
+ class TranslationJob:
45
+ job_id: str
46
+ request_id: str
47
+ text: str
48
+ source_lang: str
49
+ target_lang: str
50
+ priority: int = 0
51
+ auto_charge: bool = False
52
+ notification_url: Optional[str] = None
53
+ created_at: float = field(default_factory=time.time)
54
+ assigned_at: Optional[float] = None
55
+ started_at: Optional[float] = None
56
+ completed_at: Optional[float] = None
57
+ assigned_server: Optional[str] = None
58
+ status: JobStatus = JobStatus.PENDING
59
+ result: Optional[Dict[str, Any]] = None
60
+ error: Optional[str] = None
61
+ retry_count: int = 0
62
+ max_retries: int = 3
63
+ metadata: Dict[str, Any] = field(default_factory=dict)
64
+
65
+ @dataclass
66
+ class ServerInfo:
67
+ id: str
68
+ url: str
69
+ status: ServerStatus = ServerStatus.OFFLINE
70
+ last_ping: float = 0
71
+ current_jobs: int = 0
72
+ max_concurrent_jobs: int = 1
73
+ response_time: float = 0
74
+ error_count: int = 0
75
+ total_requests: int = 0
76
+ last_error: Optional[str] = None
77
+ metadata: Dict[str, Any] = field(default_factory=dict)
78
+
79
+ # Server Registry Class
80
+ class ServerRegistry:
81
+ def __init__(self, health_check_interval: int = 30):
82
+ self.servers: Dict[str, ServerInfo] = {}
83
+ self.health_check_interval = health_check_interval
84
+ self.lock = threading.Lock()
85
+ self.health_monitor_task = None
86
+ self.running = False
87
+
88
+ def register_server(self, server_id: str, url: str, max_concurrent_jobs: int = 1):
89
+ """Register a new translation server"""
90
+ with self.lock:
91
+ self.servers[server_id] = ServerInfo(
92
+ id=server_id,
93
+ url=url,
94
+ max_concurrent_jobs=max_concurrent_jobs
95
+ )
96
+ logger.info(f"Registered server {server_id} at {url}")
97
+
98
+ def unregister_server(self, server_id: str):
99
+ """Remove a server from registry"""
100
+ with self.lock:
101
+ if server_id in self.servers:
102
+ del self.servers[server_id]
103
+ logger.info(f"Unregistered server {server_id}")
104
+
105
+ def get_available_server(self) -> Optional[ServerInfo]:
106
+ """Get the best available server for processing"""
107
+ with self.lock:
108
+ available_servers = [
109
+ server for server in self.servers.values()
110
+ if server.status == ServerStatus.AVAILABLE and
111
+ server.current_jobs < server.max_concurrent_jobs
112
+ ]
113
+
114
+ if not available_servers:
115
+ return None
116
+
117
+ available_servers.sort(key=lambda s: (s.current_jobs, s.response_time))
118
+ return available_servers[0]
119
+
120
+ def mark_server_busy(self, server_id: str):
121
+ """Mark server as busy"""
122
+ with self.lock:
123
+ if server_id in self.servers:
124
+ self.servers[server_id].current_jobs += 1
125
+ if self.servers[server_id].current_jobs >= self.servers[server_id].max_concurrent_jobs:
126
+ self.servers[server_id].status = ServerStatus.BUSY
127
+
128
+ def mark_server_available(self, server_id: str):
129
+ """Mark server as available"""
130
+ with self.lock:
131
+ if server_id in self.servers:
132
+ self.servers[server_id].current_jobs = max(0, self.servers[server_id].current_jobs - 1)
133
+ if self.servers[server_id].current_jobs < self.servers[server_id].max_concurrent_jobs:
134
+ self.servers[server_id].status = ServerStatus.AVAILABLE
135
+
136
+ def get_server_stats(self) -> Dict[str, Any]:
137
+ """Get statistics about all servers"""
138
+ with self.lock:
139
+ stats = {
140
+ 'total_servers': len(self.servers),
141
+ 'available_servers': len([s for s in self.servers.values() if s.status == ServerStatus.AVAILABLE]),
142
+ 'busy_servers': len([s for s in self.servers.values() if s.status == ServerStatus.BUSY]),
143
+ 'offline_servers': len([s for s in self.servers.values() if s.status == ServerStatus.OFFLINE]),
144
+ 'servers': {
145
+ server_id: {
146
+ 'status': server.status.value,
147
+ 'current_jobs': server.current_jobs,
148
+ 'max_jobs': server.max_concurrent_jobs,
149
+ 'response_time': server.response_time,
150
+ 'total_requests': server.total_requests,
151
+ 'error_count': server.error_count,
152
+ 'last_ping': server.last_ping
153
+ }
154
+ for server_id, server in self.servers.items()
155
+ }
156
+ }
157
+ return stats
158
+
159
+ async def check_server_health(self, server: ServerInfo) -> bool:
160
+ """Check if a server is healthy"""
161
+ try:
162
+ start_time = time.time()
163
+ async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=10)) as session:
164
+ async with session.get(f"{server.url}/api/health") as response:
165
+ response_time = time.time() - start_time
166
+
167
+ if response.status == 200:
168
+ data = await response.json()
169
+ with self.lock:
170
+ server.last_ping = time.time()
171
+ server.response_time = response_time
172
+ server.error_count = 0
173
+ server.last_error = None
174
+
175
+ if data.get('status') == 'healthy':
176
+ if server.current_jobs < server.max_concurrent_jobs:
177
+ server.status = ServerStatus.AVAILABLE
178
+ else:
179
+ server.status = ServerStatus.BUSY
180
+ else:
181
+ server.status = ServerStatus.ERROR
182
+
183
+ return True
184
+ else:
185
+ raise Exception(f"HTTP {response.status}")
186
+
187
+ except Exception as e:
188
+ with self.lock:
189
+ server.status = ServerStatus.OFFLINE
190
+ server.error_count += 1
191
+ server.last_error = str(e)
192
+ logger.error(f"Health check failed for server {server.id}: {e}")
193
+ return False
194
+
195
+ async def health_monitor(self):
196
+ """Continuously monitor server health"""
197
+ while self.running:
198
+ try:
199
+ servers_to_check = list(self.servers.values())
200
+
201
+ health_tasks = [
202
+ self.check_server_health(server)
203
+ for server in servers_to_check
204
+ ]
205
+
206
+ await asyncio.gather(*health_tasks, return_exceptions=True)
207
+
208
+ except Exception as e:
209
+ logger.error(f"Error in health monitor: {e}")
210
+
211
+ await asyncio.sleep(self.health_check_interval)
212
+
213
+ def start_health_monitoring(self):
214
+ """Start the health monitoring task"""
215
+ if not self.running:
216
+ self.running = True
217
+ loop = asyncio.get_event_loop()
218
+ self.health_monitor_task = loop.create_task(self.health_monitor())
219
+ logger.info("Started server health monitoring")
220
+
221
+ def stop_health_monitoring(self):
222
+ """Stop the health monitoring task"""
223
+ self.running = False
224
+ if self.health_monitor_task:
225
+ self.health_monitor_task.cancel()
226
+ logger.info("Stopped server health monitoring")
227
+
228
+ # Translation Queue Class
229
+ class TranslationQueue:
230
+ def __init__(self, max_queue_size: int = 1000):
231
+ self.pending_jobs: asyncio.Queue = asyncio.Queue(maxsize=max_queue_size)
232
+ self.active_jobs: Dict[str, TranslationJob] = {}
233
+ self.completed_jobs: Dict[str, TranslationJob] = {}
234
+ self.failed_jobs: Dict[str, TranslationJob] = {}
235
+
236
+ self.lock = asyncio.Lock()
237
+ self.processor_task: Optional[asyncio.Task] = None
238
+ self.running = False
239
+
240
+ self.total_jobs = 0
241
+ self.processed_jobs = 0
242
+ self.failed_job_count = 0
243
+
244
+ async def add_job(self,
245
+ text: str,
246
+ source_lang: str,
247
+ target_lang: str,
248
+ request_id: Optional[str] = None,
249
+ priority: int = 0,
250
+ auto_charge: bool = False,
251
+ notification_url: Optional[str] = None) -> str:
252
+ """Add a new translation job to the queue"""
253
+
254
+ if not request_id:
255
+ request_id = str(uuid.uuid4())
256
+
257
+ job_id = f"job_{int(time.time())}_{str(uuid.uuid4())[:8]}"
258
+
259
+ job = TranslationJob(
260
+ job_id=job_id,
261
+ request_id=request_id,
262
+ text=text,
263
+ source_lang=source_lang,
264
+ target_lang=target_lang,
265
+ priority=priority,
266
+ auto_charge=auto_charge,
267
+ notification_url=notification_url
268
+ )
269
+
270
+ try:
271
+ await self.pending_jobs.put(job)
272
+
273
+ async with self.lock:
274
+ self.total_jobs += 1
275
+
276
+ logger.info(f"Added job {job_id} to queue (request_id: {request_id})")
277
+ return job_id
278
+
279
+ except asyncio.QueueFull:
280
+ logger.error(f"Queue is full, cannot add job {job_id}")
281
+ raise Exception("Translation queue is full, please try again later")
282
+
283
+ async def get_job_status(self, job_id: str) -> Optional[Dict[str, Any]]:
284
+ """Get the status of a specific job"""
285
+ async with self.lock:
286
+ if job_id in self.active_jobs:
287
+ job = self.active_jobs[job_id]
288
+ return {
289
+ "job_id": job_id,
290
+ "request_id": job.request_id,
291
+ "status": job.status.value,
292
+ "assigned_server": job.assigned_server,
293
+ "created_at": job.created_at,
294
+ "assigned_at": job.assigned_at,
295
+ "started_at": job.started_at,
296
+ "processing_time": time.time() - job.started_at if job.started_at else 0,
297
+ "retry_count": job.retry_count
298
+ }
299
+
300
+ if job_id in self.completed_jobs:
301
+ job = self.completed_jobs[job_id]
302
+ return {
303
+ "job_id": job_id,
304
+ "request_id": job.request_id,
305
+ "status": job.status.value,
306
+ "assigned_server": job.assigned_server,
307
+ "created_at": job.created_at,
308
+ "completed_at": job.completed_at,
309
+ "processing_time": job.completed_at - job.started_at if job.started_at and job.completed_at else 0,
310
+ "result": job.result,
311
+ "retry_count": job.retry_count
312
+ }
313
+
314
+ if job_id in self.failed_jobs:
315
+ job = self.failed_jobs[job_id]
316
+ return {
317
+ "job_id": job_id,
318
+ "request_id": job.request_id,
319
+ "status": job.status.value,
320
+ "error": job.error,
321
+ "created_at": job.created_at,
322
+ "failed_at": job.completed_at,
323
+ "retry_count": job.retry_count
324
+ }
325
+
326
+ return None
327
+
328
+ async def get_job_by_request_id(self, request_id: str) -> Optional[Dict[str, Any]]:
329
+ """Get job status by request_id"""
330
+ async with self.lock:
331
+ all_jobs = {**self.active_jobs, **self.completed_jobs, **self.failed_jobs}
332
+
333
+ for job in all_jobs.values():
334
+ if job.request_id == request_id:
335
+ return await self.get_job_status(job.job_id)
336
+
337
+ return None
338
+
339
+ async def cancel_job(self, job_id: str) -> bool:
340
+ """Cancel a pending or active job"""
341
+ async with self.lock:
342
+ if job_id in self.active_jobs:
343
+ job = self.active_jobs[job_id]
344
+ if job.status in [JobStatus.PENDING, JobStatus.ASSIGNED]:
345
+ job.status = JobStatus.CANCELLED
346
+ job.completed_at = time.time()
347
+
348
+ self.failed_jobs[job_id] = job
349
+ del self.active_jobs[job_id]
350
+
351
+ if job.assigned_server:
352
+ server_registry.mark_server_available(job.assigned_server)
353
+
354
+ logger.info(f"Cancelled job {job_id}")
355
+ return True
356
+
357
+ return False
358
+
359
+ async def get_queue_stats(self) -> Dict[str, Any]:
360
+ """Get queue statistics"""
361
+ async with self.lock:
362
+ pending_count = self.pending_jobs.qsize()
363
+ active_count = len(self.active_jobs)
364
+ completed_count = len(self.completed_jobs)
365
+ failed_count = len(self.failed_jobs)
366
+
367
+ return {
368
+ "pending_jobs": pending_count,
369
+ "active_jobs": active_count,
370
+ "completed_jobs": completed_count,
371
+ "failed_jobs": failed_count,
372
+ "total_jobs": self.total_jobs,
373
+ "processed_jobs": self.processed_jobs,
374
+ "success_rate": (self.processed_jobs / max(1, self.total_jobs)) * 100,
375
+ "queue_utilization": (pending_count / self.pending_jobs.maxsize) * 100
376
+ }
377
+
378
+ async def send_translation_request(self, server_url: str, job: TranslationJob) -> Dict[str, Any]:
379
+ """Send translation request to a specific server"""
380
+ try:
381
+ payload = {
382
+ "text": job.text,
383
+ "source_lang": job.source_lang,
384
+ "target_lang": job.target_lang,
385
+ "request_id": job.request_id,
386
+ "auto_charge": job.auto_charge,
387
+ "notification_url": job.notification_url
388
+ }
389
+
390
+ timeout = aiohttp.ClientTimeout(total=300)
391
+
392
+ async with aiohttp.ClientSession(timeout=timeout) as session:
393
+ async with session.post(
394
+ f"{server_url}/api/translate/heavy",
395
+ json=payload,
396
+ headers={"Content-Type": "application/json"}
397
+ ) as response:
398
+
399
+ if response.status == 200:
400
+ result = await response.json()
401
+ logger.info(f"Successfully submitted job {job.job_id} to server {server_url}")
402
+ return result
403
+ else:
404
+ error_text = await response.text()
405
+ raise Exception(f"Server returned {response.status}: {error_text}")
406
+
407
+ except Exception as e:
408
+ logger.error(f"Failed to send job {job.job_id} to server {server_url}: {e}")
409
+ raise e
410
+
411
+ async def process_queue(self):
412
+ """Main queue processor - assigns jobs to available servers"""
413
+ logger.info("Started queue processor")
414
+
415
+ while self.running:
416
+ try:
417
+ try:
418
+ job = await asyncio.wait_for(self.pending_jobs.get(), timeout=1.0)
419
+ except asyncio.TimeoutError:
420
+ continue
421
+
422
+ available_server = server_registry.get_available_server()
423
+
424
+ if not available_server:
425
+ await self.pending_jobs.put(job)
426
+ logger.warning(f"No available servers for job {job.job_id}, requeueing")
427
+ await asyncio.sleep(2)
428
+ continue
429
+
430
+ async with self.lock:
431
+ job.assigned_server = available_server.id
432
+ job.assigned_at = time.time()
433
+ job.status = JobStatus.ASSIGNED
434
+ self.active_jobs[job.job_id] = job
435
+
436
+ server_registry.mark_server_busy(available_server.id)
437
+
438
+ try:
439
+ job.status = JobStatus.PROCESSING
440
+ job.started_at = time.time()
441
+
442
+ result = await self.send_translation_request(available_server.url, job)
443
+
444
+ logger.info(f"Job {job.job_id} submitted to server {available_server.id}")
445
+
446
+ except Exception as e:
447
+ async with self.lock:
448
+ job.retry_count += 1
449
+ job.error = str(e)
450
+
451
+ if job.retry_count < job.max_retries:
452
+ job.status = JobStatus.PENDING
453
+ job.assigned_server = None
454
+ job.assigned_at = None
455
+ job.started_at = None
456
+
457
+ await self.pending_jobs.put(job)
458
+ del self.active_jobs[job.job_id]
459
+
460
+ logger.warning(f"Job {job.job_id} failed, retrying ({job.retry_count}/{job.max_retries})")
461
+ else:
462
+ job.status = JobStatus.FAILED
463
+ job.completed_at = time.time()
464
+
465
+ self.failed_jobs[job.job_id] = job
466
+ self.failed_job_count += 1
467
+ del self.active_jobs[job.job_id]
468
+
469
+ logger.error(f"Job {job.job_id} failed permanently after {job.retry_count} retries")
470
+
471
+ server_registry.mark_server_available(available_server.id)
472
+
473
+ except Exception as e:
474
+ logger.error(f"Error in queue processor: {e}")
475
+ await asyncio.sleep(1)
476
+
477
+ def start_processing(self):
478
+ """Start the queue processor"""
479
+ if not self.running:
480
+ self.running = True
481
+ self.processor_task = asyncio.create_task(self.process_queue())
482
+ logger.info("Started queue processing")
483
+
484
+ def stop_processing(self):
485
+ """Stop the queue processor"""
486
+ self.running = False
487
+ if self.processor_task:
488
+ self.processor_task.cancel()
489
+ logger.info("Stopped queue processing")
490
+
491
+ # Global instances
492
+ server_registry = ServerRegistry()
493
+ translation_queue = TranslationQueue()
494
+
495
+ # Configuration
496
+ LOAD_BALANCER_ENABLED = os.getenv("LOAD_BALANCER_ENABLED", "false").lower() == "true"
497
+ SERVER_ID = os.getenv("SERVER_ID", f"server_{int(time.time())}")
498
+ CURRENT_SERVER_URL = os.getenv("CURRENT_SERVER_URL", "http://localhost:7860")
499
+ PEER_SERVERS = os.getenv("PEER_SERVERS", "").split(",") if os.getenv("PEER_SERVERS") else []
500
+ MODEL_NAME = os.getenv("MODEL_NAME", "default_model")
501
+
502
+ # Initialize FastAPI app
503
+ app = FastAPI(title="Enhanced Translation Service with Load Balancer")
504
+
505
+ # Global storage for translations (you may need to replace this with your actual implementation)
506
+ translations = {}
507
+ translator = None # This should be your actual translator instance
508
+
509
+ # Helper functions
510
+ async def estimate_queue_wait_time() -> int:
511
+ """Estimate wait time in seconds based on queue size and server availability"""
512
+ try:
513
+ queue_stats = await translation_queue.get_queue_stats()
514
+ server_stats = server_registry.get_server_stats()
515
+
516
+ pending_jobs = queue_stats['pending_jobs']
517
+ available_servers = server_stats['available_servers']
518
+
519
+ if available_servers == 0:
520
+ return 300
521
+
522
+ estimated_seconds = (pending_jobs * 30) // max(1, available_servers)
523
+ return min(estimated_seconds, 1800)
524
+
525
+ except Exception:
526
+ return 120
527
+
528
+ async def send_completion_notification(notification_url: str, request_id: str,
529
+ translated_text: str, result: dict,
530
+ character_count: int, translation_length: int,
531
+ source_lang: str, target_lang: str, auto_charge: bool):
532
+ """Send completion notification with enhanced data"""
533
+ try:
534
+ payload = {
535
+ "request_id": request_id,
536
+ "status": "completed",
537
+ "translated_text": translated_text,
538
+ "processing_time": result['processing_time'],
539
+ "character_count": character_count,
540
+ "translation_length": translation_length,
541
+ "source_lang": source_lang,
542
+ "target_lang": target_lang,
543
+ "from_cache": result.get('from_cache', False),
544
+ "chunks_count": result.get('chunks_count', 1),
545
+ "auto_charge": auto_charge,
546
+ "server_id": SERVER_ID,
547
+ "completed_at": datetime.now().isoformat()
548
+ }
549
+
550
+ timeout = aiohttp.ClientTimeout(total=45)
551
+
552
+ async with aiohttp.ClientSession(timeout=timeout) as session:
553
+ async with session.post(
554
+ notification_url,
555
+ json=payload,
556
+ headers={
557
+ 'Content-Type': 'application/json',
558
+ 'User-Agent': 'MLT-Server/2.0'
559
+ }
560
+ ) as response:
561
+
562
+ if response.status == 200:
563
+ logger.info(f"Notification sent successfully for {request_id}")
564
+ return True
565
+ else:
566
+ logger.warning(f"Notification failed with status {response.status} for {request_id}")
567
+ return False
568
+
569
+ except Exception as e:
570
+ logger.error(f"Failed to send notification for {request_id}: {e}")
571
+ return False
572
+
573
+ async def run_enhanced_translation_job(request_id: str, text: str, source_lang: str,
574
+ target_lang: str, notification_url: Optional[str],
575
+ auto_charge: bool = False):
576
+ """Enhanced translation job runner with load balancer integration"""
577
+ try:
578
+ start_time = time.time()
579
+
580
+ # Simulate progress updates
581
+ for i in range(1, 10):
582
+ await asyncio.sleep(2)
583
+ if request_id in translations:
584
+ translations[request_id]["progress"] = i * 10
585
+ translations[request_id]["elapsed_time"] = time.time() - start_time
586
+
587
+ # Perform actual translation (replace with your actual translation logic)
588
+ result = translator.translate_text(text, source_lang, target_lang)
589
+
590
+ translated_text = result['translated_text']
591
+ processing_time = time.time() - start_time
592
+
593
+ # Update translation status
594
+ translations[request_id] = {
595
+ "status": "completed",
596
+ "progress": 100,
597
+ "elapsed_time": processing_time,
598
+ "message": "Translation completed successfully",
599
+ "result": translated_text,
600
+ "server_id": SERVER_ID,
601
+ "processing_time": result['processing_time'],
602
+ "from_cache": result.get('from_cache', False)
603
+ }
604
+
605
+ # Store in completed translations
606
+ translator.completed_translations[request_id] = {
607
+ 'result': result,
608
+ 'completed_at': time.time(),
609
+ 'character_count': len(text),
610
+ 'translation_length': len(translated_text),
611
+ 'server_id': SERVER_ID
612
+ }
613
+
614
+ # Free up server capacity
615
+ if LOAD_BALANCER_ENABLED:
616
+ server_registry.mark_server_available(SERVER_ID)
617
+
618
+ # Send notification if URL provided
619
+ if notification_url:
620
+ await send_completion_notification(
621
+ notification_url, request_id, translated_text, result,
622
+ len(text), len(translated_text), source_lang, target_lang, auto_charge
623
+ )
624
+
625
+ logger.info(f"Translation job {request_id} completed successfully on server {SERVER_ID}")
626
+
627
+ except Exception as e:
628
+ logger.error(f"Error in translation job {request_id}: {e}")
629
+
630
+ # Update error status
631
+ if request_id in translations:
632
+ translations[request_id] = {
633
+ "status": "failed",
634
+ "message": f"Translation failed: {str(e)}",
635
+ "server_id": SERVER_ID,
636
+ "elapsed_time": time.time() - start_time if 'start_time' in locals() else 0
637
+ }
638
+
639
+ # Free up server capacity
640
+ if LOAD_BALANCER_ENABLED:
641
+ server_registry.mark_server_available(SERVER_ID)
642
+
643
+ # Event handlers
644
+ @app.on_event("startup")
645
+ async def startup_event():
646
+ """Initialize load balancer on startup"""
647
+ if LOAD_BALANCER_ENABLED:
648
+ server_registry.register_server(SERVER_ID, CURRENT_SERVER_URL, max_concurrent_jobs=1)
649
+
650
+ for i, peer_url in enumerate(PEER_SERVERS):
651
+ if peer_url.strip():
652
+ peer_id = f"peer_server_{i}"
653
+ server_registry.register_server(peer_id, peer_url.strip(), max_concurrent_jobs=1)
654
+
655
+ server_registry.start_health_monitoring()
656
+ translation_queue.start_processing()
657
+
658
+ logger.info(f"Load balancer initialized with {len(PEER_SERVERS)} peer servers")
659
+
660
+ @app.on_event("shutdown")
661
+ async def shutdown_event():
662
+ """Cleanup load balancer on shutdown"""
663
+ if LOAD_BALANCER_ENABLED:
664
+ server_registry.stop_health_monitoring()
665
+ translation_queue.stop_processing()
666
+ logger.info("Load balancer shutdown complete")
667
+
668
+ # API Endpoints
669
+ @app.post("/api/translate/heavy")
670
+ async def heavy_translate_enhanced(request: Request):
671
+ """Enhanced heavy translation with load balancer support"""
672
+ try:
673
+ data = await request.json()
674
+
675
+ # Extract parameters
676
+ request_id = data.get("request_id")
677
+ if not request_id:
678
+ request_id = str(uuid.uuid4())
679
+
680
+ text = data.get("text")
681
+ source_lang = data.get("source_lang")
682
+ target_lang = data.get("target_lang")
683
+ auto_charge = data.get("auto_charge", False)
684
+ notification_url = data.get("notification_url")
685
+
686
+ # Validate required fields
687
+ if not all([text, source_lang, target_lang]):
688
+ raise HTTPException(status_code=400, detail="Missing required fields: text, source_lang, target_lang")
689
+
690
+ # Check if load balancer is enabled and this server is busy
691
+ if LOAD_BALANCER_ENABLED:
692
+ local_server = server_registry.servers.get(SERVER_ID)
693
+
694
+ # If this server is at capacity, try to route to another server
695
+ if (local_server and
696
+ local_server.current_jobs >= local_server.max_concurrent_jobs):
697
+
698
+ # Try to find an available peer server
699
+ available_server = server_registry.get_available_server()
700
+
701
+ if available_server and available_server.id != SERVER_ID:
702
+ # Route to available peer server
703
+ try:
704
+ async with aiohttp.ClientSession() as session:
705
+ async with session.post(
706
+ f"{available_server.url}/api/translate/heavy",
707
+ json=data,
708
+ timeout=aiohttp.ClientTimeout(total=10)
709
+ ) as response:
710
+ if response.status == 200:
711
+ result = await response.json()
712
+ logger.info(f"Routed request {request_id} to server {available_server.id}")
713
+ return result
714
+ else:
715
+ logger.warning(f"Failed to route to {available_server.id}: {response.status}")
716
+ except Exception as e:
717
+ logger.error(f"Error routing to {available_server.id}: {e}")
718
+
719
+ # If routing failed, add to queue
720
+ job_id = await translation_queue.add_job(
721
+ text=text,
722
+ source_lang=source_lang,
723
+ target_lang=target_lang,
724
+ request_id=request_id,
725
+ auto_charge=auto_charge,
726
+ notification_url=notification_url
727
+ )
728
+
729
+ return {
730
+ "success": True,
731
+ "request_id": request_id,
732
+ "job_id": job_id,
733
+ "message": "Server busy, request queued for processing",
734
+ "processing_mode": "queued"
735
+ }
736
+
737
+ # Process locally
738
+ translations[request_id] = {
739
+ "status": "processing",
740
+ "progress": 0,
741
+ "elapsed_time": 0,
742
+ "message": "Translation in progress...",
743
+ "server_id": SERVER_ID
744
+ }
745
+
746
+ # Mark server as busy if load balancer is enabled
747
+ if LOAD_BALANCER_ENABLED:
748
+ server_registry.mark_server_busy(SERVER_ID)
749
+
750
+ # Start translation task
751
+ asyncio.create_task(
752
+ run_enhanced_translation_job(
753
+ request_id, text, source_lang, target_lang,
754
+ notification_url, auto_charge
755
+ )
756
+ )
757
+
758
+ return {
759
+ "success": True,
760
+ "request_id": request_id,
761
+ "message": "Translation started on current server",
762
+ "processing_mode": "local",
763
+ "server_id": SERVER_ID
764
+ }
765
+
766
+ except HTTPException:
767
+ raise
768
+ except Exception as e:
769
+ logger.error(f"Error in heavy_translate_enhanced: {e}")
770
+ raise HTTPException(status_code=500, detail=str(e))
771
+
772
+ @app.post("/api/webhook/job-completion")
773
+ async def job_completion_webhook(data: dict):
774
+ """Webhook endpoint for receiving job completion notifications from peer servers"""
775
+ try:
776
+ job_id = data.get('job_id')
777
+ request_id = data.get('request_id')
778
+ status = data.get('status')
779
+ result = data.get('result')
780
+ server_id = data.get('server_id')
781
+
782
+ if not all([job_id, request_id, status]):
783
+ raise HTTPException(status_code=400, detail="Missing required fields")
784
+
785
+ # Update job status in queue
786
+ async with translation_queue.lock:
787
+ if job_id in translation_queue.active_jobs:
788
+ job = translation_queue.active_jobs[job_id]
789
+
790
+ if status == 'completed':
791
+ job.status = JobStatus.COMPLETED
792
+ job.completed_at = time.time()
793
+ job.result = result
794
+
795
+ # Move to completed jobs
796
+ translation_queue.completed_jobs[job_id] = job
797
+ del translation_queue.active_jobs[job_id]
798
+ translation_queue.processed_jobs += 1
799
+
800
+ logger.info(f"Job {job_id} completed on server {server_id}")
801
+
802
+ elif status == 'failed':
803
+ job.status = JobStatus.FAILED
804
+ job.completed_at = time.time()
805
+ job.error = data.get('error', 'Unknown error')
806
+
807
+ # Move to failed jobs
808
+ translation_queue.failed_jobs[job_id] = job
809
+ del translation_queue.active_jobs[job_id]
810
+ translation_queue.failed_job_count += 1
811
+
812
+ logger.error(f"Job {job_id} failed on server {server_id}")
813
+
814
+ # Free up the server
815
+ if job.assigned_server:
816
+ server_registry.mark_server_available(job.assigned_server)
817
+
818
+ return {
819
+ "success": True,
820
+ "message": f"Job {job_id} status updated to {status}"
821
+ }
822
+
823
+ except Exception as e:
824
+ logger.error(f"Error in job completion webhook: {e}")
825
+ raise HTTPException(status_code=500, detail=str(e))
826
+
827
+ @app.get("/api/enhanced-status")
828
+ async def enhanced_server_status():
829
+ """Get enhanced server status including load balancer information"""
830
+ try:
831
+ base_stats = {
832
+ "server_id": SERVER_ID,
833
+ "server_url": CURRENT_SERVER_URL,
834
+ "load_balancer_enabled": LOAD_BALANCER_ENABLED,
835
+ "model": MODEL_NAME,
836
+ "device": str(translator.device) if translator else "unknown",
837
+ "gpu_available": torch.cuda.is_available(),
838
+ }
839
+
840
+ if LOAD_BALANCER_ENABLED:
841
+ server_stats = server_registry.get_server_stats()
842
+ queue_stats = await translation_queue.get_queue_stats()
843
+
844
+ base_stats.update({
845
+ "server_registry": server_stats,
846
+ "queue_stats": queue_stats,
847
+ "peer_servers": len(PEER_SERVERS)
848
+ })
849
+ else:
850
+ # Local server stats only
851
+ base_stats.update({
852
+ "active_sessions": len(translator.translation_sessions) if translator else 0,
853
+ "completed_translations": len(translator.completed_translations) if translator else 0,
854
+ "total_requests": translator.total_requests if translator else 0
855
+ })
856
+
857
+ return {
858
+ "success": True,
859
+ **base_stats,
860
+ "timestamp": datetime.now().isoformat()
861
+ }
862
+
863
+ except Exception as e:
864
+ raise HTTPException(status_code=500, detail=str(e))
865
+
866
+ @app.post("/api/translate/distributed")
867
+ async def distributed_translate(request: TranslationRequest):
868
+ """
869
+ Distributed translation endpoint - routes requests to available servers
870
+ """
871
+ try:
872
+ if not LOAD_BALANCER_ENABLED:
873
+ # Fallback to local processing
874
+ return await translate_text_api(request)
875
+
876
+ # Check if this server is available for local processing
877
+ local_server = server_registry.servers.get(SERVER_ID)
878
+
879
+ if (local_server and
880
+ local_server.status == ServerStatus.AVAILABLE and
881
+ local_server.current_jobs < local_server.max_concurrent_jobs):
882
+
883
+ # Process locally if available
884
+ server_registry.mark_server_busy(SERVER_ID)
885
+ try:
886
+ result = perform_translation_internal(
887
+ request.text,
888
+ request.source_lang,
889
+ request.target_lang
890
+ )
891
+
892
+ return {
893
+ "success": True,
894
+ "processed_by": SERVER_ID,
895
+ "processing_mode": "local",
896
+ "translated_text": result['translated_text'],
897
+ "processing_time": result['processing_time'],
898
+ "chunks_count": result['chunks_count'],
899
+ "from_cache": result.get('from_cache', False),
900
+ "character_count": len(request.text),
901
+ "translation_length": len(result['translated_text'])
902
+ }
903
+ finally:
904
+ server_registry.mark_server_available(SERVER_ID)
905
+
906
+ else:
907
+ # Add to distributed queue
908
+ job_id = await translation_queue.add_job(
909
+ text=request.text,
910
+ source_lang=request.source_lang,
911
+ target_lang=request.target_lang,
912
+ auto_charge=request.auto_charge
913
+ )
914
+
915
+ return {
916
+ "success": True,
917
+ "processing_mode": "queued",
918
+ "job_id": job_id,
919
+ "message": "Request queued for processing on available server",
920
+ "estimated_wait_time": await estimate_queue_wait_time()
921
+ }
922
+
923
+ except Exception as e:
924
+ logger.error(f"Error in distributed translation: {e}")
925
+ raise HTTPException(status_code=500, detail=str(e))
926
+
927
+ @app.post("/api/translate/queue")
928
+ async def queue_translate(request: TranslationRequest):
929
+ """
930
+ Force translation through the queue system
931
+ """
932
+ try:
933
+ job_id = await translation_queue.add_job(
934
+ text=request.text,
935
+ source_lang=request.source_lang,
936
+ target_lang=request.target_lang,
937
+ auto_charge=request.auto_charge
938
+ )
939
+
940
+ return {
941
+ "success": True,
942
+ "job_id": job_id,
943
+ "message": "Translation request added to queue",
944
+ "estimated_wait_time": await estimate_queue_wait_time()
945
+ }
946
+
947
+ except Exception as e:
948
+ raise HTTPException(status_code=500, detail=str(e))
949
+
950
+ @app.get("/api/job/{job_id}/status")
951
+ async def get_job_status(job_id: str):
952
+ """Get status of a queued translation job"""
953
+ try:
954
+ status = await translation_queue.get_job_status(job_id)
955
+
956
+ if not status:
957
+ raise HTTPException(status_code=404, detail="Job not found")
958
+
959
+ return {
960
+ "success": True,
961
+ **status
962
+ }
963
+
964
+ except HTTPException:
965
+ raise
966
+ except Exception as e:
967
+ raise HTTPException(status_code=500, detail=str(e))
968
+
969
+ @app.get("/api/request/{request_id}/status")
970
+ async def get_request_status(request_id: str):
971
+ """Get status by request_id (WordPress compatibility)"""
972
+ try:
973
+ status = await translation_queue.get_job_by_request_id(request_id)
974
+
975
+ if not status:
976
+ raise HTTPException(status_code=404, detail="Request not found")
977
+
978
+ return {
979
+ "success": True,
980
+ **status
981
+ }
982
+
983
+ except HTTPException:
984
+ raise
985
+ except Exception as e:
986
+ raise HTTPException(status_code=500, detail=str(e))
987
+
988
+ @app.post("/api/job/{job_id}/cancel")
989
+ async def cancel_job(job_id: str):
990
+ """Cancel a queued translation job"""
991
+ try:
992
+ cancelled = await translation_queue.cancel_job(job_id)
993
+
994
+ if cancelled:
995
+ return {
996
+ "success": True,
997
+ "message": f"Job {job_id} cancelled successfully"
998
+ }
999
+ else:
1000
+ raise HTTPException(status_code=404, detail="Job not found or cannot be cancelled")
1001
+
1002
+ except HTTPException:
1003
+ raise
1004
+ except Exception as e:
1005
+ raise HTTPException(status_code=500, detail=str(e))
1006
+
1007
+ @app.get("/api/load-balancer/status")
1008
+ async def load_balancer_status():
1009
+ """Get load balancer status"""
1010
+ try:
1011
+ server_stats = server_registry.get_server_stats()
1012
+ queue_stats = await translation_queue.get_queue_stats()
1013
+
1014
+ return {
1015
+ "success": True,
1016
+ "load_balancer_enabled": LOAD_BALANCER_ENABLED,
1017
+ "server_registry": server_stats,
1018
+ "queue_stats": queue_stats,
1019
+ "total_servers": len(server_registry.servers),
1020
+ "available_servers": len([s for s in server_registry.servers.values() if s.status == ServerStatus.AVAILABLE])
1021
+ }
1022
+
1023
+ except Exception as e:
1024
+ raise HTTPException(status_code=500, detail=str(e))
1025
+
1026
+ @app.post("/api/load-balancer/register")
1027
+ async def register_server(server_data: dict):
1028
+ """Register a new server with the load balancer"""
1029
+ try:
1030
+ server_id = server_data.get("server_id")
1031
+ url = server_data.get("url")
1032
+ max_concurrent_jobs = server_data.get("max_concurrent_jobs", 1)
1033
+
1034
+ if not all([server_id, url]):
1035
+ raise HTTPException(status_code=400, detail="Missing server_id or url")
1036
+
1037
+ server_registry.register_server(server_id, url, max_concurrent_jobs)
1038
+
1039
+ return {
1040
+ "success": True,
1041
+ "message": f"Server {server_id} registered successfully"
1042
+ }
1043
+
1044
+ except Exception as e:
1045
+ raise HTTPException(status_code=500, detail=str(e))
1046
+
1047
+ @app.post("/api/load-balancer/unregister")
1048
+ async def unregister_server(server_data: dict):
1049
+ """Unregister a server from the load balancer"""
1050
+ try:
1051
+ server_id = server_data.get("server_id")
1052
+
1053
+ if not server_id:
1054
+ raise HTTPException(status_code=400, detail="Missing server_id")
1055
+
1056
+ server_registry.unregister_server(server_id)
1057
+
1058
+ return {
1059
+ "success": True,
1060
+ "message": f"Server {server_id} unregistered successfully"
1061
+ }
1062
+
1063
+ except Exception as e:
1064
+ raise HTTPException(status_code=500, detail=str(e))
1065
+
1066
+ # Helper functions for internal translation processing
1067
+ def perform_translation_internal(text: str, source_lang: str, target_lang: str) -> Dict[str, Any]:
1068
+ """Internal translation function - replace with your actual implementation"""
1069
+ # This is a placeholder - replace with your actual translation logic
1070
+ start_time = time.time()
1071
+
1072
+ # Simulate translation processing
1073
+ time.sleep(0.1)
1074
+
1075
+ translated_text = f"[TRANSLATED] {text} [{source_lang}->{target_lang}]"
1076
+
1077
+ return {
1078
+ "translated_text": translated_text,
1079
+ "processing_time": time.time() - start_time,
1080
+ "chunks_count": 1,
1081
+ "from_cache": False
1082
+ }
1083
+
1084
+ async def translate_text_api(request: TranslationRequest):
1085
+ """Fallback translation API - replace with your actual implementation"""
1086
+ try:
1087
+ result = perform_translation_internal(
1088
+ request.text,
1089
+ request.source_lang,
1090
+ request.target_lang
1091
+ )
1092
+
1093
+ return {
1094
+ "success": True,
1095
+ "translated_text": result['translated_text'],
1096
+ "processing_time": result['processing_time'],
1097
+ "chunks_count": result['chunks_count'],
1098
+ "from_cache": result.get('from_cache', False)
1099
+ }
1100
+
1101
+ except Exception as e:
1102
+ raise HTTPException(status_code=500, detail=str(e))
1103
+
1104
+ if __name__ == "__main__":
1105
+ import uvicorn
1106
+
1107
+ # Configure logging
1108
+ logging.basicConfig(
1109
+ level=logging.INFO,
1110
+ format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
1111
+ )
1112
+
1113
+ # Start the server
1114
+ uvicorn.run(
1115
+ app,
1116
+ host="0.0.0.0",
1117
+ port=7860,
1118
+ log_level="info"
1119
+ )