Spaces:
Running
Running
| #!/usr/bin/env python3 | |
| """ | |
| FastAPI Video Compression Server | |
| Continuously polls Hugging Face dataset for large videos and compresses them. | |
| """ | |
| import os | |
| import sys | |
| import json | |
| import asyncio | |
| import subprocess | |
| import tempfile | |
| import threading | |
| import time | |
| from pathlib import Path | |
| from datetime import datetime | |
| from dotenv import load_dotenv | |
| from typing import List, Dict, Optional | |
| from fastapi import FastAPI, HTTPException | |
| from fastapi.responses import JSONResponse | |
| import uvicorn | |
| try: | |
| from huggingface_hub import list_repo_files, hf_hub_download, upload_file | |
| except ImportError: | |
| print("Missing dependency: huggingface_hub") | |
| sys.exit(1) | |
| # Load environment variables | |
| load_dotenv() | |
| HF_TOKEN = os.getenv("HF_TOKEN") | |
| if not HF_TOKEN: | |
| print("Error: Missing HF_TOKEN in .env") | |
| sys.exit(1) | |
| # Configuration | |
| HF_DATASET_REPO = "factorstudios/movs" | |
| READY_VIDEOS_FOLDER = "ready_videos" | |
| COMPRESSED_FOLDER = "compressed" | |
| SIZE_THRESHOLD_MB = 100 # Compress videos above 100MB | |
| CACHE_DIR = "/tmp/video_compress_cache" | |
| TEMP_DIR = Path("/tmp/video_compression_server") | |
| TEMP_DIR.mkdir(exist_ok=True) | |
| PROGRESS_FILE = TEMP_DIR / "compression_progress.json" | |
| PROGRESS_FILE_REPO = f"{COMPRESSED_FOLDER}/compression_progress.json" | |
| # Global state | |
| compression_state = { | |
| "is_running": False, | |
| "is_polling": False, | |
| "total_found": 0, | |
| "total_compressed": 0, | |
| "total_uploaded": 0, | |
| "current_video": None, | |
| "last_error": None, | |
| "processed_videos": [], | |
| "failed_videos": [], | |
| "total_space_saved_mb": 0, | |
| "compression_stats": {} | |
| } | |
| app = FastAPI(title="Video Compression Server") | |
| def get_video_duration(video_path: str) -> float: | |
| """Get video duration in seconds using ffprobe.""" | |
| cmd = [ | |
| "ffprobe", "-v", "error", | |
| "-show_entries", "format=duration", | |
| "-of", "default=noprint_wrappers=1:nokey=1:nokey=1", | |
| video_path | |
| ] | |
| result = subprocess.run(cmd, capture_output=True, text=True) | |
| try: | |
| return float(result.stdout.strip()) | |
| except ValueError: | |
| return 0 | |
| def get_video_bitrate(video_path: str) -> float: | |
| """Get video bitrate in kbps.""" | |
| cmd = [ | |
| "ffprobe", "-v", "error", | |
| "-select_streams", "v:0", | |
| "-show_entries", "stream=bit_rate", | |
| "-of", "default=noprint_wrappers=1:nokey=1", | |
| video_path | |
| ] | |
| result = subprocess.run(cmd, capture_output=True, text=True) | |
| try: | |
| return float(result.stdout.strip()) / 1000 | |
| except ValueError: | |
| return 0 | |
| def compress_video(input_path: str, output_path: str) -> Dict: | |
| """ | |
| Compress video using H.265 targeting ~50MB with quality preservation. | |
| Returns dict with compression stats. | |
| """ | |
| print(f"\n{'='*80}") | |
| print(f"COMPRESSING: {Path(input_path).name}") | |
| print(f"{'='*80}") | |
| # Get video info | |
| duration = get_video_duration(input_path) | |
| original_size = os.path.getsize(input_path) / (1024**2) | |
| print(f"Original: {original_size:.1f} MB | Duration: {duration:.1f}s") | |
| print(f"Target: ~50MB with high quality preservation") | |
| # Calculate target bitrate for ~50MB file | |
| target_mb = 50 | |
| target_bitrate_kbps = int((target_mb * 8 * 1024) / max(duration, 1)) | |
| video_bitrate = max(400, min(target_bitrate_kbps, 1200)) | |
| print(f"Calculated bitrate: {video_bitrate}kbps") | |
| # H.265 encoding with aggressive compression targeting ~50MB | |
| ffmpeg_cmd = [ | |
| "ffmpeg", "-y", | |
| "-i", input_path, | |
| "-c:v", "libx265", | |
| "-preset", "slow", | |
| "-crf", "28", | |
| "-b:v", f"{video_bitrate}k", | |
| "-maxrate", f"{int(video_bitrate * 1.2)}k", | |
| "-bufsize", f"{int(video_bitrate * 2)}k", | |
| "-x265-params", "aq-mode=3:log-level=error", | |
| "-pix_fmt", "yuv420p", | |
| "-c:a", "aac", | |
| "-b:a", "128k", | |
| "-progress", "pipe:1", | |
| output_path | |
| ] | |
| try: | |
| result = subprocess.run(ffmpeg_cmd, capture_output=True, text=True, timeout=3600) | |
| if result.returncode != 0: | |
| print(f"✗ Encoding failed with code {result.returncode}") | |
| if result.stderr: | |
| print(f"Error output: {result.stderr[:500]}") | |
| return None | |
| if not os.path.exists(output_path): | |
| print(f"✗ Output file not created") | |
| return None | |
| compressed_size = os.path.getsize(output_path) / (1024**2) | |
| compression_ratio = original_size / compressed_size | |
| saved_mb = original_size - compressed_size | |
| print(f"✓ Compressed: {original_size:.1f}MB → {compressed_size:.1f}MB") | |
| print(f" Saved: {saved_mb:.1f}MB ({(saved_mb/original_size*100):.1f}%)") | |
| print(f" Ratio: {compression_ratio:.2f}x") | |
| return { | |
| "original_size_mb": round(original_size, 2), | |
| "compressed_size_mb": round(compressed_size, 2), | |
| "saved_mb": round(saved_mb, 2), | |
| "compression_ratio": round(compression_ratio, 2), | |
| "duration_seconds": duration | |
| } | |
| except subprocess.TimeoutExpired: | |
| print(f"✗ Compression timed out (>1 hour)") | |
| return None | |
| except Exception as e: | |
| print(f"✗ Compression error: {e}") | |
| return None | |
| async def load_progress_file() -> Dict: | |
| """Load compression progress from JSON file in HF dataset.""" | |
| try: | |
| print("Attempting to load progress file from dataset...") | |
| # Try to download existing progress file | |
| progress_path = hf_hub_download( | |
| repo_id=HF_DATASET_REPO, | |
| filename=PROGRESS_FILE_REPO, | |
| repo_type="dataset", | |
| token=HF_TOKEN, | |
| cache_dir=CACHE_DIR | |
| ) | |
| if os.path.islink(progress_path): | |
| progress_path = os.path.realpath(progress_path) | |
| with open(progress_path, 'r') as f: | |
| progress = json.load(f) | |
| compressed_count = len(progress.get('compressed', [])) | |
| failed_count = len(progress.get('failed', [])) | |
| print(f"✓ Loaded progress: {compressed_count} compressed, {failed_count} failed") | |
| return progress | |
| except Exception as e: | |
| print(f"⊘ No existing progress file or load failed: {str(e)[:100]}") | |
| return {"compressed": [], "failed": [], "last_updated": datetime.now().isoformat()} | |
| async def save_progress_file(progress: Dict): | |
| """Save and upload compression progress to HF dataset.""" | |
| try: | |
| progress["last_updated"] = datetime.now().isoformat() | |
| with open(PROGRESS_FILE, 'w') as f: | |
| json.dump(progress, f, indent=2) | |
| print(f"Uploading progress file...") | |
| upload_file( | |
| path_or_fileobj=str(PROGRESS_FILE), | |
| path_in_repo=PROGRESS_FILE_REPO, | |
| repo_id=HF_DATASET_REPO, | |
| repo_type="dataset", | |
| token=HF_TOKEN, | |
| commit_message="Update compression progress" | |
| ) | |
| print(f"✓ Progress file uploaded") | |
| except Exception as e: | |
| print(f"Warning: Could not save progress file: {e}") | |
| async def scan_and_compress_videos(): | |
| """Scan ready_videos folder and compress large videos using persistent progress tracking.""" | |
| if compression_state["is_polling"]: | |
| print("Already polling, skipping...") | |
| return | |
| compression_state["is_polling"] = True | |
| try: | |
| print("\n" + "="*80) | |
| print("LOADING COMPRESSION PROGRESS") | |
| print("="*80) | |
| # Load progress file | |
| progress = await load_progress_file() | |
| compressed_files = {item["path"] for item in progress.get("compressed", [])} | |
| failed_files = {item["path"] for item in progress.get("failed", [])} | |
| print(f"Current state: {len(compressed_files)} compressed, {len(failed_files)} failed") | |
| print("\n" + "="*80) | |
| print("SCANNING FOR LARGE VIDEOS") | |
| print("="*80) | |
| try: | |
| # List all files in ready_videos | |
| print("Connecting to Hugging Face dataset...") | |
| files = list_repo_files( | |
| repo_id=HF_DATASET_REPO, | |
| repo_type="dataset", | |
| token=HF_TOKEN | |
| ) | |
| print(f"✓ Found {len(files)} total files in dataset") | |
| except Exception as e: | |
| print(f"✗ Error listing files: {e}") | |
| compression_state["last_error"] = str(e) | |
| compression_state["is_polling"] = False | |
| return | |
| # Find unprocessed video files | |
| video_files = [] | |
| for f in files: | |
| if f.startswith(f"{READY_VIDEOS_FOLDER}/") and f.endswith(".mp4"): | |
| # Skip already compressed files | |
| if "_compressed" in f: | |
| print(f" ⊘ {f.split('/')[-1]} (already compressed file)") | |
| continue | |
| # Skip if already handled | |
| if f in compressed_files: | |
| print(f" ⊘ {f.split('/')[-1]} (already processed)") | |
| continue | |
| if f in failed_files: | |
| print(f" ✗ {f.split('/')[-1]} (previously failed)") | |
| continue | |
| try: | |
| # Parse file path: ready_videos/moviename/segment-XX.mp4 | |
| parts = f.split("/") | |
| if len(parts) >= 3: | |
| movie_name = parts[1] | |
| file_name = parts[2] | |
| video_files.append({ | |
| "path": f, | |
| "movie_name": movie_name, | |
| "file_name": file_name | |
| }) | |
| except Exception as e: | |
| print(f"Error parsing file {f}: {e}") | |
| continue | |
| compression_state["total_found"] = len(video_files) | |
| print(f"\n✓ Found {len(video_files)} unprocessed video files") | |
| if not video_files: | |
| print("✓ All videos already processed!") | |
| compression_state["is_polling"] = False | |
| return | |
| # Process each video | |
| for video_info in video_files: | |
| compression_state["current_video"] = video_info["file_name"] | |
| try: | |
| # Download | |
| print(f"\nDownloading: {video_info['path']}") | |
| video_path = hf_hub_download( | |
| repo_id=HF_DATASET_REPO, | |
| filename=video_info["path"], | |
| repo_type="dataset", | |
| token=HF_TOKEN, | |
| cache_dir=CACHE_DIR | |
| ) | |
| if os.path.islink(video_path): | |
| video_path = os.path.realpath(video_path) | |
| file_size_mb = os.path.getsize(video_path) / (1024**2) | |
| print(f"✓ Downloaded: {file_size_mb:.1f} MB") | |
| # Check if meets compression threshold | |
| if file_size_mb < SIZE_THRESHOLD_MB: | |
| print(f"⊘ Below threshold ({file_size_mb:.1f}MB < {SIZE_THRESHOLD_MB}MB), skipping") | |
| progress["compressed"].append({ | |
| "path": video_info["path"], | |
| "file_name": video_info["file_name"], | |
| "status": "skipped", | |
| "reason": f"Below {SIZE_THRESHOLD_MB}MB threshold", | |
| "timestamp": datetime.now().isoformat() | |
| }) | |
| await save_progress_file(progress) | |
| continue | |
| print(f"✓ Starting H.265 compression...") | |
| # Compress | |
| output_path = TEMP_DIR / f"{Path(video_info['file_name']).stem}_compressed.mp4" | |
| compression_stats = compress_video(video_path, str(output_path)) | |
| if not compression_stats: | |
| raise Exception("Compression failed") | |
| compression_state["total_compressed"] += 1 | |
| # Upload compressed video to COMPRESSED folder with same structure | |
| upload_filename = f"{Path(video_info['file_name']).stem}_compressed.mp4" | |
| upload_path = f"{COMPRESSED_FOLDER}/{video_info['movie_name']}/{upload_filename}" | |
| print(f"Uploading: {upload_path}") | |
| upload_file( | |
| path_or_fileobj=str(output_path), | |
| path_in_repo=upload_path, | |
| repo_id=HF_DATASET_REPO, | |
| repo_type="dataset", | |
| token=HF_TOKEN, | |
| commit_message=f"Add H.265 compressed video: {upload_filename}" | |
| ) | |
| compression_state["total_uploaded"] += 1 | |
| compression_state["total_space_saved_mb"] += compression_stats["saved_mb"] | |
| # Track in progress file | |
| progress["compressed"].append({ | |
| "path": video_info["path"], | |
| "file_name": video_info["file_name"], | |
| "upload_path": upload_path, | |
| "status": "compressed_uploaded", | |
| "stats": compression_stats, | |
| "timestamp": datetime.now().isoformat() | |
| }) | |
| # Save progress | |
| await save_progress_file(progress) | |
| # Cleanup | |
| try: | |
| os.remove(output_path) | |
| except: | |
| pass | |
| except Exception as e: | |
| print(f"✗ Error: {e}") | |
| compression_state["last_error"] = str(e) | |
| progress["failed"].append({ | |
| "path": video_info["path"], | |
| "file_name": video_info["file_name"], | |
| "error": str(e), | |
| "timestamp": datetime.now().isoformat() | |
| }) | |
| await save_progress_file(progress) | |
| print(f"\n{'='*80}") | |
| print("COMPRESSION COMPLETE") | |
| print(f" Compressed: {compression_state['total_compressed']}") | |
| print(f" Uploaded: {compression_state['total_uploaded']}") | |
| print(f" Space saved: {compression_state['total_space_saved_mb']:.1f} MB") | |
| print(f" Total compressed: {len(progress['compressed'])}") | |
| print(f" Total failed: {len(progress['failed'])}") | |
| print(f"{'='*80}\n") | |
| except Exception as e: | |
| print(f"Critical error: {e}") | |
| compression_state["last_error"] = str(e) | |
| finally: | |
| compression_state["is_polling"] = False | |
| async def startup_event(): | |
| """Schedule compression scan on server startup with 30s delay using background thread.""" | |
| print("\n" + "="*80) | |
| print("STARTUP EVENT TRIGGERED") | |
| print("="*80) | |
| # Check if ffmpeg/ffprobe are available | |
| try: | |
| result = subprocess.run(["ffmpeg", "-version"], capture_output=True, timeout=5) | |
| if result.returncode == 0: | |
| print("✓ ffmpeg is available") | |
| else: | |
| print("✗ ffmpeg check failed") | |
| except FileNotFoundError: | |
| print("✗ WARNING: ffmpeg not found in PATH - compression will fail") | |
| except Exception as e: | |
| print(f"✗ Error checking ffmpeg: {e}") | |
| # Schedule scan in a background thread (more reliable for deployment) | |
| def run_delayed_scan(): | |
| print("\nWaiting 30 seconds before starting compression scan...") | |
| time.sleep(30) | |
| print("Starting compression scan now...") | |
| asyncio.run(scan_and_compress_videos()) | |
| scan_thread = threading.Thread(target=run_delayed_scan, daemon=True) | |
| scan_thread.start() | |
| print("✓ Background scan thread scheduled") | |
| async def health(): | |
| """Health check endpoint.""" | |
| return JSONResponse({ | |
| "status": "running", | |
| "service": "Video Compression Server", | |
| "is_polling": compression_state["is_polling"], | |
| "total_found": compression_state["total_found"], | |
| "total_compressed": compression_state["total_compressed"], | |
| "total_uploaded": compression_state["total_uploaded"], | |
| "current_video": compression_state["current_video"], | |
| "space_saved_mb": compression_state["total_space_saved_mb"], | |
| "processed_count": len(compression_state["processed_videos"]), | |
| "failed_count": len(compression_state["failed_videos"]) | |
| }) | |
| async def get_status(): | |
| """Get detailed status.""" | |
| return JSONResponse({ | |
| "is_polling": compression_state["is_polling"], | |
| "total_found": compression_state["total_found"], | |
| "total_compressed": compression_state["total_compressed"], | |
| "total_uploaded": compression_state["total_uploaded"], | |
| "total_space_saved_mb": round(compression_state["total_space_saved_mb"], 2), | |
| "current_video": compression_state["current_video"], | |
| "processed_videos": compression_state["processed_videos"], | |
| "failed_videos": compression_state["failed_videos"], | |
| "last_error": compression_state["last_error"] | |
| }) | |
| async def trigger_scan(): | |
| """Manually trigger a scan and compression run.""" | |
| if compression_state["is_polling"]: | |
| return JSONResponse({ | |
| "status": "already_running", | |
| "message": "Compression scan already in progress" | |
| }, status_code=409) | |
| asyncio.create_task(scan_and_compress_videos()) | |
| return JSONResponse({ | |
| "status": "started", | |
| "message": "Compression scan started" | |
| }) | |
| async def get_stats(): | |
| """Get compression statistics.""" | |
| return JSONResponse({ | |
| "compression_stats": compression_state["compression_stats"], | |
| "total_space_saved_mb": round(compression_state["total_space_saved_mb"], 2), | |
| "average_compression_ratio": round( | |
| sum(s["compression_ratio"] for s in compression_state["compression_stats"].values()) / | |
| max(len(compression_state["compression_stats"]), 1), 2 | |
| ) if compression_state["compression_stats"] else 0 | |
| }) | |
| async def reset_state(): | |
| """Reset all tracking (for testing).""" | |
| compression_state["total_found"] = 0 | |
| compression_state["total_compressed"] = 0 | |
| compression_state["total_uploaded"] = 0 | |
| compression_state["processed_videos"] = [] | |
| compression_state["failed_videos"] = [] | |
| compression_state["compression_stats"] = {} | |
| compression_state["total_space_saved_mb"] = 0 | |
| return JSONResponse({"status": "reset"}) | |
| if __name__ == "__main__": | |
| print("Starting Video Compression Server on port 7860...") | |
| print("Scanning for videos on startup...") | |
| uvicorn.run(app, host="0.0.0.0", port=7860) | |