from httpx import AsyncClient from apscheduler.triggers.interval import IntervalTrigger from asyncio import Lock, sleep from datetime import datetime, timedelta from feedparser import parse as feed_parse from functools import partial from io import BytesIO from pyrogram.filters import create from pyrogram.handlers import MessageHandler from time import time from re import compile, I from .. import scheduler, rss_dict, LOGGER from ..core.config_manager import Config from ..helper.ext_utils.bot_utils import new_task, arg_parser, get_size_bytes from ..helper.ext_utils.status_utils import get_readable_file_size from ..helper.ext_utils.db_handler import database from ..helper.ext_utils.exceptions import RssShutdownException from ..helper.ext_utils.help_messages import RSS_HELP_MESSAGE from ..helper.telegram_helper.button_build import ButtonMaker from ..helper.telegram_helper.filters import CustomFilters from ..helper.telegram_helper.message_utils import ( send_message, edit_message, send_rss, send_file, delete_message, ) rss_dict_lock = Lock() handler_dict = {} size_regex = compile(r"(\d+(\.\d+)?\s?(GB|MB|KB|GiB|MiB|KiB))", I) headers = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/117.0.0.0 Safari/537.36", "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8", "Accept-Language": "en-US,en;q=0.5", } async def rss_menu(event): user_id = event.from_user.id buttons = ButtonMaker() buttons.data_button("Subscribe", f"rss sub {user_id}") buttons.data_button("Subscriptions", f"rss list {user_id} 0") buttons.data_button("Get Items", f"rss get {user_id}") buttons.data_button("Edit", f"rss edit {user_id}") buttons.data_button("Pause", f"rss pause {user_id}") buttons.data_button("Resume", f"rss resume {user_id}") buttons.data_button("Unsubscribe", f"rss unsubscribe {user_id}") if await CustomFilters.sudo("", event): buttons.data_button("All Subscriptions", f"rss listall {user_id} 0") buttons.data_button("Pause All", f"rss allpause {user_id}") buttons.data_button("Resume All", f"rss allresume {user_id}") buttons.data_button("Unsubscribe All", f"rss allunsub {user_id}") buttons.data_button("Delete User", f"rss deluser {user_id}") if scheduler.running: buttons.data_button("Shutdown Rss", f"rss shutdown {user_id}") else: buttons.data_button("Start Rss", f"rss start {user_id}") buttons.data_button("Close", f"rss close {user_id}") button = buttons.build_menu(2) msg = f"Rss Menu | Users: {len(rss_dict)} | Running: {scheduler.running}" return msg, button async def update_rss_menu(query): msg, button = await rss_menu(query) await edit_message(query.message, msg, button) @new_task async def get_rss_menu(_, message): msg, button = await rss_menu(message) await send_message(message, msg, button) @new_task async def rss_sub(_, message, pre_event): user_id = message.from_user.id handler_dict[user_id] = False if username := message.from_user.username: tag = f"@{username}" else: tag = message.from_user.mention msg = "" items = message.text.split("\n") for index, item in enumerate(items, start=1): args = item.split() if len(args) < 2: await send_message( message, f"{item}. Wrong Input format. Read help message before adding new subcription!", ) continue title = args[0].strip() if (user_feeds := rss_dict.get(user_id, False)) and title in user_feeds: await send_message( message, f"This title {title} already subscribed! Choose another title!" ) continue feed_link = args[1].strip() if feed_link.startswith(("-inf", "-exf", "-c")): await send_message( message, f"Wrong input in line {index}! Add Title! Read the example!", ) continue inf_lists = [] exf_lists = [] if len(args) > 2: arg_base = {"-c": None, "-inf": None, "-exf": None, "-stv": None} arg_parser(args[2:], arg_base) cmd = arg_base["-c"] inf = arg_base["-inf"] exf = arg_base["-exf"] stv = arg_base["-stv"] if stv is not None: stv = stv.lower() == "true" if inf is not None: filters_list = inf.split("|") for x in filters_list: y = x.split(" or ") inf_lists.append(y) if exf is not None: filters_list = exf.split("|") for x in filters_list: y = x.split(" or ") exf_lists.append(y) else: inf = None exf = None cmd = None stv = False try: async with AsyncClient( headers=headers, follow_redirects=True, timeout=60, verify=False ) as client: res = await client.get(feed_link) html = res.text rss_d = feed_parse(html) last_title = rss_d.entries[0]["title"] if rss_d.entries[0].get("size"): size = int(rss_d.entries[0]["size"]) elif rss_d.entries[0].get("summary"): summary = rss_d.entries[0]["summary"] matches = size_regex.findall(summary) sizes = [match[0] for match in matches] size = get_size_bytes(sizes[0]) else: size = 0 msg += "Subscribed!" msg += f"\nTitle: {title}\nFeed Url: {feed_link}" msg += f"\nlatest record for {rss_d.feed.title}:" msg += ( f"\nName: {last_title.replace('>', '').replace('<', '')}" ) try: last_link = rss_d.entries[0]["links"][1]["href"] except IndexError: last_link = rss_d.entries[0]["link"] msg += f"\nLink: {last_link}" if size: msg += f"\nSize: {get_readable_file_size(size)}" msg += f"\nCommand: {cmd}" msg += f"\nFilters:-\ninf: {inf}\nexf: {exf}\nsensitive: {stv}" async with rss_dict_lock: if rss_dict.get(user_id, False): rss_dict[user_id][title] = { "link": feed_link, "last_feed": last_link, "last_title": last_title, "inf": inf_lists, "exf": exf_lists, "paused": False, "command": cmd, "sensitive": stv, "tag": tag, } else: rss_dict[user_id] = { title: { "link": feed_link, "last_feed": last_link, "last_title": last_title, "inf": inf_lists, "exf": exf_lists, "paused": False, "command": cmd, "sensitive": stv, "tag": tag, } } LOGGER.info( f"Rss Feed Added: id: {user_id} - title: {title} - link: {feed_link} - c: {cmd} - inf: {inf} - exf: {exf} - stv {stv}" ) except (IndexError, AttributeError) as e: emsg = f"The link: {feed_link} doesn't seem to be a RSS feed or it's region-blocked!" await send_message(message, emsg + "\nError: " + str(e)) except Exception as e: await send_message(message, str(e)) if msg: await database.rss_update(user_id) await send_message(message, msg) is_sudo = await CustomFilters.sudo("", message) if scheduler.state == 2: scheduler.resume() elif is_sudo and not scheduler.running: add_job() scheduler.start() await update_rss_menu(pre_event) async def get_user_id(title): async with rss_dict_lock: return next( ( (True, user_id) for user_id, feed in rss_dict.items() if feed["title"] == title ), (False, False), ) @new_task async def rss_update(_, message, pre_event, state): user_id = message.from_user.id handler_dict[user_id] = False titles = message.text.split() is_sudo = await CustomFilters.sudo("", message) updated = [] for title in titles: title = title.strip() if not (res := rss_dict[user_id].get(title, False)): if is_sudo: res, user_id = await get_user_id(title) if not res: user_id = message.from_user.id await send_message(message, f"{title} not found!") continue istate = rss_dict[user_id][title].get("paused", False) if istate and state == "pause" or not istate and state == "resume": await send_message(message, f"{title} already {state}d!") continue async with rss_dict_lock: updated.append(title) if state == "unsubscribe": del rss_dict[user_id][title] elif state == "pause": rss_dict[user_id][title]["paused"] = True elif state == "resume": rss_dict[user_id][title]["paused"] = False if state == "resume": if scheduler.state == 2: scheduler.resume() elif is_sudo and not scheduler.running: add_job() scheduler.start() if is_sudo and Config.DATABASE_URL and user_id != message.from_user.id: await database.rss_update(user_id) if not rss_dict[user_id]: async with rss_dict_lock: del rss_dict[user_id] await database.rss_delete(user_id) if not rss_dict: await database.trunc_table("rss") if updated: LOGGER.info(f"Rss link with Title(s): {updated} has been {state}d!") await send_message( message, f"Rss links with Title(s): {updated} has been {state}d!", ) if rss_dict.get(user_id): await database.rss_update(user_id) await update_rss_menu(pre_event) async def rss_list(query, start, all_users=False): user_id = query.from_user.id buttons = ButtonMaker() if all_users: list_feed = f"All subscriptions | Page: {int(start / 5)} " async with rss_dict_lock: keysCount = sum(len(v.keys()) for v in rss_dict.values()) index = 0 for titles in rss_dict.values(): for index, (title, data) in enumerate( list(titles.items())[start : 5 + start] ): list_feed += f"\n\nTitle: {title}\n" list_feed += f"Feed Url: {data['link']}\n" list_feed += f"Command: {data['command']}\n" list_feed += f"Inf: {data['inf']}\n" list_feed += f"Exf: {data['exf']}\n" list_feed += f"Sensitive: {data.get('sensitive', False)}\n" list_feed += f"Paused: {data['paused']}\n" list_feed += f"User: {data['tag'].replace('@', '', 1)}" index += 1 if index == 5: break else: list_feed = f"Your subscriptions | Page: {int(start / 5)} " async with rss_dict_lock: keysCount = len(rss_dict.get(user_id, {}).keys()) for title, data in list(rss_dict[user_id].items())[start : 5 + start]: list_feed += f"\n\nTitle: {title}\nFeed Url: {data['link']}\n" list_feed += f"Command: {data['command']}\n" list_feed += f"Inf: {data['inf']}\n" list_feed += f"Exf: {data['exf']}\n" list_feed += ( f"Sensitive: {data.get('sensitive', False)}\n" ) list_feed += f"Paused: {data['paused']}\n" buttons.data_button("Back", f"rss back {user_id}") buttons.data_button("Close", f"rss close {user_id}") if keysCount > 5: for x in range(0, keysCount, 5): buttons.data_button( f"{int(x / 5)}", f"rss list {user_id} {x}", position="footer" ) button = buttons.build_menu(2) if query.message.text.html == list_feed: return await edit_message(query.message, list_feed, button) @new_task async def rss_get(_, message, pre_event): user_id = message.from_user.id handler_dict[user_id] = False args = message.text.split() if len(args) < 2: await send_message( message, f"{args}. Wrong Input format. You should add number of the items you want to get. Read help message before adding new subcription!", ) await update_rss_menu(pre_event) return try: title = args[0] count = int(args[1]) data = rss_dict[user_id].get(title, False) if data and count > 0: try: msg = await send_message( message, f"Getting the last {count} item(s) from {title}" ) async with AsyncClient( headers=headers, follow_redirects=True, timeout=60, verify=False ) as client: res = await client.get(data["link"]) html = res.text rss_d = feed_parse(html) item_info = "" for item_num in range(count): try: link = rss_d.entries[item_num]["links"][1]["href"] except IndexError: link = rss_d.entries[item_num]["link"] item_info += f"Name: {rss_d.entries[item_num]['title'].replace('>', '').replace('<', '')}\n" item_info += f"Link: {link}\n\n" item_info_ecd = item_info.encode() if len(item_info_ecd) > 4000: with BytesIO(item_info_ecd) as out_file: out_file.name = f"rssGet {title} items_no. {count}.txt" await send_file(message, out_file) await delete_message(msg) else: await edit_message(msg, item_info) except IndexError as e: LOGGER.error(str(e)) await edit_message( msg, "Parse depth exceeded. Try again with a lower value." ) except Exception as e: LOGGER.error(str(e)) await edit_message(msg, str(e)) else: await send_message(message, "Enter a valid title. Title not found!") except Exception as e: LOGGER.error(str(e)) await send_message(message, f"Enter a valid value!. {e}") await update_rss_menu(pre_event) @new_task async def rss_edit(_, message, pre_event): user_id = message.from_user.id handler_dict[user_id] = False items = message.text.split("\n") updated = False for item in items: args = item.split() title = args[0].strip() if len(args) < 2: await send_message( message, f"{item}. Wrong Input format. Read help message before editing!", ) continue elif not rss_dict[user_id].get(title, False): await send_message(message, "Enter a valid title. Title not found!") continue updated = True inf_lists = [] exf_lists = [] arg_base = {"-c": None, "-inf": None, "-exf": None, "-stv": None} arg_parser(args[1:], arg_base) cmd = arg_base["-c"] inf = arg_base["-inf"] exf = arg_base["-exf"] stv = arg_base["-stv"] async with rss_dict_lock: if stv is not None: stv = stv.lower() == "true" rss_dict[user_id][title]["sensitive"] = stv if cmd is not None: if cmd.lower() == "none": cmd = None rss_dict[user_id][title]["command"] = cmd if inf is not None: if inf.lower() != "none": filters_list = inf.split("|") for x in filters_list: y = x.split(" or ") inf_lists.append(y) rss_dict[user_id][title]["inf"] = inf_lists if exf is not None: if exf.lower() != "none": filters_list = exf.split("|") for x in filters_list: y = x.split(" or ") exf_lists.append(y) rss_dict[user_id][title]["exf"] = exf_lists if updated: await database.rss_update(user_id) await update_rss_menu(pre_event) @new_task async def rss_delete(_, message, pre_event): handler_dict[message.from_user.id] = False users = message.text.split() for user in users: user = int(user) async with rss_dict_lock: del rss_dict[user] await database.rss_delete(user) await update_rss_menu(pre_event) async def event_handler(client, query, pfunc): user_id = query.from_user.id handler_dict[user_id] = True start_time = time() async def event_filter(_, __, event): user = event.from_user or event.sender_chat return bool( user.id == user_id and event.chat.id == query.message.chat.id and event.text ) handler = client.add_handler(MessageHandler(pfunc, create(event_filter)), group=-1) while handler_dict[user_id]: await sleep(0.5) if time() - start_time > 60: handler_dict[user_id] = False await update_rss_menu(query) client.remove_handler(*handler) @new_task async def rss_listener(client, query): user_id = query.from_user.id message = query.message data = query.data.split() if int(data[2]) != user_id and not await CustomFilters.sudo("", query): await query.answer( text="You don't have permission to use these buttons!", show_alert=True ) elif data[1] == "close": await query.answer() handler_dict[user_id] = False await delete_message(message, message.reply_to_message) elif data[1] == "back": await query.answer() handler_dict[user_id] = False await update_rss_menu(query) elif data[1] == "sub": await query.answer() handler_dict[user_id] = False buttons = ButtonMaker() buttons.data_button("Back", f"rss back {user_id}") buttons.data_button("Close", f"rss close {user_id}") button = buttons.build_menu(2) await edit_message(message, RSS_HELP_MESSAGE, button) pfunc = partial(rss_sub, pre_event=query) await event_handler(client, query, pfunc) elif data[1] == "list": handler_dict[user_id] = False if len(rss_dict.get(int(data[2]), {})) == 0: await query.answer(text="No subscriptions!", show_alert=True) else: await query.answer() start = int(data[3]) await rss_list(query, start) elif data[1] == "get": handler_dict[user_id] = False if len(rss_dict.get(int(data[2]), {})) == 0: await query.answer(text="No subscriptions!", show_alert=True) else: await query.answer() buttons = ButtonMaker() buttons.data_button("Back", f"rss back {user_id}") buttons.data_button("Close", f"rss close {user_id}") button = buttons.build_menu(2) await edit_message( message, "Send one title with value separated by space get last X items.\nTitle Value\nTimeout: 60 sec.", button, ) pfunc = partial(rss_get, pre_event=query) await event_handler(client, query, pfunc) elif data[1] in ["unsubscribe", "pause", "resume"]: handler_dict[user_id] = False if len(rss_dict.get(int(data[2]), {})) == 0: await query.answer(text="No subscriptions!", show_alert=True) else: await query.answer() buttons = ButtonMaker() buttons.data_button("Back", f"rss back {user_id}") if data[1] == "pause": buttons.data_button("Pause AllMyFeeds", f"rss uallpause {user_id}") elif data[1] == "resume": buttons.data_button("Resume AllMyFeeds", f"rss uallresume {user_id}") elif data[1] == "unsubscribe": buttons.data_button("Unsub AllMyFeeds", f"rss uallunsub {user_id}") buttons.data_button("Close", f"rss close {user_id}") button = buttons.build_menu(2) await edit_message( message, f"Send one or more rss titles separated by space to {data[1]}.\nTimeout: 60 sec.", button, ) pfunc = partial(rss_update, pre_event=query, state=data[1]) await event_handler(client, query, pfunc) elif data[1] == "edit": handler_dict[user_id] = False if len(rss_dict.get(int(data[2]), {})) == 0: await query.answer(text="No subscriptions!", show_alert=True) else: await query.answer() buttons = ButtonMaker() buttons.data_button("Back", f"rss back {user_id}") buttons.data_button("Close", f"rss close {user_id}") button = buttons.build_menu(2) msg = """Send one or more rss titles with new filters or command separated by new line. Examples: Title1 -c mirror -up remote:path/subdir -exf none -inf 1080 or 720 -stv true Title2 -c none -inf none -stv false Title3 -c mirror -rcf xxx -up xxx -z pswd -stv false Note: Only what you provide will be edited, the rest will be the same like example 2: exf will stay same as it is. Timeout: 60 sec. Argument -c for command and arguments """ await edit_message(message, msg, button) pfunc = partial(rss_edit, pre_event=query) await event_handler(client, query, pfunc) elif data[1].startswith("uall"): handler_dict[user_id] = False if len(rss_dict.get(int(data[2]), {})) == 0: await query.answer(text="No subscriptions!", show_alert=True) return await query.answer() if data[1].endswith("unsub"): async with rss_dict_lock: del rss_dict[int(data[2])] await database.rss_delete(int(data[2])) await update_rss_menu(query) elif data[1].endswith("pause"): async with rss_dict_lock: for title in list(rss_dict[int(data[2])].keys()): rss_dict[int(data[2])][title]["paused"] = True await database.rss_update(int(data[2])) elif data[1].endswith("resume"): async with rss_dict_lock: for title in list(rss_dict[int(data[2])].keys()): rss_dict[int(data[2])][title]["paused"] = False if scheduler.state == 2: scheduler.resume() await database.rss_update(int(data[2])) await update_rss_menu(query) elif data[1].startswith("all"): if len(rss_dict) == 0: await query.answer(text="No subscriptions!", show_alert=True) return await query.answer() if data[1].endswith("unsub"): async with rss_dict_lock: rss_dict.clear() await database.trunc_table("rss") await update_rss_menu(query) elif data[1].endswith("pause"): async with rss_dict_lock: for user in list(rss_dict.keys()): for title in list(rss_dict[user].keys()): rss_dict[int(data[2])][title]["paused"] = True if scheduler.running: scheduler.pause() await database.rss_update_all() elif data[1].endswith("resume"): async with rss_dict_lock: for user in list(rss_dict.keys()): for title in list(rss_dict[user].keys()): rss_dict[int(data[2])][title]["paused"] = False if scheduler.state == 2: scheduler.resume() elif not scheduler.running: add_job() scheduler.start() await database.rss_update_all() elif data[1] == "deluser": if len(rss_dict) == 0: await query.answer(text="No subscriptions!", show_alert=True) else: await query.answer() buttons = ButtonMaker() buttons.data_button("Back", f"rss back {user_id}") buttons.data_button("Close", f"rss close {user_id}") button = buttons.build_menu(2) msg = "Send one or more user_id separated by space to delete their resources.\nTimeout: 60 sec." await edit_message(message, msg, button) pfunc = partial(rss_delete, pre_event=query) await event_handler(client, query, pfunc) elif data[1] == "listall": if not rss_dict: await query.answer(text="No subscriptions!", show_alert=True) else: await query.answer() start = int(data[3]) await rss_list(query, start, all_users=True) elif data[1] == "shutdown": if scheduler.running: await query.answer() scheduler.shutdown(wait=False) await sleep(0.5) await update_rss_menu(query) else: await query.answer(text="Already Stopped!", show_alert=True) elif data[1] == "start": if not scheduler.running: await query.answer() add_job() scheduler.start() await update_rss_menu(query) else: await query.answer(text="Already Running!", show_alert=True) async def rss_monitor(): chat = Config.RSS_CHAT if not chat: LOGGER.warning("RSS_CHAT not added! Shutting down rss scheduler...") scheduler.shutdown(wait=False) return if len(rss_dict) == 0: scheduler.pause() return all_paused = True rss_topic_id = rss_chat_id = None if isinstance(chat, int): rss_chat_id = chat elif "|" in chat: rss_chat_id, rss_topic_id = list( map( lambda x: int(x) if x.lstrip("-").isdigit() else x, chat.split("|", 1), ) ) elif chat.lstrip("-").isdigit(): rss_chat_id = int(chat) for user, items in list(rss_dict.items()): for title, data in items.items(): try: if data["paused"]: continue tries = 0 while True: try: async with AsyncClient( headers=headers, follow_redirects=True, timeout=60, verify=False, ) as client: res = await client.get(data["link"]) html = res.text break except Exception: tries += 1 if tries > 3: raise continue rss_d = feed_parse(html) try: last_link = rss_d.entries[0]["links"][1]["href"] except IndexError: last_link = rss_d.entries[0]["link"] finally: all_paused = False last_title = rss_d.entries[0]["title"] if data["last_feed"] == last_link or data["last_title"] == last_title: continue feed_count = 0 while True: try: await sleep(10) except Exception: raise RssShutdownException("Rss Monitor Stopped!") try: item_title = rss_d.entries[feed_count]["title"] try: url = rss_d.entries[feed_count]["links"][1]["href"] except IndexError: url = rss_d.entries[feed_count]["link"] if data["last_feed"] == url or data["last_title"] == item_title: break if rss_d.entries[feed_count].get("size"): size = int(rss_d.entries[feed_count]["size"]) elif rss_d.entries[feed_count].get("summary"): summary = rss_d.entries[feed_count]["summary"] matches = size_regex.findall(summary) sizes = [match[0] for match in matches] size = get_size_bytes(sizes[0]) else: size = 0 except IndexError: LOGGER.warning( f"Reached Max index no. {feed_count} for this feed: {title}. Maybe you need to use less RSS_DELAY to not miss some torrents" ) break parse = True for flist in data["inf"]: if ( data.get("sensitive", False) and all(x.lower() not in item_title.lower() for x in flist) ) or ( not data.get("sensitive", False) and all(x not in item_title for x in flist) ): parse = False feed_count += 1 break if not parse: continue for flist in data["exf"]: if ( data.get("sensitive", False) and any(x.lower() in item_title.lower() for x in flist) ) or ( not data.get("sensitive", False) and any(x in item_title for x in flist) ): parse = False feed_count += 1 break if not parse: continue if command := data["command"]: if ( size and Config.RSS_SIZE_LIMIT and Config.RSS_SIZE_LIMIT < size ): feed_count += 1 continue cmd = command.split(maxsplit=1) cmd.insert(1, url) feed_msg = " ".join(cmd) if not feed_msg.startswith("/"): feed_msg = f"/{feed_msg}" else: feed_msg = f"Name: {item_title.replace('>', '').replace('<', '')}" feed_msg += f"\n\nLink: {url}" if size: feed_msg += f"\nSize: {get_readable_file_size(size)}" feed_msg += ( f"\nTag: {data['tag']} {user}" ) await send_rss(feed_msg, rss_chat_id, rss_topic_id) feed_count += 1 async with rss_dict_lock: if user not in rss_dict or not rss_dict[user].get(title, False): continue rss_dict[user][title].update( {"last_feed": last_link, "last_title": last_title} ) await database.rss_update(user) LOGGER.info(f"Feed Name: {title}") LOGGER.info(f"Last item: {last_link}") except RssShutdownException as ex: LOGGER.info(ex) break except Exception as e: LOGGER.error(f"{e} - Feed Name: {title} - Feed Link: {data['link']}") continue if all_paused: scheduler.pause() def add_job(): scheduler.add_job( rss_monitor, trigger=IntervalTrigger(seconds=Config.RSS_DELAY), id="0", name="RSS", misfire_grace_time=15, max_instances=1, next_run_time=datetime.now() + timedelta(seconds=20), replace_existing=True, ) add_job() scheduler.start()