# Moon-Userbot - telegram userbot # Copyright (C) 2020-present Moon Userbot Organization # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # You should have received a copy of the GNU General Public License # along with this program. If not, see . import asyncio import math import mimetypes import os import time from datetime import datetime from io import BytesIO from urllib.parse import unquote import requests import urllib3 from pyrogram import Client, enums, filters from pyrogram.types import Message from pySmartDL import SmartDL from utils.config import apiflash_key from utils.misc import modules_help, prefix from utils.scripts import format_exc, humanbytes, progress def generate_screenshot(url): api_url = f"https://api.apiflash.com/v1/urltoimage?access_key={apiflash_key}&url={url}&format=png" response = requests.get(api_url) if response.status_code == 200: return BytesIO(response.content) return None http = urllib3.PoolManager() @Client.on_message(filters.command("short", prefix) & filters.me) async def short(_, message: Message): if len(message.command) > 1: link = message.text.split(maxsplit=1)[1] elif message.reply_to_message: link = message.reply_to_message.text else: await message.edit(f"Usage: {prefix}short [url to short]") return r = http.request("GET", "https://clck.ru/--?url=" + link) await message.edit( r.data.decode().replace("https://", "Shortened Url:"), disable_web_page_preview=True, ) @Client.on_message(filters.command("urldl", prefix) & filters.me) async def urldl(client: Client, message: Message): if len(message.command) > 1: message_id = None link = message.text.split(maxsplit=1)[1] elif message.reply_to_message: message_id = message.reply_to_message.id link = message.reply_to_message.text else: await message.edit( f"Usage: {prefix}urldl [url to download]" ) return await message.edit("Trying to download...") ext = "." + link.split(".")[-1] c_time = time.time() resp = requests.head(link, allow_redirects=True, timeout=5) if resp.status_code != 200: return await message.edit("Failed to fetch request header information") content_type = resp.headers.get("Content-Type").split(";")[0] extension = mimetypes.guess_extension(content_type) # Check if the file is an executable binary is_executable = content_type in [ "application/octet-stream", "application/x-msdownload", ] # Get the file extension from the URL url_extension = os.path.splitext(link)[1].lower() try: os.makedirs("downloads") if is_executable: file_name = "downloads/" + link.split("/")[-1] if not file_name.endswith(url_extension): file_name += url_extension elif extension: file_name = "downloads/" + link.split("/")[-1] if not file_name.endswith(extension): file_name += extension else: file_name = "downloads/" + link.split("/")[-1] except FileNotFoundError: if is_executable: file_name = "downloads/" + link.split("/")[-1] if not file_name.endswith(url_extension): file_name += url_extension elif extension: file_name = "downloads/" + link.split("/")[-1] if not file_name.endswith(extension): file_name += extension else: file_name = "downloads/" + link.split("/")[-1] except FileExistsError: if is_executable: file_name = "downloads/" + link.split("/")[-1] if not file_name.endswith(url_extension): file_name += url_extension elif extension: file_name = "downloads/" + link.split("/")[-1] if not file_name.endswith(extension): file_name += extension else: file_name = "downloads/" + link.split("/")[-1] downloader = SmartDL(link, file_name, progress_bar=False, timeout=10) start_t = datetime.now() try: downloader.start(blocking=False) except Exception as e: return await message.edit_text(format_exc(e)) while not downloader.isFinished(): total_length = downloader.filesize or None downloaded = downloader.get_dl_size(human=True) u_m = "" now = time.time() diff = now - c_time percentage = downloader.get_progress() * 100 speed = downloader.get_speed(human=True) progress_str = ( "".join(["▰" for _ in range(math.floor(percentage / 5))]) + "".join(["▱" for _ in range(20 - math.floor(percentage / 5))]) + f"\nProgress: {round(percentage, 2)}%" ) eta = downloader.get_eta(human=True) try: m = "Trying to download...\n" m += f"File Name: {unquote(link.split('/')[-1])}\n" m += f"Speed: {speed}\n" m += f"{progress_str}\n" m += f"{downloaded} of {humanbytes(total_length)}\n" m += f"ETA: {eta}" if round(diff % 10.00) == 0 and m != u_m: await message.edit_text(disable_web_page_preview=True, text=m) u_m = m await asyncio.sleep(5) except Exception as e: await message.edit_text(format_exc(e)) if os.path.exists(file_name): end_t = datetime.now() sec = (end_t - start_t).seconds await message.edit_text( f"Downloaded to {file_name} in {sec} seconds" ) ms_ = await message.edit("Starting Upload...") await client.send_document( message.chat.id, file_name, progress=progress, progress_args=(ms_, c_time, "`Uploading...`"), caption=f"File Name: {unquote(link.split('/')[-1])}\n", reply_to_message_id=message_id, ) await message.delete() os.remove(file_name) else: await message.edit("Failed to download") @Client.on_message(filters.command("upload", prefix) & filters.me) async def upload_cmd(_, message: Message): max_size = 512 * 1024 * 1024 max_size_mb = 100 min_file_age = 31 max_file_age = 180 ms_ = await message.edit("`Downloading...`", parse_mode=enums.ParseMode.MARKDOWN) c_time = time.time() try: file_name = await message.download( progress=progress, progress_args=(ms_, c_time, "`Downloading...`") ) except ValueError: try: file_name = await message.reply_to_message.download( progress=progress, progress_args=(ms_, c_time, "`Downloading...`") ) except ValueError: await message.edit("File to upload not found") return if os.path.getsize(file_name) > max_size: await message.edit(f"Files longer than {max_size_mb}MB isn't supported") if os.path.exists(file_name): os.remove(file_name) return await message.edit("Uploading...") with open(file_name, "rb") as f: response = requests.post( "https://x0.at", files={"file": f}, ) if response.ok: file_size_mb = os.path.getsize(file_name) / 1024 / 1024 file_age = int( min_file_age + (max_file_age - min_file_age) * ((1 - (file_size_mb / max_size_mb)) ** 2) ) url = response.text.replace("https://", "") await message.edit( f"Your URL: {url}\nYour file will remain live for {file_age} days", disable_web_page_preview=True, ) else: await message.edit( f"API returned an error!\n" f"{response.text}\n Not allowed" ) print(response.text) if os.path.exists(file_name): os.remove(file_name) @Client.on_message(filters.command(["ws", "webshot"], prefix) & filters.me) async def webshot(client: Client, message: Message): if len(message.command) > 1: url = message.text.split(maxsplit=1)[1] if not url.startswith("https://"): url = "https://" + message.text.split(maxsplit=1)[1] elif message.reply_to_message: url = message.reply_to_message.text if not url.startswith("https://"): url = "https://" + message.text.split(maxsplit=1)[1] else: await message.edit_text( f"Usage: {prefix}webshot/{prefix}ws [url/reply to url]" ) return chat_id = message.chat.id await message.edit("Generating screenshot...") try: screenshot_data = generate_screenshot(url) if screenshot_data: await message.delete() await client.send_photo( chat_id, screenshot_data, caption=f"Screenshot of {url}" ) else: await message.reply_text( "Failed to generate screenshot...\nMake sure url is correct" ) except Exception as e: await message.reply_text(f"An error occurred: {format_exc(e)}") modules_help["url"] = { "short [url]*": "short url", "urldl [url]*": "download url content", "upload [file|reply]*": "upload file to internet", "webshot [link]*": "Screenshot of web page", "ws [reply to link]*": "Screenshot of web page", }