Spaces:
Paused
Paused
| #!/usr/bin/env python3 | |
| # coding: utf-8 | |
| import logging | |
| import math | |
| import os | |
| from contextlib import contextmanager | |
| from typing import Literal | |
| from pymongo import MongoClient | |
| from datetime import datetime | |
| from config import ENABLE_VIP, FREE_DOWNLOAD, MONGO_URI | |
| class PaymentStatus: | |
| PENDING = "pending" | |
| COMPLETED = "completed" | |
| FAILED = "failed" | |
| REFUNDED = "refunded" | |
| def get_database(): | |
| client = MongoClient(MONGO_URI) | |
| try: | |
| # Verify connection | |
| client.admin.command('ping') | |
| except Exception as e: | |
| logging.error("Failed to connect to database: %s", e) | |
| raise | |
| return client[os.getenv("DB_NAME", "ytdl_bot")] | |
| db = get_database() | |
| def get_quality_settings(tgid) -> Literal["high", "medium", "low", "audio", "custom"]: | |
| user = db.users.find_one({"user_id": tgid}) | |
| if user and "settings" in user: | |
| return user["settings"].get("quality", "high") | |
| return "high" | |
| def get_format_settings(tgid) -> Literal["video", "audio", "document"]: | |
| user = db.users.find_one({"user_id": tgid}) | |
| if user and "settings" in user: | |
| return user["settings"].get("format", "video") | |
| return "video" | |
| def set_user_settings(tgid: int, key: str, value: str): | |
| db.users.update_one( | |
| {"user_id": tgid}, | |
| {"$set": {f"settings.{key}": value}}, | |
| upsert=True | |
| ) | |
| def get_free_quota(uid: int): | |
| if not ENABLE_VIP: | |
| return math.inf | |
| user = db.users.find_one({"user_id": uid}) | |
| if user: | |
| return user.get("free", FREE_DOWNLOAD) | |
| return FREE_DOWNLOAD | |
| def get_paid_quota(uid: int): | |
| if ENABLE_VIP: | |
| user = db.users.find_one({"user_id": uid}) | |
| if user: | |
| return user.get("paid", 0) | |
| return 0 | |
| return math.inf | |
| def reset_free_quota(uid: int): | |
| db.users.update_one( | |
| {"user_id": uid}, | |
| {"$set": {"free": 5}} | |
| ) | |
| def add_paid_quota(uid: int, amount: int): | |
| db.users.update_one( | |
| {"user_id": uid}, | |
| {"$inc": {"paid": amount}} | |
| ) | |
| def check_quota(uid: int): | |
| if not ENABLE_VIP: | |
| return | |
| user = db.users.find_one({"user_id": uid}) | |
| if user and (user.get("free", 0) + user.get("paid", 0)) <= 0: | |
| raise Exception("Quota exhausted. Please /buy or wait until free quota is reset") | |
| def use_quota(uid: int): | |
| if not ENABLE_VIP: | |
| return | |
| user = db.users.find_one({"user_id": uid}) | |
| if user: | |
| if user.get("free", 0) > 0: | |
| db.users.update_one( | |
| {"user_id": uid}, | |
| {"$inc": {"free": -1}} | |
| ) | |
| elif user.get("paid", 0) > 0: | |
| db.users.update_one( | |
| {"user_id": uid}, | |
| {"$inc": {"paid": -1}} | |
| ) | |
| else: | |
| raise Exception("Quota exhausted. Please /buy or wait until free quota is reset") | |
| def init_user(uid: int): | |
| db.users.update_one( | |
| {"user_id": uid}, | |
| {"$setOnInsert": { | |
| "user_id": uid, | |
| "free": FREE_DOWNLOAD, | |
| "paid": 0, | |
| "created_at": datetime.utcnow() | |
| }}, | |
| upsert=True | |
| ) | |
| def reset_free(): | |
| db.users.update_many( | |
| {}, | |
| {"$set": {"free": FREE_DOWNLOAD}} | |
| ) | |
| def credit_account(who, total_amount: int, quota: int, transaction, method="stripe"): | |
| user = db.users.find_one_and_update( | |
| {"user_id": who}, | |
| {"$inc": {"paid": quota}}, | |
| return_document=True | |
| ) | |
| if user: | |
| dollar = total_amount / 100 | |
| logging.info("user %d credited with %d tokens, payment:$%.2f", who, user["paid"], dollar) | |
| payment = { | |
| "method": method, | |
| "amount": total_amount, | |
| "status": PaymentStatus.COMPLETED, | |
| "transaction_id": transaction, | |
| "user_id": who, | |
| "created_at": datetime.utcnow() | |
| } | |
| db.payments.insert_one(payment) | |
| return user.get("free", 0), user.get("paid", 0) | |
| return None, None |