jebin2 commited on
Commit
fc638eb
·
1 Parent(s): 8c4055f

credit system

Browse files
core/models.py CHANGED
@@ -125,6 +125,10 @@ class GeminiJob(Base):
125
  created_at = Column(DateTime(timezone=True), server_default=func.now())
126
  started_at = Column(DateTime(timezone=True), nullable=True)
127
  completed_at = Column(DateTime(timezone=True), nullable=True)
 
 
 
 
128
 
129
  def __repr__(self):
130
  return f"<GeminiJob(job_id={self.job_id}, type={self.job_type}, status={self.status}, priority={self.priority})>"
 
125
  created_at = Column(DateTime(timezone=True), server_default=func.now())
126
  started_at = Column(DateTime(timezone=True), nullable=True)
127
  completed_at = Column(DateTime(timezone=True), nullable=True)
128
+
129
+ # Credit tracking for reservation pattern
130
+ credits_reserved = Column(Integer, default=0) # Credits reserved for this job
131
+ credits_refunded = Column(Boolean, default=False) # Whether credits were refunded
132
 
133
  def __repr__(self):
134
  return f"<GeminiJob(job_id={self.job_id}, type={self.job_type}, status={self.status}, priority={self.priority})>"
routers/gemini.py CHANGED
@@ -86,7 +86,8 @@ async def create_job(
86
  job_type=job_type,
87
  status="queued",
88
  priority=priority,
89
- input_data=input_data
 
90
  )
91
  db.add(job)
92
  await db.commit()
 
86
  job_type=job_type,
87
  status="queued",
88
  priority=priority,
89
+ input_data=input_data,
90
+ credits_reserved=1 # Track that 1 credit is reserved for this job
91
  )
92
  db.add(job)
93
  await db.commit()
