""" Telegram Multi-Part File Streamer - Main Application High-performance file upload and streaming service with zero-disk buffering """ import asyncio import logging from typing import AsyncGenerator, Optional from contextlib import asynccontextmanager from fastapi import FastAPI, Request, HTTPException, Response from fastapi.responses import StreamingResponse from fastapi.middleware.cors import CORSMiddleware import uvicorn from session_manager import SessionManager from database import Database, FileMetadata from utils import ( calculate_part_and_offset, generate_unique_id, CHUNK_SIZE, MAX_PART_SIZE ) # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) # Global instances session_manager: Optional[SessionManager] = None database: Optional[Database] = None @asynccontextmanager async def lifespan(app: FastAPI): """Application lifespan manager""" global session_manager, database logger.info("Initializing application...") # Initialize database database = Database() await database.connect() # Initialize session manager session_manager = SessionManager() await session_manager.initialize() logger.info("Application initialized successfully") yield # Cleanup logger.info("Shutting down application...") await session_manager.cleanup() await database.disconnect() logger.info("Application shutdown complete") # Initialize FastAPI app app = FastAPI( title="Telegram Multi-Part File Streamer", description="High-performance file upload and streaming service", version="1.0.0", lifespan=lifespan ) # Add CORS middleware app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) @app.get("/") async def root(): """Health check endpoint""" return { "status": "online", "service": "Telegram Multi-Part File Streamer", "version": "1.0.0" } @app.get("/health") async def health_check(): """Detailed health check""" session_count = len(session_manager.sessions) if session_manager else 0 db_connected = database.is_connected() if database else False return { "status": "healthy" if (session_count > 0 and db_connected) else "degraded", "sessions": session_count, "database": "connected" if db_connected else "disconnected" } @app.post("/upload") async def upload_file(request: Request, filename: Optional[str] = None): """ High-speed zero-disk file upload endpoint Streams data directly from HTTP to Telegram with auto-splitting """ if not session_manager or not database: raise HTTPException(status_code=503, detail="Service not initialized") logger.info(f"Upload request received: filename={filename}") unique_id = generate_unique_id() file_parts = [] total_size = 0 part_number = 0 try: # Create async generator from request stream async def request_stream() -> AsyncGenerator[bytes, None]: async for chunk in request.stream(): yield chunk # Buffer for part assembly part_buffer = bytearray() async for chunk in request_stream(): part_buffer.extend(chunk) # Check if we need to upload this part while len(part_buffer) >= MAX_PART_SIZE: part_number += 1 part_data = bytes(part_buffer[:MAX_PART_SIZE]) part_buffer = part_buffer[MAX_PART_SIZE:] logger.info(f"Uploading part {part_number} ({len(part_data)} bytes)") # Upload part to Telegram file_id = await session_manager.upload_part( part_data, f"{filename or unique_id}_part_{part_number}" ) file_parts.append({ "part_number": part_number, "file_id": file_id, "size": len(part_data) }) total_size += len(part_data) logger.info( f"Part {part_number} uploaded successfully. " f"Total size: {total_size / (1024**3):.2f} GB" ) # Upload remaining data as final part if len(part_buffer) > 0: part_number += 1 part_data = bytes(part_buffer) logger.info(f"Uploading final part {part_number} ({len(part_data)} bytes)") file_id = await session_manager.upload_part( part_data, f"{filename or unique_id}_part_{part_number}" ) file_parts.append({ "part_number": part_number, "file_id": file_id, "size": len(part_data) }) total_size += len(part_data) # Store metadata in database metadata = FileMetadata( unique_id=unique_id, filename=filename or f"file_{unique_id}", total_size=total_size, parts=file_parts, part_count=part_number ) await database.save_file_metadata(metadata) logger.info( f"Upload completed: unique_id={unique_id}, " f"parts={part_number}, total_size={total_size / (1024**3):.2f} GB" ) return { "success": True, "unique_id": unique_id, "filename": metadata.filename, "total_size": total_size, "parts": part_number, "download_url": f"/dl/{unique_id}" } except Exception as e: logger.error(f"Upload failed: {str(e)}", exc_info=True) raise HTTPException(status_code=500, detail=f"Upload failed: {str(e)}") @app.get("/dl/{unique_id}") async def stream_file(unique_id: str, request: Request): """ High-speed streaming endpoint with full range request support Supports multi-part concatenation and parallel connections """ if not session_manager or not database: raise HTTPException(status_code=503, detail="Service not initialized") # Fetch file metadata metadata = await database.get_file_metadata(unique_id) if not metadata: raise HTTPException(status_code=404, detail="File not found") # Parse range header range_header = request.headers.get("range") start = 0 end = metadata.total_size - 1 status_code = 200 if range_header: # Parse range: bytes=start-end range_str = range_header.replace("bytes=", "") range_parts = range_str.split("-") if range_parts[0]: start = int(range_parts[0]) if range_parts[1]: end = int(range_parts[1]) status_code = 206 # Partial Content # Validate range if start < 0 or end >= metadata.total_size or start > end: raise HTTPException(status_code=416, detail="Range not satisfiable") logger.info( f"Streaming request: unique_id={unique_id}, " f"range={start}-{end}, size={end - start + 1}" ) # Create streaming response content_length = end - start + 1 headers = { "Content-Type": "application/octet-stream", "Content-Length": str(content_length), "Accept-Ranges": "bytes", "Content-Disposition": f'attachment; filename="{metadata.filename}"', } if status_code == 206: headers["Content-Range"] = f"bytes {start}-{end}/{metadata.total_size}" async def stream_generator() -> AsyncGenerator[bytes, None]: """Generate stream from Telegram parts""" bytes_sent = 0 current_position = 0 for part in metadata.parts: part_start = current_position part_end = current_position + part["size"] - 1 # Check if this part overlaps with requested range if part_end < start: current_position += part["size"] continue if part_start > end: break # Calculate offset within this part offset_in_part = max(0, start - part_start) bytes_to_read = min( part["size"] - offset_in_part, content_length - bytes_sent ) logger.debug( f"Streaming part {part['part_number']}: " f"offset={offset_in_part}, bytes={bytes_to_read}" ) # Stream this part with retry logic retry_count = 0 max_retries = 3 while retry_count < max_retries: try: async for chunk in session_manager.stream_part( part["file_id"], offset=offset_in_part, limit=bytes_to_read ): chunk_size = len(chunk) # Ensure we don't send more than requested if bytes_sent + chunk_size > content_length: chunk = chunk[:content_length - bytes_sent] chunk_size = len(chunk) yield chunk bytes_sent += chunk_size if bytes_sent >= content_length: return break # Success except Exception as e: retry_count += 1 if retry_count >= max_retries: logger.error( f"Failed to stream part {part['part_number']}: {str(e)}" ) raise wait_time = 2 ** retry_count logger.warning( f"Retry {retry_count}/{max_retries} for part " f"{part['part_number']} after {wait_time}s" ) await asyncio.sleep(wait_time) current_position += part["size"] return StreamingResponse( stream_generator(), status_code=status_code, headers=headers, media_type="application/octet-stream" ) @app.get("/info/{unique_id}") async def get_file_info(unique_id: str): """Get file metadata and information""" if not database: raise HTTPException(status_code=503, detail="Service not initialized") metadata = await database.get_file_metadata(unique_id) if not metadata: raise HTTPException(status_code=404, detail="File not found") return { "unique_id": metadata.unique_id, "filename": metadata.filename, "total_size": metadata.total_size, "total_size_gb": f"{metadata.total_size / (1024**3):.2f}", "parts": metadata.part_count, "uploaded_at": metadata.uploaded_at, "download_url": f"/dl/{unique_id}" } @app.delete("/delete/{unique_id}") async def delete_file(unique_id: str): """Delete file and all its parts""" if not session_manager or not database: raise HTTPException(status_code=503, detail="Service not initialized") # Get metadata metadata = await database.get_file_metadata(unique_id) if not metadata: raise HTTPException(status_code=404, detail="File not found") # Delete from Telegram (best effort) deleted_parts = 0 for part in metadata.parts: try: await session_manager.delete_part(part["file_id"]) deleted_parts += 1 except Exception as e: logger.warning(f"Failed to delete part {part['part_number']}: {str(e)}") # Delete from database await database.delete_file_metadata(unique_id) logger.info(f"Deleted file: unique_id={unique_id}, parts={deleted_parts}") return { "success": True, "unique_id": unique_id, "deleted_parts": deleted_parts, "total_parts": metadata.part_count } if __name__ == "__main__": uvicorn.run( "main:app", host="0.0.0.0", port=8000, workers=1, # Single worker for shared session state log_level="info" )