jebin2 commited on
Commit
8c4055f
·
1 Parent(s): 0164d71

start instant

Browse files
routers/gemini.py CHANGED
@@ -75,7 +75,7 @@ async def create_job(
75
  input_data: dict
76
  ) -> GeminiJob:
77
  """Create a new job in the queue with auto-assigned priority."""
78
- from services.gemini_job_worker import get_priority_for_job_type
79
 
80
  job_id = f"job_{uuid.uuid4().hex[:16]}"
81
  priority = get_priority_for_job_type(job_type)
@@ -92,6 +92,9 @@ async def create_job(
92
  await db.commit()
93
  await db.refresh(job)
94
 
 
 
 
95
  return job
96
 
97
 
 
75
  input_data: dict
76
  ) -> GeminiJob:
77
  """Create a new job in the queue with auto-assigned priority."""
78
+ from services.gemini_job_worker import get_priority_for_job_type, get_pool
79
 
80
  job_id = f"job_{uuid.uuid4().hex[:16]}"
81
  priority = get_priority_for_job_type(job_type)
 
92
  await db.commit()
93
  await db.refresh(job)
94
 
95
+ # Notify workers immediately so they wake up and process this job
96
+ get_pool().notify_new_job(priority)
97
+
98
  return job
99
 
100
 
services/gemini_job_worker.py CHANGED
@@ -112,20 +112,39 @@ class GeminiJobProcessor(JobProcessor[GeminiJob]):
112
  if status_result.get("status") == "completed":
113
  video_url = status_result.get("video_url")
114
  if video_url:
115
- filename = await service.download_video(video_url, job.job_id)
116
- job.status = "completed"
117
- job.output_data = {"filename": filename}
118
- success = True
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
119
  else:
120
  job.status = "failed"
121
  job.error_message = "No video URL returned"
 
122
  error_msg = job.error_message
123
  else:
124
  job.status = "failed"
125
  job.error_message = status_result.get("error", "Unknown error")
 
126
  error_msg = job.error_message
127
-
128
- job.completed_at = datetime.utcnow()
129
  else:
130
  # Not done - reschedule
131
  job.retry_count += 1
@@ -137,6 +156,7 @@ class GeminiJobProcessor(JobProcessor[GeminiJob]):
137
  except Exception as e:
138
  logger.error(f"Error checking video status for {job.job_id}: {e}")
139
  job.retry_count += 1
 
140
  config = WorkerConfig.from_env()
141
  interval = get_interval_for_priority(job.priority, config)
142
  job.next_process_at = datetime.utcnow() + timedelta(seconds=interval)
 
112
  if status_result.get("status") == "completed":
113
  video_url = status_result.get("video_url")
114
  if video_url:
115
+ try:
116
+ filename = await service.download_video(video_url, job.job_id)
117
+ job.status = "completed"
118
+ job.output_data = {"filename": filename}
119
+ job.error_message = None # Clear any previous error
120
+ success = True
121
+ except Exception as download_error:
122
+ # Download failed - track separately
123
+ error_msg = f"Download failed: {download_error}"
124
+ logger.error(f"Error downloading video for {job.job_id}: {download_error}")
125
+
126
+ # Fail job after 5 download attempts (don't retry forever)
127
+ if job.retry_count >= 5:
128
+ job.status = "failed"
129
+ job.error_message = error_msg
130
+ job.completed_at = datetime.utcnow()
131
+ else:
132
+ # Retry
133
+ job.retry_count += 1
134
+ job.error_message = f"Download attempt {job.retry_count} failed: {download_error}"
135
+ config = WorkerConfig.from_env()
136
+ interval = get_interval_for_priority(job.priority, config)
137
+ job.next_process_at = datetime.utcnow() + timedelta(seconds=interval)
138
  else:
139
  job.status = "failed"
140
  job.error_message = "No video URL returned"
141
+ job.completed_at = datetime.utcnow()
142
  error_msg = job.error_message
143
  else:
144
  job.status = "failed"
145
  job.error_message = status_result.get("error", "Unknown error")
146
+ job.completed_at = datetime.utcnow()
147
  error_msg = job.error_message
 
 
148
  else:
149
  # Not done - reschedule
150
  job.retry_count += 1
 
156
  except Exception as e:
157
  logger.error(f"Error checking video status for {job.job_id}: {e}")
