Stream / main.py
Mafia2008's picture
Update main.py
b351d03 verified
import os
import math
import logging
import socket
import requests # Make sure 'requests' is in requirements.txt
from contextlib import asynccontextmanager
from pyrogram import Client
from pyrogram.session import Session, Auth
from pyrogram import raw
from pyrogram.file_id import FileId
from fastapi import FastAPI, Request, HTTPException
from fastapi.responses import StreamingResponse
import uvicorn
# --- 1. GLOBAL IPv4 PATCH (The "Nuclear Option") ---
# This forces the entire Python script to ignore IPv6.
# If this doesn't fix it, your Space IP is blacklisted.
def force_ipv4():
old_getaddrinfo = socket.getaddrinfo
def new_getaddrinfo(*args, **kwargs):
responses = old_getaddrinfo(*args, **kwargs)
return [response for response in responses if response[0] == socket.AF_INET]
socket.getaddrinfo = new_getaddrinfo
force_ipv4()
# --- 2. HTTP CONNECTIVITY CHECK ---
def check_telegram_api():
print("πŸ“‘ TEST: Checking connection to Telegram API via HTTP...")
try:
# We try to reach the standard HTTP API.
# If this fails, the NETWORK is blocked.
r = requests.get(f"https://api.telegram.org/bot{os.environ.get('BOT_TOKEN')}/getMe", timeout=10)
if r.status_code == 200:
print("βœ… HTTP TEST PASSED: Network is fine. Bot Token is valid.")
return True
else:
print(f"⚠️ HTTP TEST FAILED: Status {r.status_code} - {r.text}")
return False
except Exception as e:
print(f"❌ HTTP TEST FAILED: Could not reach Telegram. The IP is likely blocked. Error: {e}")
return False
# --- CONFIGURATION ---
try:
API_ID = int(os.environ.get("API_ID"))
API_HASH = os.environ.get("API_HASH")
BOT_TOKEN = os.environ.get("BOT_TOKEN")
STORAGE_CHANNEL = int(os.environ.get("STORAGE_CHANNEL"))
except ValueError:
print("❌ ERROR: Env vars are missing or invalid!")
exit(1)
# --- BOT CLIENT ---
client = Client(
"worker_bot",
api_id=API_ID,
api_hash=API_HASH,
bot_token=BOT_TOKEN,
in_memory=True,
no_updates=True,
ipv6=False,
workdir="/tmp"
)
@asynccontextmanager
async def lifespan(app: FastAPI):
# Run the diagnostic
network_ok = check_telegram_api()
if network_ok:
print("⏳ Connecting via Pyrogram (MTProto)...")
try:
await client.start()
me = await client.get_me()
print(f"βœ… Worker Started as: {me.first_name} (@{me.username})")
except Exception as e:
print(f"❌ PYROGRAM ERROR: HTTP worked, but MTProto failed. {e}")
else:
print("πŸ›‘ ABORTING: Cannot reach Telegram API.")
yield
try:
if client.is_connected: await client.stop()
except: pass
app = FastAPI(lifespan=lifespan)
# --- STREAMING LOGIC ---
class ByteStreamer:
def __init__(self, client: Client): self.client = client
async def yield_file(self, file_id, offset, first_part_cut, last_part_cut, part_count, chunk_size):
client = self.client
if not client.is_connected: return
try:
ms = client.media_sessions.get(file_id.dc_id)
if not ms:
if file_id.dc_id != await client.storage.dc_id():
auth_key = await Auth(client, file_id.dc_id, await client.storage.test_mode()).create()
ms = Session(client, file_id.dc_id, auth_key, await client.storage.test_mode(), is_media=True)
await ms.start()
else: ms = client.session
client.media_sessions[file_id.dc_id] = ms
except: return
loc = raw.types.InputDocumentFileLocation(id=file_id.media_id, access_hash=file_id.access_hash, file_reference=file_id.file_reference, thumb_size=file_id.thumbnail_size)
curr = 1
while curr <= part_count:
try:
r = await ms.invoke(raw.functions.upload.GetFile(location=loc, offset=offset, limit=chunk_size), retries=3)
if isinstance(r, raw.types.upload.File):
chunk = r.bytes
if not chunk: break
if part_count == 1: yield chunk[first_part_cut:last_part_cut]
elif curr == 1: yield chunk[first_part_cut:]
elif curr == part_count: yield chunk[:last_part_cut]
else: yield chunk
curr += 1
offset += chunk_size
else: break
except: break
@app.get("/stream/{message_id}/{filename}")
async def stream_handler(req: Request, message_id: int, filename: str):
if not client.is_connected: raise HTTPException(503, "Bot not connected to Telegram")
try:
msg = await client.get_messages(STORAGE_CHANNEL, message_id)
media = msg.document or msg.video or msg.audio
if not media: raise HTTPException(404, "File not found")
file_id = FileId.decode(media.file_id)
file_size = media.file_size
range_header = req.headers.get("Range", 0)
from_bytes, until_bytes = 0, file_size - 1
if range_header:
s = range_header.replace("bytes=", "").split("-")
from_bytes = int(s[0])
if s[1]: until_bytes = int(s[1])
req_len = until_bytes - from_bytes + 1
chunk = 1048576
offset = (from_bytes // chunk) * chunk
first_cut = from_bytes - offset
last_cut = (until_bytes % chunk) + 1
parts = math.ceil(req_len / chunk)
streamer = ByteStreamer(client)
body = streamer.yield_file(file_id, offset, first_cut, last_cut, parts, chunk)
headers = {"Content-Type": media.mime_type or "application/octet-stream", "Content-Disposition": f'inline; filename="{media.file_name}"', "Accept-Ranges": "bytes", "Content-Range": f"bytes {from_bytes}-{until_bytes}/{file_size}", "Content-Length": str(req_len)}
return StreamingResponse(body, status_code=206 if range_header else 200, headers=headers)
except: raise HTTPException(404, "File Not Found")
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=7860)