jebin2 commited on
Commit
ec0e527
·
1 Parent(s): 74b89f0

retry logic

Browse files
check_exceptions.py ADDED
@@ -0,0 +1,15 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+
2
+ try:
3
+ from google.api_core import exceptions
4
+ print("google.api_core.exceptions found")
5
+ print(f"ResourceExhausted: {exceptions.ResourceExhausted}")
6
+ print(f"Unauthenticated: {exceptions.Unauthenticated}")
7
+ print(f"PermissionDenied: {exceptions.PermissionDenied}")
8
+ except ImportError:
9
+ print("google.api_core.exceptions NOT found")
10
+
11
+ try:
12
+ from google import genai
13
+ print("google.genai found")
14
+ except ImportError:
15
+ print("google.genai NOT found")
core/models.py CHANGED
@@ -165,6 +165,7 @@ class GeminiJob(Base):
165
 
166
  input_data = Column(JSON, nullable=True) # Request details (prompt, settings, etc.)
167
  output_data = Column(JSON, nullable=True) # Result (filename, text, etc.)
 
168
  error_message = Column(Text, nullable=True)
169
  created_at = Column(DateTime(timezone=True), server_default=func.now())
170
  started_at = Column(DateTime(timezone=True), nullable=True)
 
165
 
166
  input_data = Column(JSON, nullable=True) # Request details (prompt, settings, etc.)
167
  output_data = Column(JSON, nullable=True) # Result (filename, text, etc.)
168
+ api_response = Column(JSON, nullable=True) # Raw response from third-party API (success or error)
169
  error_message = Column(Text, nullable=True)
170
  created_at = Column(DateTime(timezone=True), server_default=func.now())
171
  started_at = Column(DateTime(timezone=True), nullable=True)
routers/gemini.py CHANGED
@@ -535,23 +535,25 @@ async def delete_job(
535
  refund_amount = 0
536
  message = "Job deleted"
537
 
538
- if job.status == "queued":
539
- # Refund logic: Restore 8 credits (10 - 2)
540
- # Only if it was a video job (cost 10). For others (cost 1), maybe no refund or full?
541
- # Requirement says "restore 8", implying video job context.
542
- # Let's check credits_reserved. If 10, refund 8. If 1, refund 0? Or 1?
543
- # Assuming this logic is specific to the high-cost video jobs.
544
-
545
- if job.credits_reserved >= 10:
 
 
 
 
 
546
  refund_amount = 8
547
  user.credits += refund_amount
 
 
 
548
  message = f"Job deleted. {refund_amount} credits refunded."
549
- elif job.credits_reserved > 0:
550
- # For lower cost jobs, maybe full refund if queued? Or partial?
551
- # User specifically mentioned "restore 8" for the queued state.
552
- # I'll stick to the specific requirement for now, but maybe refund full for 1-credit jobs?
553
- # Let's assume strict "restore 8" applies to the 10-credit video jobs.
554
- pass
555
 
556
  await db.delete(job)
557
  await db.commit()
 
535
  refund_amount = 0
536
  message = "Job deleted"
537
 
538
+ if not job.third_party_id:
539
+ # Job never successfully started on Gemini (Dev error / Pre-execution failure)
540
+ # Refund FULL credits
541
+ if job.credits_reserved > 0 and not job.credits_refunded:
542
+ refund_amount = job.credits_reserved
543
+ user.credits += refund_amount
544
+ job.credits_refunded = True
545
+ message = f"Job deleted. Full {refund_amount} credits refunded (job not started)."
546
+ elif job.status == "queued":
547
+ # Job has third_party_id but is queued? (Unlikely for video, but maybe for others?)
548
+ # Or maybe it was reset to queued?
549
+ # Use existing logic: Refund 8 credits (10 - 2) for video
550
+ if job.credits_reserved >= 10 and not job.credits_refunded:
551
  refund_amount = 8
552
  user.credits += refund_amount
553
+ # Don't mark as fully refunded, as it's partial?
554
+ # Actually credits_refunded is boolean. Maybe we shouldn't set it if partial?
555
+ # But we gave back credits. Let's just update user credits.
556
  message = f"Job deleted. {refund_amount} credits refunded."
 
 
 
 
 
 
557
 
558
  await db.delete(job)
559
  await db.commit()
services/gemini_job_worker.py CHANGED
@@ -51,6 +51,71 @@ class GeminiJobProcessor(JobProcessor[GeminiJob]):
51
  from services.api_key_manager import record_usage
52
  await record_usage(session, key_index, success, error_message)
53
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
54
  async def process(self, job: GeminiJob, session: AsyncSession) -> GeminiJob:
55
  """Start processing a new job with round-robin API key."""
56
  key_index, service = await self._get_service_with_key(session)
@@ -80,12 +145,9 @@ class GeminiJobProcessor(JobProcessor[GeminiJob]):
80
  job.completed_at = datetime.utcnow()
81
  error_msg = job.error_message
82
  except Exception as e:
83
- logger.error(f"Error processing job {job.job_id}: {e}")
84
- job.status = "failed"
85
- job.error_message = str(e)
86
- job.completed_at = datetime.utcnow()
87
- success = False
88
- error_msg = str(e)
89
 
90
  # Record usage
91
  await self._record_usage(session, key_index, success, error_msg)
@@ -107,6 +169,8 @@ class GeminiJobProcessor(JobProcessor[GeminiJob]):
107
 
108
  try:
109
  status_result = await service.check_video_status(job.third_party_id)
 
 
110
 
111
  if status_result.get("done"):
112
  if status_result.get("status") == "completed":
@@ -137,14 +201,14 @@ class GeminiJobProcessor(JobProcessor[GeminiJob]):
137
  success = True # Status check succeeded even if video not ready
138
 
139
  except Exception as e:
140
- logger.error(f"Error checking video status for {job.job_id}: {e}")
141
- job.retry_count += 1
142
- job.error_message = f"Status check failed: {e}"
143
- config = WorkerConfig.from_env()
144
- interval = get_interval_for_priority(job.priority, config)
145
- job.next_process_at = datetime.utcnow() + timedelta(seconds=interval)
146
- success = False
147
- error_msg = str(e)
148
 
149
  # Record usage
150
  await self._record_usage(session, key_index, success, error_msg)
@@ -198,6 +262,7 @@ class GeminiJobProcessor(JobProcessor[GeminiJob]):
198
  number_of_videos=input_data.get("number_of_videos", 1)
199
  )
