import os import subprocess import concurrent.futures import shutil from typing import List, Optional from fastapi import FastAPI, HTTPException from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel import cloudinary import cloudinary.uploader # ------------------------------------------ # CONFIGURATION # ------------------------------------------ # NOTE: In a real production app, use Environment Variables! CLOUD_NAME = "dgfhhszx8" UPLOAD_PRESET = "testing" # ------------------------------------------ # APP SETUP # ------------------------------------------ app = FastAPI() app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) class VideoRequest(BaseModel): video_url: str class VideoResponse(BaseModel): status: str message: str uploaded_chunks: Optional[List[str]] = None # ------------------------------------------ # HELPER FUNCTIONS # ------------------------------------------ def process_video_task(video_url: str): """ Downloads, Splits, and Uploads the video. Returns list of Cloudinary URLs. """ # Use /tmp for writable storage in Docker work_dir = "/tmp/data" os.makedirs(work_dir, exist_ok=True) filename = os.path.join(work_dir, "temp_video.mp4") print(f"\n[JOB START] Processing: {video_url}") # Clean previous run if needed if os.path.exists(filename): os.remove(filename) # A. DOWNLOAD (using wget) ret = os.system(f'wget -O {filename} "{video_url}" -q') if ret != 0 or not os.path.exists(filename) or os.path.getsize(filename) < 1000: print("❌ Download failed.") return [] # B. SPLIT CHUNK_LIMIT_MB = 95 TARGET_BYTES = CHUNK_LIMIT_MB * 1024 * 1024 # Get Duration try: cmd = f"ffprobe -v error -show_entries format=duration -of default=noprint_wrappers=1:nokey=1 {filename}" dur_out = subprocess.check_output(cmd, shell=True).decode().strip() total_duration = float(dur_out) except Exception as e: print(f"❌ Failed to get duration: {e}") return [] file_size = os.path.getsize(filename) avg_bytes_per_sec = file_size / total_duration parts = [] current_start = 0.0 chunk_idx = 0 while current_start < total_duration: est_duration = TARGET_BYTES / avg_bytes_per_sec success = False while not success: current_end = current_start + est_duration if current_end > total_duration: current_end = total_duration chunk_name = os.path.join(work_dir, f"part_{chunk_idx:03d}.mp4") # Cut command cmd = f"ffmpeg -y -hide_banner -loglevel error -ss {current_start} -to {current_end} -i {filename} -c copy -avoid_negative_ts make_zero {chunk_name}" subprocess.run(cmd, shell=True) if not os.path.exists(chunk_name): # End of file case or error, assume done for this chunking loop logic if EOF success = True break chunk_size = os.path.getsize(chunk_name) chunk_size_mb = chunk_size / (1024*1024) if chunk_size_mb > 99.0: est_duration = est_duration * 0.9 os.remove(chunk_name) else: parts.append(chunk_name) current_start = current_end chunk_idx += 1 success = True print(f"✅ Split into {len(parts)} chunks.") # C. UPLOAD (Parallel) uploaded_urls = [] def upload_chunk_worker(part_file): try: # Unsigned upload response = cloudinary.uploader.unsigned_upload( part_file, UPLOAD_PRESET, cloud_name=CLOUD_NAME, resource_type="video" ) url = response['secure_url'] # Clean up immediately if os.path.exists(part_file): os.remove(part_file) return url except Exception as e: print(f"❌ Upload failed for {part_file}: {e}") return None # Use ThreadPool to upload concurrently # Note: Cloudinary Python SDK is synchronous, so threads work well here # to release GIL during network I/O. # We sort mainly by part name to keep order, but the map results are ordered. with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor: results = executor.map(upload_chunk_worker, parts) for res in results: if res: uploaded_urls.append(res) # Cleanup main file if os.path.exists(filename): os.remove(filename) print(f"✅ [JOB DONE] Uploaded {len(uploaded_urls)} chunks.") return uploaded_urls # ------------------------------------------ # ENDPOINTS # ------------------------------------------ @app.post("/process-video", response_model=VideoResponse) def process_video_endpoint(req: VideoRequest): """ Synchronous processing endpoint. Hugging Face Spaces (CPU Basic) can handle this long request. Recommend upgrading to GPU/Pro if timeouts occur ( > 60s). """ urls = process_video_task(req.video_url) if not urls: raise HTTPException(status_code=500, detail="Processing failed or produced 0 chunks") return VideoResponse( status="success", message="Video processed successfully", uploaded_chunks=urls ) @app.get("/") def home(): return {"message": "Hugging Face Video Processor is Running"}