Spaces:
Sleeping
Sleeping
File size: 5,563 Bytes
5718d27 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 | import asyncio
import json
import logging
from pathlib import Path
from typing import Any, Dict, List, Optional
log = logging.getLogger("qwen2api.db")
# โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
# JSON ๆไปถๅญๅจ๏ผๆฌๅฐๅผๅ / Docker ๅทๆจกๅผ๏ผ
# โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
class AsyncJsonDB:
"""ๅธฆๅผๆญฅ่ฏปๅ้็ JSON ๆไปถๅญๅจ๏ผ้ฒๆญขๅนถๅๆๅใ"""
def __init__(self, path: str | Path, default_data: Any = None):
self.path = Path(path)
self.default_data = default_data if default_data is not None else []
self._lock = asyncio.Lock()
self._data: Any = None
self._init_file()
def _init_file(self):
if not self.path.exists():
self.path.parent.mkdir(parents=True, exist_ok=True)
self.path.write_text(json.dumps(self.default_data, indent=2, ensure_ascii=False), encoding="utf-8")
async def load(self) -> Any:
async with self._lock:
if not self.path.exists():
self._data = self.default_data
return self._data
try:
content = self.path.read_text(encoding="utf-8")
self._data = json.loads(content)
except Exception as e:
log.error(f"Failed to load JSON from {self.path}: {e}")
self._data = self.default_data
return self._data
async def save(self, data: Any):
async with self._lock:
self._data = data
try:
self.path.write_text(json.dumps(self._data, indent=2, ensure_ascii=False), encoding="utf-8")
except Exception as e:
log.error(f"Failed to save JSON to {self.path}: {e}")
async def get(self) -> Any:
if self._data is None:
return await self.load()
return self._data
# โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
# MongoDB Atlas ๆไน
ๅ๏ผHugging Face ็ญๆ ็ถๆ็ฏๅข๏ผ
# โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
class AsyncMongoDB:
"""MongoDB Atlas ๆไน
ๅๅญๅจ๏ผๆฅๅฃไธ AsyncJsonDB ๅฎๅ
จๅ
ผๅฎนใ
ๆฏไธชๅฎไพๅฏนๅบไธไธช MongoDB collection๏ผ
ๅฐๆดไธช list ไฝไธบๅไธชๆๆกฃๅญๅจ๏ผ_id="data", value=<list>๏ผ๏ผ
ไฟ่ฏๆๅฐๆนๅจๅณๅฏๆฟๆข AsyncJsonDBใ
"""
def __init__(self, collection_name: str, db, default_data: Any = None):
self._collection = db[collection_name]
self.default_data = default_data if default_data is not None else []
self._lock = asyncio.Lock()
self._data: Any = None
async def load(self) -> Any:
async with self._lock:
try:
doc = await self._collection.find_one({"_id": "data"})
self._data = doc["value"] if doc else self.default_data
except Exception as e:
log.error(f"MongoDB load failed for {self._collection.name}: {e}")
self._data = self.default_data
return self._data
async def save(self, data: Any):
async with self._lock:
self._data = data
try:
await self._collection.replace_one(
{"_id": "data"},
{"_id": "data", "value": data},
upsert=True,
)
except Exception as e:
log.error(f"MongoDB save failed for {self._collection.name}: {e}")
async def get(self) -> Any:
if self._data is None:
return await self.load()
return self._data
# โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
# ๅทฅๅๅฝๆฐ๏ผๆ นๆฎ้
็ฝฎ่ชๅจ้ๆฉๅญๅจๅ็ซฏ
# โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
_mongo_client = None
_mongo_db = None
async def init_mongo(uri: str, db_name: str, timeout_ms: int = 5000):
"""ๅๅงๅๅ
จๅฑ MongoDB ่ฟๆฅ๏ผๅจ lifespan ๅฏๅจๆถ่ฐ็จไธๆฌกใ"""
global _mongo_client, _mongo_db
from motor.motor_asyncio import AsyncIOMotorClient
_mongo_client = AsyncIOMotorClient(
uri,
serverSelectionTimeoutMS=timeout_ms,
connectTimeoutMS=timeout_ms,
)
_mongo_db = _mongo_client[db_name]
# ้ช่ฏ่ฟๆฅ
await _mongo_client.admin.command("ping")
log.info(f"MongoDB Atlas ๅทฒ่ฟๆฅ โ db={db_name}")
async def close_mongo():
"""ๅ
ณ้ญ MongoDB ่ฟๆฅ๏ผๅจ lifespan ๅ
ณ้ญๆถ่ฐ็จใ"""
global _mongo_client, _mongo_db
if _mongo_client:
_mongo_client.close()
_mongo_client = None
_mongo_db = None
log.info("MongoDB ่ฟๆฅๅทฒๅ
ณ้ญ")
def get_mongo_db():
"""่ฟๅๅฝๅ MongoDB database ๅฎไพ๏ผๅฏ่ฝไธบ None๏ผใ"""
return _mongo_db
def create_db(collection_name: str, file_path: str, default_data: Any = None):
"""ๅทฅๅๅฝๆฐ๏ผๆ MongoDB ๆถ่ฟๅ AsyncMongoDB๏ผๅฆๅๅ้ๅฐ AsyncJsonDBใ"""
if _mongo_db is not None:
return AsyncMongoDB(collection_name, _mongo_db, default_data)
return AsyncJsonDB(file_path, default_data)
|