Spaces:
Running
Running
| #!/usr/bin/env python3 | |
| """ | |
| FastAPI Video Compression Server | |
| Continuously polls Hugging Face dataset for large videos and compresses them. | |
| """ | |
| import os | |
| import sys | |
| import json | |
| import asyncio | |
| import subprocess | |
| import tempfile | |
| from pathlib import Path | |
| from datetime import datetime | |
| from dotenv import load_dotenv | |
| from typing import List, Dict, Optional | |
| from fastapi import FastAPI, HTTPException | |
| from fastapi.responses import JSONResponse | |
| import uvicorn | |
| try: | |
| from huggingface_hub import list_repo_files, hf_hub_download, upload_file | |
| except ImportError: | |
| print("Missing dependency: huggingface_hub") | |
| sys.exit(1) | |
| # Load environment variables | |
| load_dotenv() | |
| HF_TOKEN = os.getenv("HF_TOKEN") | |
| if not HF_TOKEN: | |
| print("Error: Missing HF_TOKEN in .env") | |
| sys.exit(1) | |
| # Configuration | |
| HF_DATASET_REPO = "factorstudios/movs" | |
| READY_VIDEOS_FOLDER = "ready_videos" | |
| SIZE_THRESHOLD_MB = 100 # Compress videos above 100MB | |
| CACHE_DIR = "/tmp/video_compress_cache" | |
| TEMP_DIR = Path("/tmp/video_compression_server") | |
| TEMP_DIR.mkdir(exist_ok=True) | |
| # Global state | |
| compression_state = { | |
| "is_running": False, | |
| "is_polling": False, | |
| "total_found": 0, | |
| "total_compressed": 0, | |
| "total_uploaded": 0, | |
| "current_video": None, | |
| "last_error": None, | |
| "processed_videos": [], | |
| "failed_videos": [], | |
| "total_space_saved_mb": 0, | |
| "compression_stats": {} | |
| } | |
| app = FastAPI(title="Video Compression Server") | |
| def get_video_duration(video_path: str) -> float: | |
| """Get video duration in seconds using ffprobe.""" | |
| cmd = [ | |
| "ffprobe", "-v", "error", | |
| "-show_entries", "format=duration", | |
| "-of", "default=noprint_wrappers=1:nokey=1:nokey=1", | |
| video_path | |
| ] | |
| result = subprocess.run(cmd, capture_output=True, text=True) | |
| try: | |
| return float(result.stdout.strip()) | |
| except ValueError: | |
| return 0 | |
| def get_video_bitrate(video_path: str) -> float: | |
| """Get video bitrate in kbps.""" | |
| cmd = [ | |
| "ffprobe", "-v", "error", | |
| "-select_streams", "v:0", | |
| "-show_entries", "stream=bit_rate", | |
| "-of", "default=noprint_wrappers=1:nokey=1", | |
| video_path | |
| ] | |
| result = subprocess.run(cmd, capture_output=True, text=True) | |
| try: | |
| return float(result.stdout.strip()) / 1000 | |
| except ValueError: | |
| return 0 | |
| def compress_video(input_path: str, output_path: str) -> Dict: | |
| """ | |
| Compress video using H.265 with maximum quality preservation. | |
| Returns dict with compression stats. | |
| """ | |
| print(f"\n{'='*80}") | |
| print(f"COMPRESSING: {Path(input_path).name}") | |
| print(f"{'='*80}") | |
| # Get video info | |
| duration = get_video_duration(input_path) | |
| original_size = os.path.getsize(input_path) / (1024**2) | |
| print(f"Original: {original_size:.1f} MB | Duration: {duration:.1f}s") | |
| # H.265 encoding with maximum quality | |
| ffmpeg_cmd = [ | |
| "ffmpeg", "-y", | |
| "-i", input_path, | |
| "-c:v", "libx265", | |
| "-preset", "veryslow", | |
| "-crf", "20", | |
| "-x265-params", "aq-mode=3", | |
| "-pix_fmt", "yuv420p10le", | |
| "-c:a", "aac", | |
| "-b:a", "192k", | |
| output_path | |
| ] | |
| result = subprocess.run(ffmpeg_cmd, capture_output=True, text=True) | |
| if result.returncode != 0: | |
| print(f"✗ Encoding failed: {result.stderr[:500]}") | |
| return None | |
| if not os.path.exists(output_path): | |
| print(f"✗ Output file not created") | |
| return None | |
| compressed_size = os.path.getsize(output_path) / (1024**2) | |
| compression_ratio = original_size / compressed_size | |
| saved_mb = original_size - compressed_size | |
| print(f"✓ Compressed: {original_size:.1f}MB → {compressed_size:.1f}MB") | |
| print(f" Saved: {saved_mb:.1f}MB ({(saved_mb/original_size*100):.1f}%)") | |
| print(f" Ratio: {compression_ratio:.2f}x") | |
| return { | |
| "original_size_mb": round(original_size, 2), | |
| "compressed_size_mb": round(compressed_size, 2), | |
| "saved_mb": round(saved_mb, 2), | |
| "compression_ratio": round(compression_ratio, 2), | |
| "duration_seconds": duration | |
| } | |
| async def scan_and_compress_videos(): | |
| """Scan ready_videos folder and compress large videos.""" | |
| if compression_state["is_polling"]: | |
| print("Already polling, skipping...") | |
| return | |
| compression_state["is_polling"] = True | |
| try: | |
| print("\n" + "="*80) | |
| print("SCANNING FOR LARGE VIDEOS") | |
| print("="*80) | |
| # List all files in ready_videos | |
| files = list_repo_files( | |
| repo_id=HF_DATASET_REPO, | |
| repo_type="dataset", | |
| token=HF_TOKEN | |
| ) | |
| # Find video files > 100MB in ready_videos folder | |
| video_files = [] | |
| for f in files: | |
| if f.startswith(f"{READY_VIDEOS_FOLDER}/") and f.endswith(".mp4"): | |
| # Get file info from HF Hub | |
| try: | |
| # Parse file path: ready_videos/moviename/segment-XX.mp4 | |
| parts = f.split("/") | |
| if len(parts) >= 3: | |
| movie_name = parts[1] | |
| file_name = parts[2] | |
| video_files.append({ | |
| "path": f, | |
| "movie_name": movie_name, | |
| "file_name": file_name | |
| }) | |
| except Exception as e: | |
| print(f"Error parsing file {f}: {e}") | |
| continue | |
| print(f"Found {len(video_files)} video files") | |
| # Check sizes and filter large ones | |
| large_videos = [] | |
| for video_info in video_files: | |
| # Skip if already processed | |
| if any(pv["path"] == video_info["path"] for pv in compression_state["processed_videos"]): | |
| print(f" ⊘ {video_info['file_name']} (already processed)") | |
| continue | |
| # Skip if already failed | |
| if any(fv["path"] == video_info["path"] for fv in compression_state["failed_videos"]): | |
| print(f" ✗ {video_info['file_name']} (previously failed)") | |
| continue | |
| large_videos.append(video_info) | |
| compression_state["total_found"] = len(large_videos) | |
| print(f"Unprocessed videos: {len(large_videos)}") | |
| if not large_videos: | |
| print("✓ All videos already processed!") | |
| compression_state["is_polling"] = False | |
| return | |
| # Process each video | |
| for video_info in large_videos: | |
| compression_state["current_video"] = video_info["file_name"] | |
| try: | |
| # Download | |
| print(f"\nDownloading: {video_info['path']}") | |
| video_path = hf_hub_download( | |
| repo_id=HF_DATASET_REPO, | |
| filename=video_info["path"], | |
| repo_type="dataset", | |
| token=HF_TOKEN, | |
| cache_dir=CACHE_DIR | |
| ) | |
| if os.path.islink(video_path): | |
| video_path = os.path.realpath(video_path) | |
| file_size_mb = os.path.getsize(video_path) / (1024**2) | |
| print(f"✓ Downloaded: {file_size_mb:.1f} MB") | |
| # Skip if smaller than threshold | |
| if file_size_mb < SIZE_THRESHOLD_MB: | |
| print(f"⊘ File size {file_size_mb:.1f}MB < {SIZE_THRESHOLD_MB}MB threshold, skipping") | |
| compression_state["processed_videos"].append({ | |
| "path": video_info["path"], | |
| "file_name": video_info["file_name"], | |
| "status": "skipped", | |
| "reason": f"File size {file_size_mb:.1f}MB below threshold", | |
| "timestamp": datetime.now().isoformat() | |
| }) | |
| continue | |
| # Compress | |
| output_path = TEMP_DIR / f"{Path(video_info['file_name']).stem}_compressed.mp4" | |
| compression_stats = compress_video(video_path, str(output_path)) | |
| if not compression_stats: | |
| raise Exception("Compression failed") | |
| compression_state["total_compressed"] += 1 | |
| # Upload compressed video | |
| upload_filename = f"{Path(video_info['file_name']).stem}_compressed.mp4" | |
| upload_path = f"{READY_VIDEOS_FOLDER}/{video_info['movie_name']}/{upload_filename}" | |
| print(f"Uploading: {upload_path}") | |
| upload_file( | |
| path_or_fileobj=str(output_path), | |
| path_in_repo=upload_path, | |
| repo_id=HF_DATASET_REPO, | |
| repo_type="dataset", | |
| token=HF_TOKEN, | |
| commit_message=f"Add H.265 compressed video: {upload_filename}" | |
| ) | |
| compression_state["total_uploaded"] += 1 | |
| compression_state["total_space_saved_mb"] += compression_stats["saved_mb"] | |
| # Track processed video | |
| compression_state["processed_videos"].append({ | |
| "path": video_info["path"], | |
| "file_name": video_info["file_name"], | |
| "upload_path": upload_path, | |
| "status": "compressed_uploaded", | |
| "stats": compression_stats, | |
| "timestamp": datetime.now().isoformat() | |
| }) | |
| compression_state["compression_stats"][video_info["file_name"]] = compression_stats | |
| # Cleanup | |
| try: | |
| os.remove(output_path) | |
| except: | |
| pass | |
| except Exception as e: | |
| print(f"✗ Error: {e}") | |
| compression_state["last_error"] = str(e) | |
| compression_state["failed_videos"].append({ | |
| "path": video_info["path"], | |
| "file_name": video_info["file_name"], | |
| "error": str(e), | |
| "timestamp": datetime.now().isoformat() | |
| }) | |
| # Save JSON tracking file | |
| print(f"\n{'='*80}") | |
| print("SAVING PROCESSING REPORT") | |
| print(f"{'='*80}") | |
| report = { | |
| "timestamp": datetime.now().isoformat(), | |
| "total_videos_found": compression_state["total_found"], | |
| "total_compressed": compression_state["total_compressed"], | |
| "total_uploaded": compression_state["total_uploaded"], | |
| "total_space_saved_mb": compression_state["total_space_saved_mb"], | |
| "processed_videos": compression_state["processed_videos"], | |
| "failed_videos": compression_state["failed_videos"], | |
| "compression_stats": compression_state["compression_stats"] | |
| } | |
| report_path = TEMP_DIR / "compression_report.json" | |
| with open(report_path, "w") as f: | |
| json.dump(report, f, indent=2) | |
| # Upload JSON report | |
| print(f"Uploading report...") | |
| upload_file( | |
| path_or_fileobj=str(report_path), | |
| path_in_repo=f"{READY_VIDEOS_FOLDER}/compression_report.json", | |
| repo_id=HF_DATASET_REPO, | |
| repo_type="dataset", | |
| token=HF_TOKEN, | |
| commit_message="Update compression processing report" | |
| ) | |
| print(f"✓ Report uploaded") | |
| print(f"\n{'='*80}") | |
| print("COMPRESSION COMPLETE") | |
| print(f" Compressed: {compression_state['total_compressed']}") | |
| print(f" Uploaded: {compression_state['total_uploaded']}") | |
| print(f" Space saved: {compression_state['total_space_saved_mb']:.1f} MB") | |
| print(f"{'='*80}\n") | |
| except Exception as e: | |
| print(f"Critical error: {e}") | |
| compression_state["last_error"] = str(e) | |
| finally: | |
| compression_state["is_polling"] = False | |
| async def startup_event(): | |
| """Start initial compression scan after 15 seconds.""" | |
| async def delayed_scan(): | |
| await asyncio.sleep(15) | |
| print("\n" + "="*80) | |
| print("STARTING COMPRESSION SCAN (15 seconds after startup)") | |
| print("="*80) | |
| await scan_and_compress_videos() | |
| asyncio.create_task(delayed_scan()) | |
| async def health(): | |
| """Health check endpoint.""" | |
| return JSONResponse({ | |
| "status": "running", | |
| "service": "Video Compression Server", | |
| "is_polling": compression_state["is_polling"], | |
| "total_found": compression_state["total_found"], | |
| "total_compressed": compression_state["total_compressed"], | |
| "total_uploaded": compression_state["total_uploaded"], | |
| "current_video": compression_state["current_video"], | |
| "space_saved_mb": compression_state["total_space_saved_mb"], | |
| "processed_count": len(compression_state["processed_videos"]), | |
| "failed_count": len(compression_state["failed_videos"]) | |
| }) | |
| async def get_status(): | |
| """Get detailed status.""" | |
| return JSONResponse({ | |
| "is_polling": compression_state["is_polling"], | |
| "total_found": compression_state["total_found"], | |
| "total_compressed": compression_state["total_compressed"], | |
| "total_uploaded": compression_state["total_uploaded"], | |
| "total_space_saved_mb": round(compression_state["total_space_saved_mb"], 2), | |
| "current_video": compression_state["current_video"], | |
| "processed_videos": compression_state["processed_videos"], | |
| "failed_videos": compression_state["failed_videos"], | |
| "last_error": compression_state["last_error"] | |
| }) | |
| async def trigger_scan(): | |
| """Manually trigger a scan and compression run.""" | |
| if compression_state["is_polling"]: | |
| return JSONResponse({ | |
| "status": "already_running", | |
| "message": "Compression scan already in progress" | |
| }, status_code=409) | |
| asyncio.create_task(scan_and_compress_videos()) | |
| return JSONResponse({ | |
| "status": "started", | |
| "message": "Compression scan started" | |
| }) | |
| async def get_stats(): | |
| """Get compression statistics.""" | |
| return JSONResponse({ | |
| "compression_stats": compression_state["compression_stats"], | |
| "total_space_saved_mb": round(compression_state["total_space_saved_mb"], 2), | |
| "average_compression_ratio": round( | |
| sum(s["compression_ratio"] for s in compression_state["compression_stats"].values()) / | |
| max(len(compression_state["compression_stats"]), 1), 2 | |
| ) if compression_state["compression_stats"] else 0 | |
| }) | |
| async def reset_state(): | |
| """Reset all tracking (for testing).""" | |
| compression_state["total_found"] = 0 | |
| compression_state["total_compressed"] = 0 | |
| compression_state["total_uploaded"] = 0 | |
| compression_state["processed_videos"] = [] | |
| compression_state["failed_videos"] = [] | |
| compression_state["compression_stats"] = {} | |
| compression_state["total_space_saved_mb"] = 0 | |
| return JSONResponse({"status": "reset"}) | |
| if __name__ == "__main__": | |
| print("Starting Video Compression Server on port 7861...") | |
| print("Scanning for videos on startup...") | |
| uvicorn.run(app, host="0.0.0.0", port=7860) | |