voiceforge-universal / backend /tests /integration /test_e2e_full_flow.py
creator-o1
Initial commit: Complete VoiceForge Enterprise Speech AI Platform
d00203b
import pytest
import pytest_asyncio
import struct
from httpx import AsyncClient, ASGITransport
from app.main import app
# --- Fixtures ---
@pytest.fixture(scope="module")
def anyio_backend():
return "asyncio"
@pytest_asyncio.fixture
async def async_client():
transport = ASGITransport(app=app)
async with AsyncClient(transport=transport, base_url="http://test") as client:
yield client
# --- Helpers ---
def create_dummy_wav(size_kb=10):
"""Create a valid dummy WAV file for testing"""
# RIFF header
header = b'RIFF' + struct.pack('<I', 36 + size_kb*1024) + b'WAVE'
# fmt chunk
header += b'fmt ' + struct.pack('<I', 16) + struct.pack('<H', 1) + struct.pack('<H', 1) + \
struct.pack('<I', 44100) + struct.pack('<I', 88200) + struct.pack('<H', 2) + struct.pack('<H', 16)
# data chunk
header += b'data' + struct.pack('<I', size_kb*1024)
# data
data = b'\x00' * (size_kb * 1024)
return header + data
# --- Tests ---
@pytest.mark.asyncio
async def test_full_user_journey(async_client):
"""
Test the complete user flow:
1. Register
2. Login
3. Generate API Key
4. Check Profile
"""
# 1. Register
email = "e2e_user@example.com"
password = "SecurePass123!"
response = await async_client.post("/api/v1/auth/register", json={
"email": email,
"password": password,
"full_name": "E2E Tester"
})
# Handle case where user already exists from previous run
if response.status_code != 200:
assert response.status_code == 400
assert "already registered" in response.text
else:
data = response.json()
assert data["email"] == email
assert "id" in data
# 2. Login
login_data = {"username": email, "password": password}
response = await async_client.post("/api/v1/auth/login", data=login_data)
assert response.status_code == 200
tokens = response.json()
assert "access_token" in tokens
token = tokens["access_token"]
headers = {"Authorization": f"Bearer {token}"}
# 3. Get Profile
response = await async_client.get("/api/v1/auth/me", headers=headers)
assert response.status_code == 200
assert response.json()["email"] == email
# 4. Create API Key
response = await async_client.post("/api/v1/auth/api-keys", json={"name": "E2E Key"}, headers=headers)
assert response.status_code == 200
api_key_data = response.json()
assert "key" in api_key_data
assert api_key_data["key"].startswith("vf_")
# 5. Verify API Key works (if we had an endpoint accepting it easily)
# For now, we just verify creation succeeded.
# 6. Check Health
response = await async_client.get("/health")
assert response.status_code == 200
assert response.json()["status"] == "ok"