services/credit_service.py ADDED
@@ -0,0 +1,257 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Credit Service - Manages credit reservation, confirmation, and refunding.
3
+
4
+ Implements the Credit Reservation Pattern:
5
+ 1. Reserve credits when job is created (deduct from user, track in job)
6
+ 2. Confirm credits only on successful completion
7
+ 3. Refund credits on refundable errors (server-side issues)
8
+ 4. Keep credits on non-refundable errors (user-caused issues)
9
+ """
10
+ import logging
11
+ from typing import Optional
12
+ from sqlalchemy.ext.asyncio import AsyncSession
13
+ from sqlalchemy import select
14
+
15
+ logger = logging.getLogger(__name__)
16
+
17
+
18
+ # =============================================================================
19
+ # Error Categories for Refund Decisions
20
+ # =============================================================================
21
+
22
+ # Refundable errors - User gets credits back (server/API issues)
23
+ REFUNDABLE_ERROR_PATTERNS = [
24
+ "API_KEY_INVALID",
25
+ "QUOTA_EXCEEDED",
26
+ "INTERNAL_ERROR",
27
+ "CONNECTION_FAILED",
28
+ "SERVER_SHUTDOWN",
29
+ "TIMEOUT",
30
+ "Server Authentication Error",
31
+ "Network error",
32
+ "Connection refused",
33
+ "Connection reset",
34
+ "Service unavailable",
35
+ "503",
36
+ "500",
37
+ "429", # Rate limit (our quota, not user's fault)
38
+ ]
39
+
40
+ # Non-refundable error patterns - User's input/content issue
41
+ NON_REFUNDABLE_ERROR_PATTERNS = [
42
+ "safety",
43
+ "blocked",
44
+ "SAFETY_FILTER",
45
+ "INVALID_INPUT",
46
+ "Invalid image",
47
+ "Bad request",
48
+ "400",
49
+ "cancelled",
50
+ "User cancelled",
51
+ ]
52
+
53
+
54
+ def is_refundable_error(error_message: Optional[str]) -> bool:
55
+ """
56
+ Determine if an error should result in a credit refund.
57
+
58
+ Args:
59
+ error_message: The error message from the failed job
60
+
61
+ Returns:
62
+ True if the error is refundable (server/API issue)
63
+ False if non-refundable (user's fault) or no error message
64
+ """
65
+ if not error_message:
66
+ return False
67
+
68
+ error_lower = error_message.lower()
69
+
70
+ # Check for REFUNDABLE patterns FIRST (specific server errors take precedence)
71
+ # This ensures API_KEY_INVALID is caught before generic "400" matcher
72
+ for pattern in REFUNDABLE_ERROR_PATTERNS:
73
+ if pattern.lower() in error_lower:
74
+ logger.debug(f"Error matched refundable pattern '{pattern}': {error_message[:100]}")
75
+ return True
76
+
77
+ # Check for non-refundable patterns (user-caused issues)
78
+ for pattern in NON_REFUNDABLE_ERROR_PATTERNS:
79
+ if pattern.lower() in error_lower:
80
+ logger.debug(f"Error matched non-refundable pattern '{pattern}': {error_message[:100]}")
81
+ return False
82
+
83
+ # Default: Max retries exceeded is refundable (we consumed API resources trying)
84
+ if "max retries" in error_lower:
85
+ return True
86
+
87
+ # Default: Unknown errors are NOT refundable to prevent abuse
88
+ # If it's an unknown error, it's more likely user-caused
89
+ logger.debug(f"Unknown error (not refundable): {error_message[:100]}")
90
+ return False
91
+
92
+
93
+ async def reserve_credit(session: AsyncSession, user, amount: int = 1) -> bool:
94
+ """
95
+ Reserve credits for a job (deduct from user's balance).
96
+
97
+ The credits are deducted but tracked in the job's credits_reserved field.
98
+ If the job fails with a refundable error, they can be restored.
99
+
100
+ Args:
101
+ session: Database session
102
+ user: User model instance
103
+ amount: Number of credits to reserve (default: 1)
104
+
105
+ Returns:
106
+ True if credits were successfully reserved
107
+ False if user has insufficient credits
108
+ """
109
+ if user.credits < amount:
110
+ logger.warning(f"User {user.user_id} has insufficient credits ({user.credits}) to reserve {amount}")
111
+ return False
112
+
113
+ user.credits -= amount
114
+ logger.info(f"Reserved {amount} credit(s) for user {user.user_id}. Remaining: {user.credits}")
115
+ # Note: Don't commit here - let caller handle transaction
116
+ return True
117
+
118
+
119
+ async def confirm_credit(session: AsyncSession, job) -> None:
120
+ """
121
+ Confirm that credits were legitimately used for a completed job.
122
+
123
+ This is called when a job completes successfully. The credits stay
124
+ deducted (they were already deducted during reservation).
125
+
126
+ Args:
127
+ session: Database session
128
+ job: GeminiJob model instance
129
+ """
130
+ if job.credits_reserved > 0:
131
+ # Credits were used - clear the reservation tracking
132
+ credits_used = job.credits_reserved
133
+ job.credits_reserved = 0
134
+ logger.info(f"Confirmed {credits_used} credit(s) used for job {job.job_id}")
135
+ # Note: Don't commit here - let caller handle transaction
136
+
137
+
138
+ async def refund_credit(session: AsyncSession, job, reason: str) -> bool:
139
+ """
140
+ Refund reserved credits back to the user.
141
+
142
+ Called when a job fails due to a refundable error (server-side issue).
143
+
144
+ Args:
145
+ session: Database session
146
+ job: GeminiJob model instance
147
+ reason: Reason for the refund (for logging)
148
+
149
+ Returns:
150
+ True if credits were refunded
151
+ False if no credits to refund or already refunded
152
+ """
153
+ if job.credits_reserved <= 0:
154
+ logger.debug(f"Job {job.job_id} has no credits to refund")
155
+ return False
156
+
157
+ if job.credits_refunded:
158
+ logger.warning(f"Job {job.job_id} was already refunded")
159
+ return False
160
+
161
+ # Get the user to restore credits
162
+ from core.models import User
163
+
164
+ result = await session.execute(
165
+ select(User).where(User.user_id == job.user_id)
166
+ )
167
+ user = result.scalar_one_or_none()
168
+
169
+ if not user:
170
+ logger.error(f"Cannot refund job {job.job_id}: User {job.user_id} not found")
171
+ return False
172
+
173
+ # Restore credits
174
+ credits_to_refund = job.credits_reserved
175
+ user.credits += credits_to_refund
176
+ job.credits_reserved = 0
177
+ job.credits_refunded = True
178
+
179
+ logger.info(
180
+ f"Refunded {credits_to_refund} credit(s) to user {user.user_id} for job {job.job_id}. "
181
+ f"Reason: {reason[:100]}. New balance: {user.credits}"
182
+ )
183
+
184
+ # Note: Don't commit here - let caller handle transaction
185
+ return True
186
+
187
+
188
+ async def handle_job_completion(session: AsyncSession, job) -> None:
189
+ """
190
+ Handle credit finalization when a job completes or fails.
191
+
192
+ This is the main entry point called by the job worker.
193
+
194
+ Args:
195
+ session: Database session
196
+ job: GeminiJob model instance with final status
197
+ """
198
+ if job.status == "completed":
199
+ # Success - confirm credits were used
200
+ await confirm_credit(session, job)
201
+
202
+ elif job.status == "failed":
203
+ # Failure - check if refundable
204
+ if is_refundable_error(job.error_message):
205
+ await refund_credit(session, job, job.error_message or "Unknown error")
206
+ else:
207
+ # Non-refundable - confirm credits were used (user's fault)
208
+ await confirm_credit(session, job)
209
+ logger.info(f"Job {job.job_id} failed with non-refundable error, credits kept")
210
+
211
+ elif job.status == "cancelled":
212
+ # Cancelled jobs get refunds only if they were never started
213
+ if job.started_at is None:
214
+ await refund_credit(session, job, "Job cancelled before processing")
215
+ else:
216
+ # Was processing - keep credits (API may have been consumed)
217
+ await confirm_credit(session, job)
218
+ logger.info(f"Job {job.job_id} cancelled during processing, credits kept")
219
+
220
+
221
+ async def refund_orphaned_jobs(session: AsyncSession) -> int:
222
+ """
223
+ Refund credits for jobs that were abandoned due to server shutdown.
224
+
225
+ Called during graceful shutdown to ensure no credits are lost.
226
+
227
+ Args:
228
+ session: Database session
229
+
230
+ Returns:
231
+ Number of jobs that were refunded
232
+ """
233
+ from core.models import GeminiJob
234
+
235
+ # Find jobs that are still processing with reserved credits
236
+ result = await session.execute(
237
+ select(GeminiJob).where(
238
+ GeminiJob.status == "processing",
239
+ GeminiJob.credits_reserved > 0,
240
+ GeminiJob.credits_refunded == False
241
+ )
242
+ )
243
+ orphaned_jobs = result.scalars().all()
244
+
245
+ refund_count = 0
246
+ for job in orphaned_jobs:
247
+ if await refund_credit(session, job, "SERVER_SHUTDOWN: Job orphaned during server shutdown"):
248
+ # Mark job as failed
249
+ job.status = "failed"
250
+ job.error_message = "Server shutdown during processing. Credits refunded."
251
+ refund_count += 1
252
+
253
+ if refund_count > 0:
254
+ await session.commit()
255
+ logger.info(f"Refunded {refund_count} orphaned job(s) during shutdown")
256
+
257
+ return refund_count
services/priority_worker_pool.py CHANGED
@@ -319,7 +319,26 @@ class PriorityWorker(Generic[JobType]):
319
  job.error_message = f"Max retries ({self.max_retries}) exceeded"
320
  job.completed_at = datetime.utcnow()
321
 
 
 
 
 
322
  await session.commit()
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
323
 
324
 
325
  class PriorityWorkerPool(Generic[JobType]):
@@ -438,11 +457,28 @@ class PriorityWorkerPool(Generic[JobType]):
438
  logger.debug(f"Notified {priority} workers of new job")
439
 
440
  async def stop(self):
441
- """Stop all workers."""
442
  self._running = False
 
 
 
 
443
  for worker in self.workers:
444
  await worker.stop()
445
  logger.info("PriorityWorkerPool stopped")
 
 
 
 
 
 
 
 
 
 
 
 
 
446
 
447
 
448
  # Convenience functions for priority mapping
 
319
  job.error_message = f"Max retries ({self.max_retries}) exceeded"
320
  job.completed_at = datetime.utcnow()
321
 
322
+ # Handle credit finalization for jobs with reserved credits
323
+ if job.status in ("completed", "failed", "cancelled"):
324
+ await self._handle_job_credits(session, job)
325
+
326
  await session.commit()
327
+
328
+ async def _handle_job_credits(self, session: AsyncSession, job: JobType):
329
+ """Handle credit finalization when job reaches terminal state."""
330
+ # Check if job has credits_reserved attribute (credit-enabled jobs)
331
+ if not hasattr(job, 'credits_reserved') or job.credits_reserved <= 0:
332
+ return
333
+
334
+ try:
335
+ from services.credit_service import handle_job_completion
336
+ await handle_job_completion(session, job)
337
+ except ImportError:
338
+ # Credit service not available - skip
339
+ logger.debug(f"Credit service not available for job {job.job_id}")
340
+ except Exception as e:
341
+ logger.error(f"Error handling credits for job {job.job_id}: {e}")
342
 
343
 
344
  class PriorityWorkerPool(Generic[JobType]):
 
457
  logger.debug(f"Notified {priority} workers of new job")
458
 
459
  async def stop(self):
460
+ """Stop all workers and refund orphaned jobs."""
461
  self._running = False
462
+
463
+ # Refund credits for any jobs that were processing when server stopped
464
+ await self._refund_orphaned_jobs()
465
+
466
  for worker in self.workers:
467
  await worker.stop()
468
  logger.info("PriorityWorkerPool stopped")
469
+
470
+ async def _refund_orphaned_jobs(self):
471
+ """Refund credits for jobs abandoned during shutdown."""
472
+ try:
473
+ from services.credit_service import refund_orphaned_jobs
474
+ async with self.session_maker() as session:
475
+ refund_count = await refund_orphaned_jobs(session)
476
+ if refund_count > 0:
477
+ logger.info(f"Shutdown: Refunded {refund_count} orphaned job(s)")
478
+ except ImportError:
479
+ logger.debug("Credit service not available for orphaned job refunds")
480
+ except Exception as e:
481
+ logger.error(f"Error refunding orphaned jobs during shutdown: {e}")
482
 
483
 
484
  # Convenience functions for priority mapping