apigateway / tests /test_job_lifecycle.py
jebin2's picture
job del
74b89f0
import asyncio
import os
import sys
from datetime import datetime
# Add project root to path
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker
from core.models import User, GeminiJob
from core.database import DATABASE_URL
async def test_lifecycle():
engine = create_async_engine(DATABASE_URL)
async_session = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
async with async_session() as session:
# 1. Setup User
user_id = "test_user_lifecycle"
user = User(user_id=user_id, email="test_lifecycle@example.com", credits=100)
session.add(user)
await session.commit()
print(f"Created user with {user.credits} credits")
# 2. Simulate Video Job Creation (Cost 10)
# We simulate the API logic manually since we can't easily call the API here without a running server
# But we can verify the logic we implemented in the router
# Deduct 10 credits
user.credits -= 10
await session.commit()
print(f"Deducted 10 credits. Remaining: {user.credits}")
assert user.credits == 90
# Create Job
job = GeminiJob(
job_id="job_test_video",
user_id=user_id,
job_type="video",
status="queued",
credits_reserved=10
)
session.add(job)
await session.commit()
print("Created queued video job")
# 3. Simulate Delete Queued Job (Refund 8)
# Logic from router:
if job.status == "queued" and job.credits_reserved >= 10:
refund = 8
user.credits += refund
print(f"Refunded {refund} credits")
await session.delete(job)
await session.commit()
print(f"Deleted job. User credits: {user.credits}")
assert user.credits == 98 # 90 + 8
# 4. Simulate Processing Job Deletion (No Refund)
# Deduct 10 again
user.credits -= 10
job2 = GeminiJob(
job_id="job_test_processing",
user_id=user_id,
job_type="video",
status="processing",
credits_reserved=10
)
session.add(job2)
await session.commit()
print(f"Created processing job. User credits: {user.credits}") # 88
# Delete
if job2.status == "queued" and job2.credits_reserved >= 10:
refund = 8
user.credits += refund
await session.delete(job2)
await session.commit()
print(f"Deleted processing job. User credits: {user.credits}")
assert user.credits == 88 # No change
# Cleanup
await session.delete(user)
await session.commit()
print("Test cleanup complete")
if __name__ == "__main__":
asyncio.run(test_lifecycle())