tg-stream / WebStreamer /utils /file_properties.py
vickydmt's picture
Upload folder using huggingface_hub
8db43b6 verified
# This file is a part of TG-FileStreamBot
from __future__ import annotations
import logging
from typing import Any, Optional
from datetime import datetime
from pyrogram import Client
from pyrogram.types import Message
from pyrogram.file_id import FileId
from WebStreamer.bot import StreamBot
from WebStreamer.utils.database import Database
from WebStreamer.vars import Var
db = Database(Var.DATABASE_URL, Var.SESSION_NAME)
async def get_file_ids(client: Client | bool, db_id: str, multi_clients) -> Optional[FileId]:
logging.debug("Starting of get_file_ids")
file_info = await db.get_file(db_id)
if (not "file_ids" in file_info) or not client:
logging.debug("Storing file_id of all clients in DB")
log_msg=await send_file(StreamBot, file_info['file_id'])
await db.update_file_ids(db_id, await update_file_id(log_msg.id, multi_clients))
logging.debug("Stored file_id of all clients in DB")
if not client:
return
file_info = await db.get_file(db_id)
file_id_info=file_info.setdefault("file_ids", {})
if not str(client.id) in file_id_info:
logging.debug("Storing file_id in DB")
log_msg=await send_file(StreamBot, file_info['file_id'])
msg=await client.get_messages(Var.BIN_CHANNEL,log_msg.id)
media = get_media_from_message(msg)
file_id_info[str(client.id)]=getattr(media, "file_id", "")
await db.update_file_ids(db_id, file_id_info)
logging.debug("Stored file_id in DB")
logging.debug("Middle of get_file_ids")
file_id = FileId.decode(file_id_info[str(client.id)])
setattr(file_id, "file_size", file_info['file_size'])
setattr(file_id, "mime_type", file_info['mime_type'])
setattr(file_id, "file_name", file_info['file_name'])
setattr(file_id, "unique_id", file_info['file_unique_id'])
logging.debug("Ending of get_file_ids")
return file_id
def get_media_from_message(message: "Message") -> Any:
media_types = (
"audio",
"document",
"photo",
"sticker",
"animation",
"video",
"voice",
"video_note",
)
for attr in media_types:
media = getattr(message, attr, None)
if media:
return media
def get_media_file_size(m):
media = get_media_from_message(m)
return getattr(media, "file_size", "None")
def get_name(media_msg: Message | FileId) -> str:
if isinstance(media_msg, Message):
media = get_media_from_message(media_msg)
file_name = getattr(media, "file_name", "")
elif isinstance(media_msg, FileId):
file_name = getattr(media_msg, "file_name", "")
if not file_name:
if isinstance(media_msg, Message) and media_msg.media:
media_type = media_msg.media.value
elif media_msg.file_type:
media_type = media_msg.file_type.name.lower()
else:
media_type = "file"
formats = {
"photo": "jpg", "audio": "mp3", "voice": "ogg",
"video": "mp4", "animation": "mp4", "video_note": "mp4",
"sticker": "webp"
}
ext = formats.get(media_type)
ext = "." + ext if ext else ""
date = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
file_name = f"{media_type}-{date}{ext}"
return file_name
def get_file_info(message):
media = get_media_from_message(message)
return {
"user_id": message.from_user.id,
"file_id": getattr(media, "file_id", ""),
"file_unique_id":getattr(media, "file_unique_id", ""),
"file_name": get_name(message),
"file_size":getattr(media, "file_size", 0),
"mime_type": getattr(media, "mime_type", "None/unknown")
}
async def update_file_id(msg_id, multi_clients):
file_ids={}
for _, client in multi_clients.items():
log_msg=await client.get_messages(Var.BIN_CHANNEL, msg_id)
media = get_media_from_message(log_msg)
file_ids[str(client.id)]=getattr(media, "file_id", "")
return file_ids
async def send_file(client: Client, file_id: str):
return await client.send_cached_media(Var.BIN_CHANNEL, file_id)