| import os |
| import math |
| import logging |
| import socket |
| import requests |
| from contextlib import asynccontextmanager |
| from pyrogram import Client |
| from pyrogram.session import Session, Auth |
| from pyrogram import raw |
| from pyrogram.file_id import FileId |
| from fastapi import FastAPI, Request, HTTPException |
| from fastapi.responses import StreamingResponse |
| import uvicorn |
|
|
| |
| |
| |
| def force_ipv4(): |
| old_getaddrinfo = socket.getaddrinfo |
| def new_getaddrinfo(*args, **kwargs): |
| responses = old_getaddrinfo(*args, **kwargs) |
| return [response for response in responses if response[0] == socket.AF_INET] |
| socket.getaddrinfo = new_getaddrinfo |
|
|
| force_ipv4() |
|
|
| |
| def check_telegram_api(): |
| print("π‘ TEST: Checking connection to Telegram API via HTTP...") |
| try: |
| |
| |
| r = requests.get(f"https://api.telegram.org/bot{os.environ.get('BOT_TOKEN')}/getMe", timeout=10) |
| if r.status_code == 200: |
| print("β
HTTP TEST PASSED: Network is fine. Bot Token is valid.") |
| return True |
| else: |
| print(f"β οΈ HTTP TEST FAILED: Status {r.status_code} - {r.text}") |
| return False |
| except Exception as e: |
| print(f"β HTTP TEST FAILED: Could not reach Telegram. The IP is likely blocked. Error: {e}") |
| return False |
|
|
| |
| try: |
| API_ID = int(os.environ.get("API_ID")) |
| API_HASH = os.environ.get("API_HASH") |
| BOT_TOKEN = os.environ.get("BOT_TOKEN") |
| STORAGE_CHANNEL = int(os.environ.get("STORAGE_CHANNEL")) |
| except ValueError: |
| print("β ERROR: Env vars are missing or invalid!") |
| exit(1) |
|
|
| |
| client = Client( |
| "worker_bot", |
| api_id=API_ID, |
| api_hash=API_HASH, |
| bot_token=BOT_TOKEN, |
| in_memory=True, |
| no_updates=True, |
| ipv6=False, |
| workdir="/tmp" |
| ) |
|
|
| @asynccontextmanager |
| async def lifespan(app: FastAPI): |
| |
| network_ok = check_telegram_api() |
| |
| if network_ok: |
| print("β³ Connecting via Pyrogram (MTProto)...") |
| try: |
| await client.start() |
| me = await client.get_me() |
| print(f"β
Worker Started as: {me.first_name} (@{me.username})") |
| except Exception as e: |
| print(f"β PYROGRAM ERROR: HTTP worked, but MTProto failed. {e}") |
| else: |
| print("π ABORTING: Cannot reach Telegram API.") |
|
|
| yield |
| |
| try: |
| if client.is_connected: await client.stop() |
| except: pass |
|
|
| app = FastAPI(lifespan=lifespan) |
|
|
| |
| class ByteStreamer: |
| def __init__(self, client: Client): self.client = client |
| async def yield_file(self, file_id, offset, first_part_cut, last_part_cut, part_count, chunk_size): |
| client = self.client |
| if not client.is_connected: return |
| |
| try: |
| ms = client.media_sessions.get(file_id.dc_id) |
| if not ms: |
| if file_id.dc_id != await client.storage.dc_id(): |
| auth_key = await Auth(client, file_id.dc_id, await client.storage.test_mode()).create() |
| ms = Session(client, file_id.dc_id, auth_key, await client.storage.test_mode(), is_media=True) |
| await ms.start() |
| else: ms = client.session |
| client.media_sessions[file_id.dc_id] = ms |
| except: return |
|
|
| loc = raw.types.InputDocumentFileLocation(id=file_id.media_id, access_hash=file_id.access_hash, file_reference=file_id.file_reference, thumb_size=file_id.thumbnail_size) |
| curr = 1 |
| while curr <= part_count: |
| try: |
| r = await ms.invoke(raw.functions.upload.GetFile(location=loc, offset=offset, limit=chunk_size), retries=3) |
| if isinstance(r, raw.types.upload.File): |
| chunk = r.bytes |
| if not chunk: break |
| if part_count == 1: yield chunk[first_part_cut:last_part_cut] |
| elif curr == 1: yield chunk[first_part_cut:] |
| elif curr == part_count: yield chunk[:last_part_cut] |
| else: yield chunk |
| curr += 1 |
| offset += chunk_size |
| else: break |
| except: break |
|
|
| @app.get("/stream/{message_id}/{filename}") |
| async def stream_handler(req: Request, message_id: int, filename: str): |
| if not client.is_connected: raise HTTPException(503, "Bot not connected to Telegram") |
| try: |
| msg = await client.get_messages(STORAGE_CHANNEL, message_id) |
| media = msg.document or msg.video or msg.audio |
| if not media: raise HTTPException(404, "File not found") |
| file_id = FileId.decode(media.file_id) |
| file_size = media.file_size |
| range_header = req.headers.get("Range", 0) |
| from_bytes, until_bytes = 0, file_size - 1 |
| if range_header: |
| s = range_header.replace("bytes=", "").split("-") |
| from_bytes = int(s[0]) |
| if s[1]: until_bytes = int(s[1]) |
| req_len = until_bytes - from_bytes + 1 |
| chunk = 1048576 |
| offset = (from_bytes // chunk) * chunk |
| first_cut = from_bytes - offset |
| last_cut = (until_bytes % chunk) + 1 |
| parts = math.ceil(req_len / chunk) |
| streamer = ByteStreamer(client) |
| body = streamer.yield_file(file_id, offset, first_cut, last_cut, parts, chunk) |
| headers = {"Content-Type": media.mime_type or "application/octet-stream", "Content-Disposition": f'inline; filename="{media.file_name}"', "Accept-Ranges": "bytes", "Content-Range": f"bytes {from_bytes}-{until_bytes}/{file_size}", "Content-Length": str(req_len)} |
| return StreamingResponse(body, status_code=206 if range_header else 200, headers=headers) |
| except: raise HTTPException(404, "File Not Found") |
|
|
| if __name__ == "__main__": |
| uvicorn.run(app, host="0.0.0.0", port=7860) |
|
|