Spaces:
Paused
Paused
| from telethon import Button | |
| from telethon import functions, types,utils,helpers,custom | |
| from telethon.events import NewMessage | |
| from telethon.tl.custom.message import Message | |
| from bot import TelegramBot | |
| from bot.config import Telegram | |
| from bot.modules.static import * | |
| from bot.modules.decorators import verify_user | |
| from bot.plugins.upload import upload_chunk | |
| from bot.modules.telegram import get_message, get_file_properties | |
| import json | |
| async def refresh(): | |
| with open("/app/bot/data.json", "r") as infile: | |
| Telegram.ALLOWED_USER_IDS = json.load(infile) | |
| # Add a value | |
| async def add_id(id): | |
| await refresh() | |
| if id in Telegram.ALLOWED_USER_IDS["my_array"]: | |
| await TelegramBot.send_message(id, "You are already exists") | |
| else: | |
| Telegram.ALLOWED_USER_IDS["my_array"].append(id) | |
| await TelegramBot.send_message(id, "Now you are in whitelist !!") | |
| with open("/app/bot/data.json", "w") as outfile: | |
| json.dump(Telegram.ALLOWED_USER_IDS, outfile) | |
| await refresh() | |
| #data["my_array"].append("new_value") | |
| # Remove a value (index 2 in this example) | |
| async def del_id(id): | |
| await refresh() | |
| for i in range(len(Telegram.ALLOWED_USER_IDS["my_array"])): | |
| if Telegram.ALLOWED_USER_IDS["my_array"][i] == id: | |
| Telegram.ALLOWED_USER_IDS["my_array"].pop(i) | |
| await TelegramBot.send_message(id, "You are removed from whitelist") | |
| break | |
| with open("/app/bot/data.json", "w") as outfile: | |
| json.dump(Telegram.ALLOWED_USER_IDS, outfile) | |
| await refresh() | |
| #@verify_user(private=True) | |
| async def reply_message(event: NewMessage.Event | Message): | |
| if event.message.reply_to: | |
| #print('this is reply message', event) | |
| message=await TelegramBot.get_messages(event.message.peer_id.user_id, ids=event.message.reply_to.reply_to_msg_id) | |
| event_message=str(event.message.message) | |
| message_text=message.message | |
| parts = message_text.split("\n") | |
| filename = str(parts[0].split(":")[1].strip()) # Remove leading/trailing spaces | |
| #size = int(parts[1].split(":")[1].strip()) | |
| message_id = int(parts[2].split(":")[1].strip()) | |
| code = str(parts[3].split(":")[1].strip()) | |
| file = await get_message(message_id=int(message_id)) | |
| file_name, file_size, mime_type = get_file_properties(file) | |
| #print(filename,size,message_id,code) | |
| result,new_name=await upload_chunk(message_id,code,event.message.id,event_message,event.message.peer_id.user_id,event.message.reply_to.reply_to_msg_id) | |
| attributes, mime_type = utils.get_attributes( | |
| new_name, | |
| mime_type=mime_type, | |
| attributes=None, | |
| force_document=True, | |
| voice_note=None, | |
| video_note=None, | |
| supports_streaming=True, | |
| thumb=None | |
| ) | |
| #await TelegramBot.send_file(event.message.peer_id.user_id, file=result,force_document=True) | |
| await TelegramBot(functions.messages.SendMediaRequest( | |
| peer=event.message.peer_id.user_id, | |
| media=types.InputMediaUploadedDocument( | |
| file=result, | |
| mime_type=mime_type, | |
| attributes=attributes, | |
| ), | |
| message='File Uploaded successfully !' | |
| )) | |
| #print(result.stringify()) | |
| #print(result) | |
| '''if event_message.lower() =='n': | |
| else: | |
| message_text=message.message | |
| parts = message_text.split("\n") | |
| filename = event_message # Remove leading/trailing spaces | |
| size = int(parts[1].split(":")[1].strip()) | |
| message_id = int(parts[2].split(":")[1].strip()) | |
| code = str(parts[3].split(":")[1].strip()) | |
| print(filename,size,message_id,code) | |
| upload_chunk(filename,message_id,code,event.message.id)''' | |
| #@verify_user(private=True) | |
| async def welcome(event: NewMessage.Event | Message): | |
| await event.reply( | |
| message=WelcomeText % {'first_name': event.sender.first_name}, | |
| buttons=[ | |
| [ | |
| Button.url('Add to Channel', f'https://t.me/{Telegram.BOT_USERNAME}?startchannel&admin=post_messages+edit_messages+delete_messages') | |
| ] | |
| ] | |
| ) | |
| #@verify_user(private=True) | |
| async def user_info(event: Message): | |
| await event.reply(UserInfoText.format(sender=event.sender)) | |
| async def send_log(event: NewMessage.Event | Message): | |
| await event.reply(file='event-log.txt') | |
| async def add_user(event: NewMessage.Event | Message): | |
| await add_id(event.sender.id) | |
| async def add_user(event: NewMessage.Event | Message): | |
| await del_id(event.sender.id) | |