|
|
from importlib import import_module |
|
|
from os import getenv |
|
|
|
|
|
|
|
|
class Config: |
|
|
AS_DOCUMENT = False |
|
|
AUTHORIZED_CHATS = "" |
|
|
BASE_URL = "" |
|
|
BASE_URL_PORT = 80 |
|
|
BOT_TOKEN = "" |
|
|
HELPER_TOKENS = "" |
|
|
BOT_MAX_TASKS = 0 |
|
|
BOT_PM = False |
|
|
CMD_SUFFIX = "" |
|
|
DEFAULT_LANG = "en" |
|
|
DATABASE_URL = "" |
|
|
DEFAULT_UPLOAD = "rc" |
|
|
DELETE_LINKS = False |
|
|
DEBRID_LINK_API = "" |
|
|
DISABLE_TORRENTS = False |
|
|
DISABLE_LEECH = False |
|
|
DISABLE_BULK = False |
|
|
DISABLE_MULTI = False |
|
|
DISABLE_SEED = False |
|
|
DISABLE_FF_MODE = False |
|
|
EQUAL_SPLITS = False |
|
|
EXCLUDED_EXTENSIONS = "" |
|
|
FFMPEG_CMDS = {} |
|
|
FILELION_API = "" |
|
|
MEDIA_STORE = True |
|
|
FORCE_SUB_IDS = "" |
|
|
GOFILE_API = "" |
|
|
GOFILE_FOLDER_ID = "" |
|
|
PIXELDRAIN_KEY = "" |
|
|
PROTECTED_API = "" |
|
|
BUZZHEAVIER_API = "" |
|
|
GDRIVE_ID = "" |
|
|
GD_DESP = "Uploaded with WZ Bot" |
|
|
AUTHOR_NAME = "WZML-X" |
|
|
AUTHOR_URL = "https://t.me/WZML_X" |
|
|
INSTADL_API = "" |
|
|
IMDB_TEMPLATE = "" |
|
|
INCOMPLETE_TASK_NOTIFIER = False |
|
|
INDEX_URL = "" |
|
|
IS_TEAM_DRIVE = False |
|
|
JD_EMAIL = "" |
|
|
JD_PASS = "" |
|
|
MEGA_EMAIL = "" |
|
|
MEGA_PASSWORD = "" |
|
|
DIRECT_LIMIT = 0 |
|
|
MEGA_LIMIT = 0 |
|
|
TORRENT_LIMIT = 0 |
|
|
GD_DL_LIMIT = 0 |
|
|
RC_DL_LIMIT = 0 |
|
|
CLONE_LIMIT = 0 |
|
|
JD_LIMIT = 0 |
|
|
NZB_LIMIT = 0 |
|
|
YTDLP_LIMIT = 0 |
|
|
PLAYLIST_LIMIT = 0 |
|
|
LEECH_LIMIT = 0 |
|
|
EXTRACT_LIMIT = 0 |
|
|
ARCHIVE_LIMIT = 0 |
|
|
STORAGE_LIMIT = 0 |
|
|
LEECH_DUMP_CHAT = "" |
|
|
LINKS_LOG_ID = "" |
|
|
MIRROR_LOG_ID = "" |
|
|
CLEAN_LOG_MSG = False |
|
|
LEECH_PREFIX = "" |
|
|
LEECH_CAPTION = "" |
|
|
LEECH_SUFFIX = "" |
|
|
LEECH_FONT = "" |
|
|
LEECH_SPLIT_SIZE = 2097152000 |
|
|
MEDIA_GROUP = False |
|
|
HYBRID_LEECH = True |
|
|
HYPER_THREADS = 0 |
|
|
HYDRA_IP = "" |
|
|
HYDRA_API_KEY = "" |
|
|
NAME_SWAP = "" |
|
|
OWNER_ID = 0 |
|
|
QUEUE_ALL = 0 |
|
|
QUEUE_DOWNLOAD = 0 |
|
|
QUEUE_UPLOAD = 0 |
|
|
RCLONE_FLAGS = "" |
|
|
RCLONE_PATH = "" |
|
|
RCLONE_SERVE_URL = "" |
|
|
SHOW_CLOUD_LINK = True |
|
|
RCLONE_SERVE_USER = "" |
|
|
RCLONE_SERVE_PASS = "" |
|
|
RCLONE_SERVE_PORT = 8080 |
|
|
RSS_CHAT = "" |
|
|
RSS_DELAY = 600 |
|
|
RSS_SIZE_LIMIT = 0 |
|
|
SEARCH_API_LINK = "" |
|
|
SEARCH_LIMIT = 0 |
|
|
SEARCH_PLUGINS = [] |
|
|
SET_COMMANDS = True |
|
|
STATUS_LIMIT = 10 |
|
|
STATUS_UPDATE_INTERVAL = 15 |
|
|
STOP_DUPLICATE = False |
|
|
STREAMWISH_API = "" |
|
|
SUDO_USERS = "" |
|
|
TELEGRAM_API = 0 |
|
|
TELEGRAM_HASH = "" |
|
|
TG_PROXY = None |
|
|
THUMBNAIL_LAYOUT = "" |
|
|
VERIFY_TIMEOUT = 0 |
|
|
LOGIN_PASS = "" |
|
|
TORRENT_TIMEOUT = 0 |
|
|
TIMEZONE = "Asia/Kolkata" |
|
|
USER_MAX_TASKS = 0 |
|
|
USER_TIME_INTERVAL = 0 |
|
|
UPLOAD_PATHS = {} |
|
|
UPSTREAM_REPO = "" |
|
|
UPSTREAM_BRANCH = "master" |
|
|
UPDATE_PKGS = True |
|
|
USENET_SERVERS = [] |
|
|
USER_SESSION_STRING = "" |
|
|
USER_TRANSMISSION = True |
|
|
USE_SERVICE_ACCOUNTS = False |
|
|
WEB_PINCODE = True |
|
|
YT_DLP_OPTIONS = {} |
|
|
YT_DESP = "Uploaded with WZML-X bot" |
|
|
YT_TAGS = ["telegram", "bot", "youtube"] |
|
|
YT_CATEGORY_ID = 22 |
|
|
YT_PRIVACY_STATUS = "unlisted" |
|
|
|
|
|
@classmethod |
|
|
def get(cls, key): |
|
|
return getattr(cls, key) if hasattr(cls, key) else None |
|
|
|
|
|
@classmethod |
|
|
def set(cls, key, value): |
|
|
if hasattr(cls, key): |
|
|
value = cls._convert_env_type(key, value) |
|
|
setattr(cls, key, value) |
|
|
else: |
|
|
raise KeyError(f"{key} is not a valid configuration key.") |
|
|
|
|
|
@classmethod |
|
|
def get_all(cls): |
|
|
return { |
|
|
key: getattr(cls, key) |
|
|
for key in cls.__dict__.keys() |
|
|
if not key.startswith("__") and not callable(getattr(cls, key)) |
|
|
} |
|
|
|
|
|
@classmethod |
|
|
def load(cls): |
|
|
cls.load_config() |
|
|
cls.load_env() |
|
|
|
|
|
@classmethod |
|
|
def load_config(cls): |
|
|
try: |
|
|
settings = import_module("config") |
|
|
except ModuleNotFoundError: |
|
|
return |
|
|
for attr in dir(settings): |
|
|
if hasattr(cls, attr): |
|
|
value = getattr(settings, attr) |
|
|
if not value: |
|
|
continue |
|
|
if isinstance(value, str): |
|
|
value = value.strip() |
|
|
if attr == "DEFAULT_UPLOAD" and value != "gd": |
|
|
value = "rc" |
|
|
elif attr in [ |
|
|
"BASE_URL", |
|
|
"RCLONE_SERVE_URL", |
|
|
"INDEX_URL", |
|
|
"SEARCH_API_LINK", |
|
|
]: |
|
|
if value: |
|
|
value = value.strip("/") |
|
|
elif attr == "USENET_SERVERS": |
|
|
try: |
|
|
if not value[0].get("host"): |
|
|
continue |
|
|
except Exception: |
|
|
continue |
|
|
setattr(cls, attr, value) |
|
|
for key in ["BOT_TOKEN", "OWNER_ID", "TELEGRAM_API", "TELEGRAM_HASH"]: |
|
|
value = getattr(cls, key) |
|
|
if isinstance(value, str): |
|
|
value = value.strip() |
|
|
if not value: |
|
|
raise ValueError(f"{key} variable is missing!") |
|
|
|
|
|
@classmethod |
|
|
def load_env(cls): |
|
|
config_vars = cls.get_all() |
|
|
for key in config_vars: |
|
|
env_value = getenv(key) |
|
|
if env_value is not None: |
|
|
converted_value = cls._convert_env_type(key, env_value) |
|
|
cls.set(key, converted_value) |
|
|
|
|
|
@classmethod |
|
|
def _convert_env_type(cls, key, value): |
|
|
original_value = getattr(cls, key, None) |
|
|
if original_value is None: |
|
|
return value |
|
|
elif isinstance(original_value, bool): |
|
|
if isinstance(value, bool): |
|
|
return value |
|
|
return str(value).lower() in ("true", "1", "yes") |
|
|
elif isinstance(original_value, int): |
|
|
if isinstance(value, int): |
|
|
return value |
|
|
try: |
|
|
return int(value) |
|
|
except (ValueError, TypeError): |
|
|
return original_value |
|
|
elif isinstance(original_value, float): |
|
|
if isinstance(value, float): |
|
|
return value |
|
|
try: |
|
|
return float(value) |
|
|
except (ValueError, TypeError): |
|
|
return original_value |
|
|
return value |
|
|
|
|
|
@classmethod |
|
|
def load_dict(cls, config_dict): |
|
|
for key, value in config_dict.items(): |
|
|
if hasattr(cls, key): |
|
|
if key == "DEFAULT_UPLOAD" and value != "gd": |
|
|
value = "rc" |
|
|
elif key in [ |
|
|
"BASE_URL", |
|
|
"RCLONE_SERVE_URL", |
|
|
"INDEX_URL", |
|
|
"SEARCH_API_LINK", |
|
|
]: |
|
|
if value: |
|
|
value = value.strip("/") |
|
|
elif key == "USENET_SERVERS": |
|
|
try: |
|
|
if not value[0].get("host"): |
|
|
value = [] |
|
|
except Exception: |
|
|
value = [] |
|
|
value = cls._convert_env_type(key, value) |
|
|
setattr(cls, key, value) |
|
|
for key in ["BOT_TOKEN", "OWNER_ID", "TELEGRAM_API", "TELEGRAM_HASH"]: |
|
|
value = getattr(cls, key) |
|
|
if isinstance(value, str): |
|
|
value = value.strip() |
|
|
if not value: |
|
|
raise ValueError(f"{key} variable is missing!") |
|
|
|
|
|
|
|
|
class BinConfig: |
|
|
ARIA2_NAME = "blitzfetcher" |
|
|
QBIT_NAME = "stormtorrent" |
|
|
FFMPEG_NAME = "mediaforge" |
|
|
RCLONE_NAME = "ghostdrive" |
|
|
SABNZBD_NAME = "newsripper" |
|
|
|