158
  job.retry_count += 1
159
+ job.error_message = f"Status check failed: {e}"
160
  config = WorkerConfig.from_env()
161
  interval = get_interval_for_priority(job.priority, config)
162
  job.next_process_at = datetime.utcnow() + timedelta(seconds=interval)
services/gemini_service.py CHANGED
@@ -212,10 +212,13 @@ class GeminiService:
212
  Returns status and video URL if complete.
213
  """
214
  try:
215
- # Get operation status
 
 
 
216
  operation = await asyncio.to_thread(
217
  self.client.operations.get,
218
- name=gemini_operation_name
219
  )
220
 
221
  if not operation.done:
@@ -225,39 +228,36 @@ class GeminiService:
225
  "status": "pending"
226
  }
227
 
 
228
  if operation.error:
 
 
 
229
  return {
230
  "gemini_operation_name": gemini_operation_name,
231
  "done": True,
232
  "status": "failed",
233
- "error": operation.error.message or "Unknown error"
234
- }
235
-
236
- # Extract video URI
237
- generated_videos = getattr(operation.response, 'generated_videos', None)
238
- if not generated_videos:
239
- return {
240
- "gemini_operation_name": gemini_operation_name,
241
- "done": True,
242
- "status": "failed",
243
- "error": "No video URI returned. May be due to safety filters."
244
  }
245
 
246
- video_uri = generated_videos[0].video.uri if generated_videos[0].video else None
247
- if not video_uri:
248
- return {
249
- "gemini_operation_name": gemini_operation_name,
250
- "done": True,
251
- "status": "failed",
252
- "error": "No video URI in response."
253
- }
 
 
 
 
254
 
255
- # Return success with video URL (internal - will be downloaded by router)
256
  return {
257
  "gemini_operation_name": gemini_operation_name,
258
  "done": True,
259
- "status": "completed",
260
- "video_url": f"{video_uri}&key={self.api_key}"
261
  }
262
 
263
  except Exception as error:
@@ -280,7 +280,8 @@ class GeminiService:
280
  filepath = os.path.join(DOWNLOADS_DIR, filename)
281
 
282
  try:
283
- async with httpx.AsyncClient(timeout=120.0) as client:
 
284
  response = await client.get(video_url)
285
  response.raise_for_status()
286
 
 
212
  Returns status and video URL if complete.
213
  """
214
  try:
215
+ # Get operation status using the operation object
216
+ # First, we need to recreate the operation from the name
217
+ from google.genai.types import GenerateVideosOperation
218
+
219
  operation = await asyncio.to_thread(
220
  self.client.operations.get,
221
+ GenerateVideosOperation(name=gemini_operation_name, done=False)
222
  )
223
 
224
  if not operation.done:
 
228
  "status": "pending"
229
  }
230
 
231
+ # Check for error - handle both string and object types
232
  if operation.error:
233
+ error_msg = operation.error
234
+ if hasattr(operation.error, 'message'):
235
+ error_msg = operation.error.message
236
  return {
237
  "gemini_operation_name": gemini_operation_name,
238
  "done": True,
239
  "status": "failed",
240
+ "error": str(error_msg) or "Unknown error"
 
 
 
 
 
 
 
 
 
 
241
  }
242
 
243
+ # Extract video URI from result
244
+ result = operation.result
245
+ if result and hasattr(result, 'generated_videos') and result.generated_videos:
246
+ video = result.generated_videos[0]
247
+ if hasattr(video, 'video') and video.video and hasattr(video.video, 'uri'):
248
+ video_uri = video.video.uri
249
+ return {
250
+ "gemini_operation_name": gemini_operation_name,
251
+ "done": True,
252
+ "status": "completed",
253
+ "video_url": f"{video_uri}&key={self.api_key}"
254
+ }
255
 
 
256
  return {
257
  "gemini_operation_name": gemini_operation_name,
258
  "done": True,
259
+ "status": "failed",
260
+ "error": "No video URI returned. May be due to safety filters."
261
  }
262
 
263
  except Exception as error:
 
280
  filepath = os.path.join(DOWNLOADS_DIR, filename)
281
 
282
  try:
283
+ # follow_redirects=True is required as Gemini returns 302 redirects
284
+ async with httpx.AsyncClient(timeout=120.0, follow_redirects=True) as client:
285
  response = await client.get(video_url)
