File size: 2,790 Bytes
395651c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
import os
import httpx
import time
import pytest
import logging

# Configuration from environment
BASE_URL = os.getenv("TEST_BASE_URL", "http://localhost:8000")
USER_ID = os.getenv("TEST_USER_ID")
SESSION_ID = os.getenv("TEST_SESSION_ID")

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

@pytest.mark.smoke
@pytest.mark.real_api
@pytest.mark.asyncio
async def test_api_e2e_flow():
    if not USER_ID or not SESSION_ID:
        pytest.fail("TEST_USER_ID and TEST_SESSION_ID must be set")

    auth_headers = {"Authorization": f"Test {USER_ID}"}
    
    async with httpx.AsyncClient(base_url=BASE_URL, timeout=30.0) as client:
        # 1. Health check
        print("\n[1/3] Checking API Health...")
        res = await client.get("/")
        assert res.status_code == 200
        assert "running" in res.json()["message"].lower()
        print("   ✅ Health check passed")

        # 2. Submit Solve Request
        print(f"\n[2/3] Submitting solve request for session {SESSION_ID}...")
        payload = {
            "text": "Cho hình chữ nhật ABCD có AB=5, AD=10. Tính diện tích.",
            "request_video": False
        }
        res = await client.post(f"/api/v1/sessions/{SESSION_ID}/solve", json=payload, headers=auth_headers)
        
        if res.status_code != 200:
            print(f"   ❌ FAILED: {res.text}")
            assert res.status_code == 200
            
        data = res.json()
        job_id = data["job_id"]
        assert job_id is not None
        print(f"   ✅ Request accepted. Job ID: {job_id}")

        # 3. Polling Job Status
        print("\n[3/3] Polling job status...")
        max_attempts = 15
        for i in range(max_attempts):
            time.sleep(2) # Simple sleep between polls
            res = await client.get(f"/api/v1/solve/{job_id}", headers=auth_headers)
            assert res.status_code == 200
            job_data = res.json()
            status = job_data["status"]
            print(f"   Attempt {i+1}: Status = {status}")
            
            if status == "success":
                print("   ✅ SUCCESS: API pipeline completed successfully.")
                result = job_data.get("result", {})
                assert "coordinates" in result
                assert "geometry_dsl" in result
                return
            
            if status == "error":
                error_msg = job_data.get("result", {}).get("error", "Unknown error")
                pytest.fail(f"Job failed with error: {error_msg}")
            
            if i == max_attempts - 1:
                pytest.fail("Timeout waiting for job completion")

if __name__ == "__main__":
    # This allows running the script directly if needed
    import asyncio
    asyncio.run(test_api_e2e_flow())