jebin2 commited on
Commit
be85b16
Β·
1 Parent(s): bc8ed4e
app.py CHANGED
@@ -39,6 +39,17 @@ async def lifespan(app: FastAPI):
39
  register_db_service_config()
40
  logger.info("βœ… DB Service configured")
41
 
 
 
 
 
 
 
 
 
 
 
 
42
  # Register Auth Service configuration
43
  from services.auth_service import register_auth_service
44
  register_auth_service(
@@ -120,7 +131,9 @@ async def lifespan(app: FastAPI):
120
 
121
  # Shutdown: Upload DB to Drive
122
  logger.info("Shutdown: Uploading database to Google Drive...")
123
- await drive_service.upload_db_async()
 
 
124
  logger.info("Shutting down...")
125
 
126
 
 
39
  register_db_service_config()
40
  logger.info("βœ… DB Service configured")
41
 
42
+ # Initialize Backup Service
43
+ from services.backup_service import initialize_backup_service
44
+ # Note: drive_service is already initialized globally, but re-initialized here as per instruction.
45
+ # The global drive_service is used later for download/upload.
46
+ local_drive_service = DriveService()
47
+ backup_service = initialize_backup_service(
48
+ local_drive_service,
49
+ min_interval_seconds=30 # Minimum 30s between backups
50
+ )
51
+ logger.info("βœ… Backup Service initialized")
52
+
53
  # Register Auth Service configuration
54
  from services.auth_service import register_auth_service
55
  register_auth_service(
 
131
 
132
  # Shutdown: Upload DB to Drive
133
  logger.info("Shutdown: Uploading database to Google Drive...")
134
+ from services.backup_service import get_backup_service
135
+ backup_service = get_backup_service()
136
+ await backup_service.backup_async(force=True) # Force backup on shutdown
137
  logger.info("Shutting down...")
138
 
139
 
routers/auth.py CHANGED
@@ -200,7 +200,9 @@ async def google_auth(
200
  access_token = create_access_token(user.user_id, user.email, user.token_version)
201
 
202
  # Sync DB to Drive (Async)
203
- background_tasks.add_task(drive_service.upload_db_async)
 
 
204
 
205
  return AuthResponse(
206
  success=True,
@@ -336,6 +338,8 @@ async def logout(
336
  await db.commit()
337
 
338
  # Sync DB to Drive (Async)
339
- background_tasks.add_task(drive_service.upload_db_async)
 
 
340
 
341
  return {"success": True, "message": "Logged out successfully. All sessions invalidated."}
 
200
  access_token = create_access_token(user.user_id, user.email, user.token_version)
201
 
202
  # Sync DB to Drive (Async)
203
+ from services.backup_service import get_backup_service
204
+ backup_service = get_backup_service()
205
+ background_tasks.add_task(backup_service.backup_async)
206
 
207
  return AuthResponse(
208
  success=True,
 
338
  await db.commit()
339
 
340
  # Sync DB to Drive (Async)
341
+ from services.backup_service import get_backup_service
342
+ backup_service = get_backup_service()
343
+ background_tasks.add_task(backup_service.backup_async)
344
 
345
  return {"success": True, "message": "Logged out successfully. All sessions invalidated."}
routers/payments.py CHANGED
@@ -410,7 +410,9 @@ async def verify_payment(
410
  )
411
 
412
  # Sync DB to Drive (Async)
413
- background_tasks.add_task(drive_service.upload_db_async)
 
 
414
 
415
  return VerifyPaymentResponse(
416
  success=True,
@@ -505,7 +507,9 @@ async def razorpay_webhook(
505
  )
506
 
507
  # Sync DB to Drive (Async)
508
- background_tasks.add_task(drive_service.upload_db_async)
 
 
509
 
510
  # Handle payment failed event
511
  elif event == "payment.failed":
 
410
  )
411
 
412
  # Sync DB to Drive (Async)
413
+ from services.backup_service import get_backup_service
414
+ backup_service = get_backup_service()
415
+ background_tasks.add_task(backup_service.backup_async)
416
 
417
  return VerifyPaymentResponse(
418
  success=True,
 
507
  )
508
 
509
  # Sync DB to Drive (Async)
510
+ from services.backup_service import get_backup_service
511
+ backup_service = get_backup_service()
512
+ background_tasks.add_task(backup_service.backup_async)
513
 
514
  # Handle payment failed event
515
  elif event == "payment.failed":
services/backup_service/USAGE.md ADDED
@@ -0,0 +1,54 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # Example usage in routes and app.py
2
+
3
+ """
4
+ In app.py (startup):
5
+
6
+ from services.drive_service import DriveService
7
+ from services.backup_service import initialize_backup_service
8
+
9
+ drive_service = DriveService()
10
+ backup_service = initialize_backup_service(
11
+ drive_service,
12
+ min_interval_seconds=30 # 30 seconds between backups
13
+ )
14
+
15
+ In routes (e.g., payments.py):
16
+
17
+ from fastapi import BackgroundTasks
18
+ from services.backup_service import get_backup_service
19
+
20
+ @router.post("/verify")
21
+ async def verify_payment(background_tasks: BackgroundTasks, ...):
22
+ # ... payment logic ...
23
+
24
+ # Trigger backup in background (non-blocking)
25
+ backup_service = get_backup_service()
26
+ background_tasks.add_task(backup_service.backup_async)
27
+
28
+ return {"success": True}
29
+
30
+ In app.py (shutdown):
31
+
32
+ from services.backup_service import get_backup_service
33
+
34
+ @app.on_event("shutdown")
35
+ async def shutdown():
36
+ backup_service = get_backup_service()
37
+ # Force backup on shutdown (ignore debouncing)
38
+ await backup_service.backup_async(force=True)
39
+
40
+ How it works:
41
+
42
+ 1. User A pays β†’ backup starts in background
43
+ 2. User B pays (1s later) β†’ sees backup in progress, skips
44
+ 3. User C pays (2s later) β†’ sees backup in progress, skips
45
+ 4. Backup completes (5s)
46
+ 5. User D pays (10s after first) β†’ only 10s elapsed, skips (debounce)
47
+ 6. User E pays (35s after first) β†’ 35s > 30s, new backup starts!
48
+
49
+ Benefits:
50
+ - βœ… No concurrent uploads
51
+ - βœ… Response returns immediately
52
+ - βœ… Minimal Drive API usage
53
+ - βœ… No wasted resources
54
+ """
services/backup_service/__init__.py ADDED
@@ -0,0 +1,169 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ DB Backup Service - Intelligent database backup to Google Drive
3
+
4
+ Features:
5
+ - Async lock to prevent concurrent uploads
6
+ - Debouncing to skip frequent uploads
7
+ - Non-blocking background execution
8
+ - Force option for critical backups (e.g., shutdown)
9
+ """
10
+
11
+ import asyncio
12
+ import logging
13
+ from datetime import datetime, timedelta
14
+ from typing import Optional
15
+
16
+ logger = logging.getLogger(__name__)
17
+
18
+
19
+ class BackupService:
20
+ """
21
+ Intelligent database backup service.
22
+
23
+ Prevents concurrent uploads and excessive backup frequency using:
24
+ - Async lock (one upload at a time)
25
+ - Debouncing (minimum interval between uploads)
26
+ - Non-blocking skip (don't wait if upload in progress)
27
+ """
28
+
29
+ def __init__(
30
+ self,
31
+ drive_service,
32
+ min_interval_seconds: int = 30
33
+ ):
34
+ """
35
+ Initialize backup service.
36
+
37
+ Args:
38
+ drive_service: DriveService instance for actual uploads
39
+ min_interval_seconds: Minimum seconds between uploads (debouncing)
40
+ """
41
+ self.drive_service = drive_service
42
+ self.min_interval = timedelta(seconds=min_interval_seconds)
43
+
44
+ # Async lock to prevent concurrent uploads
45
+ self._upload_lock = asyncio.Lock()
46
+
47
+ # Track last upload time for debouncing
48
+ self._last_upload_time: Optional[datetime] = None
49
+
50
+ logger.info(f"BackupService initialized (min interval: {min_interval_seconds}s)")
51
+
52
+ async def backup_async(self, force: bool = False) -> bool:
53
+ """
54
+ Backup database to Google Drive (async, non-blocking).
55
+
56
+ Args:
57
+ force: If True, skip debouncing and upload immediately
58
+ (useful for critical backups like shutdown)
59
+
60
+ Returns:
61
+ True if backup was performed, False if skipped
62
+ """
63
+ # Non-blocking check: if upload already in progress, skip
64
+ if self._upload_lock.locked():
65
+ logger.info("πŸ“¦ Backup already in progress, skipping this request")
66
+ return False
67
+
68
+ # Debouncing check: skip if uploaded recently (unless forced)
69
+ if not force and self._last_upload_time:
70
+ time_since_last = datetime.utcnow() - self._last_upload_time
71
+ if time_since_last < self.min_interval:
72
+ logger.info(
73
+ f"πŸ“¦ Backup skipped - last backup was {time_since_last.seconds}s ago "
74
+ f"(min interval: {self.min_interval.seconds}s)"
75
+ )
76
+ return False
77
+
78
+ # Try to acquire lock (should succeed since we checked above)
79
+ async with self._upload_lock:
80
+ try:
81
+ # Perform actual upload using DriveService
82
+ loop = asyncio.get_event_loop()
83
+ await loop.run_in_executor(
84
+ None,
85
+ self.drive_service.upload_db
86
+ )
87
+
88
+ # Update last upload time
89
+ self._last_upload_time = datetime.utcnow()
90
+
91
+ logger.info("βœ… Database backup completed successfully")
92
+ return True
93
+
94
+ except Exception as e:
95
+ logger.error(f"❌ Database backup failed: {e}")
96
+ # Don't update last_upload_time on failure
97
+ # This allows retry on next request
98
+ return False
99
+
100
+ def backup_sync(self, force: bool = False) -> bool:
101
+ """
102
+ Synchronous backup (for non-async contexts).
103
+
104
+ Args:
105
+ force: If True, skip debouncing
106
+
107
+ Returns:
108
+ True if backup was performed, False if skipped
109
+ """
110
+ # Check if we should skip (debouncing)
111
+ if not force and self._last_upload_time:
112
+ time_since_last = datetime.utcnow() - self._last_upload_time
113
+ if time_since_last < self.min_interval:
114
+ logger.info(
115
+ f"πŸ“¦ Backup skipped - last backup was {time_since_last.seconds}s ago"
116
+ )
117
+ return False
118
+
119
+ try:
120
+ # Perform upload
121
+ self.drive_service.upload_db()
122
+ self._last_upload_time = datetime.utcnow()
123
+ logger.info("βœ… Database backup completed successfully (sync)")
124
+ return True
125
+ except Exception as e:
126
+ logger.error(f"❌ Database backup failed (sync): {e}")
127
+ return False
128
+
129
+ def get_stats(self) -> dict:
130
+ """Get backup statistics."""
131
+ return {
132
+ "last_backup_time": self._last_upload_time.isoformat() if self._last_upload_time else None,
133
+ "upload_in_progress": self._upload_lock.locked(),
134
+ "min_interval_seconds": self.min_interval.seconds,
135
+ }
136
+
137
+
138
+ # Global instance (initialized in app startup)
139
+ _backup_service: Optional[BackupService] = None
140
+
141
+
142
+ def get_backup_service() -> BackupService:
143
+ """Get the global backup service instance."""
144
+ if _backup_service is None:
145
+ raise RuntimeError("BackupService not initialized. Call initialize_backup_service() first.")
146
+ return _backup_service
147
+
148
+
149
+ def initialize_backup_service(drive_service, min_interval_seconds: int = 30) -> BackupService:
150
+ """
151
+ Initialize the global backup service instance.
152
+
153
+ Args:
154
+ drive_service: DriveService instance
155
+ min_interval_seconds: Minimum seconds between backups
156
+
157
+ Returns:
158
+ BackupService instance
159
+ """
160
+ global _backup_service
161
+ _backup_service = BackupService(drive_service, min_interval_seconds)
162
+ return _backup_service
163
+
164
+
165
+ __all__ = [
166
+ 'BackupService',
167
+ 'get_backup_service',
168
+ 'initialize_backup_service',
169
+ ]
services/gemini_service/job_processor.py CHANGED
@@ -188,7 +188,9 @@ class GeminiJobProcessor(JobProcessor[GeminiJob]):
188
  job.completed_at = datetime.utcnow()
189
  success = True
190
  # Sync DB on success
191
- await self.drive_service.upload_db_async()
 
 
192
  else:
193
  job.status = "failed"
194
  job.error_message = "No video URL returned"
@@ -291,7 +293,9 @@ class GeminiJobProcessor(JobProcessor[GeminiJob]):
291
  job.api_response = {"status": "success", "type": "image_edit"}
292
  job.completed_at = datetime.utcnow()
293
  # Sync DB on success
294
- await self.drive_service.upload_db_async()
 
 
295
  return job
296
 
297
  async def _process_text(self, job: GeminiJob, service: GeminiService, input_data: dict) -> GeminiJob:
@@ -305,7 +309,9 @@ class GeminiJobProcessor(JobProcessor[GeminiJob]):
305
  job.api_response = {"result": result}
306
  job.completed_at = datetime.utcnow()
307
  # Sync DB on success
308
- await self.drive_service.upload_db_async()
 
 
309
  return job
310
 
311
  async def _process_analyze(self, job: GeminiJob, service: GeminiService, input_data: dict) -> GeminiJob:
@@ -320,7 +326,9 @@ class GeminiJobProcessor(JobProcessor[GeminiJob]):
320
  job.api_response = {"result": result}
321
  job.completed_at = datetime.utcnow()
322
  # Sync DB on success
323
- await self.drive_service.upload_db_async()
 
 
324
  return job
325
 
326
  async def _process_animation_prompt(self, job: GeminiJob, service: GeminiService, input_data: dict) -> GeminiJob:
@@ -335,7 +343,9 @@ class GeminiJobProcessor(JobProcessor[GeminiJob]):
335
  job.api_response = {"result": result}
336
  job.completed_at = datetime.utcnow()
337
  # Sync DB on success
338
- await self.drive_service.upload_db_async()
 
 
339
  return job
340
 
341
 
 
188
  job.completed_at = datetime.utcnow()
189
  success = True
190
  # Sync DB on success
191
+ from services.backup_service import get_backup_service
192
+ backup_service = get_backup_service()
193
+ await backup_service.backup_async()
194
  else:
195
  job.status = "failed"
196
  job.error_message = "No video URL returned"
 
293
  job.api_response = {"status": "success", "type": "image_edit"}
294
  job.completed_at = datetime.utcnow()
295
  # Sync DB on success
296
+ from services.backup_service import get_backup_service
297
+ backup_service = get_backup_service()
298
+ await backup_service.backup_async()
299
  return job
300
 
301
  async def _process_text(self, job: GeminiJob, service: GeminiService, input_data: dict) -> GeminiJob:
 
309
  job.api_response = {"result": result}
310
  job.completed_at = datetime.utcnow()
311
  # Sync DB on success
312
+ from services.backup_service import get_backup_service
313
+ backup_service = get_backup_service()
314
+ await backup_service.backup_async()
315
  return job
316
 
317
  async def _process_analyze(self, job: GeminiJob, service: GeminiService, input_data: dict) -> GeminiJob:
 
326
  job.api_response = {"result": result}
327
  job.completed_at = datetime.utcnow()
328
  # Sync DB on success
329
+ from services.backup_service import get_backup_service
330
+ backup_service = get_backup_service()
331
+ await backup_service.backup_async()
332
  return job
333
 
334
  async def _process_animation_prompt(self, job: GeminiJob, service: GeminiService, input_data: dict) -> GeminiJob:
 
343
  job.api_response = {"result": result}
344
  job.completed_at = datetime.utcnow()
345
  # Sync DB on success
346
+ from services.backup_service import get_backup_service
347
+ backup_service = get_backup_service()
348
+ await backup_service.backup_async()
349
  return job
350
 
351