Spaces:
Build error
Build error
File size: 8,257 Bytes
3e6248e |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 |
"""
E2E Test Configuration
Fixtures for running real server integration tests with authentication.
"""
import os
import pytest
import httpx
import subprocess
import time
import socket
import sqlite3
import uuid
from contextlib import closing
# E2E test configuration
E2E_TEST_PORT = 8001
E2E_TEST_HOST = "127.0.0.1"
E2E_BASE_URL = f"http://{E2E_TEST_HOST}:{E2E_TEST_PORT}"
E2E_DB_FILE = "apigateway_production.db" # Server uses this in development mode
# Use a 32+ char secret to pass library validation
JWT_SECRET = "e2e-test-jwt-secret-key-32-chars-minimum"
def find_free_port():
"""Find an available port."""
with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as s:
s.bind(('', 0))
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
return s.getsockname()[1]
def wait_for_server(url: str, timeout: int = 30) -> bool:
"""Wait for server to be ready."""
start = time.time()
while time.time() - start < timeout:
try:
response = httpx.get(f"{url}/health", timeout=2)
if response.status_code == 200:
return True
except httpx.RequestError:
pass
time.sleep(0.5)
return False
@pytest.fixture(scope="session")
def e2e_env():
"""Set up E2E test environment variables."""
original_env = os.environ.copy()
# Test environment configuration
os.environ["CORS_ORIGINS"] = "http://localhost:3000"
os.environ["JWT_SECRET"] = JWT_SECRET
os.environ["AUTH_SIGN_IN_GOOGLE_CLIENT_ID"] = "test-client-id"
os.environ["ENVIRONMENT"] = "development"
os.environ["RESET_DB"] = "false" # Handle manually to avoid race conditions
os.environ["SKIP_SERVICE_REGISTRATION_CHECK"] = "true"
# Explicitly set DB URL to absolute path to avoid any ambiguity
db_path = "/home/jebin/git/apigateway/apigateway_production.db"
os.environ["DATABASE_URL"] = f"sqlite+aiosqlite:///{db_path}"
yield
# Restore original environment
os.environ.clear()
os.environ.update(original_env)
@pytest.fixture(scope="session")
def live_server(e2e_env):
"""Start a real uvicorn server for E2E tests."""
# Handle DB clean up manually BEFORE server starts
db_path = "/home/jebin/git/apigateway/apigateway_production.db"
# Force delete existing DB
if os.path.exists(db_path):
os.remove(db_path)
# We need to initialize the DB structure since we just deleted it
# We can use python -c to run the init_db code from app context
# This ensures tables exists BEFORE we start the server and try to insert users
# We'll use a simple script to create tables
init_script = """
import asyncio
from core.database import init_db
async def main():
await init_db()
if __name__ == "__main__":
asyncio.run(main())
"""
subprocess.run(
["python", "-c", init_script],
cwd="/home/jebin/git/apigateway",
env=os.environ.copy(),
check=True
)
port = find_free_port()
base_url = f"http://127.0.0.1:{port}"
# Start uvicorn in subprocess
process = subprocess.Popen(
[
"python", "-m", "uvicorn",
"app:app",
"--host", "127.0.0.1",
"--port", str(port),
"--log-level", "warning"
],
cwd="/home/jebin/git/apigateway",
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
env=os.environ.copy()
)
# Wait for server to be ready
if not wait_for_server(base_url):
process.terminate()
stdout, stderr = process.communicate(timeout=5)
raise RuntimeError(f"Server failed to start. stderr: {stderr.decode()}")
yield base_url
# Cleanup
process.terminate()
try:
process.wait(timeout=5)
except subprocess.TimeoutExpired:
process.kill()
@pytest.fixture
def api_client(live_server):
"""HTTP client for making real API requests."""
with httpx.Client(base_url=live_server, timeout=30.0) as client:
yield client
@pytest.fixture
def test_user_data():
"""Generate unique test user data."""
unique_id = str(uuid.uuid4())[:8]
return {
"user_id": f"e2e_user_{unique_id}",
"email": f"e2e_test_{unique_id}@example.com",
"google_id": f"google_{unique_id}",
"name": f"E2E Test User {unique_id}",
"credits": 100,
"token_version": 1
}
@pytest.fixture
def create_test_user(live_server, test_user_data):
"""
Create a test user directly in the database.
Returns user data and access token.
"""
# Connect to the database the server is using
db_path = "/home/jebin/git/apigateway/apigateway_production.db"
# Wait for DB and tables to be ready (server creates them on startup)
max_retries = 20
for attempt in range(max_retries):
try:
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
# Check if users table exists
cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='users'")
if cursor.fetchone() is None:
conn.close()
time.sleep(0.5)
continue
# Insert test user using the app's own codebase to ensure compatibility
insert_script = f"""
import asyncio
import os
from sqlalchemy import select
from core.database import async_session_maker
from core.models import User
import datetime
async def create_user():
async with async_session_maker() as db:
# Check if user exists
stmt = select(User).where(User.user_id == '{test_user_data["user_id"]}')
result = await db.execute(stmt)
if result.scalar_one_or_none():
return
user = User(
user_id='{test_user_data["user_id"]}',
email='{test_user_data["email"]}',
google_id='{test_user_data["google_id"]}',
name='{test_user_data["name"]}',
credits={test_user_data["credits"]},
token_version={test_user_data["token_version"]},
is_active=True,
created_at=datetime.datetime.utcnow(),
updated_at=datetime.datetime.utcnow()
)
db.add(user)
await db.commit()
if __name__ == "__main__":
asyncio.run(create_user())
"""
# Run instructions in subprocess
subprocess.run(
["python", "-c", insert_script],
cwd="/home/jebin/git/apigateway",
env=os.environ.copy(),
check=True,
capture_output=True
)
# Verify via sqlite3 just in case
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
cursor.execute("SELECT user_id FROM users WHERE user_id=?", (test_user_data["user_id"],))
if cursor.fetchone():
conn.close()
break
conn.close()
except (sqlite3.OperationalError, subprocess.CalledProcessError) as e:
if attempt < max_retries - 1:
time.sleep(0.5)
else:
raise RuntimeError(f"Failed to create test user after {max_retries} attempts: {e}")
# Generate a valid JWT token using the library with SAME secret as server
from google_auth_service import JWTService
jwt_service = JWTService(secret_key=JWT_SECRET)
access_token = jwt_service.create_access_token(
user_id=test_user_data["user_id"],
email=test_user_data["email"],
token_version=test_user_data["token_version"]
)
return {
**test_user_data,
"access_token": access_token
}
@pytest.fixture
def authenticated_client(api_client, create_test_user):
"""
HTTP client with valid authentication token.
Uses a real user created in the database.
"""
token = create_test_user["access_token"]
api_client.headers["Authorization"] = f"Bearer {token}"
yield api_client, create_test_user
# Cleanup - remove auth header
if "Authorization" in api_client.headers:
del api_client.headers["Authorization"]
|