286
  response.raise_for_status()
287
 
services/priority_worker_pool.py CHANGED
@@ -149,7 +149,8 @@ class PriorityWorker(Generic[JobType]):
149
  session_maker: async_sessionmaker,
150
  job_model: type,
151
  job_processor: JobProcessor[JobType],
152
- max_retries: int = 60
 
153
  ):
154
  self.worker_id = worker_id
155
  self.priority = priority
@@ -160,6 +161,7 @@ class PriorityWorker(Generic[JobType]):
160
  self.max_retries = max_retries
161
  self._running = False
162
  self._current_job_id: Optional[str] = None
 
163
 
164
  async def start(self):
165
  """Start the worker polling loop."""
@@ -173,16 +175,44 @@ class PriorityWorker(Generic[JobType]):
173
  logger.info(f"Worker {self.worker_id} ({self.priority}) stopped")
174
 
175
  async def _poll_loop(self):
176
- """Main polling loop."""
 
 
 
 
 
 
 
177
  while self._running:
 
178
  try:
179
- await self._process_one_job()
180
  except Exception as e:
181
  logger.error(f"Worker {self.worker_id}: Error in poll loop: {e}")
182
- await asyncio.sleep(self.poll_interval)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
183
 
184
- async def _process_one_job(self):
185
- """Find and process one job."""
 
 
 
 
186
  async with self.session_maker() as session:
187
  now = datetime.utcnow()
188
 
@@ -202,18 +232,20 @@ class PriorityWorker(Generic[JobType]):
202
  job = result.scalar_one_or_none()
203
 
204
  if not job:
205
- return
206
 
207
  self._current_job_id = job.job_id
208
 
209
  try:
210
  await self._process_job(session, job)
 
211
  except Exception as e:
212
  logger.error(f"Worker {self.worker_id}: Error processing job {job.job_id}: {e}")
213
  job.status = "failed"
214
  job.error_message = str(e)
215
  job.completed_at = datetime.utcnow()
216
  await session.commit()
 
217
  finally:
218
  self._current_job_id = None
219
 
@@ -326,6 +358,13 @@ class PriorityWorkerPool(Generic[JobType]):
326
  )
327
  self.workers: List[PriorityWorker] = []
328
  self._running = False
 
 
 
 
 
 
 
329
 
330
  async def start(self):
331
  """Start all workers."""
@@ -341,7 +380,8 @@ class PriorityWorkerPool(Generic[JobType]):
341
  session_maker=self.session_maker,
342
  job_model=self.job_model,
343
  job_processor=self.job_processor,
344
- max_retries=self.config.max_retries
 
345
  )
346
  self.workers.append(worker)
347
  await worker.start()
@@ -356,7 +396,8 @@ class PriorityWorkerPool(Generic[JobType]):
356
  session_maker=self.session_maker,
357
  job_model=self.job_model,
358
  job_processor=self.job_processor,
359
- max_retries=self.config.max_retries
 
360
  )
361
  self.workers.append(worker)
362
  await worker.start()
@@ -371,7 +412,8 @@ class PriorityWorkerPool(Generic[JobType]):
371
  session_maker=self.session_maker,
372
  job_model=self.job_model,
373
  job_processor=self.job_processor,
374
- max_retries=self.config.max_retries
 
375
  )
376
  self.workers.append(worker)
377
  await worker.start()
@@ -383,6 +425,18 @@ class PriorityWorkerPool(Generic[JobType]):
383
  f"{self.config.fast_workers} fast, {self.config.medium_workers} medium, {self.config.slow_workers} slow"
384
  )
385
 
 
 
 
 
 
 
 
 
 
 
 
 
386
  async def stop(self):
387
  """Stop all workers."""
388
  self._running = False
 
149
  session_maker: async_sessionmaker,
150
  job_model: type,
151
  job_processor: JobProcessor[JobType],
152
+ max_retries: int = 60,
153
+ wake_event: Optional[asyncio.Event] = None
154
  ):
155
  self.worker_id = worker_id
156
  self.priority = priority
 
161
  self.max_retries = max_retries
162
  self._running = False
163
  self._current_job_id: Optional[str] = None
164
+ self._wake_event = wake_event # Event to wake worker immediately when new jobs arrive
165
 
166
  async def start(self):
167
  """Start the worker polling loop."""
 
