#!/usr/bin/env python3 """ FastAPI Video Compression Server Continuously polls Hugging Face dataset for large videos and compresses them. """ import os import sys import json import asyncio import subprocess import tempfile from pathlib import Path from datetime import datetime from dotenv import load_dotenv from typing import List, Dict, Optional from fastapi import FastAPI, HTTPException from fastapi.responses import JSONResponse import uvicorn try: from huggingface_hub import list_repo_files, hf_hub_download, upload_file except ImportError: print("Missing dependency: huggingface_hub") sys.exit(1) # Load environment variables load_dotenv() HF_TOKEN = os.getenv("HF_TOKEN") if not HF_TOKEN: print("Error: Missing HF_TOKEN in .env") sys.exit(1) # Configuration HF_DATASET_REPO = "factorstudios/movs" READY_VIDEOS_FOLDER = "ready_videos" SIZE_THRESHOLD_MB = 100 # Compress videos above 100MB CACHE_DIR = "/tmp/video_compress_cache" TEMP_DIR = Path("/tmp/video_compression_server") TEMP_DIR.mkdir(exist_ok=True) # Global state compression_state = { "is_running": False, "is_polling": False, "total_found": 0, "total_compressed": 0, "total_uploaded": 0, "current_video": None, "last_error": None, "processed_videos": [], "failed_videos": [], "total_space_saved_mb": 0, "compression_stats": {} } app = FastAPI(title="Video Compression Server") def get_video_duration(video_path: str) -> float: """Get video duration in seconds using ffprobe.""" cmd = [ "ffprobe", "-v", "error", "-show_entries", "format=duration", "-of", "default=noprint_wrappers=1:nokey=1:nokey=1", video_path ] result = subprocess.run(cmd, capture_output=True, text=True) try: return float(result.stdout.strip()) except ValueError: return 0 def get_video_bitrate(video_path: str) -> float: """Get video bitrate in kbps.""" cmd = [ "ffprobe", "-v", "error", "-select_streams", "v:0", "-show_entries", "stream=bit_rate", "-of", "default=noprint_wrappers=1:nokey=1", video_path ] result = subprocess.run(cmd, capture_output=True, text=True) try: return float(result.stdout.strip()) / 1000 except ValueError: return 0 def compress_video(input_path: str, output_path: str) -> Dict: """ Compress video using H.265 with maximum quality preservation. Returns dict with compression stats. """ print(f"\n{'='*80}") print(f"COMPRESSING: {Path(input_path).name}") print(f"{'='*80}") # Get video info duration = get_video_duration(input_path) original_size = os.path.getsize(input_path) / (1024**2) print(f"Original: {original_size:.1f} MB | Duration: {duration:.1f}s") # H.265 encoding with maximum quality ffmpeg_cmd = [ "ffmpeg", "-y", "-i", input_path, "-c:v", "libx265", "-preset", "veryslow", "-crf", "20", "-x265-params", "aq-mode=3", "-pix_fmt", "yuv420p10le", "-c:a", "aac", "-b:a", "192k", output_path ] result = subprocess.run(ffmpeg_cmd, capture_output=True, text=True) if result.returncode != 0: print(f"✗ Encoding failed: {result.stderr[:500]}") return None if not os.path.exists(output_path): print(f"✗ Output file not created") return None compressed_size = os.path.getsize(output_path) / (1024**2) compression_ratio = original_size / compressed_size saved_mb = original_size - compressed_size print(f"✓ Compressed: {original_size:.1f}MB → {compressed_size:.1f}MB") print(f" Saved: {saved_mb:.1f}MB ({(saved_mb/original_size*100):.1f}%)") print(f" Ratio: {compression_ratio:.2f}x") return { "original_size_mb": round(original_size, 2), "compressed_size_mb": round(compressed_size, 2), "saved_mb": round(saved_mb, 2), "compression_ratio": round(compression_ratio, 2), "duration_seconds": duration } async def scan_and_compress_videos(): """Scan ready_videos folder and compress large videos.""" if compression_state["is_polling"]: print("Already polling, skipping...") return compression_state["is_polling"] = True try: print("\n" + "="*80) print("SCANNING FOR LARGE VIDEOS") print("="*80) # List all files in ready_videos files = list_repo_files( repo_id=HF_DATASET_REPO, repo_type="dataset", token=HF_TOKEN ) # Find video files > 100MB in ready_videos folder video_files = [] for f in files: if f.startswith(f"{READY_VIDEOS_FOLDER}/") and f.endswith(".mp4"): # Get file info from HF Hub try: # Parse file path: ready_videos/moviename/segment-XX.mp4 parts = f.split("/") if len(parts) >= 3: movie_name = parts[1] file_name = parts[2] video_files.append({ "path": f, "movie_name": movie_name, "file_name": file_name }) except Exception as e: print(f"Error parsing file {f}: {e}") continue print(f"Found {len(video_files)} video files") # Check sizes and filter large ones large_videos = [] for video_info in video_files: # Skip if already processed if any(pv["path"] == video_info["path"] for pv in compression_state["processed_videos"]): print(f" ⊘ {video_info['file_name']} (already processed)") continue # Skip if already failed if any(fv["path"] == video_info["path"] for fv in compression_state["failed_videos"]): print(f" ✗ {video_info['file_name']} (previously failed)") continue large_videos.append(video_info) compression_state["total_found"] = len(large_videos) print(f"Unprocessed videos: {len(large_videos)}") if not large_videos: print("✓ All videos already processed!") compression_state["is_polling"] = False return # Process each video for video_info in large_videos: compression_state["current_video"] = video_info["file_name"] try: # Download print(f"\nDownloading: {video_info['path']}") video_path = hf_hub_download( repo_id=HF_DATASET_REPO, filename=video_info["path"], repo_type="dataset", token=HF_TOKEN, cache_dir=CACHE_DIR ) if os.path.islink(video_path): video_path = os.path.realpath(video_path) file_size_mb = os.path.getsize(video_path) / (1024**2) print(f"✓ Downloaded: {file_size_mb:.1f} MB") # Skip if smaller than threshold if file_size_mb < SIZE_THRESHOLD_MB: print(f"⊘ File size {file_size_mb:.1f}MB < {SIZE_THRESHOLD_MB}MB threshold, skipping") compression_state["processed_videos"].append({ "path": video_info["path"], "file_name": video_info["file_name"], "status": "skipped", "reason": f"File size {file_size_mb:.1f}MB below threshold", "timestamp": datetime.now().isoformat() }) continue # Compress output_path = TEMP_DIR / f"{Path(video_info['file_name']).stem}_compressed.mp4" compression_stats = compress_video(video_path, str(output_path)) if not compression_stats: raise Exception("Compression failed") compression_state["total_compressed"] += 1 # Upload compressed video upload_filename = f"{Path(video_info['file_name']).stem}_compressed.mp4" upload_path = f"{READY_VIDEOS_FOLDER}/{video_info['movie_name']}/{upload_filename}" print(f"Uploading: {upload_path}") upload_file( path_or_fileobj=str(output_path), path_in_repo=upload_path, repo_id=HF_DATASET_REPO, repo_type="dataset", token=HF_TOKEN, commit_message=f"Add H.265 compressed video: {upload_filename}" ) compression_state["total_uploaded"] += 1 compression_state["total_space_saved_mb"] += compression_stats["saved_mb"] # Track processed video compression_state["processed_videos"].append({ "path": video_info["path"], "file_name": video_info["file_name"], "upload_path": upload_path, "status": "compressed_uploaded", "stats": compression_stats, "timestamp": datetime.now().isoformat() }) compression_state["compression_stats"][video_info["file_name"]] = compression_stats # Cleanup try: os.remove(output_path) except: pass except Exception as e: print(f"✗ Error: {e}") compression_state["last_error"] = str(e) compression_state["failed_videos"].append({ "path": video_info["path"], "file_name": video_info["file_name"], "error": str(e), "timestamp": datetime.now().isoformat() }) # Save JSON tracking file print(f"\n{'='*80}") print("SAVING PROCESSING REPORT") print(f"{'='*80}") report = { "timestamp": datetime.now().isoformat(), "total_videos_found": compression_state["total_found"], "total_compressed": compression_state["total_compressed"], "total_uploaded": compression_state["total_uploaded"], "total_space_saved_mb": compression_state["total_space_saved_mb"], "processed_videos": compression_state["processed_videos"], "failed_videos": compression_state["failed_videos"], "compression_stats": compression_state["compression_stats"] } report_path = TEMP_DIR / "compression_report.json" with open(report_path, "w") as f: json.dump(report, f, indent=2) # Upload JSON report print(f"Uploading report...") upload_file( path_or_fileobj=str(report_path), path_in_repo=f"{READY_VIDEOS_FOLDER}/compression_report.json", repo_id=HF_DATASET_REPO, repo_type="dataset", token=HF_TOKEN, commit_message="Update compression processing report" ) print(f"✓ Report uploaded") print(f"\n{'='*80}") print("COMPRESSION COMPLETE") print(f" Compressed: {compression_state['total_compressed']}") print(f" Uploaded: {compression_state['total_uploaded']}") print(f" Space saved: {compression_state['total_space_saved_mb']:.1f} MB") print(f"{'='*80}\n") except Exception as e: print(f"Critical error: {e}") compression_state["last_error"] = str(e) finally: compression_state["is_polling"] = False @app.on_event("startup") async def startup_event(): """Start initial compression scan after 15 seconds.""" async def delayed_scan(): await asyncio.sleep(15) print("\n" + "="*80) print("STARTING COMPRESSION SCAN (15 seconds after startup)") print("="*80) await scan_and_compress_videos() asyncio.create_task(delayed_scan()) @app.get("/") async def health(): """Health check endpoint.""" return JSONResponse({ "status": "running", "service": "Video Compression Server", "is_polling": compression_state["is_polling"], "total_found": compression_state["total_found"], "total_compressed": compression_state["total_compressed"], "total_uploaded": compression_state["total_uploaded"], "current_video": compression_state["current_video"], "space_saved_mb": compression_state["total_space_saved_mb"], "processed_count": len(compression_state["processed_videos"]), "failed_count": len(compression_state["failed_videos"]) }) @app.get("/status") async def get_status(): """Get detailed status.""" return JSONResponse({ "is_polling": compression_state["is_polling"], "total_found": compression_state["total_found"], "total_compressed": compression_state["total_compressed"], "total_uploaded": compression_state["total_uploaded"], "total_space_saved_mb": round(compression_state["total_space_saved_mb"], 2), "current_video": compression_state["current_video"], "processed_videos": compression_state["processed_videos"], "failed_videos": compression_state["failed_videos"], "last_error": compression_state["last_error"] }) @app.post("/scan") async def trigger_scan(): """Manually trigger a scan and compression run.""" if compression_state["is_polling"]: return JSONResponse({ "status": "already_running", "message": "Compression scan already in progress" }, status_code=409) asyncio.create_task(scan_and_compress_videos()) return JSONResponse({ "status": "started", "message": "Compression scan started" }) @app.get("/stats") async def get_stats(): """Get compression statistics.""" return JSONResponse({ "compression_stats": compression_state["compression_stats"], "total_space_saved_mb": round(compression_state["total_space_saved_mb"], 2), "average_compression_ratio": round( sum(s["compression_ratio"] for s in compression_state["compression_stats"].values()) / max(len(compression_state["compression_stats"]), 1), 2 ) if compression_state["compression_stats"] else 0 }) @app.post("/reset") async def reset_state(): """Reset all tracking (for testing).""" compression_state["total_found"] = 0 compression_state["total_compressed"] = 0 compression_state["total_uploaded"] = 0 compression_state["processed_videos"] = [] compression_state["failed_videos"] = [] compression_state["compression_stats"] = {} compression_state["total_space_saved_mb"] = 0 return JSONResponse({"status": "reset"}) if __name__ == "__main__": print("Starting Video Compression Server on port 7861...") print("Scanning for videos on startup...") uvicorn.run(app, host="0.0.0.0", port=7860)