File size: 8,185 Bytes
db78256 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 |
from importlib import import_module
from aiofiles import open as aiopen
from aiofiles.os import path as aiopath
from motor.motor_asyncio import AsyncIOMotorClient
from pymongo.errors import PyMongoError
from pymongo.server_api import ServerApi
from ... import LOGGER, qbit_options, rss_dict, user_data
from ...core.config_manager import Config
from ...core.tg_client import TgClient
class DbManager:
def __init__(self):
self._return = True
self._conn = None
self.db = None
async def connect(self):
try:
if self._conn is not None:
await self._conn.close()
self._conn = AsyncIOMotorClient(
Config.DATABASE_URL, server_api=ServerApi("1")
)
self.db = self._conn.wzmlx
self._return = False
except PyMongoError as e:
LOGGER.error(f"Error in DB connection: {e}")
self.db = None
self._return = True
self._conn = None
async def disconnect(self):
self._return = True
if self._conn is not None:
await self._conn.close()
self._conn = None
async def update_deploy_config(self):
if self._return:
return
settings = import_module("config")
config_file = {
key: value.strip() if isinstance(value, str) else value
for key, value in vars(settings).items()
if not key.startswith("__")
}
await self.db.settings.deployConfig.replace_one(
{"_id": TgClient.ID}, config_file, upsert=True
)
async def update_config(self, dict_):
if self._return:
return
await self.db.settings.config.update_one(
{"_id": TgClient.ID}, {"$set": dict_}, upsert=True
)
async def update_aria2(self, key, value):
if self._return:
return
await self.db.settings.aria2c.update_one(
{"_id": TgClient.ID}, {"$set": {key: value}}, upsert=True
)
async def update_qbittorrent(self, key, value):
if self._return:
return
await self.db.settings.qbittorrent.update_one(
{"_id": TgClient.ID}, {"$set": {key: value}}, upsert=True
)
async def save_qbit_settings(self):
if self._return:
return
await self.db.settings.qbittorrent.update_one(
{"_id": TgClient.ID}, {"$set": qbit_options}, upsert=True
)
async def update_private_file(self, path):
if self._return:
return
db_path = path.replace(".", "__")
if await aiopath.exists(path):
async with aiopen(path, "rb+") as pf:
pf_bin = await pf.read()
await self.db.settings.files.update_one(
{"_id": TgClient.ID}, {"$set": {db_path: pf_bin}}, upsert=True
)
if path == "config.py":
await self.update_deploy_config()
else:
await self.db.settings.files.update_one(
{"_id": TgClient.ID}, {"$unset": {db_path: ""}}, upsert=True
)
async def update_nzb_config(self):
if self._return:
return
async with aiopen("sabnzbd/SABnzbd.ini", "rb+") as pf:
nzb_conf = await pf.read()
await self.db.settings.nzb.replace_one(
{"_id": TgClient.ID}, {"SABnzbd__ini": nzb_conf}, upsert=True
)
async def update_user_data(self, user_id):
if self._return:
return
data = user_data.get(user_id, {})
data = data.copy()
for key in ("THUMBNAIL", "RCLONE_CONFIG", "TOKEN_PICKLE", "USER_COOKIE_FILE"):
data.pop(key, None)
pipeline = [
{
"$replaceRoot": {
"newRoot": {
"$mergeObjects": [
data,
{
"$arrayToObject": {
"$filter": {
"input": {"$objectToArray": "$$ROOT"},
"as": "field",
"cond": {
"$in": [
"$$field.k",
[
"THUMBNAIL",
"RCLONE_CONFIG",
"TOKEN_PICKLE",
"USER_COOKIE_FILE",
],
]
},
}
}
},
]
}
}
}
]
await self.db.users[TgClient.ID].update_one(
{"_id": user_id}, pipeline, upsert=True
)
async def update_user_doc(self, user_id, key, path=""):
if self._return:
return
if path:
async with aiopen(path, "rb+") as doc:
doc_bin = await doc.read()
await self.db.users[TgClient.ID].update_one(
{"_id": user_id}, {"$set": {key: doc_bin}}, upsert=True
)
else:
await self.db.users[TgClient.ID].update_one(
{"_id": user_id}, {"$unset": {key: ""}}, upsert=True
)
async def rss_update_all(self):
if self._return:
return
for user_id in list(rss_dict.keys()):
await self.db.rss[TgClient.ID].replace_one(
{"_id": user_id}, rss_dict[user_id], upsert=True
)
async def rss_update(self, user_id):
if self._return:
return
await self.db.rss[TgClient.ID].replace_one(
{"_id": user_id}, rss_dict[user_id], upsert=True
)
async def rss_delete(self, user_id):
if self._return:
return
await self.db.rss[TgClient.ID].delete_one({"_id": user_id})
async def add_incomplete_task(self, cid, link, tag):
if self._return:
return
await self.db.tasks[TgClient.ID].insert_one(
{"_id": link, "cid": cid, "tag": tag}
)
async def get_pm_uids(self):
if self._return:
return
return [doc["_id"] async for doc in self.db.pm_users[TgClient.ID].find({})]
async def set_pm_users(self, user_id):
if self._return:
return
if not bool(await self.db.pm_users[TgClient.ID].find_one({"_id": user_id})):
await self.db.pm_users[TgClient.ID].insert_one({"_id": user_id})
LOGGER.info(f"New PM User Added : {user_id}")
async def rm_pm_user(self, user_id):
if self._return:
return
await self.db.pm_users[TgClient.ID].delete_one({"_id": user_id})
async def rm_complete_task(self, link):
if self._return:
return
await self.db.tasks[TgClient.ID].delete_one({"_id": link})
async def get_incomplete_tasks(self):
notifier_dict = {}
if self._return:
return notifier_dict
if await self.db.tasks[TgClient.ID].find_one():
rows = self.db.tasks[TgClient.ID].find({})
async for row in rows:
if row["cid"] in list(notifier_dict.keys()):
if row["tag"] in list(notifier_dict[row["cid"]]):
notifier_dict[row["cid"]][row["tag"]].append(row["_id"])
else:
notifier_dict[row["cid"]][row["tag"]] = [row["_id"]]
else:
notifier_dict[row["cid"]] = {row["tag"]: [row["_id"]]}
await self.db.tasks[TgClient.ID].drop()
return notifier_dict
async def trunc_table(self, name):
if self._return:
return
await self.db[name][TgClient.ID].drop()
database = DbManager()
|