|
|
from __future__ import annotations |
|
|
|
|
|
from typing import Optional |
|
|
|
|
|
from motor.motor_asyncio import AsyncIOMotorClient, AsyncIOMotorCollection, AsyncIOMotorDatabase |
|
|
|
|
|
from app.core.config import settings |
|
|
|
|
|
_mongo_client: Optional[AsyncIOMotorClient] = None |
|
|
|
|
|
|
|
|
async def connect_to_mongo() -> None: |
|
|
"""Initialize the MongoDB client once during application startup.""" |
|
|
global _mongo_client |
|
|
if _mongo_client is not None: |
|
|
return |
|
|
|
|
|
client = AsyncIOMotorClient(settings.mongo_uri, serverSelectionTimeoutMS=5000) |
|
|
|
|
|
await client.server_info() |
|
|
_mongo_client = client |
|
|
|
|
|
|
|
|
async def close_mongo_connection() -> None: |
|
|
global _mongo_client |
|
|
if _mongo_client is None: |
|
|
return |
|
|
_mongo_client.close() |
|
|
_mongo_client = None |
|
|
|
|
|
|
|
|
def get_client() -> AsyncIOMotorClient: |
|
|
if _mongo_client is None: |
|
|
raise RuntimeError("MongoDB client is not initialized. Wait for startup to finish.") |
|
|
return _mongo_client |
|
|
|
|
|
|
|
|
def get_database() -> AsyncIOMotorDatabase: |
|
|
return get_client()[settings.mongo_db] |
|
|
|
|
|
|
|
|
def get_autocategory_collection() -> AsyncIOMotorCollection: |
|
|
return get_database()[settings.mongo_collection] |
|
|
|
|
|
|
|
|
def get_api_logs_collection() -> AsyncIOMotorCollection: |
|
|
return get_database()[settings.api_logs_collection] |
|
|
|
|
|
|
|
|
def get_subcategory_collection() -> AsyncIOMotorCollection: |
|
|
return get_database()[settings.mongo_subcategory_collection] |
|
|
|