jebin2 commited on
Commit
260bc17
·
1 Parent(s): b22652d
Files changed (1) hide show
  1. tests/test_models.py +567 -0
tests/test_models.py ADDED
@@ -0,0 +1,567 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Comprehensive Tests for SQLAlchemy Models
3
+
4
+ Tests cover all 8 models:
5
+ 1. User - Authentication, credits, soft delete
6
+ 2. ClientUser - Device tracking, IP mapping
7
+ 3. AuditLog - Event logging
8
+ 4. GeminiJob - Job queue, priority, credit tracking
9
+ 5. PaymentTransaction - Payment processing
10
+ 6. Contact - Support tickets
11
+ 7. RateLimit - Rate limiting
12
+ 8. ApiKeyUsage - API key rotation
13
+
14
+ Tests CRUD operations, relationships, soft deletes, constraints, and indexes.
15
+ """
16
+ import pytest
17
+ from datetime import datetime, timedelta
18
+ from sqlalchemy import select
19
+ from sqlalchemy.ext.asyncio import AsyncSession
20
+
21
+
22
+ # ============================================================================
23
+ # 1. User Model Tests
24
+ # ============================================================================
25
+
26
+ class TestUserModel:
27
+ """Test User model CRUD and features."""
28
+
29
+ @pytest.mark.asyncio
30
+ async def test_create_user(self, db_session):
31
+ """Create a new user."""
32
+ from core.models import User
33
+
34
+ user = User(
35
+ user_id="usr_test_001",
36
+ email="test@example.com",
37
+ google_id="google_123",
38
+ name="Test User",
39
+ credits=50
40
+ )
41
+
42
+ db_session.add(user)
43
+ await db_session.commit()
44
+ await db_session.refresh(user)
45
+
46
+ assert user.id is not None
47
+ assert user.email == "test@example.com"
48
+ assert user.credits == 50
49
+ assert user.token_version == 1 # Default
50
+
51
+ @pytest.mark.asyncio
52
+ async def test_user_unique_email(self, db_session):
53
+ """Email must be unique."""
54
+ from core.models import User
55
+ from sqlalchemy.exc import IntegrityError
56
+
57
+ user1 = User(user_id="usr_001", email="duplicate@example.com")
58
+ db_session.add(user1)
59
+ await db_session.commit()
60
+
61
+ user2 = User(user_id="usr_002", email="duplicate@example.com")
62
+ db_session.add(user2)
63
+
64
+ with pytest.raises(IntegrityError):
65
+ await db_session.commit()
66
+
67
+ @pytest.mark.asyncio
68
+ async def test_user_token_versioning(self, db_session):
69
+ """Test token version increment for logout."""
70
+ from core.models import User
71
+
72
+ user = User(user_id="usr_tv", email="tv@example.com")
73
+ db_session.add(user)
74
+ await db_session.commit()
75
+
76
+ assert user.token_version == 1
77
+
78
+ # Increment version (simulate logout)
79
+ user.token_version += 1
80
+ await db_session.commit()
81
+
82
+ assert user.token_version == 2
83
+
84
+ @pytest.mark.asyncio
85
+ async def test_user_soft_delete(self, db_session):
86
+ """Test soft delete functionality."""
87
+ from core.models import User
88
+
89
+ user = User(user_id="usr_del", email="delete@example.com")
90
+ db_session.add(user)
91
+ await db_session.commit()
92
+
93
+ # Soft delete
94
+ user.deleted_at = datetime.utcnow()
95
+ await db_session.commit()
96
+
97
+ assert user.deleted_at is not None
98
+ assert user.id is not None # Still in database
99
+
100
+
101
+ # ============================================================================
102
+ # 2. ClientUser Model Tests
103
+ # ============================================================================
104
+
105
+ class TestClientUserModel:
106
+ """Test ClientUser model for device tracking."""
107
+
108
+ @pytest.mark.asyncio
109
+ async def test_create_client_user(self, db_session):
110
+ """Create client user mapping."""
111
+ from core.models import User, ClientUser
112
+
113
+ user = User(user_id="usr_cu", email="cu@example.com")
114
+ db_session.add(user)
115
+ await db_session.commit()
116
+
117
+ client_user = ClientUser(
118
+ user_id=user.id,
119
+ client_user_id="temp_client_123",
120
+ ip_address="192.168.1.1",
121
+ device_fingerprint="abc123"
122
+ )
123
+ db_session.add(client_user)
124
+ await db_session.commit()
125
+
126
+ assert client_user.id is not None
127
+ assert client_user.user_id == user.id
128
+
129
+ @pytest.mark.asyncio
130
+ async def test_client_user_anonymous(self, db_session):
131
+ """Client user can exist without server user (anonymous)."""
132
+ from core.models import ClientUser
133
+
134
+ client_user = ClientUser(
135
+ user_id=None, # Anonymous
136
+ client_user_id="anon_123",
137
+ ip_address="10.0.0.1"
138
+ )
139
+ db_session.add(client_user)
140
+ await db_session.commit()
141
+
142
+ assert client_user.id is not None
143
+ assert client_user.user_id is None
144
+
145
+ @pytest.mark.asyncio
146
+ async def test_client_user_relationship(self, db_session):
147
+ """Test relationship to User."""
148
+ from core.models import User, ClientUser
149
+
150
+ user = User(user_id="usr_rel", email="rel@example.com")
151
+ client1 = ClientUser(client_user_id="c1", ip_address="1.1.1.1")
152
+ client2 = ClientUser(client_user_id="c2", ip_address="2.2.2.2")
153
+
154
+ user.client_users.append(client1)
155
+ user.client_users.append(client2)
156
+
157
+ db_session.add(user)
158
+ await db_session.commit()
159
+
160
+ # Query user's client mappings
161
+ result = await db_session.execute(
162
+ select(ClientUser).where(ClientUser.user_id == user.id)
163
+ )
164
+ clients = result.scalars().all()
165
+
166
+ assert len(clients) == 2
167
+
168
+
169
+ # ============================================================================
170
+ # 3. AuditLog Model Tests
171
+ # ============================================================================
172
+
173
+ class TestAuditLogModel:
174
+ """Test AuditLog model."""
175
+
176
+ @pytest.mark.asyncio
177
+ async def test_create_client_audit_log(self, db_session):
178
+ """Create client-side audit log."""
179
+ from core.models import AuditLog
180
+
181
+ log = AuditLog(
182
+ log_type="client",
183
+ client_user_id="temp_123",
184
+ action="page_view",
185
+ status="success",
186
+ details={"page": "/home"}
187
+ )
188
+ db_session.add(log)
189
+ await db_session.commit()
190
+
191
+ assert log.id is not None
192
+ assert log.log_type == "client"
193
+
194
+ @pytest.mark.asyncio
195
+ async def test_create_server_audit_log(self, db_session):
196
+ """Create server-side audit log."""
197
+ from core.models import User, AuditLog
198
+
199
+ user = User(user_id="usr_audit", email="audit@example.com")
200
+ db_session.add(user)
201
+ await db_session.commit()
202
+
203
+ log = AuditLog(
204
+ log_type="server",
205
+ user_id=user.id,
206
+ action="credit_deduction",
207
+ status="success",
208
+ details={"amount": 5}
209
+ )
210
+ db_session.add(log)
211
+ await db_session.commit()
212
+
213
+ assert log.user_id == user.id
214
+
215
+
216
+ # ============================================================================
217
+ # 4. GeminiJob Model Tests
218
+ # ============================================================================
219
+
220
+ class TestGeminiJobModel:
221
+ """Test GeminiJob model."""
222
+
223
+ @pytest.mark.asyncio
224
+ async def test_create_job(self, db_session):
225
+ """Create a Gemini job."""
226
+ from core.models import User, GeminiJob
227
+
228
+ user = User(user_id="usr_job", email="job@example.com")
229
+ db_session.add(user)
230
+ await db_session.commit()
231
+
232
+ job = GeminiJob(
233
+ job_id="job_001",
234
+ user_id=user.id,
235
+ job_type="video",
236
+ status="queued",
237
+ priority="fast",
238
+ credits_reserved=10
239
+ )
240
+ db_session.add(job)
241
+ await db_session.commit()
242
+
243
+ assert job.id is not None
244
+ assert job.status == "queued"
245
+ assert job.credits_reserved == 10
246
+
247
+ @pytest.mark.asyncio
248
+ async def test_job_status_transitions(self, db_session):
249
+ """Test job status lifecycle."""
250
+ from core.models import User, GeminiJob
251
+
252
+ user = User(user_id="usr_status", email="status@example.com")
253
+ db_session.add(user)
254
+ await db_session.commit()
255
+
256
+ job = GeminiJob(
257
+ job_id="job_lifecycle",
258
+ user_id=user.id,
259
+ job_type="video",
260
+ status="queued"
261
+ )
262
+ db_session.add(job)
263
+ await db_session.commit()
264
+
265
+ # Transition to processing
266
+ job.status = "processing"
267
+ job.started_at = datetime.utcnow()
268
+ await db_session.commit()
269
+
270
+ # Transition to completed
271
+ job.status = "completed"
272
+ job.completed_at = datetime.utcnow()
273
+ await db_session.commit()
274
+
275
+ assert job.status == "completed"
276
+ assert job.started_at is not None
277
+ assert job.completed_at is not None
278
+
279
+ @pytest.mark.asyncio
280
+ async def test_job_priority_system(self, db_session):
281
+ """Test job priority tiers."""
282
+ from core.models import User, GeminiJob
283
+
284
+ user = User(user_id="usr_priority", email="priority@example.com")
285
+ db_session.add(user)
286
+ await db_session.commit()
287
+
288
+ fast_job = GeminiJob(job_id="job_fast", user_id=user.id, job_type="video", priority="fast")
289
+ medium_job = GeminiJob(job_id="job_medium", user_id=user.id, job_type="video", priority="medium")
290
+ slow_job = GeminiJob(job_id="job_slow", user_id=user.id, job_type="video", priority="slow")
291
+
292
+ db_session.add_all([fast_job, medium_job, slow_job])
293
+ await db_session.commit()
294
+
295
+ # Query by priority
296
+ result = await db_session.execute(
297
+ select(GeminiJob).where(GeminiJob.priority == "fast")
298
+ )
299
+ jobs = result.scalars().all()
300
+
301
+ assert len(jobs) == 1
302
+ assert jobs[0].job_id == "job_fast"
303
+
304
+
305
+ # ============================================================================
306
+ # 5. PaymentTransaction Model Tests
307
+ # ============================================================================
308
+
309
+ class TestPaymentTransactionModel:
310
+ """Test PaymentTransaction model."""
311
+
312
+ @pytest.mark.asyncio
313
+ async def test_create_payment(self, db_session):
314
+ """Create payment transaction."""
315
+ from core.models import User, PaymentTransaction
316
+
317
+ user = User(user_id="usr_pay", email="pay@example.com")
318
+ db_session.add(user)
319
+ await db_session.commit()
320
+
321
+ payment = PaymentTransaction(
322
+ transaction_id="txn_001",
323
+ user_id=user.id,
324
+ gateway="razorpay",
325
+ package_id="starter",
326
+ credits_amount=100,
327
+ amount_paise=9900,
328
+ status="created"
329
+ )
330
+ db_session.add(payment)
331
+ await db_session.commit()
332
+
333
+ assert payment.id is not None
334
+ assert payment.amount_paise == 9900
335
+
336
+ @pytest.mark.asyncio
337
+ async def test_payment_status_transitions(self, db_session):
338
+ """Test payment status changes."""
339
+ from core.models import User, PaymentTransaction
340
+
341
+ user = User(user_id="usr_paystat", email="paystat@example.com")
342
+ db_session.add(user)
343
+ await db_session.commit()
344
+
345
+ payment = PaymentTransaction(
346
+ transaction_id="txn_002",
347
+ user_id=user.id,
348
+ gateway="razorpay",
349
+ package_id="pro",
350
+ credits_amount=1000,
351
+ amount_paise=49900,
352
+ status="created"
353
+ )
354
+ db_session.add(payment)
355
+ await db_session.commit()
356
+
357
+ # Payment completed
358
+ payment.status = "paid"
359
+ payment.paid_at = datetime.utcnow()
360
+ payment.gateway_payment_id = "pay_abc123"
361
+ await db_session.commit()
362
+
363
+ assert payment.status == "paid"
364
+ assert payment.paid_at is not None
365
+
366
+
367
+ # ============================================================================
368
+ # 6. Contact Model Tests
369
+ # ============================================================================
370
+
371
+ class TestContactModel:
372
+ """Test Contact model."""
373
+
374
+ @pytest.mark.asyncio
375
+ async def test_create_contact(self, db_session):
376
+ """Create contact form submission."""
377
+ from core.models import User, Contact
378
+
379
+ user = User(user_id="usr_contact", email="contact@example.com")
380
+ db_session.add(user)
381
+ await db_session.commit()
382
+
383
+ contact = Contact(
384
+ user_id=user.id,
385
+ email=user.email,
386
+ subject="Help with credits",
387
+ message="I need assistance with my credit balance.",
388
+ ip_address="192.168.1.100"
389
+ )
390
+ db_session.add(contact)
391
+ await db_session.commit()
392
+
393
+ assert contact.id is not None
394
+ assert contact.subject == "Help with credits"
395
+
396
+
397
+ # ============================================================================
398
+ # 7. RateLimit Model Tests
399
+ # ============================================================================
400
+
401
+ class TestRateLimitModel:
402
+ """Test RateLimit model."""
403
+
404
+ @pytest.mark.asyncio
405
+ async def test_create_rate_limit(self, db_session):
406
+ """Create rate limit entry."""
407
+ from core.models import RateLimit
408
+
409
+ now = datetime.utcnow()
410
+ rate_limit = RateLimit(
411
+ identifier="192.168.1.1",
412
+ endpoint="/auth/google",
413
+ attempts=1,
414
+ window_start=now,
415
+ expires_at=now + timedelta(minutes=15)
416
+ )
417
+ db_session.add(rate_limit)
418
+ await db_session.commit()
419
+
420
+ assert rate_limit.id is not None
421
+ assert rate_limit.attempts == 1
422
+
423
+ @pytest.mark.asyncio
424
+ async def test_rate_limit_increment(self, db_session):
425
+ """Increment rate limit attempts."""
426
+ from core.models import RateLimit
427
+
428
+ now = datetime.utcnow()
429
+ rate_limit = RateLimit(
430
+ identifier="10.0.0.1",
431
+ endpoint="/auth/refresh",
432
+ attempts=1,
433
+ window_start=now,
434
+ expires_at=now + timedelta(minutes=15)
435
+ )
436
+ db_session.add(rate_limit)
437
+ await db_session.commit()
438
+
439
+ # Increment attempts
440
+ rate_limit.attempts += 1
441
+ await db_session.commit()
442
+
443
+ assert rate_limit.attempts == 2
444
+
445
+
446
+ # ============================================================================
447
+ # 8. ApiKeyUsage Model Tests
448
+ # ============================================================================
449
+
450
+ class TestApiKeyUsageModel:
451
+ """Test ApiKeyUsage model."""
452
+
453
+ @pytest.mark.asyncio
454
+ async def test_create_api_key_usage(self, db_session):
455
+ """Create API key usage tracking."""
456
+ from core.models import ApiKeyUsage
457
+
458
+ usage = ApiKeyUsage(
459
+ key_index=0,
460
+ total_requests=0,
461
+ success_count=0,
462
+ failure_count=0
463
+ )
464
+ db_session.add(usage)
465
+ await db_session.commit()
466
+
467
+ assert usage.id is not None
468
+ assert usage.key_index == 0
469
+
470
+ @pytest.mark.asyncio
471
+ async def test_api_key_usage_tracking(self, db_session):
472
+ """Track API key usage stats."""
473
+ from core.models import ApiKeyUsage
474
+
475
+ usage = ApiKeyUsage(key_index=1)
476
+ db_session.add(usage)
477
+ await db_session.commit()
478
+
479
+ # Simulate successful request
480
+ usage.total_requests += 1
481
+ usage.success_count += 1
482
+ usage.last_used_at = datetime.utcnow()
483
+ await db_session.commit()
484
+
485
+ assert usage.total_requests == 1
486
+ assert usage.success_count == 1
487
+
488
+ # Simulate failed request
489
+ usage.total_requests += 1
490
+ usage.failure_count += 1
491
+ usage.last_error = "Quota exceeded"
492
+ await db_session.commit()
493
+
494
+ assert usage.total_requests == 2
495
+ assert usage.failure_count == 1
496
+
497
+
498
+ # ============================================================================
499
+ # Relationship Tests
500
+ # ============================================================================
501
+
502
+ class TestModelRelationships:
503
+ """Test relationships between models."""
504
+
505
+ @pytest.mark.asyncio
506
+ async def test_user_jobs_relationship(self, db_session):
507
+ """User can have multiple jobs."""
508
+ from core.models import User, GeminiJob
509
+
510
+ user = User(user_id="usr_jobs", email="jobs@example.com")
511
+ db_session.add(user)
512
+ await db_session.commit()
513
+
514
+ job1 = GeminiJob(job_id="job_fast", user_id=user.id, job_type="video")
515
+ job2 = GeminiJob(job_id="job_medium", user_id=user.id, job_type="image")
516
+
517
+ db_session.add_all([job1, job2])
518
+ await db_session.commit()
519
+
520
+ # Query user's jobs
521
+ result = await db_session.execute(
522
+ select(GeminiJob).where(GeminiJob.user_id == user.id)
523
+ )
524
+ jobs = result.scalars().all()
525
+
526
+ assert len(jobs) == 2
527
+
528
+ @pytest.mark.asyncio
529
+ async def test_user_payments_relationship(self, db_session):
530
+ """User can have multiple payments."""
531
+ from core.models import User, PaymentTransaction
532
+
533
+ user = User(user_id="usr_payments", email="payments@example.com")
534
+ db_session.add(user)
535
+ await db_session.commit()
536
+
537
+ payment1 = PaymentTransaction(
538
+ transaction_id="txn_1",
539
+ user_id=user.id,
540
+ gateway="razorpay",
541
+ package_id="starter",
542
+ credits_amount=100,
543
+ amount_paise=9900
544
+ )
545
+ payment2 = PaymentTransaction(
546
+ transaction_id="txn_2",
547
+ user_id=user.id,
548
+ gateway="razorpay",
549
+ package_id="pro",
550
+ credits_amount=1000,
551
+ amount_paise=49900
552
+ )
553
+
554
+ db_session.add_all([payment1, payment2])
555
+ await db_session.commit()
556
+
557
+ # Query user's payments
558
+ result = await db_session.execute(
559
+ select(PaymentTransaction).where(PaymentTransaction.user_id == user.id)
560
+ )
561
+ payments = result.scalars().all()
562
+
563
+ assert len(payments) == 2
564
+
565
+
566
+ if __name__ == "__main__":
567
+ pytest.main([__file__, "-v"])