jebin2 commited on
Commit
a650e63
·
1 Parent(s): da9494d

credi layer

Browse files
CREDIT_SERVICE_EXAMPLE.py ADDED
@@ -0,0 +1,181 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Example app.py configuration for Credit Service
3
+
4
+ This shows how to configure the new middleware-based credit system.
5
+ """
6
+
7
+ # ============================================================================
8
+ # Credit Service Registration
9
+ # ============================================================================
10
+
11
+ from services.credit_service import CreditServiceConfig
12
+
13
+ # Register credit service with route configurations
14
+ CreditServiceConfig.register(
15
+ route_configs={
16
+ # Synchronous operations - credits confirmed/refunded immediately
17
+ "/gemini/generate-animation-prompt": {
18
+ "cost": 1,
19
+ "type": "sync"
20
+ },
21
+ "/gemini/edit-image": {
22
+ "cost": 1,
23
+ "type": "sync"
24
+ },
25
+ "/gemini/generate-text": {
26
+ "cost": 1,
27
+ "type": "sync"
28
+ },
29
+ "/gemini/analyze-image": {
30
+ "cost": 1,
31
+ "type": "sync"
32
+ },
33
+
34
+ # Asynchronous operations - credits kept reserved until job completes
35
+ "/gemini/generate-video": {
36
+ "cost": 10,
37
+ "type": "async",
38
+ "status_endpoint": "/gemini/job/{job_id}" # Optional metadata
39
+ },
40
+
41
+ # Status check endpoints - no additional cost, but can trigger confirm/refund
42
+ "/gemini/job/{job_id}": {
43
+ "cost": 0,
44
+ "type": "async"
45
+ }
46
+ }
47
+ )
48
+
49
+ # ============================================================================
50
+ # Middleware Chain (IMPORTANT: Order matters!)
51
+ # ============================================================================
52
+
53
+ # 1. AuthMiddleware MUST come first (sets request.state.user)
54
+ app.add_middleware(AuthServiceConfig.get_middleware())
55
+
56
+ # 2. CreditMiddleware comes after auth (needs request.state.user)
57
+ app.add_middleware(CreditServiceConfig.get_middleware())
58
+
59
+ # 3. Other middleware can follow
60
+ # app.add_middleware(...)
61
+
62
+ # ============================================================================
63
+ # How It Works - Examples
64
+ # ============================================================================
65
+
66
+ """
67
+ SYNCHRONOUS OPERATION EXAMPLE:
68
+ ==============================
69
+
70
+ 1. User requests: POST /gemini/analyze-image
71
+ → Middleware: Reserve 1 credit
72
+ → Application: Process image analysis
73
+ → Application: Return 200 + analysis result
74
+ → Middleware: Confirm credit (success)
75
+
76
+ 2. User requests: POST /gemini/analyze-image
77
+ → Middleware: Reserve 1 credit
78
+ → Application: Image validation fails
79
+ → Application: Return 400 + error
80
+ → Middleware: Refund credit (request error)
81
+
82
+
83
+ ASYNCHRONOUS OPERATION EXAMPLE:
84
+ ===============================
85
+
86
+ 1. User requests: POST /gemini/generate-video
87
+ → Middleware: Reserve 10 credits
88
+ → Application: Create job, return job_id
89
+ → Application: Return 200 + {"job_id": "abc123", "status": "queued"}
90
+ → Middleware: Keep credits reserved (job pending)
91
+
92
+ 2. User polls: GET /gemini/job/abc123
93
+ → Middleware: No additional cost (cost=0)
94
+ → Application: Fetch job status
95
+ → Application: Return 200 + {"job_id": "abc123", "status": "processing"}
96
+ → Middleware: Keep credits reserved (still processing)
97
+
98
+ 3. User polls: GET /gemini/job/abc123
99
+ → Middleware: No additional cost
100
+ → Application: Fetch job status
101
+ → Application: Return 200 + {"job_id": "abc123", "status": "completed", "video_url": "..."}
102
+ → Middleware: Confirm 10 credits (job completed)
103
+
104
+ 4. Alternative - Job fails with refundable error:
105
+ GET /gemini/job/abc123
106
+ → Application: Return 200 + {"status": "failed", "error_message": "API_KEY_INVALID"}
107
+ → Middleware: Refund 10 credits (refundable server error)
108
+
109
+ 5. Alternative - Job fails with user error:
110
+ GET /gemini/job/abc123
111
+ → Application: Return 200 + {"status": "failed", "error_message": "safety filter"}
112
+ → Middleware: Confirm 10 credits (non-refundable user error)
113
+
114
+
115
+ TRANSACTION HISTORY:
116
+ ===================
117
+
118
+ All operations are recorded in credit_transactions table:
119
+
120
+ | transaction_id | type | amount | balance_before | balance_after | reason |
121
+ |----------------|---------|--------|----------------|---------------|------------------------------|
122
+ | ctx_abc123 | reserve | -10 | 100 | 90 | POST /gemini/generate-video |
123
+ | cfm_def456 | confirm | 0 | 90 | 90 | Confirmed usage for ctx_abc |
124
+
125
+ User can query their history via: GET /credits/transactions
126
+ """
127
+
128
+ # ============================================================================
129
+ # Payment Integration (Still uses transaction manager directly)
130
+ # ============================================================================
131
+
132
+ """
133
+ In routers/payments.py, use CreditTransactionManager directly:
134
+
135
+ from services.credit_service import CreditTransactionManager
136
+
137
+ async def process_successful_payment(...):
138
+ await CreditTransactionManager.add_credits(
139
+ session=db,
140
+ user=user,
141
+ amount=credits_purchased,
142
+ source="payment",
143
+ reference_type="payment",
144
+ reference_id=transaction_id,
145
+ reason=f"Purchase: {package_id}",
146
+ metadata={"gateway": "razorpay", ...}
147
+ )
148
+ """
149
+
150
+ # ============================================================================
151
+ # Application Layer Changes
152
+ # ============================================================================
153
+
154
+ """
155
+ NO CHANGES NEEDED in application routers!
156
+
157
+ Before (old approach):
158
+ @router.post("/gemini/analyze-image")
159
+ async def analyze_image(
160
+ request: Request,
161
+ user: User = Depends(verify_credits) # ← Remove this
162
+ ):
163
+ # Had to manually handle credits
164
+ pass
165
+
166
+ After (new approach):
167
+ @router.post("/gemini/analyze-image")
168
+ async def analyze_image(request: Request):
169
+ user = request.state.user # Set by AuthMiddleware
170
+ # Credits automatically handled by middleware!
171
+ # Just focus on business logic
172
+ pass
173
+
174
+ Old dependencies to REMOVE:
175
+ - Depends(verify_credits)
176
+ - Depends(verify_video_credits)
177
+ - Manual credit deductions: user.credits -= X
178
+ - Manual refunds in endpoints
179
+
180
+ Everything is now handled automatically by middleware!
181
+ """
add_credit_transactions_table.py ADDED
@@ -0,0 +1,120 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Database migration script to add credit_transactions table.
3
+
4
+ Run this script to add the credit transaction tracking table to your database.
5
+
6
+ Usage:
7
+ python add_credit_transactions_table.py
8
+ """
9
+ import sqlite3
10
+ import sys
11
+ from pathlib import Path
12
+
13
+ # Find database file
14
+ DB_FILE = "apigateway_dev.db"
15
+ if not Path(DB_FILE).exists():
16
+ DB_FILE = "apigateway_development.db"
17
+ if not Path(DB_FILE).exists():
18
+ print(f"Error: Database file not found. Looking for 'apigateway_dev.db' or 'apigateway_development.db'")
19
+ sys.exit(1)
20
+
21
+ print(f"Using database: {DB_FILE}")
22
+
23
+ # SQL to create credit_transactions table
24
+ CREATE_TABLE_SQL = """
25
+ CREATE TABLE IF NOT EXISTS credit_transactions (
26
+ id INTEGER PRIMARY KEY AUTOINCREMENT,
27
+ transaction_id VARCHAR(100) UNIQUE NOT NULL,
28
+ user_id INTEGER NOT NULL,
29
+
30
+ -- Transaction details
31
+ transaction_type VARCHAR(20) NOT NULL,
32
+ amount INTEGER NOT NULL,
33
+ balance_before INTEGER NOT NULL,
34
+ balance_after INTEGER NOT NULL,
35
+
36
+ -- Context
37
+ source VARCHAR(50) NOT NULL,
38
+ reference_type VARCHAR(30),
39
+ reference_id VARCHAR(100),
40
+
41
+ -- Request/Response metadata
42
+ request_path VARCHAR(500),
43
+ request_method VARCHAR(10),
44
+ response_status INTEGER,
45
+
46
+ -- Additional details
47
+ reason TEXT,
48
+ metadata JSON,
49
+
50
+ -- Timestamps
51
+ created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
52
+ deleted_at DATETIME,
53
+
54
+ FOREIGN KEY (user_id) REFERENCES users(id)
55
+ );
56
+ """
57
+
58
+ # SQL to create indexes
59
+ CREATE_INDEXES_SQL = [
60
+ "CREATE INDEX IF NOT EXISTS ix_credit_transactions_transaction_id ON credit_transactions(transaction_id);",
61
+ "CREATE INDEX IF NOT EXISTS ix_user_transactions ON credit_transactions(user_id, created_at);",
62
+ "CREATE INDEX IF NOT EXISTS ix_transaction_type ON credit_transactions(transaction_type);",
63
+ "CREATE INDEX IF NOT EXISTS ix_reference ON credit_transactions(reference_type, reference_id);",
64
+ "CREATE INDEX IF NOT EXISTS ix_credit_transactions_created_at ON credit_transactions(created_at);",
65
+ "CREATE INDEX IF NOT EXISTS ix_credit_transactions_deleted_at ON credit_transactions(deleted_at);",
66
+ ]
67
+
68
+ def run_migration():
69
+ """Run the migration to add credit_transactions table."""
70
+ try:
71
+ conn = sqlite3.connect(DB_FILE)
72
+ cursor = conn.cursor()
73
+
74
+ # Check if table already exists
75
+ cursor.execute("""
76
+ SELECT name FROM sqlite_master
77
+ WHERE type='table' AND name='credit_transactions';
78
+ """)
79
+
80
+ if cursor.fetchone():
81
+ print("✅ Table 'credit_transactions' already exists. Skipping creation.")
82
+ else:
83
+ print("Creating 'credit_transactions' table...")
84
+ cursor.execute(CREATE_TABLE_SQL)
85
+ print("✅ Table created successfully!")
86
+
87
+ # Create indexes
88
+ print("\nCreating indexes...")
89
+ for index_sql in CREATE_INDEXES_SQL:
90
+ cursor.execute(index_sql)
91
+ print("✅ Indexes created successfully!")
92
+
93
+ # Commit changes
94
+ conn.commit()
95
+
96
+ # Verify table
97
+ cursor.execute("PRAGMA table_info(credit_transactions);")
98
+ columns = cursor.fetchall()
99
+
100
+ print(f"\nTable schema (columns: {len(columns)}):")
101
+ for col in columns:
102
+ print(f" - {col[1]} ({col[2]})")
103
+
104
+ conn.close()
105
+
106
+ print("\n✅ Migration completed successfully!")
107
+ print("\nNext steps:")
108
+ print(" 1. Update app.py to register credit service with route configs")
109
+ print(" 2. The middleware will automatically track all credit operations")
110
+ print(" 3. Check transaction history via /credits/transactions endpoint")
111
+
112
+ except Exception as e:
113
+ print(f"\n❌ Migration failed: {e}")
114
+ sys.exit(1)
115
+
116
+ if __name__ == "__main__":
117
+ print("="*60)
118
+ print("Credit Transactions Table Migration")
119
+ print("="*60)
120
+ run_migration()
app.py CHANGED
@@ -84,14 +84,36 @@ async def lifespan(app: FastAPI):
84
  logger.info("✅ Auth Service configured")