175
  logger.info(f"Worker {self.worker_id} ({self.priority}) stopped")
176
 
177
  async def _poll_loop(self):
178
+ """Main polling loop with optimized scheduling.
179
+
180
+ Optimizations:
181
+ - When no jobs are found, sleep for poll_interval before checking again
182
+ - When a job is processed, immediately check for the next job (no waiting)
183
+ - This ensures first job starts immediately when queue was empty
184
+ - This ensures next job starts immediately after current job finishes
185
+ """
186
  while self._running:
187
+ job_found = False
188
  try:
189
+ job_found = await self._process_one_job()
190
  except Exception as e:
191
  logger.error(f"Worker {self.worker_id}: Error in poll loop: {e}")
192
+
193
+ # Only sleep if no job was found - otherwise immediately look for next job
194
+ if not job_found:
195
+ # Wait on event with timeout - allows immediate wake-up when new job arrives
196
+ if self._wake_event:
197
+ try:
198
+ # Wait for event or timeout (whichever comes first)
199
+ await asyncio.wait_for(
200
+ self._wake_event.wait(),
201
+ timeout=self.poll_interval
202
+ )
203
+ # Clear event after waking (we'll check for jobs)
204
+ self._wake_event.clear()
205
+ except asyncio.TimeoutError:
206
+ pass # Normal timeout, check for jobs
207
+ else:
208
+ await asyncio.sleep(self.poll_interval)
209
 
210
+ async def _process_one_job(self) -> bool:
211
+ """Find and process one job.
212
+
213
+ Returns:
214
+ True if a job was found and processed, False if no jobs available
215
+ """
216
  async with self.session_maker() as session:
217
  now = datetime.utcnow()
218
 
 
232
  job = result.scalar_one_or_none()
233
 
234
  if not job:
235
+ return False
236
 
237
  self._current_job_id = job.job_id
238
 
239
  try:
240
  await self._process_job(session, job)
241
+ return True
242
  except Exception as e:
243
  logger.error(f"Worker {self.worker_id}: Error processing job {job.job_id}: {e}")
244
  job.status = "failed"
245
  job.error_message = str(e)
246
  job.completed_at = datetime.utcnow()
247
  await session.commit()
248
+ return True # Job was found, even though it failed
249
  finally:
250
  self._current_job_id = None
251
 
 
358
  )
359
  self.workers: List[PriorityWorker] = []
360
  self._running = False
361
+
362
+ # Wake events for each priority tier - allows immediate job notification
363
+ self._wake_events: dict[str, asyncio.Event] = {
364
+ "fast": asyncio.Event(),
365
+ "medium": asyncio.Event(),
366
+ "slow": asyncio.Event()
367
+ }
368
 
369
  async def start(self):
370
  """Start all workers."""
 
380
  session_maker=self.session_maker,
381
  job_model=self.job_model,
382
  job_processor=self.job_processor,
383
+ max_retries=self.config.max_retries,
384
+ wake_event=self._wake_events["fast"]
385
  )
386
  self.workers.append(worker)
387
  await worker.start()
 
396
  session_maker=self.session_maker,
397
  job_model=self.job_model,
398
  job_processor=self.job_processor,
399
+ max_retries=self.config.max_retries,
400
+ wake_event=self._wake_events["medium"]
401
  )
402
  self.workers.append(worker)
403
  await worker.start()
 
412
  session_maker=self.session_maker,
413
  job_model=self.job_model,
414
  job_processor=self.job_processor,
415
+ max_retries=self.config.max_retries,
416
+ wake_event=self._wake_events["slow"]
417
  )
418
  self.workers.append(worker)
419
  await worker.start()
 
425
  f"{self.config.fast_workers} fast, {self.config.medium_workers} medium, {self.config.slow_workers} slow"
426
  )
427
 
428
+ def notify_new_job(self, priority: str):
429
+ """
430
+ Wake sleeping workers of the specified priority tier.
431
+ Call this when a new job is created to start processing immediately.
432
+
433
+ Args:
434
+ priority: Priority tier ("fast", "medium", or "slow")
435
+ """
436
+ if priority in self._wake_events:
437
+ self._wake_events[priority].set()
438
+ logger.debug(f"Notified {priority} workers of new job")
439
+
440
  async def stop(self):
441
  """Stop all workers."""
442
  self._running = False