privateone's picture
Upload 40 files
861d763 verified
from telethon import Button
from telethon import functions, types,utils,helpers,custom
from telethon.events import NewMessage
from telethon.tl.custom.message import Message
from bot import TelegramBot
from bot.config import Telegram
from bot.modules.static import *
from bot.modules.decorators import verify_user
from bot.plugins.upload import upload_chunk
from bot.modules.telegram import get_message, get_file_properties
import json
async def refresh():
with open("/app/bot/data.json", "r") as infile:
Telegram.ALLOWED_USER_IDS = json.load(infile)
# Add a value
async def add_id(id):
await refresh()
if id in Telegram.ALLOWED_USER_IDS["my_array"]:
await TelegramBot.send_message(id, "You are already exists")
else:
Telegram.ALLOWED_USER_IDS["my_array"].append(id)
await TelegramBot.send_message(id, "Now you are in whitelist !!")
with open("/app/bot/data.json", "w") as outfile:
json.dump(Telegram.ALLOWED_USER_IDS, outfile)
await refresh()
#data["my_array"].append("new_value")
# Remove a value (index 2 in this example)
async def del_id(id):
await refresh()
for i in range(len(Telegram.ALLOWED_USER_IDS["my_array"])):
if Telegram.ALLOWED_USER_IDS["my_array"][i] == id:
Telegram.ALLOWED_USER_IDS["my_array"].pop(i)
await TelegramBot.send_message(id, "You are removed from whitelist")
break
with open("/app/bot/data.json", "w") as outfile:
json.dump(Telegram.ALLOWED_USER_IDS, outfile)
await refresh()
@TelegramBot.on(NewMessage(incoming=True))
#@verify_user(private=True)
async def reply_message(event: NewMessage.Event | Message):
if event.message.reply_to:
#print('this is reply message', event)
message=await TelegramBot.get_messages(event.message.peer_id.user_id, ids=event.message.reply_to.reply_to_msg_id)
event_message=str(event.message.message)
message_text=message.message
parts = message_text.split("\n")
filename = str(parts[0].split(":")[1].strip()) # Remove leading/trailing spaces
#size = int(parts[1].split(":")[1].strip())
message_id = int(parts[2].split(":")[1].strip())
code = str(parts[3].split(":")[1].strip())
file = await get_message(message_id=int(message_id))
file_name, file_size, mime_type = get_file_properties(file)
#print(filename,size,message_id,code)
result,new_name=await upload_chunk(message_id,code,event.message.id,event_message,event.message.peer_id.user_id,event.message.reply_to.reply_to_msg_id)
attributes, mime_type = utils.get_attributes(
new_name,
mime_type=mime_type,
attributes=None,
force_document=True,
voice_note=None,
video_note=None,
supports_streaming=True,
thumb=None
)
#await TelegramBot.send_file(event.message.peer_id.user_id, file=result,force_document=True)
await TelegramBot(functions.messages.SendMediaRequest(
peer=event.message.peer_id.user_id,
media=types.InputMediaUploadedDocument(
file=result,
mime_type=mime_type,
attributes=attributes,
),
message='File Uploaded successfully !'
))
#print(result.stringify())
#print(result)
'''if event_message.lower() =='n':
else:
message_text=message.message
parts = message_text.split("\n")
filename = event_message # Remove leading/trailing spaces
size = int(parts[1].split(":")[1].strip())
message_id = int(parts[2].split(":")[1].strip())
code = str(parts[3].split(":")[1].strip())
print(filename,size,message_id,code)
upload_chunk(filename,message_id,code,event.message.id)'''
@TelegramBot.on(NewMessage(incoming=True, pattern=r'^/start$'))
#@verify_user(private=True)
async def welcome(event: NewMessage.Event | Message):
await event.reply(
message=WelcomeText % {'first_name': event.sender.first_name},
buttons=[
[
Button.url('Add to Channel', f'https://t.me/{Telegram.BOT_USERNAME}?startchannel&admin=post_messages+edit_messages+delete_messages')
]
]
)
@TelegramBot.on(NewMessage(incoming=True, pattern=r'^/info$'))
#@verify_user(private=True)
async def user_info(event: Message):
await event.reply(UserInfoText.format(sender=event.sender))
@TelegramBot.on(NewMessage(chats=Telegram.OWNER_ID, incoming=True, pattern=r'^/log$'))
async def send_log(event: NewMessage.Event | Message):
await event.reply(file='event-log.txt')
@TelegramBot.on(NewMessage(incoming=True, pattern=r'^/in$'))
async def add_user(event: NewMessage.Event | Message):
await add_id(event.sender.id)
@TelegramBot.on(NewMessage(incoming=True, pattern=r'^/out$'))
async def add_user(event: NewMessage.Event | Message):
await del_id(event.sender.id)