200
  job.third_party_id = result.get("gemini_operation_name")
 
201
 
202
  # Schedule first status check
203
  config = WorkerConfig.from_env()
@@ -215,6 +280,8 @@ class GeminiJobProcessor(JobProcessor[GeminiJob]):
215
  )
216
  job.status = "completed"
217
  job.output_data = {"image": result}
 
 
218
  job.completed_at = datetime.utcnow()
219
  return job
220
 
@@ -226,6 +293,7 @@ class GeminiJobProcessor(JobProcessor[GeminiJob]):
226
  )
227
  job.status = "completed"
228
  job.output_data = {"text": result}
 
229
  job.completed_at = datetime.utcnow()
230
  return job
231
 
@@ -238,6 +306,7 @@ class GeminiJobProcessor(JobProcessor[GeminiJob]):
238
  )
239
  job.status = "completed"
240
  job.output_data = {"analysis": result}
 
241
  job.completed_at = datetime.utcnow()
242
  return job
243
 
@@ -250,6 +319,7 @@ class GeminiJobProcessor(JobProcessor[GeminiJob]):
250
  )
251
  job.status = "completed"
252
  job.output_data = {"prompt": result}
 
253
  job.completed_at = datetime.utcnow()
254
  return job
255
 
 
51
  from services.api_key_manager import record_usage
52
  await record_usage(session, key_index, success, error_message)
53
 
