jebin2 commited on
Commit
c3f1513
·
1 Parent(s): e4273eb

drive sync

Browse files
routers/payments.py CHANGED
@@ -14,7 +14,7 @@ import uuid
14
  from datetime import datetime
15
  from typing import List, Optional
16
 
17
- from fastapi import APIRouter, Depends, HTTPException, Query, Request, status
18
  from pydantic import BaseModel
19
  from sqlalchemy import select, desc, func
20
  from sqlalchemy.ext.asyncio import AsyncSession
@@ -22,6 +22,7 @@ from sqlalchemy.ext.asyncio import AsyncSession
22
  from core.database import get_db
23
  from core.models import User, PaymentTransaction
24
  from dependencies import get_current_user
 
25
  from services.razorpay_service import (
26
  RazorpayService,
27
  RazorpayConfigError,
@@ -37,6 +38,7 @@ from services.razorpay_service import (
37
  logger = logging.getLogger(__name__)
38
 
39
  router = APIRouter(prefix="/payments", tags=["payments"])
 
40
 
41
 
42
  # =============================================================================
@@ -314,6 +316,7 @@ async def create_order(
314
  @router.post("/verify", response_model=VerifyPaymentResponse)
315
  async def verify_payment(
316
  request: VerifyPaymentRequest,
 
317
  user: User = Depends(get_current_user),
318
  db: AsyncSession = Depends(get_db)
319
  ):
@@ -403,6 +406,9 @@ async def verify_payment(
403
  signature=request.razorpay_signature
404
  )
405
 
 
 
 
406
  return VerifyPaymentResponse(
407
  success=True,
408
  message="Payment successful! Credits added.",
@@ -422,6 +428,7 @@ async def verify_payment(
422
  @router.post("/webhook/razorpay")
423
  async def razorpay_webhook(
424
  request: Request,
 
425
  db: AsyncSession = Depends(get_db)
426
  ):
427
  """
@@ -493,6 +500,9 @@ async def razorpay_webhook(
493
  source="webhook",
494
  db=db
495
  )
 
 
 
496
 
497
  # Handle payment failed event
498
  elif event == "payment.failed":
 
14
  from datetime import datetime
15
  from typing import List, Optional
16
 
17
+ from fastapi import APIRouter, Depends, HTTPException, Query, Request, status, BackgroundTasks
18
  from pydantic import BaseModel
19
  from sqlalchemy import select, desc, func
20
  from sqlalchemy.ext.asyncio import AsyncSession
 
22
  from core.database import get_db
23
  from core.models import User, PaymentTransaction
24
  from dependencies import get_current_user
25
+ from services.drive_service import DriveService
26
  from services.razorpay_service import (
27
  RazorpayService,
28
  RazorpayConfigError,
 
38
  logger = logging.getLogger(__name__)
39
 
40
  router = APIRouter(prefix="/payments", tags=["payments"])
41
+ drive_service = DriveService()
42
 
43
 
44
  # =============================================================================
 
316
  @router.post("/verify", response_model=VerifyPaymentResponse)
317
  async def verify_payment(
318
  request: VerifyPaymentRequest,
319
+ background_tasks: BackgroundTasks,
320
  user: User = Depends(get_current_user),
321
  db: AsyncSession = Depends(get_db)
322
  ):
 
406
  signature=request.razorpay_signature
407
  )
408
 
409
+ # Sync DB to Drive (Async)
410
+ background_tasks.add_task(drive_service.upload_db)
411
+
412
  return VerifyPaymentResponse(
413
  success=True,
414
  message="Payment successful! Credits added.",
 
428
  @router.post("/webhook/razorpay")
429
  async def razorpay_webhook(
430
  request: Request,
431
+ background_tasks: BackgroundTasks,
432
  db: AsyncSession = Depends(get_db)
433
  ):
434
  """
 
500
  source="webhook",
501
  db=db
502
  )
503
+
504
+ # Sync DB to Drive (Async)
505
+ background_tasks.add_task(drive_service.upload_db)
506
 
507
  # Handle payment failed event
508
  elif event == "payment.failed":
services/gemini_job_worker.py CHANGED
@@ -18,6 +18,8 @@ from services.priority_worker_pool import (
18
  get_interval_for_priority
19
  )
20
  from services.gemini_service import GeminiService
 
 
21
 
22
  logger = logging.getLogger(__name__)
23
 
@@ -40,6 +42,17 @@ def get_priority_for_job_type(job_type: str) -> str:
40
  class GeminiJobProcessor(JobProcessor[GeminiJob]):
41
  """Processes Gemini AI jobs (text, image, video generation) with round-robin API keys."""
42
 
 
 
 
 
 
 
 
 
 
 
 
43
  async def _get_service_with_key(self, session: AsyncSession) -> tuple:
44
  """Get a GeminiService with the least-used API key."""
45
  from services.api_key_manager import get_least_used_key
@@ -182,6 +195,8 @@ class GeminiJobProcessor(JobProcessor[GeminiJob]):
182
  job.error_message = None # Clear any previous error
183
  job.completed_at = datetime.utcnow()
184
  success = True
 
 
185
  else:
186
  job.status = "failed"
187
  job.error_message = "No video URL returned"
@@ -283,6 +298,8 @@ class GeminiJobProcessor(JobProcessor[GeminiJob]):
283
  # Don't save full base64 image to api_response
284
  job.api_response = {"status": "success", "type": "image_edit"}
285
  job.completed_at = datetime.utcnow()
 
 
286
  return job
287
 
288
  async def _process_text(self, job: GeminiJob, service: GeminiService, input_data: dict) -> GeminiJob:
@@ -295,6 +312,8 @@ class GeminiJobProcessor(JobProcessor[GeminiJob]):
295
  job.output_data = {"text": result}
296
  job.api_response = {"result": result}
297
  job.completed_at = datetime.utcnow()
 
 
298
  return job
299
 
300
  async def _process_analyze(self, job: GeminiJob, service: GeminiService, input_data: dict) -> GeminiJob:
@@ -308,6 +327,8 @@ class GeminiJobProcessor(JobProcessor[GeminiJob]):
308
  job.output_data = {"analysis": result}
309
  job.api_response = {"result": result}
310
  job.completed_at = datetime.utcnow()
 
 
311
  return job
312
 
313
  async def _process_animation_prompt(self, job: GeminiJob, service: GeminiService, input_data: dict) -> GeminiJob:
@@ -321,6 +342,8 @@ class GeminiJobProcessor(JobProcessor[GeminiJob]):
321
  job.output_data = {"prompt": result}
322
  job.api_response = {"result": result}
323
  job.completed_at = datetime.utcnow()
 
 
324
  return job
325
 
326
 
 
18
  get_interval_for_priority
19
  )
20
  from services.gemini_service import GeminiService
21
+ from services.drive_service import DriveService
22
+ import asyncio
23
 
24
  logger = logging.getLogger(__name__)
25
 
 
42
  class GeminiJobProcessor(JobProcessor[GeminiJob]):
43
  """Processes Gemini AI jobs (text, image, video generation) with round-robin API keys."""
44
 
45
+ def __init__(self):
46
+ self.drive_service = DriveService()
47
+
48
+ async def _sync_db_to_drive(self):
49
+ """Sync database to Google Drive in a separate thread."""
50
+ try:
51
+ # Run synchronous upload_db in a thread to avoid blocking the event loop
52
+ await asyncio.to_thread(self.drive_service.upload_db)
53
+ except Exception as e:
54
+ logger.error(f"Failed to sync DB to Drive after job completion: {e}")
55
+
56
  async def _get_service_with_key(self, session: AsyncSession) -> tuple:
57
  """Get a GeminiService with the least-used API key."""
58
  from services.api_key_manager import get_least_used_key
 
195
  job.error_message = None # Clear any previous error
196
  job.completed_at = datetime.utcnow()
197
  success = True
198
+ # Sync DB on success
199
+ await self._sync_db_to_drive()
200
  else:
201
  job.status = "failed"
202
  job.error_message = "No video URL returned"
 
298
  # Don't save full base64 image to api_response
299
  job.api_response = {"status": "success", "type": "image_edit"}
300
  job.completed_at = datetime.utcnow()
301
+ # Sync DB on success
302
+ await self._sync_db_to_drive()
303
  return job
304
 
305
  async def _process_text(self, job: GeminiJob, service: GeminiService, input_data: dict) -> GeminiJob:
 
312
  job.output_data = {"text": result}
313
  job.api_response = {"result": result}
314
  job.completed_at = datetime.utcnow()
315
+ # Sync DB on success
316
+ await self._sync_db_to_drive()
317
  return job
318
 
319
  async def _process_analyze(self, job: GeminiJob, service: GeminiService, input_data: dict) -> GeminiJob:
 
327
  job.output_data = {"analysis": result}
328
  job.api_response = {"result": result}
329
  job.completed_at = datetime.utcnow()
330
+ # Sync DB on success
331
+ await self._sync_db_to_drive()
332
  return job
333
 
334
  async def _process_animation_prompt(self, job: GeminiJob, service: GeminiService, input_data: dict) -> GeminiJob:
 
342
  job.output_data = {"prompt": result}
343
  job.api_response = {"result": result}
344
  job.completed_at = datetime.utcnow()
345
+ # Sync DB on success
346
+ await self._sync_db_to_drive()
347
  return job
348
 
349