Spaces:
Runtime error
Runtime error
| """ | |
| Pytest configuration and fixtures for SCM microservice tests. | |
| """ | |
| import pytest | |
| import asyncio | |
| from motor.motor_asyncio import AsyncIOMotorClient | |
| import redis.asyncio as redis | |
| import asyncpg | |
| from app.core.config import settings | |
| # Test database and cache names | |
| TEST_DB_NAME = "scm_test_db" | |
| TEST_REDIS_DB = 15 | |
| def event_loop(): | |
| """Create an event loop for the test session""" | |
| loop = asyncio.get_event_loop_policy().new_event_loop() | |
| yield loop | |
| loop.close() | |
| async def test_db(): | |
| """ | |
| Provide a clean test database for each test. | |
| Drops the database after the test completes. | |
| """ | |
| client = AsyncIOMotorClient(settings.MONGODB_URI) | |
| db = client[TEST_DB_NAME] | |
| yield db | |
| # Cleanup: drop the test database | |
| await client.drop_database(TEST_DB_NAME) | |
| client.close() | |
| async def test_redis(): | |
| """ | |
| Provide a clean Redis instance for each test. | |
| Flushes the test database after the test completes. | |
| """ | |
| redis_client = redis.Redis( | |
| host=settings.REDIS_HOST, | |
| port=settings.REDIS_PORT, | |
| password=settings.REDIS_PASSWORD, | |
| db=TEST_REDIS_DB, | |
| decode_responses=True | |
| ) | |
| yield redis_client | |
| # Cleanup: flush the test database | |
| await redis_client.flushdb() | |
| await redis_client.close() | |
| async def test_pg_conn(): | |
| """ | |
| Provide a PostgreSQL connection for each test. | |
| Creates trans schema and merchants_ref table, then cleans up after test. | |
| """ | |
| # Create connection | |
| conn = await asyncpg.connect( | |
| host=settings.POSTGRES_HOST, | |
| port=settings.POSTGRES_PORT, | |
| database=settings.POSTGRES_DB, | |
| user=settings.POSTGRES_USER, | |
| password=settings.POSTGRES_PASSWORD | |
| ) | |
| # Create trans schema if not exists | |
| await conn.execute("CREATE SCHEMA IF NOT EXISTS trans") | |
| # Create merchants_ref table | |
| await conn.execute(""" | |
| CREATE TABLE IF NOT EXISTS trans.merchants_ref ( | |
| merchant_id TEXT PRIMARY KEY, | |
| merchant_code TEXT NOT NULL, | |
| merchant_type TEXT NOT NULL, | |
| parent_merchant_id TEXT, | |
| status TEXT NOT NULL, | |
| city TEXT, | |
| state TEXT, | |
| gst_number TEXT, | |
| created_at TIMESTAMP NOT NULL, | |
| updated_at TIMESTAMP NOT NULL | |
| ) | |
| """) | |
| yield conn | |
| # Cleanup: drop test data | |
| await conn.execute("DROP TABLE IF EXISTS trans.merchants_ref") | |
| await conn.close() | |
| async def test_pg_pool(): | |
| """ | |
| Provide a PostgreSQL connection pool for the test session. | |
| Initializes the pool at the start and closes it at the end. | |
| """ | |
| from app.postgres import PostgreSQLConnectionPool | |
| # Initialize connection pool | |
| try: | |
| await PostgreSQLConnectionPool.initialize() | |
| # Create trans schema and tables | |
| conn = await PostgreSQLConnectionPool.get_connection() | |
| try: | |
| await conn.execute("CREATE SCHEMA IF NOT EXISTS trans") | |
| # Create merchants_ref table | |
| await conn.execute(""" | |
| CREATE TABLE IF NOT EXISTS trans.merchants_ref ( | |
| merchant_id TEXT PRIMARY KEY, | |
| merchant_code TEXT NOT NULL, | |
| merchant_type TEXT NOT NULL, | |
| parent_merchant_id TEXT, | |
| status TEXT NOT NULL, | |
| city TEXT, | |
| state TEXT, | |
| gst_number TEXT, | |
| created_at TIMESTAMP NOT NULL, | |
| updated_at TIMESTAMP NOT NULL | |
| ) | |
| """) | |
| # Create catalogue_ref table | |
| await conn.execute(""" | |
| CREATE TABLE IF NOT EXISTS trans.catalogue_ref ( | |
| catalogue_id TEXT PRIMARY KEY, | |
| catalogue_type TEXT NOT NULL, | |
| catalogue_name TEXT NOT NULL, | |
| sku TEXT, | |
| barcode_number TEXT, | |
| hsn_code TEXT, | |
| gst_rate NUMERIC(5,2), | |
| mrp NUMERIC(12,2), | |
| base_price NUMERIC(12,2), | |
| track_inventory BOOLEAN, | |
| batch_managed BOOLEAN, | |
| status TEXT NOT NULL, | |
| created_at TIMESTAMP NOT NULL | |
| ) | |
| """) | |
| # Create employees_ref table | |
| await conn.execute(""" | |
| CREATE TABLE IF NOT EXISTS trans.employees_ref ( | |
| employee_id TEXT PRIMARY KEY, | |
| employee_code TEXT NOT NULL, | |
| full_name TEXT NOT NULL, | |
| designation TEXT NOT NULL, | |
| department TEXT, | |
| manager_id TEXT, | |
| city TEXT, | |
| state TEXT, | |
| status TEXT NOT NULL, | |
| system_login_enabled BOOLEAN, | |
| created_at TIMESTAMP NOT NULL | |
| ) | |
| """) | |
| finally: | |
| await PostgreSQLConnectionPool.release_connection(conn) | |
| yield PostgreSQLConnectionPool | |
| except Exception as e: | |
| # If PostgreSQL is not available, skip tests that need it | |
| pytest.skip(f"PostgreSQL not available: {e}") | |
| finally: | |
| # Cleanup: close connection pool | |
| await PostgreSQLConnectionPool.close() | |