54
+ def _handle_error(self, job: GeminiJob, error: Exception, reset_to_queued: bool = False) -> tuple[bool, str]:
55
+ """
56
+ Handle job errors with retry logic.
57
+
58
+ Args:
59
+ job: The job object
60
+ error: The exception raised
61
+ reset_to_queued: Whether to reset status to 'queued' on retry (for process())
62
+
63
+ Returns:
64
+ Tuple of (success, error_message)
65
+ success is False (since it's an error)
66
+ error_message is the formatted error string
67
+ """
68
+ error_str = str(error)
69
+ is_retryable = False
70
+ log_msg = ""
71
+
72
+ # Check for Rate Limit (429)
73
+ if "429" in error_str or "ResourceExhausted" in error_str:
74
+ is_retryable = True
75
+ log_msg = f"Rate limit hit for job {job.job_id}"
76
+
77
+ # Check for Auth/Billing errors (401, 403, API key not found, API key not valid, FAILED_PRECONDITION)
78
+ elif "401" in error_str or "403" in error_str or "Unauthenticated" in error_str or "PermissionDenied" in error_str or "API key not found" in error_str or "API key not valid" in error_str or "FAILED_PRECONDITION" in error_str:
79
+ is_retryable = True
80
+ log_msg = f"Auth/Billing error for job {job.job_id}: {error_str}. Rescheduling to try different key."
81
+
82
+ # Check for Server errors (500, 503, 504)
83
+ elif "500" in error_str or "503" in error_str or "504" in error_str or "INTERNAL" in error_str or "UNAVAILABLE" in error_str or "DEADLINE_EXCEEDED" in error_str:
84
+ is_retryable = True
85
+ log_msg = f"Server error for job {job.job_id}: {error_str}"
86
+
87
+ # Try to parse JSON error details if present
88
+ try:
89
+ import json
90
+ import re
91
+ # Look for JSON-like structure in error string
92
+ json_match = re.search(r"(\{.*\})", error_str)
93
+ if json_match:
94
+ job.api_response = json.loads(json_match.group(1))
95
+ else:
96
+ job.api_response = {"error": error_str}
97
+ except Exception:
98
+ job.api_response = {"error": error_str}
99
+
100
+ if is_retryable:
101
+ logger.warning(f"{log_msg}. Rescheduling.")
102
+ job.retry_count += 1
103
+ config = WorkerConfig.from_env()
104
+ # Use a longer delay for these errors (e.g., 30s)
105
+ interval = 30
106
+ job.next_process_at = datetime.utcnow() + timedelta(seconds=interval)
107
+
108
+ if reset_to_queued:
109
+ job.status = "queued"
110
+
111
+ return False, f"Retryable error: {error_str}"
112
+ else:
113
+ logger.error(f"Error processing job {job.job_id}: {error}")
114
+ job.status = "failed"
115
+ job.error_message = str(error)
116
+ job.completed_at = datetime.utcnow()
117
+ return False, str(error)
118
+
119
  async def process(self, job: GeminiJob, session: AsyncSession) -> GeminiJob:
120
  """Start processing a new job with round-robin API key."""
121
  key_index, service = await self._get_service_with_key(session)
 
145
  job.completed_at = datetime.utcnow()
146
  error_msg = job.error_message
147
  except Exception as e:
148
+ # Use helper for error handling
149
+ # reset_to_queued=True because if we fail to start, we want to try starting again from scratch
150
+ success, error_msg = self._handle_error(job, e, reset_to_queued=True)
 
 
 
151
 
152
  # Record usage
153
  await self._record_usage(session, key_index, success, error_msg)
 
169
 
170
  try:
171
  status_result = await service.check_video_status(job.third_party_id)
172
+ # Save raw response
173
+ job.api_response = status_result
174
 
175
  if status_result.get("done"):
176
  if status_result.get("status") == "completed":
 
201
  success = True # Status check succeeded even if video not ready
202
 
203
  except Exception as e:
204
+ # Use helper for error handling
205
+ # reset_to_queued=False because we want to continue checking status, not restart
206
+ success, error_msg = self._handle_error(job, e, reset_to_queued=False)
207
+
208
+ # Record usage
209
+ await self._record_usage(session, key_index, success, error_msg)
210
+
211
+ return job
212
 
213
  # Record usage
214
  await self._record_usage(session, key_index, success, error_msg)
 
262
  number_of_videos=input_data.get("number_of_videos", 1)
263
  )
264
  job.third_party_id = result.get("gemini_operation_name")
265
+ job.api_response = result
266
 
267
  # Schedule first status check
268
  config = WorkerConfig.from_env()
 
280
  )
281
  job.status = "completed"
282
  job.output_data = {"image": result}
283
+ # Don't save full base64 image to api_response
284
+ job.api_response = {"status": "success", "type": "image_edit"}
285
  job.completed_at = datetime.utcnow()
286
  return job
287
 
 
293
  )
294
  job.status = "completed"
295
  job.output_data = {"text": result}
296
+ job.api_response = {"result": result}
297
  job.completed_at = datetime.utcnow()
298
  return job
299
 
 
306
  )
307
  job.status = "completed"
308
  job.output_data = {"analysis": result}
309
+ job.api_response = {"result": result}
310
  job.completed_at = datetime.utcnow()
311
  return job
312
 
 
319
  )
