Spaces:
Build error
Build error
| """ | |
| E2E Test Configuration | |
| Fixtures for running real server integration tests with authentication. | |
| """ | |
| import os | |
| import pytest | |
| import httpx | |
| import subprocess | |
| import time | |
| import socket | |
| import sqlite3 | |
| import uuid | |
| from contextlib import closing | |
| # E2E test configuration | |
| E2E_TEST_PORT = 8001 | |
| E2E_TEST_HOST = "127.0.0.1" | |
| E2E_BASE_URL = f"http://{E2E_TEST_HOST}:{E2E_TEST_PORT}" | |
| E2E_DB_FILE = "apigateway_production.db" # Server uses this in development mode | |
| # Use a 32+ char secret to pass library validation | |
| JWT_SECRET = "e2e-test-jwt-secret-key-32-chars-minimum" | |
| def find_free_port(): | |
| """Find an available port.""" | |
| with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as s: | |
| s.bind(('', 0)) | |
| s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) | |
| return s.getsockname()[1] | |
| def wait_for_server(url: str, timeout: int = 30) -> bool: | |
| """Wait for server to be ready.""" | |
| start = time.time() | |
| while time.time() - start < timeout: | |
| try: | |
| response = httpx.get(f"{url}/health", timeout=2) | |
| if response.status_code == 200: | |
| return True | |
| except httpx.RequestError: | |
| pass | |
| time.sleep(0.5) | |
| return False | |
| def e2e_env(): | |
| """Set up E2E test environment variables.""" | |
| original_env = os.environ.copy() | |
| # Test environment configuration | |
| os.environ["CORS_ORIGINS"] = "http://localhost:3000" | |
| os.environ["JWT_SECRET"] = JWT_SECRET | |
| os.environ["AUTH_SIGN_IN_GOOGLE_CLIENT_ID"] = "test-client-id" | |
| os.environ["ENVIRONMENT"] = "development" | |
| os.environ["RESET_DB"] = "false" # Handle manually to avoid race conditions | |
| os.environ["SKIP_SERVICE_REGISTRATION_CHECK"] = "true" | |
| # Explicitly set DB URL to absolute path to avoid any ambiguity | |
| db_path = "/home/jebin/git/apigateway/apigateway_production.db" | |
| os.environ["DATABASE_URL"] = f"sqlite+aiosqlite:///{db_path}" | |
| yield | |
| # Restore original environment | |
| os.environ.clear() | |
| os.environ.update(original_env) | |
| def live_server(e2e_env): | |
| """Start a real uvicorn server for E2E tests.""" | |
| # Handle DB clean up manually BEFORE server starts | |
| db_path = "/home/jebin/git/apigateway/apigateway_production.db" | |
| # Force delete existing DB | |
| if os.path.exists(db_path): | |
| os.remove(db_path) | |
| # We need to initialize the DB structure since we just deleted it | |
| # We can use python -c to run the init_db code from app context | |
| # This ensures tables exists BEFORE we start the server and try to insert users | |
| # We'll use a simple script to create tables | |
| init_script = """ | |
| import asyncio | |
| from core.database import init_db | |
| async def main(): | |
| await init_db() | |
| if __name__ == "__main__": | |
| asyncio.run(main()) | |
| """ | |
| subprocess.run( | |
| ["python", "-c", init_script], | |
| cwd="/home/jebin/git/apigateway", | |
| env=os.environ.copy(), | |
| check=True | |
| ) | |
| port = find_free_port() | |
| base_url = f"http://127.0.0.1:{port}" | |
| # Start uvicorn in subprocess | |
| process = subprocess.Popen( | |
| [ | |
| "python", "-m", "uvicorn", | |
| "app:app", | |
| "--host", "127.0.0.1", | |
| "--port", str(port), | |
| "--log-level", "warning" | |
| ], | |
| cwd="/home/jebin/git/apigateway", | |
| stdout=subprocess.PIPE, | |
| stderr=subprocess.PIPE, | |
| env=os.environ.copy() | |
| ) | |
| # Wait for server to be ready | |
| if not wait_for_server(base_url): | |
| process.terminate() | |
| stdout, stderr = process.communicate(timeout=5) | |
| raise RuntimeError(f"Server failed to start. stderr: {stderr.decode()}") | |
| yield base_url | |
| # Cleanup | |
| process.terminate() | |
| try: | |
| process.wait(timeout=5) | |
| except subprocess.TimeoutExpired: | |
| process.kill() | |
| def api_client(live_server): | |
| """HTTP client for making real API requests.""" | |
| with httpx.Client(base_url=live_server, timeout=30.0) as client: | |
| yield client | |
| def test_user_data(): | |
| """Generate unique test user data.""" | |
| unique_id = str(uuid.uuid4())[:8] | |
| return { | |
| "user_id": f"e2e_user_{unique_id}", | |
| "email": f"e2e_test_{unique_id}@example.com", | |
| "google_id": f"google_{unique_id}", | |
| "name": f"E2E Test User {unique_id}", | |
| "credits": 100, | |
| "token_version": 1 | |
| } | |
| def create_test_user(live_server, test_user_data): | |
| """ | |
| Create a test user directly in the database. | |
| Returns user data and access token. | |
| """ | |
| # Connect to the database the server is using | |
| db_path = "/home/jebin/git/apigateway/apigateway_production.db" | |
| # Wait for DB and tables to be ready (server creates them on startup) | |
| max_retries = 20 | |
| for attempt in range(max_retries): | |
| try: | |
| conn = sqlite3.connect(db_path) | |
| cursor = conn.cursor() | |
| # Check if users table exists | |
| cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='users'") | |
| if cursor.fetchone() is None: | |
| conn.close() | |
| time.sleep(0.5) | |
| continue | |
| # Insert test user using the app's own codebase to ensure compatibility | |
| insert_script = f""" | |
| import asyncio | |
| import os | |
| from sqlalchemy import select | |
| from core.database import async_session_maker | |
| from core.models import User | |
| import datetime | |
| async def create_user(): | |
| async with async_session_maker() as db: | |
| # Check if user exists | |
| stmt = select(User).where(User.user_id == '{test_user_data["user_id"]}') | |
| result = await db.execute(stmt) | |
| if result.scalar_one_or_none(): | |
| return | |
| user = User( | |
| user_id='{test_user_data["user_id"]}', | |
| email='{test_user_data["email"]}', | |
| google_id='{test_user_data["google_id"]}', | |
| name='{test_user_data["name"]}', | |
| credits={test_user_data["credits"]}, | |
| token_version={test_user_data["token_version"]}, | |
| is_active=True, | |
| created_at=datetime.datetime.utcnow(), | |
| updated_at=datetime.datetime.utcnow() | |
| ) | |
| db.add(user) | |
| await db.commit() | |
| if __name__ == "__main__": | |
| asyncio.run(create_user()) | |
| """ | |
| # Run instructions in subprocess | |
| subprocess.run( | |
| ["python", "-c", insert_script], | |
| cwd="/home/jebin/git/apigateway", | |
| env=os.environ.copy(), | |
| check=True, | |
| capture_output=True | |
| ) | |
| # Verify via sqlite3 just in case | |
| conn = sqlite3.connect(db_path) | |
| cursor = conn.cursor() | |
| cursor.execute("SELECT user_id FROM users WHERE user_id=?", (test_user_data["user_id"],)) | |
| if cursor.fetchone(): | |
| conn.close() | |
| break | |
| conn.close() | |
| except (sqlite3.OperationalError, subprocess.CalledProcessError) as e: | |
| if attempt < max_retries - 1: | |
| time.sleep(0.5) | |
| else: | |
| raise RuntimeError(f"Failed to create test user after {max_retries} attempts: {e}") | |
| # Generate a valid JWT token using the library with SAME secret as server | |
| from google_auth_service import JWTService | |
| jwt_service = JWTService(secret_key=JWT_SECRET) | |
| access_token = jwt_service.create_access_token( | |
| user_id=test_user_data["user_id"], | |
| email=test_user_data["email"], | |
| token_version=test_user_data["token_version"] | |
| ) | |
| return { | |
| **test_user_data, | |
| "access_token": access_token | |
| } | |
| def authenticated_client(api_client, create_test_user): | |
| """ | |
| HTTP client with valid authentication token. | |
| Uses a real user created in the database. | |
| """ | |
| token = create_test_user["access_token"] | |
| api_client.headers["Authorization"] = f"Bearer {token}" | |
| yield api_client, create_test_user | |
| # Cleanup - remove auth header | |
| if "Authorization" in api_client.headers: | |
| del api_client.headers["Authorization"] | |