85
 
86
  # Register Credit Service configuration
87
- from services.credit_service import register_credit_service
88
- register_credit_service(
89
- route_costs={
90
- "/gemini/generate-animation-prompt": 1,
91
- "/gemini/edit-image": 1,
92
- "/gemini/generate-video": 10,
93
- "/gemini/generate-text": 1,
94
- "/gemini/analyze-image": 1,
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
95
  }
96
  )
97
  logger.info("✅ Credit Service configured")
 
84
  logger.info("✅ Auth Service configured")
85
 
86
  # Register Credit Service configuration
87
+ from services.credit_service import CreditServiceConfig
88
+ CreditServiceConfig.register(
89
+ route_configs={
90
+ # Synchronous operations - credits confirmed/refunded immediately
91
+ "/gemini/generate-animation-prompt": {
92
+ "cost": 1,
93
+ "type": "sync"
94
+ },
95
+ "/gemini/edit-image": {
96
+ "cost": 1,
97
+ "type": "sync"
98
+ },
99
+ "/gemini/generate-text": {
100
+ "cost": 1,
101
+ "type": "sync"
102
+ },
103
+ "/gemini/analyze-image": {
104
+ "cost": 1,
105
+ "type": "sync"
106
+ },
107
+ # Asynchronous operations - credits reserved until job completes
108
+ "/gemini/generate-video": {
109
+ "cost": 10,
110
+ "type": "async"
111
+ },
112
+ # Status check endpoints - inspect response for job completion
113
+ "/gemini/job/{job_id}": {
114
+ "cost": 0, # No additional cost for status checks
115
+ "type": "async"
116
+ }
117
  }
118
  )
119
  logger.info("✅ Credit Service configured")
core/models.py CHANGED
@@ -297,3 +297,65 @@ class ApiKeyUsage(Base):
297
 
298
  def __repr__(self):
299
  return f"<ApiKeyUsage(index={self.key_index}, total={self.total_requests}, success={self.success_count}, failed={self.failure_count})>"
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
297
 
298
  def __repr__(self):
299
  return f"<ApiKeyUsage(index={self.key_index}, total={self.total_requests}, success={self.success_count}, failed={self.failure_count})>"
300
+
301
+
302
+ # =============================================================================
303
+ # Credit Transactions
304
+ # =============================================================================
305
+
306
+ class CreditTransaction(Base):
307
+ """
308
+ Complete audit trail for all credit operations.
309
+
310
+ All credit changes (reserve, refund, confirm, purchase) are recorded here.
311
+ Enables balance verification, transaction history, and debugging.
312
+
313
+ Transaction types:
314
+ - reserve: Credits deducted when request arrives (negative amount)
315
+ - refund: Credits returned due to failure (positive amount)
316
+ - confirm: Credits confirmed as used (amount = 0, marks finalization)
317
+ - purchase: Credits added via payment (positive amount)
318
+ - adjustment: Manual admin adjustment (positive or negative)
319
+ """
320
+ __tablename__ = "credit_transactions"
321
+
322
+ # Composite indexes for efficient queries
323
+ __table_args__ = (
324
+ Index('ix_user_transactions', 'user_id', 'created_at'),
325
+ Index('ix_transaction_type', 'transaction_type'),
326
+ Index('ix_reference', 'reference_type', 'reference_id'),
327
+ )
328
+
329
+ id = Column(Integer, primary_key=True, autoincrement=True)
330
+ transaction_id = Column(String(100), unique=True, index=True, nullable=False) # ctx_<uuid>, ref_<uuid>, etc.
331
+ user_id = Column(Integer, ForeignKey("users.id"), nullable=False, index=True) # FK to users.id
332
+
333
+ # Transaction details
334
+ transaction_type = Column(String(20), nullable=False) # reserve, refund, confirm, purchase, adjustment
335
+ amount = Column(Integer, nullable=False) # Positive for credits added, negative for removed
336
+ balance_before = Column(Integer, nullable=False) # User balance before transaction
337
+ balance_after = Column(Integer, nullable=False) # User balance after transaction
338
+
339
+ # Context
340
+ source = Column(String(50), nullable=False) # middleware, payment, job_completion, manual, admin
341
+ reference_type = Column(String(30), nullable=True) # job, payment, request, NULL
342
+ reference_id = Column(String(100), nullable=True) # job_id, transaction_id, request endpoint, etc.
343
+
344
+ # Request/Response metadata (for middleware transactions)
345
+ request_path = Column(String(500), nullable=True)
346
+ request_method = Column(String(10), nullable=True)
347
+ response_status = Column(Integer, nullable=True)
348
+
349
+ # Additional details
350
+ reason = Column(Text, nullable=True) # Human-readable reason
351
+ metadata = Column(JSON, nullable=True) # Additional context (endpoint_type, error_message, etc.)
352
+
353
+ # Timestamps
354
+ created_at = Column(DateTime(timezone=True), server_default=func.now(), index=True)
355
+ deleted_at = Column(DateTime(timezone=True), nullable=True, index=True) # Soft delete
356
+
357
+ # Relationship
358
+ user = relationship("User", backref="credit_transactions")
359
+
360
+ def __repr__(self):
361
+ return f"<CreditTransaction(id={self.transaction_id}, type={self.transaction_type}, amount={self.amount}, user={self.user_id})>"
routers/credits.py CHANGED
@@ -122,3 +122,169 @@ async def get_credit_history(
122
  page=page,
123
  limit=limit
124
  )
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
122
  page=page,
