Spaces:
Running
Running
| """ | |
| Pytest configuration and fixtures for SCM microservice tests. | |
| """ | |
| import pytest | |
| import asyncio | |
| from motor.motor_asyncio import AsyncIOMotorClient | |
| import redis.asyncio as redis | |
| from app.core.config import settings | |
| # Test database and cache names | |
| TEST_DB_NAME = "scm_test_db" | |
| TEST_REDIS_DB = 15 | |
| def event_loop(): | |
| """Create an event loop for the test session""" | |
| loop = asyncio.get_event_loop_policy().new_event_loop() | |
| yield loop | |
| loop.close() | |
| async def test_db(): | |
| """ | |
| Provide a clean test database for each test. | |
| Drops the database after the test completes. | |
| """ | |
| client = AsyncIOMotorClient(settings.MONGODB_URI) | |
| db = client[TEST_DB_NAME] | |
| yield db | |
| # Cleanup: drop the test database | |
| await client.drop_database(TEST_DB_NAME) | |
| client.close() | |
| async def test_redis(): | |
| """ | |
| Provide a clean Redis instance for each test. | |
| Flushes the test database after the test completes. | |
| """ | |
| redis_client = redis.Redis( | |
| host=settings.REDIS_HOST, | |
| port=settings.REDIS_PORT, | |
| password=settings.REDIS_PASSWORD, | |
| db=TEST_REDIS_DB, | |
| decode_responses=True | |
| ) | |
| yield redis_client | |
| # Cleanup: flush the test database | |
| await redis_client.flushdb() | |
| await redis_client.close() | |