Spaces:
No application file
No application file
| import gzip | |
| import pickle | |
| import re | |
| import json | |
| import aiohttp | |
| import cryptography.fernet as fernet | |
| from gino import Gino | |
| db = Gino() | |
| ENCRYPTION_KEY = None | |
| def _generate_key(): | |
| encryption_key = fernet.Fernet.generate_key() | |
| with open('bot_config.py', 'r+') as f: | |
| contents = f.read() | |
| pattern = r"COOKIE_ENCRYPTION_KEY\s*=\s*None" | |
| new_contents = re.sub( | |
| pattern, f'COOKIE_ENCRYPTION_KEY = {encryption_key}', contents) | |
| f.seek(0) | |
| f.write(new_contents) | |
| f.truncate() | |
| print("Here is your cookie encryption key. The key has been saved in bot_config.py:", | |
| encryption_key, "", sep="\n\n") | |
| return encryption_key | |
| def _cookies_save(cookie_jar): | |
| f = fernet.Fernet(ENCRYPTION_KEY) | |
| data = pickle.dumps(cookie_jar._cookies, pickle.HIGHEST_PROTOCOL) | |
| compressed_data = gzip.compress(data) | |
| return f.encrypt(compressed_data) | |
| def _cookies_load(cookie_jar): | |
| f = fernet.Fernet(ENCRYPTION_KEY) | |
| compressed_data = f.decrypt(cookie_jar) | |
| data = gzip.decompress(compressed_data) | |
| cookies = aiohttp.CookieJar() | |
| cookies.clear() | |
| cookies._cookies = pickle.loads(data) | |
| return cookies | |
| class User(db.Model): | |
| __tablename__ = 'usersb' | |
| id = db.Column(db.BigInteger(), primary_key=True) | |
| cookies = db.Column(db.LargeBinary()) | |
| style = db.Column(db.Integer()) | |
| chat = db.Column(db.BigInteger()) | |
| captions = db.Column(db.Boolean()) | |
| replies = db.Column(db.Boolean()) | |
| USERS = {} | |
| async def init(dbstring, encryption_key): | |
| global ENCRYPTION_KEY | |
| global USERS, ENCRYPTION_KEY | |
| ENCRYPTION_KEY = encryption_key or _generate_key() | |
| await db.set_bind(dbstring) | |
| await db.gino.create_all() | |
| all_users = await db.all(User.query) | |
| USERS = {u.id: {'cookies': _cookies_load(u.cookies) if u.cookies else None, | |
| 'style': u.style, 'chat': u.chat, | |
| 'captions': u.captions, | |
| 'replies': u.replies, | |
| 'id': u.id} for u in all_users} | |
| return USERS | |
| async def get_user(userID=None, chatID=None): | |
| user = None | |
| if userID: | |
| user = USERS[userID] if userID in USERS.keys() else None | |
| if chatID and USERS: | |
| for k, v in USERS.items(): | |
| if v['chat'] == chatID: | |
| user = v | |
| break | |
| return user | |
| async def insert_user(userID, cookies=None, chat=None, style=None, keep_cookies=True, captions=True, replies=True): | |
| global USERS | |
| if userID not in USERS.keys(): | |
| USERS[userID] = {'cookies': None, 'Style': None, | |
| 'chat': chat, 'captions': captions, 'replies': replies} | |
| USERS[userID]['cookies'] = cookies | |
| USERS[userID]['chat'] = chat | |
| USERS[userID]['style'] = style | |
| USERS[userID]['id'] = userID | |
| USERS[userID]['captions'] = captions | |
| USERS[userID]['replies'] = replies | |
| user = await User.get(userID) | |
| if not user: | |
| return await User.create(id=userID, cookies=_cookies_save(cookies) if keep_cookies else None, chat=chat, style=style) | |
| await user.update(cookies=_cookies_save(USERS[userID]['cookies']) if keep_cookies else None, | |
| chat=USERS[userID]['chat'], style=USERS[userID]['style'], captions=USERS[userID]['captions'], replies=USERS[userID]['replies']).apply() | |
| return USERS[userID] | |
| async def remove_user(userID): | |
| global USERS | |
| if userID in USERS.keys(): | |
| del USERS[userID] | |
| user = await User.get(userID) | |
| if user: | |
| return await User.delete.where(User.id == userID).gino.status() | |
| return None | |
| async def retrieve_data(userId): | |
| if userId in USERS.keys(): | |
| result = USERS[userId] | |
| if result['cookies']: | |
| cookies = [str(cookie).split(None, 1)[-1] | |
| for cookie in result['cookies']] | |
| result['cookies'] = cookies | |
| return json.dumps(USERS[userId]) | |
| return None | |