from fastapi import FastAPI, UploadFile, File, Form from fastapi.responses import JSONResponse, FileResponse from fastapi.middleware.cors import CORSMiddleware import os import uuid import shutil import subprocess import threading import time # ========================================= # FASTAPI APP # ========================================= app = FastAPI() # ========================================= # CORS # ========================================= app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # ========================================= # FOLDERS # ========================================= UPLOAD_DIR = "uploads" OUTPUT_DIR = "outputs" os.makedirs(UPLOAD_DIR, exist_ok=True) os.makedirs(OUTPUT_DIR, exist_ok=True) # ========================================= # TASK STORE # ========================================= tasks = {} # ========================================= # GET VIDEO DURATION # ========================================= def get_video_duration(filepath): command = [ "ffprobe", "-v", "error", "-show_entries", "format=duration", "-of", "default=noprint_wrappers=1:nokey=1", filepath ] result = subprocess.run( command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True ) try: return float(result.stdout.strip()) except: return 0 # ========================================= # COMPRESS VIDEO WORKER # ========================================= def compress_worker( task_id, input_path, output_path, target_size_mb ): try: # ===================================== # GET DURATION # ===================================== duration = get_video_duration( input_path ) if duration <= 0: tasks[task_id] = { "status": "error", "progress": 0, "message": "Invalid video" } return # ===================================== # BITRATE CALCULATION # ===================================== # Convert MB -> bits total_bits = ( target_size_mb * 1024 * 1024 * 8 ) # Audio bitrate audio_bitrate = 96 # Calculate total bitrate total_bitrate = int( total_bits / duration / 1000 ) # Video bitrate video_bitrate = ( total_bitrate - audio_bitrate ) # Minimum safe bitrate if video_bitrate < 250: video_bitrate = 250 # ===================================== # UPDATE STATUS # ===================================== tasks[task_id] = { "status": "processing", "progress": 0 } # ===================================== # FFMPEG COMMAND # ===================================== command = [ "ffmpeg", "-i", input_path, # VIDEO CODEC "-vcodec", "libx264", # FAST PRESET "-preset", "veryfast", # VIDEO BITRATE "-b:v", f"{video_bitrate}k", "-maxrate", f"{video_bitrate}k", "-bufsize", f"{video_bitrate * 2}k", # AUDIO "-acodec", "aac", "-b:a", f"{audio_bitrate}k", # WEB OPTIMIZATION "-movflags", "+faststart", # MULTI THREAD "-threads", "0", # PROGRESS "-progress", "pipe:1", # OVERWRITE "-y", output_path ] process = subprocess.Popen( command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, universal_newlines=True ) # ===================================== # REAL PROGRESS # ===================================== while True: line = process.stdout.readline() if not line: break line = line.strip() if "out_time_ms=" in line: try: out_time_ms = int( line.split("=")[1] ) current_time = ( out_time_ms / 1000000 ) percent = int( (current_time / duration) * 100 ) if percent > 100: percent = 100 tasks[task_id] = { "status": "processing", "progress": percent } except: pass process.wait() # ===================================== # CHECK RESULT # ===================================== if process.returncode != 0: tasks[task_id] = { "status": "error", "progress": 0, "message": "Compression failed" } return if not os.path.exists(output_path): tasks[task_id] = { "status": "error", "progress": 0, "message": "Output file missing" } return # ===================================== # FINAL SIZE CHECK # ===================================== output_size_mb = ( os.path.getsize(output_path) / 1024 / 1024 ) # ===================================== # COMPLETE # ===================================== tasks[task_id] = { "status": "completed", "progress": 100, "download_url": f"/download/{task_id}", "output_size_mb": round(output_size_mb, 2) } except Exception as e: tasks[task_id] = { "status": "error", "progress": 0, "message": str(e) } # ========================================= # COMPRESS API # ========================================= @app.post("/compress") async def compress_video( file: UploadFile = File(...), target_size: int = Form(...) ): try: if not file.filename: return JSONResponse( {"error": "No file uploaded"}, status_code=400 ) # ===================================== # TASK ID # ===================================== task_id = str(uuid.uuid4()) input_path = os.path.join( UPLOAD_DIR, f"{task_id}_{file.filename}" ) output_path = os.path.join( OUTPUT_DIR, f"{task_id}.mp4" ) # ===================================== # SAVE FILE # ===================================== with open(input_path, "wb") as buffer: shutil.copyfileobj( file.file, buffer ) # ===================================== # START THREAD # ===================================== thread = threading.Thread( target=compress_worker, args=( task_id, input_path, output_path, target_size ) ) thread.start() return { "success": True, "task_id": task_id } except Exception as e: return JSONResponse( {"error": str(e)}, status_code=500 ) # ========================================= # PROGRESS API # ========================================= @app.get("/progress/{task_id}") async def get_progress(task_id: str): if task_id not in tasks: return JSONResponse( {"error": "Task not found"}, status_code=404 ) return tasks[task_id] # ========================================= # DOWNLOAD API # ========================================= @app.get("/download/{task_id}") async def download_video(task_id: str): filepath = os.path.join( OUTPUT_DIR, f"{task_id}.mp4" ) if not os.path.exists(filepath): return JSONResponse( {"error": "File not found"}, status_code=404 ) return FileResponse( filepath, media_type="video/mp4", filename="compressed-video.mp4" ) # ========================================= # AUTO CLEANUP # ========================================= def cleanup_old_files(): while True: now = time.time() for folder in [ UPLOAD_DIR, OUTPUT_DIR ]: for filename in os.listdir(folder): filepath = os.path.join( folder, filename ) try: age = ( now - os.path.getmtime(filepath) ) # Delete after 1 hour if age > 3600: os.remove(filepath) except: pass time.sleep(1800) cleanup_thread = threading.Thread( target=cleanup_old_files, daemon=True ) cleanup_thread.start() # ========================================= # ROOT # ========================================= @app.get("/") async def home(): return { "message": "Video Compressor API Running" } # ========================================= # RUN APP # ========================================= if __name__ == "__main__": import uvicorn port = int( os.environ.get("PORT", 7860) ) uvicorn.run( "app:app", host="0.0.0.0", port=port )