jebin2 commited on
Commit
d218e46
·
1 Parent(s): c3f1513
app.py CHANGED
@@ -63,7 +63,7 @@ async def lifespan(app: FastAPI):
63
 
64
  # Shutdown: Upload DB to Drive
65
  logger.info("Shutdown: Uploading database to Google Drive...")
66
- drive_service.upload_db()
67
  logger.info("Shutting down...")
68
 
69
 
 
63
 
64
  # Shutdown: Upload DB to Drive
65
  logger.info("Shutdown: Uploading database to Google Drive...")
66
+ await drive_service.upload_db_async()
67
  logger.info("Shutting down...")
68
 
69
 
routers/auth.py CHANGED
@@ -205,7 +205,7 @@ async def google_auth(
205
  access_token = create_access_token(user.user_id, user.email, user.token_version)
206
 
207
  # Sync DB to Drive (Async)
208
- background_tasks.add_task(drive_service.upload_db)
209
 
210
  return AuthResponse(
211
  success=True,
@@ -341,6 +341,6 @@ async def logout(
341
  await db.commit()
342
 
343
  # Sync DB to Drive (Async)
344
- background_tasks.add_task(drive_service.upload_db)
345
 
346
  return {"success": True, "message": "Logged out successfully. All sessions invalidated."}
 
205
  access_token = create_access_token(user.user_id, user.email, user.token_version)
206
 
207
  # Sync DB to Drive (Async)
208
+ background_tasks.add_task(drive_service.upload_db_async)
209
 
210
  return AuthResponse(
211
  success=True,
 
341
  await db.commit()
342
 
343
  # Sync DB to Drive (Async)
344
+ background_tasks.add_task(drive_service.upload_db_async)
345
 
346
  return {"success": True, "message": "Logged out successfully. All sessions invalidated."}
routers/payments.py CHANGED
@@ -407,7 +407,7 @@ async def verify_payment(
407
  )
408
 
409
  # Sync DB to Drive (Async)
410
- background_tasks.add_task(drive_service.upload_db)
411
 
412
  return VerifyPaymentResponse(
413
  success=True,
@@ -502,7 +502,7 @@ async def razorpay_webhook(
502
  )
503
 
504
  # Sync DB to Drive (Async)
505
- background_tasks.add_task(drive_service.upload_db)
506
 
507
  # Handle payment failed event
508
  elif event == "payment.failed":
 
407
  )
408
 
409
  # Sync DB to Drive (Async)
410
+ background_tasks.add_task(drive_service.upload_db_async)
411
 
412
  return VerifyPaymentResponse(
413
  success=True,
 
502
  )
503
 
504
  # Sync DB to Drive (Async)
505
+ background_tasks.add_task(drive_service.upload_db_async)
506
 
507
  # Handle payment failed event
508
  elif event == "payment.failed":
services/drive_service.py CHANGED
@@ -1,6 +1,7 @@
1
  import os
2
  import logging
3
  import io
 
4
  from dotenv import load_dotenv
5
 
6
  load_dotenv()
@@ -132,6 +133,10 @@ class DriveService:
132
  logger.error(f"Unexpected error uploading DB: {e}")
133
  return False
134
 
 
 
 
 
135
  def download_db(self):
136
  """Download the database file from Google Drive."""
137
  if not self.service and not self.authenticate():
 
1
  import os
2
  import logging
3
  import io
4
+ import asyncio
5
  from dotenv import load_dotenv
6
 
7
  load_dotenv()
 
133
  logger.error(f"Unexpected error uploading DB: {e}")
134
  return False
135
 
136
+ async def upload_db_async(self):
137
+ """Async wrapper for upload_db to run in a separate thread."""
138
+ return await asyncio.to_thread(self.upload_db)
139
+
140
  def download_db(self):
141
  """Download the database file from Google Drive."""
142
  if not self.service and not self.authenticate():
services/gemini_job_worker.py CHANGED
@@ -45,14 +45,6 @@ class GeminiJobProcessor(JobProcessor[GeminiJob]):
45
  def __init__(self):
46
  self.drive_service = DriveService()
47
 
48
- async def _sync_db_to_drive(self):
49
- """Sync database to Google Drive in a separate thread."""
50
- try:
51
- # Run synchronous upload_db in a thread to avoid blocking the event loop
52
- await asyncio.to_thread(self.drive_service.upload_db)
53
- except Exception as e:
54
- logger.error(f"Failed to sync DB to Drive after job completion: {e}")
55
-
56
  async def _get_service_with_key(self, session: AsyncSession) -> tuple:
57
  """Get a GeminiService with the least-used API key."""
58
  from services.api_key_manager import get_least_used_key
@@ -196,7 +188,7 @@ class GeminiJobProcessor(JobProcessor[GeminiJob]):
196
  job.completed_at = datetime.utcnow()
197
  success = True
198
  # Sync DB on success
199
- await self._sync_db_to_drive()
200
  else:
201
  job.status = "failed"
202
  job.error_message = "No video URL returned"
@@ -299,7 +291,7 @@ class GeminiJobProcessor(JobProcessor[GeminiJob]):
299
  job.api_response = {"status": "success", "type": "image_edit"}
300
  job.completed_at = datetime.utcnow()
301
  # Sync DB on success
302
- await self._sync_db_to_drive()
303
  return job
304
 
305
  async def _process_text(self, job: GeminiJob, service: GeminiService, input_data: dict) -> GeminiJob:
@@ -313,7 +305,7 @@ class GeminiJobProcessor(JobProcessor[GeminiJob]):
313
  job.api_response = {"result": result}
314
  job.completed_at = datetime.utcnow()
315
  # Sync DB on success
316
- await self._sync_db_to_drive()
317
  return job
318
 
319
  async def _process_analyze(self, job: GeminiJob, service: GeminiService, input_data: dict) -> GeminiJob:
@@ -328,7 +320,7 @@ class GeminiJobProcessor(JobProcessor[GeminiJob]):
328
  job.api_response = {"result": result}
329
  job.completed_at = datetime.utcnow()
330
  # Sync DB on success
331
- await self._sync_db_to_drive()
332
  return job
333
 
334
  async def _process_animation_prompt(self, job: GeminiJob, service: GeminiService, input_data: dict) -> GeminiJob:
@@ -343,7 +335,7 @@ class GeminiJobProcessor(JobProcessor[GeminiJob]):
343
  job.api_response = {"result": result}
344
  job.completed_at = datetime.utcnow()
345
  # Sync DB on success
346
- await self._sync_db_to_drive()
347
  return job
348
 
349
 
 
45
  def __init__(self):
46
  self.drive_service = DriveService()
47
 
 
 
 
 
 
 
 
 
48
  async def _get_service_with_key(self, session: AsyncSession) -> tuple:
49
  """Get a GeminiService with the least-used API key."""
50
  from services.api_key_manager import get_least_used_key
 
188
  job.completed_at = datetime.utcnow()
189
  success = True
190
  # Sync DB on success
191
+ await self.drive_service.upload_db_async()
192
  else:
193
  job.status = "failed"
194
  job.error_message = "No video URL returned"
 
291
  job.api_response = {"status": "success", "type": "image_edit"}
292
  job.completed_at = datetime.utcnow()
293
  # Sync DB on success
294
+ await self.drive_service.upload_db_async()
295
  return job
296
 
297
  async def _process_text(self, job: GeminiJob, service: GeminiService, input_data: dict) -> GeminiJob:
 
305
  job.api_response = {"result": result}
306
  job.completed_at = datetime.utcnow()
307
  # Sync DB on success
308
+ await self.drive_service.upload_db_async()
309
  return job
310
 
311
  async def _process_analyze(self, job: GeminiJob, service: GeminiService, input_data: dict) -> GeminiJob:
 
320
  job.api_response = {"result": result}
321
  job.completed_at = datetime.utcnow()
322
  # Sync DB on success
323
+ await self.drive_service.upload_db_async()
324
  return job
325
 
326
  async def _process_animation_prompt(self, job: GeminiJob, service: GeminiService, input_data: dict) -> GeminiJob:
 
335
  job.api_response = {"result": result}
336
  job.completed_at = datetime.utcnow()
337
  # Sync DB on success
338
+ await self.drive_service.upload_db_async()
339
  return job
340
 
341