320
  job.status = "completed"
321
  job.output_data = {"prompt": result}
322
+ job.api_response = {"result": result}
323
  job.completed_at = datetime.utcnow()
324
  return job
325
 
services/gemini_service.py CHANGED
@@ -33,6 +33,7 @@ os.makedirs(DOWNLOADS_DIR, exist_ok=True)
33
 
34
  # Mock mode for local testing (set GEMINI_MOCK_MODE=true to skip real API calls)
35
  MOCK_MODE = os.getenv("GEMINI_MOCK_MODE", "false").lower() == "true"
 
36
 
37
  # Sample video URL for mock mode (a public test video)
38
  MOCK_VIDEO_URL = "https://video.twimg.com/amplify_video/1994083297756848128/vid/avc1/576x576/ue31qU0xts8L9tXD.mp4?tag=21"
@@ -116,7 +117,7 @@ class GeminiService:
116
  # Mock mode for testing
117
  if MOCK_MODE:
118
  logger.info("[MOCK MODE] Generating animation prompt")
119
- await asyncio.sleep(0.5) # Simulate API delay
120
  return "A gentle breeze rustles through the scene as soft light dances across the surface. The camera slowly zooms in with a subtle parallax effect, creating depth and movement."
121
 
122
  default_prompt = custom_prompt or "Describe how this image could be subtly animated with cinematic movement."
@@ -348,7 +349,7 @@ class GeminiService:
348
  # Mock mode for testing
349
  if MOCK_MODE:
350
  logger.info(f"[MOCK MODE] Generating text for prompt: {prompt[:50]}...")
351
- await asyncio.sleep(0.5) # Simulate API delay
352
  return f"This is a mock response for your prompt: '{prompt[:100]}...'. In production, this would be generated by Gemini AI."
353
 
354
  model_name = model or MODELS["text_generation"]
@@ -377,7 +378,7 @@ class GeminiService:
377
  # Mock mode for testing
378
  if MOCK_MODE:
379
  logger.info(f"[MOCK MODE] Analyzing image with prompt: {prompt[:50]}...")
380
- await asyncio.sleep(0.5) # Simulate API delay
381
  return f"Mock analysis result: The image appears to show a scene that matches your query '{prompt[:50]}...'. This is placeholder content for testing."
382
 
383
  async with get_text_semaphore():
 
33
 
34
  # Mock mode for local testing (set GEMINI_MOCK_MODE=true to skip real API calls)
35
  MOCK_MODE = os.getenv("GEMINI_MOCK_MODE", "false").lower() == "true"
36
+ MOCK_MODE_SLEEP_TIME = os.getenv("GEMINI_MOCK_MODE_SLEEP_TIME", "0.5")
37
 
38
  # Sample video URL for mock mode (a public test video)
39
  MOCK_VIDEO_URL = "https://video.twimg.com/amplify_video/1994083297756848128/vid/avc1/576x576/ue31qU0xts8L9tXD.mp4?tag=21"
 
117
  # Mock mode for testing
118
  if MOCK_MODE:
119
  logger.info("[MOCK MODE] Generating animation prompt")
120
+ await asyncio.sleep(GEMINI_MOCK_MODE_SLEEP_TIME) # Simulate API delay
121
  return "A gentle breeze rustles through the scene as soft light dances across the surface. The camera slowly zooms in with a subtle parallax effect, creating depth and movement."
122
 
123
  default_prompt = custom_prompt or "Describe how this image could be subtly animated with cinematic movement."
 
349
  # Mock mode for testing
350
  if MOCK_MODE:
351
  logger.info(f"[MOCK MODE] Generating text for prompt: {prompt[:50]}...")
352
+ await asyncio.sleep(MOCK_MODE_SLEEP_TIME) # Simulate API delay
353
  return f"This is a mock response for your prompt: '{prompt[:100]}...'. In production, this would be generated by Gemini AI."
354
 
355
  model_name = model or MODELS["text_generation"]
 
378
  # Mock mode for testing
379
  if MOCK_MODE:
380
  logger.info(f"[MOCK MODE] Analyzing image with prompt: {prompt[:50]}...")
381
+ await asyncio.sleep(MOCK_MODE_SLEEP_TIME) # Simulate API delay
382
  return f"Mock analysis result: The image appears to show a scene that matches your query '{prompt[:50]}...'. This is placeholder content for testing."
383
 
384
  async with get_text_semaphore():