123
  limit=limit
124
  )
125
+
126
+
127
+ # =============================================================================
128
+ # New Transaction History Endpoints
129
+ # =============================================================================
130
+
131
+ class CreditTransactionItem(BaseModel):
132
+ """Single credit transaction record."""
133
+ transaction_id: str
134
+ transaction_type: str # reserve, refund, confirm, purchase
135
+ amount: int
136
+ balance_before: int
137
+ balance_after: int
138
+ source: str
139
+ reference_type: Optional[str] = None
140
+ reference_id: Optional[str] = None
141
+ request_path: Optional[str] = None
142
+ request_method: Optional[str] = None
143
+ response_status: Optional[int] = None
144
+ reason: Optional[str] = None
145
+ created_at: str
146
+
147
+
148
+ class TransactionHistoryResponse(BaseModel):
149
+ """Response for transaction history endpoint."""
150
+ user_id: str
151
+ current_balance: int
152
+ transactions: List[CreditTransactionItem]
153
+ total_count: int
154
+ page: int
155
+ limit: int
156
+
157
+
158
+ @router.get("/transactions", response_model=TransactionHistoryResponse)
159
+ async def get_transaction_history(
160
+ request: Request,
161
+ db: AsyncSession = Depends(get_db),
162
+ transaction_type: Optional[str] = Query(None, description="Filter by type (reserve, refund, confirm, purchase)"),
163
+ page: int = Query(1, ge=1, description="Page number"),
164
+ limit: int = Query(50, ge=1, le=100, description="Items per page")
165
+ ):
166
+ """
167
+ Get detailed credit transaction history.
168
+
169
+ Returns complete audit trail of all credit operations including:
170
+ - Reservations (when credits were deducted for API calls)
171
+ - Refunds (when credits were returned due to failures)
172
+ - Confirmations (when credits were confirmed as used)
173
+ - Purchases (when credits were added via payments)
174
+
175
+ Auth is handled by AuthMiddleware - user is in request.state.user
176
+ """
177
+ from core.models import CreditTransaction
178
+ from sqlalchemy import func
179
+
180
+ user = request.state.user
181
+ offset = (page - 1) * limit
182
+
183
+ # Build query
184
+ query = select(CreditTransaction).where(
185
+ CreditTransaction.user_id == user.id
186
+ )
187
+
188
+ if transaction_type:
189
+ query = query.where(CreditTransaction.transaction_type == transaction_type)
190
+
191
+ query = query.order_by(desc(CreditTransaction.created_at)).offset(offset).limit(limit)
192
+
193
+ # Execute query
194
+ result = await db.execute(query)
195
+ transactions = result.scalars().all()
196
+
197
+ # Get total count
198
+ count_query = select(func.count(CreditTransaction.id)).where(
199
+ CreditTransaction.user_id == user.id
200
+ )
201
+ if transaction_type:
202
+ count_query = count_query.where(CreditTransaction.transaction_type == transaction_type)
203
+
204
+ count_result = await db.execute(count_query)
205
+ total_count = count_result.scalar() or 0
206
+
207
+ # Build response
208
+ transaction_items = []
209
+ for tx in transactions:
210
+ transaction_items.append(CreditTransactionItem(
211
+ transaction_id=tx.transaction_id,
212
+ transaction_type=tx.transaction_type,
213
+ amount=tx.amount,
214
+ balance_before=tx.balance_before,
215
+ balance_after=tx.balance_after,
216
+ source=tx.source,
217
+ reference_type=tx.reference_type,
218
+ reference_id=tx.reference_id,
219
+ request_path=tx.request_path,
220
+ request_method=tx.request_method,
221
+ response_status=tx.response_status,
222
+ reason=tx.reason,
223
+ created_at=tx.created_at.isoformat() if tx.created_at else None
224
+ ))
225
+
226
+ return TransactionHistoryResponse(
227
+ user_id=user.user_id,
228
+ current_balance=user.credits,
229
+ transactions=transaction_items,
230
+ total_count=total_count,
231
+ page=page,
232
+ limit=limit
233
+ )
234
+
235
+
236
+ class BalanceVerificationResponse(BaseModel):
237
+ """Response for balance verification endpoint."""
238
+ user_id: str
239
+ current_balance: int
240
+ calculated_balance: int
241
+ is_valid: bool
242
+ discrepancy: int
243
+ last_transaction_at: Optional[str] = None
244
+
245
+
246
+ @router.get("/balance/verify", response_model=BalanceVerificationResponse)
247
+ async def verify_balance(
248
+ request: Request,
249
+ db: AsyncSession = Depends(get_db)
250
+ ):
251
+ """
252
+ Verify user balance against transaction history.
253
+
254
+ Calculates balance from all transactions and compares with stored balance.
255
+ Useful for debugging credit discrepancies.
256
+
257
+ Auth is handled by AuthMiddleware - user is in request.state.user
258
+ """
259
+ from core.models import CreditTransaction
260
+ from sqlalchemy import func
261
+
262
+ user = request.state.user
263
+
264
+ # Calculate balance from transactions
265
+ result = await db.execute(
266
+ select(func.sum(CreditTransaction.amount)).where(
267
+ CreditTransaction.user_id == user.id
268
+ )
269
+ )
270
+ calculated_balance = result.scalar() or 0
271
+
272
+ # Get last transaction time
273
+ last_tx_result = await db.execute(
274
+ select(CreditTransaction).where(
275
+ CreditTransaction.user_id == user.id
276
+ ).order_by(desc(CreditTransaction.created_at)).limit(1)
277
+ )
278
+ last_tx = last_tx_result.scalar_one_or_none()
279
+
280
+ is_valid = (calculated_balance == user.credits)
281
+ discrepancy = user.credits - calculated_balance
282
+
283
+ return BalanceVerificationResponse(
284
+ user_id=user.user_id,
285
+ current_balance=user.credits,
286
+ calculated_balance=calculated_balance,
287
+ is_valid=is_valid,
288
+ discrepancy=discrepancy,
289
+ last_transaction_at=last_tx.created_at.isoformat() if last_tx and last_tx.created_at else None
290
+ )
routers/payments.py CHANGED
@@ -174,7 +174,27 @@ async def process_successful_payment(
174
  credits_added = 0
175
 
176
  if transaction.status != "paid":
177
- # First time processing - add credits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
178
  transaction.status = "paid"
179
  transaction.gateway_payment_id = payment_id
180
  transaction.paid_at = datetime.utcnow()
@@ -182,8 +202,6 @@ async def process_successful_payment(
182
  transaction.razorpay_signature = signature
183
 
184
  update_verified_by(transaction, source)
185
-
186
- user.credits += transaction.credits_amount
187
  credits_added = transaction.credits_amount
188
 
189
  logger.info(
 
174
  credits_added = 0
175
 
176
  if transaction.status != "paid":
177
+ # First time processing - add credits using transaction manager
178
+ from services.credit_service import CreditTransactionManager
179
+
180
+ await CreditTransactionManager.add_credits(
181
+ session=db,
182
+ user=user,
183
+ amount=transaction.credits_amount,
184
+ source="payment",
185
+ reference_type="payment",
186
+ reference_id=transaction.transaction_id,
187
+ reason=f"Purchase: {transaction.package_id}",
188
+ metadata={
189
+ "package_id": transaction.package_id,
190
+ "gateway": "razorpay",
191
+ "gateway_payment_id": payment_id,
192
+ "amount_paise": transaction.amount_paise,
193
+ "verified_by": source,
194
+ "signature": signature
195
+ }
196
+ )
197
+
198
  transaction.status = "paid"
199
  transaction.gateway_payment_id = payment_id
200
  transaction.paid_at = datetime.utcnow()
 
202
  transaction.razorpay_signature = signature
203
 
204
  update_verified_by(transaction, source)
 
 
205
  credits_added = transaction.credits_amount
206
 
207
  logger.info(
services/credit_service/__init__.py CHANGED
@@ -1,75 +1,61 @@
1
  """
2
- Credit Service - Credit validation middleware for API Gateway
3
 
4
- Provides plug-and-play credit management with:
5
- - Per-route cost configuration
6
- - Credit reservation and validation
7
- - Request middleware for credit checks
8
- - Automatic refund on errors
9
 
10
- Usage:
11
- # In app.py startup
12
- from services.credit_service import register_credit_service
 
 
 
 
 
 
13
 
14
- register_credit_service(
15
- route_costs={
16
- "/gemini/generate-animation-prompt": 1,
17
- "/gemini/edit-image": 1,
18
- "/gemini/generate-video": 10,
19
- "/gemini/generate-text": 1,
20
- "/gemini/analyze-image": 1,
21
  }
22
  )
23
 
24
- # In routers
25
- from fastapi import Request
26
-
27
- @router.post("/api/endpoint")
28
- async def endpoint(request: Request):
29
- user = request.state.user # From AuthMiddleware
30
- credits_reserved = request.state.credits_reserved # From CreditMiddleware
31
- return {"credits_remaining": user.credits}
32
  """
33
 
34
  from services.credit_service.config import CreditServiceConfig
35
  from services.credit_service.middleware import CreditMiddleware
 
 
 
 
 
 
 
36
  from services.credit_service.credit_manager import (
37
- reserve_credit,
38
- confirm_credit,
39
- refund_credit,
40
- handle_job_completion,
41
  is_refundable_error,
42
  REFUNDABLE_ERROR_PATTERNS,
43
  NON_REFUNDABLE_ERROR_PATTERNS,
44
  )
45
 
46
-
47
- def register_credit_service(
48
- route_costs: dict = None,
49
- ) -> None:
50
- """
51
- Register the credit service with application configuration.
52
-
53
- Args:
54
- route_costs: Dictionary mapping route paths to credit costs
55
- Example: {"/gemini/generate-video": 10, "/gemini/edit-image": 1}
56
- """
57
- CreditServiceConfig.register(
58
- route_costs=route_costs or {},
59
- )
60
-
61
-
62
  __all__ = [
63
- # Registration
64
- 'register_credit_service',
65
  'CreditServiceConfig',
 
 
66
  'CreditMiddleware',
67
 
68
- # Credit Management
69
- 'reserve_credit',
70
- 'confirm_credit',
71
- 'refund_credit',
72
- 'handle_job_completion',
 
 
 
 
 
73
  'is_refundable_error',
74
  'REFUNDABLE_ERROR_PATTERNS',
75
  'NON_REFUNDABLE_ERROR_PATTERNS',
 
1
  """
2
+ Credit Service - Middleware-based credit management system.
3
 
4
+ Provides automatic credit tracking via middleware with complete audit trail.
5
+ Application layer is completely unaware of credit management.
 
 
 
6
 
7
+ Components:
8
+ - CreditServiceConfig: Route configuration with credit costs and types
9
+ - CreditMiddleware: Request/response-based credit handling
10
+ - CreditTransactionManager: Core credit transaction operations
11
+ - ResponseInspector: Response analysis for credit decisions
12
+ - credit_manager: Legacy refund logic (used by middleware)
13
+
14
+ Usage in app.py:
15
+ from services.credit_service import CreditServiceConfig
16
 
17
+ CreditServiceConfig.register(
18
+ route_configs={
19
+ "/gemini/analyze-image": {"cost": 1, "type": "sync"},
20
+ "/gemini/generate-video": {"cost": 10, "type": "async"},
 
 
 
21
  }
22
  )
23
 
24
+ app.add_middleware(CreditServiceConfig.get_middleware())
 
 
 
 
 
 
 
25
  """
26
 
27
  from services.credit_service.config import CreditServiceConfig
28
  from services.credit_service.middleware import CreditMiddleware
29
+ from services.credit_service.transaction_manager import (
30
+ CreditTransactionManager,
31
+ InsufficientCreditsError,
32
+ TransactionNotFoundError,
33
+ UserNotFoundError
34
+ )
35
+ from services.credit_service.response_inspector import ResponseInspector
36
  from services.credit_service.credit_manager import (
 
 
 
 
37
  is_refundable_error,
38
  REFUNDABLE_ERROR_PATTERNS,
39
  NON_REFUNDABLE_ERROR_PATTERNS,
40
  )
41
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
42
  __all__ = [
43
+ # Configuration
 
44
  'CreditServiceConfig',
45
+
46
+ # Middleware
47
  'CreditMiddleware',
48
 
49
+ # Transaction Manager
50
+ 'CreditTransactionManager',
51
+ 'InsufficientCreditsError',
52
+ 'TransactionNotFoundError',
53
+ 'UserNotFoundError',
54
+
55
+ # Response Inspector
56
+ 'ResponseInspector',
57
+
58
+ # Refund logic helpers
59
  'is_refundable_error',
60
  'REFUNDABLE_ERROR_PATTERNS',
61
  'NON_REFUNDABLE_ERROR_PATTERNS',
services/credit_service/config.py CHANGED
@@ -1,7 +1,7 @@
1
  """
2
  Credit Service Configuration
3
 
4
- Manages credit cost configuration for API routes.
5
  """
6
 
7
  import logging
@@ -15,25 +15,41 @@ class CreditServiceConfig(BaseService):
15
  """
16
  Configuration for the credit service.
17
 
18
- Controls which routes require credits and how much they cost.
 
19
  """
20
 
21
  SERVICE_NAME = "credit_service"
22
 
23
- # Route cost configuration
24
- _route_costs: Dict[str, int] = {}
25
 
26
  @classmethod
27
  def register(
28
  cls,
29
- route_costs: Dict[str, int] = None,
30
  ) -> None:
31
  """
32
  Register credit service configuration.
33
 
34
  Args:
35
- route_costs: Dictionary mapping route paths to credit costs
36
- Example: {"/gemini/generate-video": 10, "/gemini/edit-image": 1}
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
37
 
38
  Raises:
39
  RuntimeError: If service is already registered
@@ -41,15 +57,15 @@ class CreditServiceConfig(BaseService):
41
  if cls._registered:
42
  raise RuntimeError(f"{cls.SERVICE_NAME} is already registered")
43
 
44
- # Store route costs
45
- cls._route_costs = route_costs or {}
46
 
47
  cls._registered = True
48
 
49
  logger.info(f"✅ {cls.SERVICE_NAME} registered successfully")
50
- logger.info(f" Routes with credit costs: {len(cls._route_costs)}")
51
- for route, cost in cls._route_costs.items():
52
- logger.info(f" {route}: {cost} credits")
53
 
54
  @classmethod
55
  def get_middleware(cls):
@@ -57,6 +73,21 @@ class CreditServiceConfig(BaseService):
57
  from services.credit_service.middleware import CreditMiddleware
58
  return CreditMiddleware
59
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
60
  @classmethod
61
  def get_cost(cls, path: str) -> int:
62
  """
@@ -68,20 +99,18 @@ class CreditServiceConfig(BaseService):
68
  Returns:
69
  Credit cost (0 if route doesn't require credits)
70
  """
71
- cls.assert_registered()
72
- return cls._route_costs.get(path, 0)
73
 
74
  @classmethod
75
  def requires_credits(cls, path: str) -> bool:
76
  """Check if a URL path requires credits."""
77
- cls.assert_registered()
78
  return cls.get_cost(path) > 0
79
 
80
  @classmethod
81
- def get_all_costs(cls) -> Dict[str, int]:
82
- """Get all route costs."""
83
  cls.assert_registered()
84
- return cls._route_costs.copy()
85
 
86
 
87
  __all__ = ['CreditServiceConfig']
 
1
  """
2
  Credit Service Configuration
3
 
4
+ Manages credit cost configuration for API routes with endpoint type classification.
5
  """
6
 
7
  import logging
 
15
  """
16
  Configuration for the credit service.
17
 
18
+ Controls which routes require credits, how much they cost,
19
+ and how to handle credits based on response (sync vs async).
20
  """
21
 
22
  SERVICE_NAME = "credit_service"
23
 
24
+ # Route configuration: path -> {cost, type, ...}
25
+ _route_configs: Dict[str, dict] = {}
26
 
27
  @classmethod
28
  def register(
29
  cls,
30
+ route_configs: Dict[str, dict] = None,
31
  ) -> None:
32
  """
33
  Register credit service configuration.
34
 
35
  Args:
36
+ route_configs: Dictionary mapping route paths to configuration
37
+ Example: {
38
+ "/gemini/generate-video": {
39
+ "cost": 10,
40
+ "type": "async", # or "sync", "free"
41
+ "status_endpoint": "/gemini/job/{job_id}" # optional
42
+ },
43
+ "/gemini/analyze-image": {
44
+ "cost": 1,
45
+ "type": "sync"
46
+ }
47
+ }
48
+
49
+ Endpoint types:
50
+ - "sync": Synchronous operation, confirm/refund immediately based on response status
51
+ - "async": Asynchronous operation (job), keep reserved until status check
52
+ - "free": No credits required (cost = 0)
53
 
54
  Raises:
55
  RuntimeError: If service is already registered
 
57
  if cls._registered:
58
  raise RuntimeError(f"{cls.SERVICE_NAME} is already registered")
59
 
60
+ # Store route configs
61
+ cls._route_configs = route_configs or {}
62
 
63
  cls._registered = True
64
 
65
  logger.info(f"✅ {cls.SERVICE_NAME} registered successfully")
66
+ logger.info(f" Routes with credit requirements: {len(cls._route_configs)}")
67
+ for route, config in cls._route_configs.items():
68
+ logger.info(f" {route}: {config.get('cost', 0)} credits ({config.get('type', 'free')})")
69
 
70
  @classmethod
71
  def get_middleware(cls):
 
73
  from services.credit_service.middleware import CreditMiddleware
74
  return CreditMiddleware
75
 
76
+ @classmethod
77
+ def get_config(cls, path: str) -> dict:
78
+ """
79
+ Get the full configuration for a given path.
80
+
81
+ Args:
82
+ path: URL path to check
83
+
84
+ Returns:
85
+ Configuration dict with 'cost' and 'type' keys
86
+ Default: {"cost": 0, "type": "free"}
87
+ """
88
+ cls.assert_registered()
89
+ return cls._route_configs.get(path, {"cost": 0, "type": "free"})
90
+
91
  @classmethod
92
  def get_cost(cls, path: str) -> int:
93
  """
 
99
  Returns:
100
  Credit cost (0 if route doesn't require credits)
101
  """
102
+ return cls.get_config(path).get("cost", 0)
 
103
 
104
  @classmethod
105
  def requires_credits(cls, path: str) -> bool:
106
  """Check if a URL path requires credits."""
 
107
  return cls.get_cost(path) > 0
108
 
109
  @classmethod
110
+ def get_all_configs(cls) -> Dict[str, dict]:
111
+ """Get all route configurations."""
112
  cls.assert_registered()
113
+ return cls._route_configs.copy()
114
 
115
 
116
  __all__ = ['CreditServiceConfig']
services/credit_service/middleware.py CHANGED
@@ -1,17 +1,26 @@
1
  """
2
- Credit Middleware - Request credit validation layer
3
 
4
- Intercepts requests to validate and reserve credits for paid endpoints.
 
 
 
5
  """
6
 
7
  import logging
 
8
  from datetime import datetime
9
- from fastapi import Request, HTTPException, status
10
- from fastapi.responses import JSONResponse
11
  from sqlalchemy.ext.asyncio import AsyncSession
12
 
13
  from core.database import async_session_maker
14
  from services.credit_service.config import CreditServiceConfig
 
 
 
 
 
15
  from services.base_service.middleware_chain import (
16
  BaseServiceMiddleware,
17
  get_request_context,
@@ -22,18 +31,15 @@ logger = logging.getLogger(__name__)
22
 
23
  class CreditMiddleware(BaseServiceMiddleware):
24
  """
25
- Credit validation middleware for request validation.
26
 
27
- Flow:
28
- 1. Check if route requires credits based on URL
29
- 2. Get authenticated user from request.state (set by AuthMiddleware)
30
- 3. Check if user has sufficient credits
31
- 4. Reserve credits (deduct from balance)
32
- 5. Attach credit info to request.state
33
- 6. Continue to next middleware/route
34
 
35
- Credits are reserved but tracked - they can be refunded if job fails
36
- with a server-side error.
 
 
37
 
38
  NOTE: This middleware MUST run AFTER AuthMiddleware since it needs
39
  the authenticated user from request.state.user
@@ -43,88 +49,171 @@ class CreditMiddleware(BaseServiceMiddleware):
43
 
44
  async def dispatch(self, request: Request, call_next):
45
  """Process request through credit middleware."""
 
46
  # Skip OPTIONS requests (CORS preflight)
47
  if request.method == "OPTIONS":
48
  return await call_next(request)
49
 
50
- # Initialize request context
51
- ctx = get_request_context(request)
52
-
53
- # Get path from request
54
  path = request.url.path
55
 
56
- # Check if route requires credits
57
- credit_cost = CreditServiceConfig.get_cost(path)
 
 
58
 
59
  if credit_cost == 0:
60
- # Route doesn't require credits, skip
61
- self.log_request(request, f"Route doesn't require credits")
62
- ctx.set_credits(0, 0)
63
- response = await call_next(request)
64
- return response
65
-
66
- # Route requires credits - user MUST be authenticated
67
- # (AuthMiddleware should have already validated this)
68
- user = request.state.user if hasattr(request.state, 'user') else None
69
 
 
 
70
  if not user:
71
- # This shouldn't happen if auth is configured correctly
72
- self.log_error(request, "Credit-required route accessed without authentication")
73
  return JSONResponse(
74
  status_code=status.HTTP_401_UNAUTHORIZED,
75
- content={"detail": "Authentication required for this endpoint"},
76
  )
77
 
78
- # Check if user has sufficient credits
79
- if user.credits < credit_cost:
80
- self.log_request(
81
- request,
82
- f"Insufficient credits: has {user.credits}, needs {credit_cost}"
83
- )
84
- return JSONResponse(
85
- status_code=status.HTTP_402_PAYMENT_REQUIRED,
86
- content={
87
- "detail": f"Insufficient credits. This operation requires {credit_cost} credits. You have {user.credits}.",
88
- "credits_required": credit_cost,
89
- "credits_available": user.credits,
90
- },
91
- )
92
-
93
- # Reserve credits (deduct from user balance)
94
  async with async_session_maker() as db:
95
  try:
96
- # Deduct credits
97
- user.credits -= credit_cost
98
- user.last_used_at = datetime.utcnow()
 
 
 
 
 
 
 
 
 
 
 
 
99
 
100
- # Update in database
101
- db.add(user)
102
  await db.commit()
103
- await db.refresh(user)
104
 
105
- # Attach credit info to request state
106
- ctx.set_credits(credit_cost, user.credits)
107
- request.state.credits_reserved = credit_cost
108
- request.state.credits_remaining = user.credits
109
 
110
  self.log_request(
111
  request,
112
- f"Reserved {credit_cost} credits for {user.user_id}, remaining: {user.credits}"
113
  )
114
 
115
- # Continue to next middleware/route
116
- response = await call_next(request)
117
- return response
118
-
 
 
 
 
 
 
119
  except Exception as e:
120
  await db.rollback()
121
- self.log_error(request, f"Error reserving credits: {e}")
122
  return JSONResponse(
123
  status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
124
- content={"detail": "Failed to reserve credits. Please try again."},
125
  )
126
- finally:
127
- await db.close()
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
128
 
129
 
130
  __all__ = ['CreditMiddleware']
 
1
  """
2
+ Credit Middleware - Automatic request/response-based credit management.
3
 
4
+ Intercepts requests to reserve credits and inspects responses to automatically
5
+ confirm or refund based on operation success/failure.
6
+
7
+ Application layer is completely unaware of credit management.
8
  """
9
 
10
  import logging
11
+ import json
12
  from datetime import datetime
13
+ from fastapi import Request, status
14
+ from fastapi.responses import JSONResponse, Response
15
  from sqlalchemy.ext.asyncio import AsyncSession
16
 
17
  from core.database import async_session_maker
18
  from services.credit_service.config import CreditServiceConfig
19
+ from services.credit_service.transaction_manager import (
20
+ CreditTransactionManager,
21
+ InsufficientCreditsError
22
+ )
23
+ from services.credit_service.response_inspector import ResponseInspector
24
  from services.base_service.middleware_chain import (
25
  BaseServiceMiddleware,
26
  get_request_context,
 
31
 
32
  class CreditMiddleware(BaseServiceMiddleware):
33
  """
34
+ Credit middleware with automatic request/response-based credit management.
35
 
36
+ Application layer is completely unaware of credits.
37
+ Credits are reserved on request, confirmed/refunded based on response.
 
 
 
 
 
38
 
39
+ Flow:
40
+ 1. REQUEST PHASE: Reserve credits if endpoint requires them
41
+ 2. APPLICATION: Process request (unaware of credits)
42
+ 3. RESPONSE PHASE: Inspect response and confirm/refund credits
43
 
44
  NOTE: This middleware MUST run AFTER AuthMiddleware since it needs
45
  the authenticated user from request.state.user
 
49
 
50
  async def dispatch(self, request: Request, call_next):
51
  """Process request through credit middleware."""
52
+
53
  # Skip OPTIONS requests (CORS preflight)
54
  if request.method == "OPTIONS":
55
  return await call_next(request)
56
 
 
 
 
 
57
  path = request.url.path
58
 
59
+ # Get endpoint configuration
60
+ config = CreditServiceConfig.get_config(path)
61
+ credit_cost = config.get("cost", 0)
62
+ endpoint_type = config.get("type", "free")
63
 
64
  if credit_cost == 0:
65
+ # Free endpoint, no credit handling needed
66
+ return await call_next(request)
 
 
 
 
 
 
 
67
 
68
+ # User must be authenticated (set by AuthMiddleware)
69
+ user = getattr(request.state, 'user', None)
70
  if not user:
 
 
71
  return JSONResponse(
72
  status_code=status.HTTP_401_UNAUTHORIZED,
73
+ content={"detail": "Authentication required for this endpoint"}
74
  )
75
 
76
+ # ===================================================================
77
+ # REQUEST PHASE: Reserve Credits
78
+ # ===================================================================
 
 
 
 
 
 
 
 
 
 
 
 
 
79
  async with async_session_maker() as db:
80
  try:
81
+ # Reserve credits
82
+ transaction = await CreditTransactionManager.reserve_credits(
83
+ session=db,
84
+ user=user,
85
+ amount=credit_cost,
86
+ source="middleware",
87
+ reference_type="request",
88
+ reference_id=f"{request.method}:{path}",
89
+ reason=f"{request.method} {path}",
90
+ metadata={
91
+ "endpoint_type": endpoint_type,
92
+ "cost": credit_cost
93
+ },
94
+ request=request
95
+ )
96
 
 
 
97
  await db.commit()
 
98
 
99
+ # Attach transaction info to request for response phase
100
+ request.state.credit_transaction_id = transaction.transaction_id
101
+ request.state.endpoint_type = endpoint_type
102
+ request.state.credit_cost = credit_cost
103
 
104
  self.log_request(
105
  request,
106
+ f"Reserved {credit_cost} credits ({endpoint_type}), txn: {transaction.transaction_id}"
107
  )
108
 
109
+ except InsufficientCreditsError as e:
110
+ await db.rollback()
111
+ return JSONResponse(
112
+ status_code=status.HTTP_402_PAYMENT_REQUIRED,
113
+ content={
114
+ "detail": f"Insufficient credits. Required: {credit_cost}, Available: {user.credits}",
115
+ "credits_required": credit_cost,
116
+ "credits_available": user.credits
117
+ }
118
+ )
119
  except Exception as e:
120
  await db.rollback()
121
+ logger.error(f"Credit reservation failed: {e}", exc_info=True)
122
  return JSONResponse(
123
  status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
124
+ content={"detail": "Failed to reserve credits"}
125
  )
126
+
127
+ # ===================================================================
128
+ # CALL APPLICATION LAYER (completely unaware of credits)
129
+ # ===================================================================
130
+ response = await call_next(request)
131
+
132
+ # ===================================================================
133
+ # RESPONSE PHASE: Confirm or Refund Based on Response
134
+ # ===================================================================
135
+ transaction_id = getattr(request.state, 'credit_transaction_id', None)
136
+ if not transaction_id:
137
+ # No transaction to finalize (shouldn't happen, but be safe)
138
+ return response
139
+
140
+ # We need to read the response body to inspect it
141
+ # This requires special handling to not consume the response
142
+ response_body = b""
143
+ async for chunk in response.body_iterator:
144
+ response_body += chunk
145
+
146
+ # Parse response for inspection
147
+ response_data = ResponseInspector.parse_response_body(response_body)
148
+ inspector = ResponseInspector()
149
+
150
+ # Determine credit action based on response
151
+ async with async_session_maker() as db:
152
+ try:
153
+ should_confirm = inspector.should_confirm(
154
+ response, endpoint_type, response_data
155
+ )
156
+ should_refund = inspector.should_refund(
157
+ response, endpoint_type, response_data
158
+ )
159
+
160
+ if should_confirm:
161
+ # Operation successful - confirm credits
162
+ await CreditTransactionManager.confirm_credits(
163
+ session=db,
164
+ transaction_id=transaction_id,
165
+ metadata={
166
+ "response_status": response.status_code,
167
+ "endpoint_type": endpoint_type
168
+ }
169
+ )
170
+ await db.commit()
171
+
172
+ self.log_request(
173
+ request,
174
+ f"Credits confirmed for {transaction_id} (success)"
175
+ )
176
+
177
+ elif should_refund:
178
+ # Operation failed - refund credits
179
+ reason = inspector.get_refund_reason(response, response_data)
180
+ await CreditTransactionManager.refund_credits(
181
+ session=db,
182
+ transaction_id=transaction_id,
183
+ reason=reason,
184
+ metadata={
185
+ "response_status": response.status_code,
186
+ "endpoint_type": endpoint_type,
187
+ "error": response_data.get("detail") if response_data else None
188
+ }
189
+ )
190
+ await db.commit()
191
+
192
+ self.log_request(
193
+ request,
194
+ f"Credits refunded for {transaction_id}: {reason}"
195
+ )
196
+
197
+ else:
198
+ # Keep reserved (async operation pending completion)
199
+ self.log_request(
200
+ request,
201
+ f"Credits kept reserved for {transaction_id} (async pending)"
202
+ )
203
+
204
+ except Exception as e:
205
+ logger.error(f"Response phase credit handling failed: {e}", exc_info=True)
206
+ # Don't fail the actual response, just log the error
207
+ # The transaction remains reserved and can be manually resolved
208
+
209
+ # Reconstruct response with original body
210
+ # This is necessary because we consumed the body_iterator above
211
+ return Response(
212
+ content=response_body,
213
+ status_code=response.status_code,
214
+ headers=dict(response.headers),
215
+ media_type=response.media_type
216
+ )
217
 
218
 
219
  __all__ = ['CreditMiddleware']
services/credit_service/response_inspector.py ADDED
@@ -0,0 +1,143 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Response Inspector - Helper for credit middleware to inspect responses.
3
+
4
+ Determines credit actions based on response status and content.
5
+ """
6
+ import json
7
+ import logging
8
+ from typing import Optional
9
+ from fastapi import Response
10
+
11
+ logger = logging.getLogger(__name__)
12
+
13
+
14
+ class ResponseInspector:
15
+ """Helper class to inspect responses and determine credit actions."""
16
+
17
+ @staticmethod
18
+ def should_confirm(
19
+ response: Response,
20
+ endpoint_type: str,
21
+ response_data: Optional[dict]
22
+ ) -> bool:
23
+ """
24
+ Determine if credits should be confirmed.
25
+
26
+ Args:
27
+ response: FastAPI Response object
28
+ endpoint_type: Type of endpoint ("sync", "async", "free")
29
+ response_data: Parsed response body (if JSON)
30
+
31
+ Returns:
32
+ True if credits should be confirmed (operation successful)
33
+ """
34
+ if endpoint_type == "sync":
35
+ # Synchronous: confirm on 2xx
36
+ return 200 <= response.status_code < 300
37
+
38
+ if endpoint_type == "async":
39
+ # Asynchronous: check job status in response
40
+ if not response_data:
41
+ return False
42
+
43
+ # For job creation endpoints (POST /gemini/generate-video)
44
+ if "job_id" in response_data and response.status_code < 300:
45
+ # Job created successfully, keep reserved (don't confirm yet)
46
+ return False
47
+
48
+ # For status check endpoints (GET /gemini/job/{id})
49
+ job_status = response_data.get("status")
50
+ if job_status == "completed":
51
+ logger.debug(f"Job completed, confirming credits")
52
+ return True
53
+
54
+ return False
55
+
56
+ @staticmethod
57
+ def should_refund(
58
+ response: Response,
59
+ endpoint_type: str,
60
+ response_data: Optional[dict]
61
+ ) -> bool:
62
+ """
63
+ Determine if credits should be refunded.
64
+
65
+ Args:
66
+ response: FastAPI Response object
67
+ endpoint_type: Type of endpoint ("sync", "async", "free")
68
+ response_data: Parsed response body (if JSON)
69
+
70
+ Returns:
71
+ True if credits should be refunded (operation failed)
72
+ """
73
+ if endpoint_type == "sync":
74
+ # Synchronous: refund on 4xx/5xx
75
+ return response.status_code >= 400
76
+
77
+ if endpoint_type == "async":
78
+ # Asynchronous: check job status
79
+ if not response_data:
80
+ # Failed to parse response, likely an error
81
+ return response.status_code >= 400
82
+
83
+ job_status = response_data.get("status")
84
+ if job_status == "failed":
85
+ # Check if error is refundable
86
+ error_message = response_data.get("error_message", "")
87
+ from services.credit_service.credit_manager import is_refundable_error
88
+ is_refundable = is_refundable_error(error_message)
89
+ logger.debug(
90
+ f"Job failed with error: {error_message[:100]}, "
91
+ f"refundable: {is_refundable}"
92
+ )
93
+ return is_refundable
94
+
95
+ return False
96
+
97
+ @staticmethod
98
+ def get_refund_reason(
99
+ response: Response,
100
+ response_data: Optional[dict]
101
+ ) -> str:
102
+ """
103
+ Get human-readable refund reason.
104
+
105
+ Args:
106
+ response: FastAPI Response object
107
+ response_data: Parsed response body (if JSON)
108
+
109
+ Returns:
110
+ Human-readable reason string
111
+ """
112
+ if response.status_code >= 500:
113
+ return f"Server error: {response.status_code}"
114
+
115
+ if response.status_code >= 400:
116
+ detail = response_data.get("detail") if response_data else None
117
+ return f"Request error: {detail or response.status_code}"
118
+
119
+ if response_data:
120
+ error_message = response_data.get("error_message")
121
+ if error_message:
122
+ return f"Job failed: {error_message[:100]}"
123
+
124
+ return "Unknown error"
125
+
126
+ @staticmethod
127
+ def parse_response_body(response_body: bytes) -> Optional[dict]:
128
+ """
129
+ Safely parse response body as JSON.
130
+
131
+ Args:
132
+ response_body: Raw response bytes
133
+
134
+ Returns:
135
+ Parsed dict or None if not JSON
136
+ """
137
+ try:
138
+ return json.loads(response_body.decode())
139
+ except:
140
+ return None
141
+
142
+
143
+ __all__ = ['ResponseInspector']
services/credit_service/transaction_manager.py ADDED
@@ -0,0 +1,425 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Credit Transaction Manager - Core credit operations service.
3
+
4
+ All credit operations flow through this service:
5
+ - Reserve credits (deduct from balance)
6
+ - Refund credits (add back to balance)
7
+ - Confirm credits (mark as used)
8
+ - Add credits (purchases, bonuses)
9
+
10
+ Provides complete audit trail with request/response context.
11
+ """
12
+ import uuid
13
+ import logging
14
+ from datetime import datetime
15
+ from typing import Optional, List
16
+ from sqlalchemy.ext.asyncio import AsyncSession
17
+ from sqlalchemy import select, desc, func
18
+ from fastapi import Request
19
+
20
+ from core.models import User, CreditTransaction
21
+
22
+ logger = logging.getLogger(__name__)
23
+
24
+
25
+ # =============================================================================
26
+ # Custom Exceptions
27
+ # =============================================================================
28
+
29
+ class InsufficientCreditsError(Exception):
30
+ """Raised when user doesn't have enough credits."""
31
+ pass
32
+
33
+
34
+ class TransactionNotFoundError(Exception):
35
+ """Raised when a transaction cannot be found."""
36
+ pass
37
+
38
+
39
+ class UserNotFoundError(Exception):
40
+ """Raised when a user cannot be found."""
41
+ pass
42
+
43
+
44
+ # =============================================================================
45
+ # Credit Transaction Manager
46
+ # =============================================================================
47
+
48
+ class CreditTransactionManager:
49
+ """
50
+ Centralized credit transaction management.
51
+ All credit operations flow through this service.
52
+ """
53
+
54
+ @staticmethod
55
+ async def reserve_credits(
56
+ session: AsyncSession,
57
+ user: User,
58
+ amount: int,
59
+ source: str,
60
+ reference_type: Optional[str] = None,
61
+ reference_id: Optional[str] = None,
62
+ reason: Optional[str] = None,
63
+ metadata: Optional[dict] = None,
64
+ request: Optional[Request] = None
65
+ ) -> CreditTransaction:
66
+ """
67
+ Reserve credits (deduct from balance).
68
+
69
+ Called by middleware when request arrives.
70
+
71
+ Args:
72
+ session: Database session
73
+ user: User model instance
74
+ amount: Number of credits to reserve
75
+ source: Source of transaction (e.g., "middleware")
76
+ reference_type: Type of reference (e.g., "request", "job")
77
+ reference_id: Reference identifier (e.g., "POST:/gemini/video")
78
+ reason: Human-readable reason
79
+ metadata: Additional context
80
+ request: FastAPI request object (for extracting metadata)
81
+
82
+ Returns:
83
+ CreditTransaction record
84
+
85
+ Raises:
86
+ InsufficientCreditsError: If user doesn't have enough credits
87
+ """
88
+ if user.credits < amount:
89
+ raise InsufficientCreditsError(
90
+ f"User has {user.credits} credits, needs {amount}"
91
+ )
92
+
93
+ # Generate transaction ID
94
+ transaction_id = f"ctx_{uuid.uuid4().hex[:16]}"
95
+
96
+ # Capture request metadata
97
+ request_metadata = {}
98
+ request_path = None
99
+ request_method = None
100
+
101
+ if request:
102
+ request_path = request.url.path
103
+ request_method = request.method
104
+ request_metadata = {
105
+ "path": request_path,
106
+ "method": request_method,
107
+ "ip": request.client.host if request.client else None,
108
+ "user_agent": request.headers.get("user-agent")
109
+ }
110
+
111
+ # Create transaction record
112
+ transaction = CreditTransaction(
113
+ transaction_id=transaction_id,
114
+ user_id=user.id,
115
+ transaction_type="reserve",
116
+ amount=-amount, # Negative for deduction
117
+ balance_before=user.credits,
118
+ balance_after=user.credits - amount,
119
+ source=source,
120
+ reference_type=reference_type,
121
+ reference_id=reference_id,
122
+ request_path=request_path,
123
+ request_method=request_method,
124
+ reason=reason or f"Reserved for {request_path or 'operation'}",
125
+ metadata={**request_metadata, **(metadata or {})}
126
+ )
127
+
128
+ # Update user balance
129
+ user.credits -= amount
130
+ user.last_used_at = datetime.utcnow()
131
+
132
+ session.add(transaction)
133
+ await session.flush() # Don't commit yet - let caller handle
134
+
135
+ logger.info(
136
+ f"Reserved {amount} credits for user {user.user_id}, "
137
+ f"transaction: {transaction_id}, new balance: {user.credits}"
138
+ )
139
+
140
+ return transaction
141
+
142
+ @staticmethod
143
+ async def confirm_credits(
144
+ session: AsyncSession,
145
+ transaction_id: str,
146
+ metadata: Optional[dict] = None
147
+ ) -> CreditTransaction:
148
+ """
149
+ Confirm credits were legitimately used.
150
+
151
+ Called by middleware after successful response.
152
+ Creates a confirmation record (amount = 0).
153
+
154
+ Args:
155
+ session: Database session
156
+ transaction_id: Original reservation transaction ID
157
+ metadata: Additional context (response_status, etc.)
158
+
159
+ Returns:
160
+ CreditTransaction record
161
+
162
+ Raises:
163
+ TransactionNotFoundError: If original transaction not found
164
+ """
165
+ # Find original reservation
166
+ result = await session.execute(
167
+ select(CreditTransaction).where(
168
+ CreditTransaction.transaction_id == transaction_id
169
+ )
170
+ )
171
+ original = result.scalar_one_or_none()
172
+
173
+ if not original:
174
+ raise TransactionNotFoundError(f"Transaction {transaction_id} not found")
175
+
176
+ # Create confirmation transaction
177
+ confirm_tx = CreditTransaction(
178
+ transaction_id=f"cfm_{uuid.uuid4().hex[:16]}",
179
+ user_id=original.user_id,
180
+ transaction_type="confirm",
181
+ amount=0, # No balance change
182
+ balance_before=original.balance_after,
183
+ balance_after=original.balance_after,
184
+ source="middleware",
185
+ reference_type=original.reference_type,
186
+ reference_id=original.reference_id,
187
+ request_path=original.request_path,
188
+ request_method=original.request_method,
189
+ reason=f"Confirmed usage for {original.transaction_id}",
190
+ metadata={
191
+ "original_transaction_id": transaction_id,
192
+ **(metadata or {})
193
+ }
194
+ )
195
+
196
+ session.add(confirm_tx)
197
+ await session.flush()
198
+
199
+ logger.info(f"Confirmed credits for transaction {transaction_id}")
200
+
201
+ return confirm_tx
202
+
203
+ @staticmethod
204
+ async def refund_credits(
205
+ session: AsyncSession,
206
+ transaction_id: str,
207
+ reason: str,
208
+ metadata: Optional[dict] = None
209
+ ) -> CreditTransaction:
210
+ """
211
+ Refund reserved credits.
212
+
213
+ Called by middleware after failure or refundable error.
214
+
215
+ Args:
216
+ session: Database session
217
+ transaction_id: Original reservation transaction ID
218
+ reason: Reason for refund
219
+ metadata: Additional context
220
+
221
+ Returns:
222
+ CreditTransaction record
223
+
224
+ Raises:
225
+ TransactionNotFoundError: If original transaction not found
226
+ UserNotFoundError: If user not found
227
+ """
228
+ # Find original reservation
229
+ result = await session.execute(
230
+ select(CreditTransaction).where(
231
+ CreditTransaction.transaction_id == transaction_id
232
+ )
233
+ )
234
+ original = result.scalar_one_or_none()
235
+
236
+ if not original:
237
+ raise TransactionNotFoundError(f"Transaction {transaction_id} not found")
238
+
239
+ # Get user
240
+ user_result = await session.execute(
241
+ select(User).where(User.id == original.user_id)
242
+ )
243
+ user = user_result.scalar_one_or_none()
244
+
245
+ if not user:
246
+ raise UserNotFoundError(f"User {original.user_id} not found")
247
+
248
+ # Calculate refund amount (reverse of original deduction)
249
+ refund_amount = abs(original.amount)
250
+
251
+ # Create refund transaction
252
+ refund_tx = CreditTransaction(
253
+ transaction_id=f"ref_{uuid.uuid4().hex[:16]}",
254
+ user_id=user.id,
255
+ transaction_type="refund",
256
+ amount=refund_amount, # Positive for addition
257
+ balance_before=user.credits,
258
+ balance_after=user.credits + refund_amount,
259
+ source="middleware",
260
+ reference_type=original.reference_type,
261
+ reference_id=original.reference_id,
262
+ request_path=original.request_path,
263
+ request_method=original.request_method,
264
+ reason=reason,
265
+ metadata={
266
+ "original_transaction_id": transaction_id,
267
+ **(metadata or {})
268
+ }
269
+ )
270
+
271
+ # Update user balance
272
+ user.credits += refund_amount
273
+
274
+ session.add(refund_tx)
275
+ await session.flush()
276
+
277
+ logger.info(
278
+ f"Refunded {refund_amount} credits for transaction {transaction_id}, "
279
+ f"reason: {reason[:100]}, new balance: {user.credits}"
280
+ )
281
+
282
+ return refund_tx
283
+
284
+ @staticmethod
285
+ async def add_credits(
286
+ session: AsyncSession,
287
+ user: User,
288
+ amount: int,
289
+ source: str,
290
+ reference_type: Optional[str] = None,
291
+ reference_id: Optional[str] = None,
292
+ reason: Optional[str] = None,
293
+ metadata: Optional[dict] = None
294
+ ) -> CreditTransaction:
295
+ """
296
+ Add credits (purchase, bonus, etc).
297
+
298
+ Used by payment router only.
299
+
300
+ Args:
301
+ session: Database session
302
+ user: User model instance
303
+ amount: Number of credits to add
304
+ source: Source of transaction (e.g., "payment")
305
+ reference_type: Type of reference (e.g., "payment")
306
+ reference_id: Reference identifier (e.g., transaction_id)
307
+ reason: Human-readable reason
308
+ metadata: Additional context
309
+
310
+ Returns:
311
+ CreditTransaction record
312
+ """
313
+ transaction_id = f"add_{uuid.uuid4().hex[:16]}"
314
+
315
+ transaction = CreditTransaction(
316
+ transaction_id=transaction_id,
317
+ user_id=user.id,
318
+ transaction_type="purchase",
319
+ amount=amount,
320
+ balance_before=user.credits,
321
+ balance_after=user.credits + amount,
322
+ source=source,
323
+ reference_type=reference_type,
324
+ reference_id=reference_id,
325
+ reason=reason,
326
+ metadata=metadata
327
+ )
328
+
329
+ user.credits += amount
330
+
331
+ session.add(transaction)
332
+ await session.flush()
333
+
334
+ logger.info(
335
+ f"Added {amount} credits to user {user.user_id}, "
336
+ f"source: {source}, new balance: {user.credits}"
337
+ )
338
+
339
+ return transaction
340
+
341
+ @staticmethod
342
+ async def get_balance(
343
+ session: AsyncSession,
344
+ user_id: int,
345
+ verify: bool = False
346
+ ) -> int:
347
+ """
348
+ Get current balance, optionally verify against transactions.
349
+
350
+ Args:
351
+ session: Database session
352
+ user_id: User ID
353
+ verify: If True, calculate balance from transactions and compare
354
+
355
+ Returns:
356
+ Current balance
357
+ """
358
+ # Get user
359
+ result = await session.execute(
360
+ select(User).where(User.id == user_id)
361
+ )
362
+ user = result.scalar_one_or_none()
363
+
364
+ if not user:
365
+ raise UserNotFoundError(f"User {user_id} not found")
366
+
367
+ if not verify:
368
+ return user.credits
369
+
370
+ # Calculate balance from transactions
371
+ tx_result = await session.execute(
372
+ select(func.sum(CreditTransaction.amount)).where(
373
+ CreditTransaction.user_id == user_id
374
+ )
375
+ )
376
+ calculated_balance = tx_result.scalar() or 0
377
+
378
+ if calculated_balance != user.credits:
379
+ logger.warning(
380
+ f"Balance mismatch for user {user_id}: "
381
+ f"stored={user.credits}, calculated={calculated_balance}"
382
+ )
383
+
384
+ return user.credits
385
+
386
+ @staticmethod
387
+ async def get_transaction_history(
388
+ session: AsyncSession,
389
+ user_id: int,
390
+ transaction_type: Optional[str] = None,
391
+ limit: int = 50,
392
+ offset: int = 0
393
+ ) -> List[CreditTransaction]:
394
+ """
395
+ Get transaction history with filters.
396
+
397
+ Args:
398
+ session: Database session
399
+ user_id: User ID
400
+ transaction_type: Filter by transaction type
401
+ limit: Maximum number of results
402
+ offset: Offset for pagination
403
+
404
+ Returns:
405
+ List of CreditTransaction records
406
+ """
407
+ query = select(CreditTransaction).where(
408
+ CreditTransaction.user_id == user_id
409
+ )
410
+
411
+ if transaction_type:
412
+ query = query.where(CreditTransaction.transaction_type == transaction_type)
413
+
414
+ query = query.order_by(desc(CreditTransaction.created_at)).offset(offset).limit(limit)
415
+
416
+ result = await session.execute(query)
417
+ return list(result.scalars().all())
418
+
419
+
420
+ __all__ = [
421
+ 'CreditTransactionManager',
422
+ 'InsufficientCreditsError',
423
+ 'TransactionNotFoundError',
424
+ 'UserNotFoundError'
425
+ ]