Spaces:
Sleeping
Sleeping
| from motor.motor_asyncio import AsyncIOMotorClient | |
| from .config import get_settings | |
| settings = get_settings() | |
| class Database: | |
| """MongoDB database connection manager.""" | |
| client: AsyncIOMotorClient = None | |
| db = Database() | |
| async def connect_to_mongo(): | |
| """Connect to MongoDB on application startup.""" | |
| db.client = AsyncIOMotorClient(settings.mongodb_url) | |
| print(f"Connected to MongoDB at {settings.mongodb_url}") | |
| async def close_mongo_connection(): | |
| """Close MongoDB connection on application shutdown.""" | |
| if db.client: | |
| db.client.close() | |
| print("Closed MongoDB connection") | |
| def get_database(): | |
| """Get the database instance.""" | |
| return db.client[settings.database_name] | |
| def get_collection(collection_name: str): | |
| """Get a specific collection from the database.""" | |
| return get_database()[collection_name] | |