jebin2 commited on
Commit
c8b0c66
·
1 Parent(s): ce64132

worker test

Browse files
Files changed (1) hide show
  1. tests/test_worker_pool.py +599 -0
tests/test_worker_pool.py CHANGED
@@ -1364,5 +1364,604 @@ class TestErrorPatternCoverage:
1364
  assert result in [True, False]
1365
 
1366
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1367
  if __name__ == "__main__":
1368
  pytest.main([__file__, "-v"])
 
 
1364
  assert result in [True, False]
1365
 
1366
 
1367
+ # =============================================================================
1368
+ # 15. INTEGRATION TESTS - Real Database
1369
+ # =============================================================================
1370
+
1371
+ import os
1372
+ import tempfile
1373
+ from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker, AsyncSession
1374
+ from sqlalchemy import Column, Integer, String, DateTime, JSON, Text, Boolean, select, update
1375
+ from sqlalchemy.orm import declarative_base
1376
+
1377
+ # Create a test-specific Base for integration tests
1378
+ IntegrationBase = declarative_base()
1379
+
1380
+
1381
+ class TestJobModel(IntegrationBase):
1382
+ """Real job model for integration tests."""
1383
+ __tablename__ = "test_jobs"
1384
+
1385
+ id = Column(Integer, primary_key=True, autoincrement=True)
1386
+ job_id = Column(String(100), unique=True, index=True, nullable=False)
1387
+ user_id = Column(String(50), index=True, nullable=False)
1388
+ job_type = Column(String(20), index=True, nullable=False)
1389
+ status = Column(String(20), default="queued", index=True)
1390
+ priority = Column(String(10), default="fast", index=True)
1391
+ next_process_at = Column(DateTime, nullable=True, index=True)
1392
+ retry_count = Column(Integer, default=0)
1393
+ third_party_id = Column(String(255), nullable=True)
1394
+ input_data = Column(JSON, nullable=True)
1395
+ output_data = Column(JSON, nullable=True)
1396
+ error_message = Column(Text, nullable=True)
1397
+ created_at = Column(DateTime, nullable=False)
1398
+ started_at = Column(DateTime, nullable=True)
1399
+ completed_at = Column(DateTime, nullable=True)
1400
+ credits_reserved = Column(Integer, default=0)
1401
+ credits_refunded = Column(Boolean, default=False)
1402
+
1403
+
1404
+ class TestUserModel(IntegrationBase):
1405
+ """Real user model for integration tests."""
1406
+ __tablename__ = "test_users"
1407
+
1408
+ id = Column(Integer, primary_key=True, autoincrement=True)
1409
+ user_id = Column(String(50), unique=True, index=True, nullable=False)
1410
+ email = Column(String(255), unique=True, nullable=False)
1411
+ credits = Column(Integer, default=100)
1412
+
1413
+
1414
+ @pytest.fixture
1415
+ async def integration_db():
1416
+ """Create a real SQLite database for integration tests."""
1417
+ # Use a temp file for the test database
1418
+ fd, db_path = tempfile.mkstemp(suffix=".db")
1419
+ os.close(fd)
1420
+
1421
+ db_url = f"sqlite+aiosqlite:///{db_path}"
1422
+ engine = create_async_engine(db_url, echo=False)
1423
+
1424
+ # Create tables
1425
+ async with engine.begin() as conn:
1426
+ await conn.run_sync(IntegrationBase.metadata.create_all)
1427
+
1428
+ session_maker = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
1429
+
1430
+ yield {
1431
+ "engine": engine,
1432
+ "session_maker": session_maker,
1433
+ "db_url": db_url,
1434
+ "db_path": db_path
1435
+ }
1436
+
1437
+ # Cleanup
1438
+ await engine.dispose()
1439
+ if os.path.exists(db_path):
1440
+ os.remove(db_path)
1441
+
1442
+
1443
+ class TestIntegrationRealDatabase:
1444
+ """Integration tests using real SQLite database."""
1445
+
1446
+ @pytest.mark.asyncio
1447
+ async def test_create_and_query_job(self, integration_db):
1448
+ """Test creating and querying a job in real database."""
1449
+ session_maker = integration_db["session_maker"]
1450
+
1451
+ async with session_maker() as session:
1452
+ # Create a job
1453
+ job = TestJobModel(
1454
+ job_id="int-test-1",
1455
+ user_id="user-123",
1456
+ job_type="text",
1457
+ status="queued",
1458
+ priority="fast",
1459
+ created_at=datetime.utcnow()
1460
+ )
1461
+ session.add(job)
1462
+ await session.commit()
1463
+
1464
+ # Query it back
1465
+ result = await session.execute(
1466
+ select(TestJobModel).where(TestJobModel.job_id == "int-test-1")
1467
+ )
1468
+ fetched = result.scalar_one_or_none()
1469
+
1470
+ assert fetched is not None
1471
+ assert fetched.job_id == "int-test-1"
1472
+ assert fetched.status == "queued"
1473
+
1474
+ @pytest.mark.asyncio
1475
+ async def test_atomic_update_with_where_clause(self, integration_db):
1476
+ """Test atomic UPDATE with WHERE clause (real DB concurrency test)."""
1477
+ session_maker = integration_db["session_maker"]
1478
+
1479
+ # Create a job
1480
+ async with session_maker() as session:
1481
+ job = TestJobModel(
1482
+ job_id="atomic-test-1",
1483
+ user_id="user-123",
1484
+ job_type="text",
1485
+ status="queued",
1486
+ priority="fast",
1487
+ created_at=datetime.utcnow()
1488
+ )
1489
+ session.add(job)
1490
+ await session.commit()
1491
+
1492
+ # Simulate Worker 1 claiming the job
1493
+ async with session_maker() as session1:
1494
+ stmt = (
1495
+ update(TestJobModel)
1496
+ .where(
1497
+ TestJobModel.job_id == "atomic-test-1",
1498
+ TestJobModel.status == "queued"
1499
+ )
1500
+ .values(status="processing", started_at=datetime.utcnow())
1501
+ )
1502
+ result1 = await session1.execute(stmt)
1503
+ await session1.commit()
1504
+
1505
+ # Worker 1 should succeed
1506
+ assert result1.rowcount == 1
1507
+
1508
+ # Simulate Worker 2 trying to claim the same job
1509
+ async with session_maker() as session2:
1510
+ stmt = (
1511
+ update(TestJobModel)
1512
+ .where(
1513
+ TestJobModel.job_id == "atomic-test-1",
1514
+ TestJobModel.status == "queued" # This won't match!
1515
+ )
1516
+ .values(status="processing", started_at=datetime.utcnow())
1517
+ )
1518
+ result2 = await session2.execute(stmt)
1519
+ await session2.commit()
1520
+
1521
+ # Worker 2 should fail (rowcount = 0)
1522
+ assert result2.rowcount == 0, "Second worker should not be able to claim!"
1523
+
1524
+ @pytest.mark.asyncio
1525
+ async def test_job_ordering_by_created_at(self, integration_db):
1526
+ """Test jobs are queried in FIFO order by created_at."""
1527
+ session_maker = integration_db["session_maker"]
1528
+
1529
+ # Create jobs in non-chronological order
1530
+ async with session_maker() as session:
1531
+ job3 = TestJobModel(
1532
+ job_id="order-3", user_id="user", job_type="text",
1533
+ status="queued", priority="fast",
1534
+ created_at=datetime(2024, 1, 1, 12, 0, 0) # Latest
1535
+ )
1536
+ job1 = TestJobModel(
1537
+ job_id="order-1", user_id="user", job_type="text",
1538
+ status="queued", priority="fast",
1539
+ created_at=datetime(2024, 1, 1, 10, 0, 0) # Oldest
1540
+ )
1541
+ job2 = TestJobModel(
1542
+ job_id="order-2", user_id="user", job_type="text",
1543
+ status="queued", priority="fast",
1544
+ created_at=datetime(2024, 1, 1, 11, 0, 0) # Middle
1545
+ )
1546
+ session.add_all([job3, job1, job2])
1547
+ await session.commit()
1548
+
1549
+ # Query with ORDER BY created_at
1550
+ async with session_maker() as session:
1551
+ result = await session.execute(
1552
+ select(TestJobModel)
1553
+ .where(TestJobModel.status == "queued")
1554
+ .order_by(TestJobModel.created_at)
1555
+ .limit(3)
1556
+ )
1557
+ jobs = result.scalars().all()
1558
+
1559
+ assert len(jobs) == 3
1560
+ assert jobs[0].job_id == "order-1", "Oldest job should be first"
1561
+ assert jobs[1].job_id == "order-2"
1562
+ assert jobs[2].job_id == "order-3"
1563
+
1564
+ @pytest.mark.asyncio
1565
+ async def test_priority_filtering(self, integration_db):
1566
+ """Test that priority filter works correctly in real DB."""
1567
+ session_maker = integration_db["session_maker"]
1568
+
1569
+ # Create jobs with different priorities
1570
+ async with session_maker() as session:
1571
+ fast_job = TestJobModel(
1572
+ job_id="prio-fast", user_id="user", job_type="text",
1573
+ status="queued", priority="fast",
1574
+ created_at=datetime.utcnow()
1575
+ )
1576
+ medium_job = TestJobModel(
1577
+ job_id="prio-medium", user_id="user", job_type="image",
1578
+ status="queued", priority="medium",
1579
+ created_at=datetime.utcnow()
1580
+ )
1581
+ slow_job = TestJobModel(
1582
+ job_id="prio-slow", user_id="user", job_type="video",
1583
+ status="queued", priority="slow",
1584
+ created_at=datetime.utcnow()
1585
+ )
1586
+ session.add_all([fast_job, medium_job, slow_job])
1587
+ await session.commit()
1588
+
1589
+ # Query only fast priority
1590
+ async with session_maker() as session:
1591
+ result = await session.execute(
1592
+ select(TestJobModel).where(TestJobModel.priority == "fast")
1593
+ )
1594
+ fast_jobs = result.scalars().all()
1595
+
1596
+ assert len(fast_jobs) == 1
1597
+ assert fast_jobs[0].job_id == "prio-fast"
1598
+
1599
+
1600
+ class TestIntegrationConcurrency:
1601
+ """Test concurrent access patterns."""
1602
+
1603
+ @pytest.mark.asyncio
1604
+ async def test_multiple_workers_different_jobs(self, integration_db):
1605
+ """Multiple workers should process different jobs concurrently."""
1606
+ session_maker = integration_db["session_maker"]
1607
+
1608
+ # Create multiple jobs
1609
+ async with session_maker() as session:
1610
+ for i in range(5):
1611
+ job = TestJobModel(
1612
+ job_id=f"concurrent-{i}",
1613
+ user_id="user",
1614
+ job_type="text",
1615
+ status="queued",
1616
+ priority="fast",
1617
+ created_at=datetime.utcnow() + timedelta(seconds=i)
1618
+ )
1619
+ session.add(job)
1620
+ await session.commit()
1621
+
1622
+ # Simulate 3 workers claiming jobs concurrently
1623
+ claimed_jobs = []
1624
+
1625
+ async def worker_claim(worker_id):
1626
+ async with session_maker() as session:
1627
+ # SELECT the oldest unclaimed job
1628
+ result = await session.execute(
1629
+ select(TestJobModel)
1630
+ .where(TestJobModel.status == "queued")
1631
+ .order_by(TestJobModel.created_at)
1632
+ .limit(1)
1633
+ )
1634
+ job = result.scalar_one_or_none()
1635
+
1636
+ if job:
1637
+ # Try to claim it atomically
1638
+ stmt = (
1639
+ update(TestJobModel)
1640
+ .where(
1641
+ TestJobModel.job_id == job.job_id,
1642
+ TestJobModel.status == "queued"
1643
+ )
1644
+ .values(status="processing")
1645
+ )
1646
+ claim_result = await session.execute(stmt)
1647
+ await session.commit()
1648
+
1649
+ if claim_result.rowcount == 1:
1650
+ claimed_jobs.append((worker_id, job.job_id))
1651
+
1652
+ # Run workers concurrently
1653
+ await asyncio.gather(
1654
+ worker_claim(0),
1655
+ worker_claim(1),
1656
+ worker_claim(2)
1657
+ )
1658
+
1659
+ # Each worker should have claimed a different job (or failed gracefully)
1660
+ claimed_job_ids = [job_id for _, job_id in claimed_jobs]
1661
+ # No duplicates
1662
+ assert len(claimed_job_ids) == len(set(claimed_job_ids)), "Same job claimed by multiple workers!"
1663
+
1664
+ @pytest.mark.asyncio
1665
+ async def test_next_process_at_prevents_duplicate_status_checks(self, integration_db):
1666
+ """next_process_at should prevent multiple workers from checking same job."""
1667
+ session_maker = integration_db["session_maker"]
1668
+
1669
+ # Create a processing job with future next_process_at
1670
+ async with session_maker() as session:
1671
+ job = TestJobModel(
1672
+ job_id="processing-job",
1673
+ user_id="user",
1674
+ job_type="video",
1675
+ status="processing",
1676
+ priority="slow",
1677
+ next_process_at=datetime.utcnow() + timedelta(minutes=5), # Future
1678
+ created_at=datetime.utcnow()
1679
+ )
1680
+ session.add(job)
1681
+ await session.commit()
1682
+
1683
+ # Query should NOT return this job (next_process_at is in future)
1684
+ async with session_maker() as session:
1685
+ now = datetime.utcnow()
1686
+ result = await session.execute(
1687
+ select(TestJobModel).where(
1688
+ TestJobModel.status == "processing",
1689
+ TestJobModel.next_process_at <= now
1690
+ )
1691
+ )
1692
+ jobs = result.scalars().all()
1693
+
1694
+ assert len(jobs) == 0, "Job with future next_process_at should not be selected"
1695
+
1696
+
1697
+ class TestIntegrationEndToEnd:
1698
+ """End-to-end job lifecycle tests."""
1699
+
1700
+ @pytest.mark.asyncio
1701
+ async def test_job_lifecycle_queued_to_completed(self, integration_db):
1702
+ """Test full job lifecycle: queued → processing → completed."""
1703
+ session_maker = integration_db["session_maker"]
1704
+
1705
+ # 1. Create job
1706
+ async with session_maker() as session:
1707
+ job = TestJobModel(
1708
+ job_id="lifecycle-1",
1709
+ user_id="user-123",
1710
+ job_type="text",
1711
+ status="queued",
1712
+ priority="fast",
1713
+ input_data={"prompt": "Hello"},
1714
+ created_at=datetime.utcnow()
1715
+ )
1716
+ session.add(job)
1717
+ await session.commit()
1718
+
1719
+ # 2. Worker claims job (queued → processing)
1720
+ async with session_maker() as session:
1721
+ stmt = (
1722
+ update(TestJobModel)
1723
+ .where(
1724
+ TestJobModel.job_id == "lifecycle-1",
1725
+ TestJobModel.status == "queued"
1726
+ )
1727
+ .values(status="processing", started_at=datetime.utcnow())
1728
+ )
1729
+ result = await session.execute(stmt)
1730
+ await session.commit()
1731
+ assert result.rowcount == 1
1732
+
1733
+ # 3. Worker completes job (processing → completed)
1734
+ async with session_maker() as session:
1735
+ stmt = (
1736
+ update(TestJobModel)
1737
+ .where(TestJobModel.job_id == "lifecycle-1")
1738
+ .values(
1739
+ status="completed",
1740
+ output_data={"response": "Hello back!"},
1741
+ completed_at=datetime.utcnow()
1742
+ )
1743
+ )
1744
+ await session.execute(stmt)
1745
+ await session.commit()
1746
+
1747
+ # 4. Verify final state
1748
+ async with session_maker() as session:
1749
+ result = await session.execute(
1750
+ select(TestJobModel).where(TestJobModel.job_id == "lifecycle-1")
1751
+ )
1752
+ job = result.scalar_one()
1753
+
1754
+ assert job.status == "completed"
1755
+ assert job.output_data == {"response": "Hello back!"}
1756
+ assert job.started_at is not None
1757
+ assert job.completed_at is not None
1758
+
1759
+ @pytest.mark.asyncio
1760
+ async def test_job_lifecycle_queued_to_failed(self, integration_db):
1761
+ """Test job failure lifecycle: queued → processing → failed."""
1762
+ session_maker = integration_db["session_maker"]
1763
+
1764
+ # 1. Create job
1765
+ async with session_maker() as session:
1766
+ job = TestJobModel(
1767
+ job_id="lifecycle-fail",
1768
+ user_id="user-123",
1769
+ job_type="text",
1770
+ status="queued",
1771
+ priority="fast",
1772
+ created_at=datetime.utcnow()
1773
+ )
1774
+ session.add(job)
1775
+ await session.commit()
1776
+
1777
+ # 2. Start processing
1778
+ async with session_maker() as session:
1779
+ stmt = (
1780
+ update(TestJobModel)
1781
+ .where(TestJobModel.job_id == "lifecycle-fail")
1782
+ .values(status="processing", started_at=datetime.utcnow())
1783
+ )
1784
+ await session.execute(stmt)
1785
+ await session.commit()
1786
+
1787
+ # 3. Job fails
1788
+ async with session_maker() as session:
1789
+ stmt = (
1790
+ update(TestJobModel)
1791
+ .where(TestJobModel.job_id == "lifecycle-fail")
1792
+ .values(
1793
+ status="failed",
1794
+ error_message="Content blocked by safety filter",
1795
+ completed_at=datetime.utcnow()
1796
+ )
1797
+ )
1798
+ await session.execute(stmt)
1799
+ await session.commit()
1800
+
1801
+ # 4. Verify
1802
+ async with session_maker() as session:
1803
+ result = await session.execute(
1804
+ select(TestJobModel).where(TestJobModel.job_id == "lifecycle-fail")
1805
+ )
1806
+ job = result.scalar_one()
1807
+
1808
+ assert job.status == "failed"
1809
+ assert "safety" in job.error_message.lower()
1810
+
1811
+
1812
+ class TestIntegrationCreditSystem:
1813
+ """Integration tests for credit system with real database."""
1814
+
1815
+ @pytest.mark.asyncio
1816
+ async def test_credit_reservation_and_refund(self, integration_db):
1817
+ """Test credit reserve and refund with real database."""
1818
+ session_maker = integration_db["session_maker"]
1819
+
1820
+ # Create user with credits
1821
+ async with session_maker() as session:
1822
+ user = TestUserModel(
1823
+ user_id="credit-user",
1824
+ email="test@example.com",
1825
+ credits=100
1826
+ )
1827
+ session.add(user)
1828
+ await session.commit()
1829
+
1830
+ # Reserve credits
1831
+ async with session_maker() as session:
1832
+ result = await session.execute(
1833
+ select(TestUserModel).where(TestUserModel.user_id == "credit-user")
1834
+ )
1835
+ user = result.scalar_one()
1836
+
1837
+ # Deduct 10 credits
1838
+ user.credits -= 10
1839
+ await session.commit()
1840
+
1841
+ # Verify deduction
1842
+ async with session_maker() as session:
1843
+ result = await session.execute(
1844
+ select(TestUserModel).where(TestUserModel.user_id == "credit-user")
1845
+ )
1846
+ user = result.scalar_one()
1847
+ assert user.credits == 90
1848
+
1849
+ # Refund credits
1850
+ async with session_maker() as session:
1851
+ result = await session.execute(
1852
+ select(TestUserModel).where(TestUserModel.user_id == "credit-user")
1853
+ )
1854
+ user = result.scalar_one()
1855
+ user.credits += 10
1856
+ await session.commit()
1857
+
1858
+ # Verify refund
1859
+ async with session_maker() as session:
1860
+ result = await session.execute(
1861
+ select(TestUserModel).where(TestUserModel.user_id == "credit-user")
1862
+ )
1863
+ user = result.scalar_one()
1864
+ assert user.credits == 100
1865
+
1866
+
1867
+ class TestIntegrationOrphanedJobs:
1868
+ """Test orphaned job handling (simulating server crash)."""
1869
+
1870
+ @pytest.mark.asyncio
1871
+ async def test_find_orphaned_processing_jobs(self, integration_db):
1872
+ """Find jobs stuck in processing state (simulating crash recovery)."""
1873
+ session_maker = integration_db["session_maker"]
1874
+
1875
+ # Create jobs in various states
1876
+ async with session_maker() as session:
1877
+ # This should NOT be found (queued)
1878
+ job1 = TestJobModel(
1879
+ job_id="orphan-queued", user_id="user", job_type="text",
1880
+ status="queued", priority="fast",
1881
+ credits_reserved=5, created_at=datetime.utcnow()
1882
+ )
1883
+ # This SHOULD be found (processing with credits)
1884
+ job2 = TestJobModel(
1885
+ job_id="orphan-processing", user_id="user", job_type="video",
1886
+ status="processing", priority="slow",
1887
+ credits_reserved=10, credits_refunded=False,
1888
+ created_at=datetime.utcnow()
1889
+ )
1890
+ # This should NOT be found (already refunded)
1891
+ job3 = TestJobModel(
1892
+ job_id="orphan-refunded", user_id="user", job_type="video",
1893
+ status="processing", priority="slow",
1894
+ credits_reserved=0, credits_refunded=True,
1895
+ created_at=datetime.utcnow()
1896
+ )
1897
+ session.add_all([job1, job2, job3])
1898
+ await session.commit()
1899
+
1900
+ # Query for orphaned jobs (as refund_orphaned_jobs does)
1901
+ async with session_maker() as session:
1902
+ result = await session.execute(
1903
+ select(TestJobModel).where(
1904
+ TestJobModel.status == "processing",
1905
+ TestJobModel.credits_reserved > 0,
1906
+ TestJobModel.credits_refunded == False
1907
+ )
1908
+ )
1909
+ orphaned = result.scalars().all()
1910
+
1911
+ assert len(orphaned) == 1
1912
+ assert orphaned[0].job_id == "orphan-processing"
1913
+
1914
+ @pytest.mark.asyncio
1915
+ async def test_mark_orphaned_job_as_failed(self, integration_db):
1916
+ """Orphaned jobs should be marked as failed during recovery."""
1917
+ session_maker = integration_db["session_maker"]
1918
+
1919
+ # Create orphaned job
1920
+ async with session_maker() as session:
1921
+ job = TestJobModel(
1922
+ job_id="crash-recovery",
1923
+ user_id="user",
1924
+ job_type="video",
1925
+ status="processing",
1926
+ priority="slow",
1927
+ credits_reserved=5,
1928
+ credits_refunded=False,
1929
+ created_at=datetime.utcnow()
1930
+ )
1931
+ session.add(job)
1932
+ await session.commit()
1933
+
1934
+ # Simulate crash recovery - mark as failed
1935
+ async with session_maker() as session:
1936
+ stmt = (
1937
+ update(TestJobModel)
1938
+ .where(
1939
+ TestJobModel.job_id == "crash-recovery",
1940
+ TestJobModel.status == "processing"
1941
+ )
1942
+ .values(
1943
+ status="failed",
1944
+ error_message="Server shutdown during processing. Credits refunded.",
1945
+ credits_refunded=True,
1946
+ credits_reserved=0
1947
+ )
1948
+ )
1949
+ await session.execute(stmt)
1950
+ await session.commit()
1951
+
1952
+ # Verify
1953
+ async with session_maker() as session:
1954
+ result = await session.execute(
1955
+ select(TestJobModel).where(TestJobModel.job_id == "crash-recovery")
1956
+ )
1957
+ job = result.scalar_one()
1958
+
1959
+ assert job.status == "failed"
1960
+ assert job.credits_refunded == True
1961
+ assert job.credits_reserved == 0
1962
+ assert "shutdown" in job.error_message.lower()
1963
+
1964
+
1965
  if __name__ == "__main__":
1966
  pytest.main([__file__, "-v"])
1967
+