danicor commited on
Commit
91658f8
·
verified ·
1 Parent(s): 5e9d853

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +51 -18
app.py CHANGED
@@ -3,7 +3,7 @@ import time
3
  import asyncio
4
  import uuid
5
  from datetime import datetime
6
- from typing import Dict, List, Optional
7
  from collections import deque
8
  from fastapi import FastAPI, HTTPException, Request
9
  from fastapi.middleware.cors import CORSMiddleware
@@ -45,7 +45,9 @@ class WorkerStatus:
45
  class TranslationOrchestrator:
46
  def __init__(self):
47
  self.workers: Dict[str, WorkerStatus] = {}
48
- self.job_queue = deque()
 
 
49
  self.active_jobs: Dict[str, Dict] = {}
50
  self.completed_jobs: Dict[str, Dict] = {}
51
  self.worker_lock = threading.Lock()
@@ -165,24 +167,51 @@ class TranslationOrchestrator:
165
 
166
  def add_job_to_queue(self, job_data: Dict):
167
  with self.job_lock:
168
- self.job_queue.append(job_data)
169
- print(f"📥 Job {job_data['request_id']} queued. Queue size: {len(self.job_queue)}")
 
 
 
 
 
 
 
 
 
 
 
 
 
170
 
171
  def process_job_queue(self):
172
- if not self.job_queue:
173
- return
174
-
175
  with self.job_lock:
176
- if not self.job_queue:
177
- return
 
 
 
 
 
 
 
 
 
 
178
 
179
- worker_id = self.get_available_worker()
180
- if not worker_id:
181
- return
182
-
183
- job_data = self.job_queue.popleft()
184
-
185
- self.assign_job_to_worker(worker_id, job_data)
 
 
 
 
 
 
186
 
187
  def assign_job_to_worker(self, worker_id: str, job_data: Dict):
188
  worker = self.workers[worker_id]
@@ -423,7 +452,9 @@ async def root():
423
  },
424
  "queue": {
425
  "active_jobs": len(orchestrator.active_jobs),
426
- "queued_jobs": len(orchestrator.job_queue),
 
 
427
  "completed_jobs": len(orchestrator.completed_jobs)
428
  }
429
  }
@@ -441,7 +472,9 @@ async def health_check():
441
  },
442
  "queue_stats": {
443
  "active": len(orchestrator.active_jobs),
444
- "queued": len(orchestrator.job_queue)
 
 
445
  }
446
  }
447
 
 
3
  import asyncio
4
  import uuid
5
  from datetime import datetime
6
+ from typing import Dict, List, Optional, Literal
7
  from collections import deque
8
  from fastapi import FastAPI, HTTPException, Request
9
  from fastapi.middleware.cors import CORSMiddleware
 
45
  class TranslationOrchestrator:
46
  def __init__(self):
47
  self.workers: Dict[str, WorkerStatus] = {}
48
+ # تغییر به priority queues
49
+ self.high_priority_queue = deque() # متن‌های کوتاه
50
+ self.normal_priority_queue = deque() # متن‌های بلند
51
  self.active_jobs: Dict[str, Dict] = {}
52
  self.completed_jobs: Dict[str, Dict] = {}
53
  self.worker_lock = threading.Lock()
 
167
 
168
  def add_job_to_queue(self, job_data: Dict):
169
  with self.job_lock:
170
+ text_length = len(job_data.get('text', ''))
171
+
172
+ # تعیین priority بر اساس طول متن
173
+ if text_length < 1000:
174
+ job_data['priority'] = 'high'
175
+ self.high_priority_queue.append(job_data)
176
+ queue_type = "HIGH priority"
177
+ else:
178
+ job_data['priority'] = 'normal'
179
+ self.normal_priority_queue.append(job_data)
180
+ queue_type = "NORMAL priority"
181
+
182
+ total_queue_size = len(self.high_priority_queue) + len(self.normal_priority_queue)
183
+ print(f"📥 Job {job_data['request_id']} ({text_length} chars) queued in {queue_type}. "
184
+ f"Queue sizes - High: {len(self.high_priority_queue)}, Normal: {len(self.normal_priority_queue)}")
185
 
186
  def process_job_queue(self):
187
+ """پردازش صف با اولویت به متن‌های کوتاه"""
 
 
188
  with self.job_lock:
189
+ # ابتدا high priority را چک می‌کنیم
190
+ if self.high_priority_queue:
191
+ worker_id = self.get_available_worker()
192
+ if worker_id:
193
+ job_data = self.high_priority_queue.popleft()
194
+ print(f"⚡ Processing HIGH priority job {job_data['request_id']}")
195
+ threading.Thread(
196
+ target=self.assign_job_to_worker,
197
+ args=(worker_id, job_data),
198
+ daemon=True
199
+ ).start()
200
+ return
201
 
202
+ # سپس normal priority
203
+ if self.normal_priority_queue:
204
+ worker_id = self.get_available_worker()
205
+ if worker_id:
206
+ job_data = self.normal_priority_queue.popleft()
207
+ print(f"📄 Processing NORMAL priority job {job_data['request_id']}")
208
+ threading.Thread(
209
+ target=self.assign_job_to_worker,
210
+ args=(worker_id, job_data),
211
+ daemon=True
212
+ ).start()
213
+ return
214
+
215
 
216
  def assign_job_to_worker(self, worker_id: str, job_data: Dict):
217
  worker = self.workers[worker_id]
 
452
  },
453
  "queue": {
454
  "active_jobs": len(orchestrator.active_jobs),
455
+ "high_priority_queued": len(orchestrator.high_priority_queue),
456
+ "normal_priority_queued": len(orchestrator.normal_priority_queue),
457
+ "total_queued": len(orchestrator.high_priority_queue) + len(orchestrator.normal_priority_queue),
458
  "completed_jobs": len(orchestrator.completed_jobs)
459
  }
460
  }
 
472
  },
473
  "queue_stats": {
474
  "active": len(orchestrator.active_jobs),
475
+ "high_priority": len(orchestrator.high_priority_queue),
476
+ "normal_priority": len(orchestrator.normal_priority_queue),
477
+ "total_queued": len(orchestrator.high_priority_queue) + len(orchestrator.normal_priority_queue)
478
  }
479
  }
480