Spaces:
Sleeping
Sleeping
File size: 4,795 Bytes
dae81a0 3e26b63 dae81a0 3e26b63 dae81a0 a2fdd1c dae81a0 a2fdd1c dae81a0 3e26b63 dae81a0 a1bbe71 dae81a0 a2fdd1c dae81a0 3e26b63 dae81a0 3e26b63 dae81a0 a2fdd1c |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 |
# Credits by @neomatrix90
import asyncio
import aiohttp
import random
import time
import requests
from random import choice
from datetime import datetime as dt
from hydrogram import filters
from hydrogram.types import Message
from bot import TelegramBot, StartTime, utils
from bot.config import Telegram
PING_DISABLE_NONPREM = {}
ANIME_WAIFU_IS_RANDOM = {}
def waifu_hentai():
LIST_SFW_JPG = ["trap", "waifu", "blowjob", "neko"]
waifu_link = "https"
waifu_api = "api.waifu.pics"
waifu_types = "nsfw"
waifu_category = choice(LIST_SFW_JPG)
waifu_param = f"{waifu_link}://{waifu_api}/{waifu_types}/{waifu_category}"
response = requests.get(waifu_param).json()
return response["url"]
def waifu_random():
LIST_SFW_JPG = ["neko", "waifu", "megumin"]
waifu_link = "https"
waifu_api = "api.waifu.pics"
waifu_types = "sfw"
waifu_category = choice(LIST_SFW_JPG)
waifu_param = f"{waifu_link}://{waifu_api}/{waifu_types}/{waifu_category}"
response = requests.get(waifu_param).json()
return response["url"]
async def fetch_server_status():
"""Fetch the server status from the given URL."""
url = "https://kawaiimizo-maakichu.hf.space/health"
try:
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
if response.status == 200:
data = await response.json()
return data.get("message", "Unknown")
return "Server Unreachable"
except Exception as e:
return f"Error: {str(e)}"
def get_caption(client, duration: float, server_status: str, uptime: str, is_premium: bool) -> str:
"""Generate the caption for the ping response."""
if is_premium:
return f"**Pong !!** `{duration}ms`\n**Server:** {server_status}\n**Uptime** - `{uptime}`\n"
return (
f"🏓 **Pɪɴɢᴇʀ :** `{duration}ms`\n"
f"👨💻 **Sᴇʀᴠᴇʀ:** `{server_status}`\n"
f"⌛ **Uᴘᴛɪᴍᴇ :** `{uptime}`\n"
f"🤴 **Oᴡɴᴇʀ :** **__{client.me.mention}__**"
)
async def send_ping_response(client, message: Message, duration: float, server_status: str, uptime: str, is_premium: bool, photo=None):
"""Send the ping response with optional photo."""
caption = get_caption(client, duration, server_status, uptime, is_premium)
if photo:
await message.reply_photo(photo, caption=caption)
else:
await message.reply_text(caption)
@TelegramBot.on_message(filters.command("pingset") & filters.user(Telegram.OWNER_ID) & ~filters.forwarded)
async def pingsetsetting(client, message: Message):
global PING_DISABLE_NONPREM, ANIME_WAIFU_IS_RANDOM
args = message.text.lower().split()[1:]
chat = message.chat
if chat.type != "private" and args:
if args[0] == "anime":
ANIME_WAIFU_IS_RANDOM[message.from_user.id] = {"anime": True, "hentai": False}
await message.reply_text(f"Turned on ping {args[0]}.")
elif args[0] == "hentai":
ANIME_WAIFU_IS_RANDOM[message.from_user.id] = {"anime": False, "hentai": True}
await message.reply_text(f"Turned on ping {args[0]}.")
elif args[0] in ("no", "off", "false"):
PING_DISABLE_NONPREM[message.from_user.id] = False
ANIME_WAIFU_IS_RANDOM[message.from_user.id] = {"anime": False, "hentai": False}
await message.reply_text("Turned off ping automatic.")
else:
ping_mode = "On" if PING_DISABLE_NONPREM.get(message.from_user.id) else \
"Anime" if ANIME_WAIFU_IS_RANDOM.get(message.from_user.id) else "Off"
await message.reply_text(f"Ping Mode: {ping_mode}")
@TelegramBot.on_message(filters.command("ping") & ~filters.forwarded)
async def custom_ping_handler(client, message: Message):
uptime = utils.get_readable_time((time.time() - StartTime))
start = dt.now()
lol = await message.reply_text("**__Pong!!__**")
# await asyncio.sleep(1.5)
duration = (dt.now() - start).microseconds / 1000
is_premium = client.me.is_premium
is_anime = ANIME_WAIFU_IS_RANDOM.get(message.from_user.id)
server_status = await fetch_server_status()
if PING_DISABLE_NONPREM.get(message.from_user.id):
await lol.edit_text(get_caption(client, duration, server_status, uptime, is_premium))
return
if is_anime:
photo = waifu_random() if is_anime.get("anime") else waifu_hentai() if is_anime.get("hentai") else None
if photo:
await send_ping_response(client, message, duration, server_status, uptime, is_premium, photo)
await lol.delete()
return
await send_ping_response(client, message, duration, server_status, uptime, is_premium)
await lol.delete() |