import asyncio import json import logging from pathlib import Path from typing import Any, Dict, List, Optional log = logging.getLogger("qwen2api.db") # ────────────────────────────────────────────── # JSON 文件存储(本地开发 / Docker 卷模式) # ────────────────────────────────────────────── class AsyncJsonDB: """带异步读写锁的 JSON 文件存储,防止并发损坏。""" def __init__(self, path: str | Path, default_data: Any = None): self.path = Path(path) self.default_data = default_data if default_data is not None else [] self._lock = asyncio.Lock() self._data: Any = None self._init_file() def _init_file(self): if not self.path.exists(): self.path.parent.mkdir(parents=True, exist_ok=True) self.path.write_text(json.dumps(self.default_data, indent=2, ensure_ascii=False), encoding="utf-8") async def load(self) -> Any: async with self._lock: if not self.path.exists(): self._data = self.default_data return self._data try: content = self.path.read_text(encoding="utf-8") self._data = json.loads(content) except Exception as e: log.error(f"Failed to load JSON from {self.path}: {e}") self._data = self.default_data return self._data async def save(self, data: Any): async with self._lock: self._data = data try: self.path.write_text(json.dumps(self._data, indent=2, ensure_ascii=False), encoding="utf-8") except Exception as e: log.error(f"Failed to save JSON to {self.path}: {e}") async def get(self) -> Any: if self._data is None: return await self.load() return self._data # ────────────────────────────────────────────── # MongoDB Atlas 持久化(Hugging Face 等无状态环境) # ────────────────────────────────────────────── class AsyncMongoDB: """MongoDB Atlas 持久化存储,接口与 AsyncJsonDB 完全兼容。 每个实例对应一个 MongoDB collection, 将整个 list 作为单个文档存储(_id="data", value=), 保证最小改动即可替换 AsyncJsonDB。 """ def __init__(self, collection_name: str, db, default_data: Any = None): self._collection = db[collection_name] self.default_data = default_data if default_data is not None else [] self._lock = asyncio.Lock() self._data: Any = None async def load(self) -> Any: async with self._lock: try: doc = await self._collection.find_one({"_id": "data"}) self._data = doc["value"] if doc else self.default_data except Exception as e: log.error(f"MongoDB load failed for {self._collection.name}: {e}") self._data = self.default_data return self._data async def save(self, data: Any): async with self._lock: self._data = data try: await self._collection.replace_one( {"_id": "data"}, {"_id": "data", "value": data}, upsert=True, ) except Exception as e: log.error(f"MongoDB save failed for {self._collection.name}: {e}") async def get(self) -> Any: if self._data is None: return await self.load() return self._data # ────────────────────────────────────────────── # 工厂函数:根据配置自动选择存储后端 # ────────────────────────────────────────────── _mongo_client = None _mongo_db = None async def init_mongo(uri: str, db_name: str, timeout_ms: int = 5000): """初始化全局 MongoDB 连接,在 lifespan 启动时调用一次。""" global _mongo_client, _mongo_db from motor.motor_asyncio import AsyncIOMotorClient _mongo_client = AsyncIOMotorClient( uri, serverSelectionTimeoutMS=timeout_ms, connectTimeoutMS=timeout_ms, ) _mongo_db = _mongo_client[db_name] # 验证连接 await _mongo_client.admin.command("ping") log.info(f"MongoDB Atlas 已连接 → db={db_name}") async def close_mongo(): """关闭 MongoDB 连接,在 lifespan 关闭时调用。""" global _mongo_client, _mongo_db if _mongo_client: _mongo_client.close() _mongo_client = None _mongo_db = None log.info("MongoDB 连接已关闭") def get_mongo_db(): """返回当前 MongoDB database 实例(可能为 None)。""" return _mongo_db def create_db(collection_name: str, file_path: str, default_data: Any = None): """工厂函数:有 MongoDB 时返回 AsyncMongoDB,否则回退到 AsyncJsonDB。""" if _mongo_db is not None: return AsyncMongoDB(collection_name, _mongo_db, default_data) return AsyncJsonDB(file_path, default_data)