clo-up / app.py
ADXabhi's picture
Create app.py
ec94a36 verified
import os
import subprocess
import concurrent.futures
import shutil
from typing import List, Optional
from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
import cloudinary
import cloudinary.uploader
# ------------------------------------------
# CONFIGURATION
# ------------------------------------------
# NOTE: In a real production app, use Environment Variables!
CLOUD_NAME = "dgfhhszx8"
UPLOAD_PRESET = "testing"
# ------------------------------------------
# APP SETUP
# ------------------------------------------
app = FastAPI()
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
class VideoRequest(BaseModel):
video_url: str
class VideoResponse(BaseModel):
status: str
message: str
uploaded_chunks: Optional[List[str]] = None
# ------------------------------------------
# HELPER FUNCTIONS
# ------------------------------------------
def process_video_task(video_url: str):
"""
Downloads, Splits, and Uploads the video.
Returns list of Cloudinary URLs.
"""
# Use /tmp for writable storage in Docker
work_dir = "/tmp/data"
os.makedirs(work_dir, exist_ok=True)
filename = os.path.join(work_dir, "temp_video.mp4")
print(f"\n[JOB START] Processing: {video_url}")
# Clean previous run if needed
if os.path.exists(filename):
os.remove(filename)
# A. DOWNLOAD (using wget)
ret = os.system(f'wget -O {filename} "{video_url}" -q')
if ret != 0 or not os.path.exists(filename) or os.path.getsize(filename) < 1000:
print("❌ Download failed.")
return []
# B. SPLIT
CHUNK_LIMIT_MB = 95
TARGET_BYTES = CHUNK_LIMIT_MB * 1024 * 1024
# Get Duration
try:
cmd = f"ffprobe -v error -show_entries format=duration -of default=noprint_wrappers=1:nokey=1 {filename}"
dur_out = subprocess.check_output(cmd, shell=True).decode().strip()
total_duration = float(dur_out)
except Exception as e:
print(f"❌ Failed to get duration: {e}")
return []
file_size = os.path.getsize(filename)
avg_bytes_per_sec = file_size / total_duration
parts = []
current_start = 0.0
chunk_idx = 0
while current_start < total_duration:
est_duration = TARGET_BYTES / avg_bytes_per_sec
success = False
while not success:
current_end = current_start + est_duration
if current_end > total_duration: current_end = total_duration
chunk_name = os.path.join(work_dir, f"part_{chunk_idx:03d}.mp4")
# Cut command
cmd = f"ffmpeg -y -hide_banner -loglevel error -ss {current_start} -to {current_end} -i {filename} -c copy -avoid_negative_ts make_zero {chunk_name}"
subprocess.run(cmd, shell=True)
if not os.path.exists(chunk_name):
# End of file case or error, assume done for this chunking loop logic if EOF
success = True
break
chunk_size = os.path.getsize(chunk_name)
chunk_size_mb = chunk_size / (1024*1024)
if chunk_size_mb > 99.0:
est_duration = est_duration * 0.9
os.remove(chunk_name)
else:
parts.append(chunk_name)
current_start = current_end
chunk_idx += 1
success = True
print(f"✅ Split into {len(parts)} chunks.")
# C. UPLOAD (Parallel)
uploaded_urls = []
def upload_chunk_worker(part_file):
try:
# Unsigned upload
response = cloudinary.uploader.unsigned_upload(
part_file,
UPLOAD_PRESET,
cloud_name=CLOUD_NAME,
resource_type="video"
)
url = response['secure_url']
# Clean up immediately
if os.path.exists(part_file):
os.remove(part_file)
return url
except Exception as e:
print(f"❌ Upload failed for {part_file}: {e}")
return None
# Use ThreadPool to upload concurrently
# Note: Cloudinary Python SDK is synchronous, so threads work well here
# to release GIL during network I/O.
# We sort mainly by part name to keep order, but the map results are ordered.
with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
results = executor.map(upload_chunk_worker, parts)
for res in results:
if res: uploaded_urls.append(res)
# Cleanup main file
if os.path.exists(filename): os.remove(filename)
print(f"✅ [JOB DONE] Uploaded {len(uploaded_urls)} chunks.")
return uploaded_urls
# ------------------------------------------
# ENDPOINTS
# ------------------------------------------
@app.post("/process-video", response_model=VideoResponse)
def process_video_endpoint(req: VideoRequest):
"""
Synchronous processing endpoint.
Hugging Face Spaces (CPU Basic) can handle this long request.
Recommend upgrading to GPU/Pro if timeouts occur ( > 60s).
"""
urls = process_video_task(req.video_url)
if not urls:
raise HTTPException(status_code=500, detail="Processing failed or produced 0 chunks")
return VideoResponse(
status="success",
message="Video processed successfully",
uploaded_chunks=urls
)
@app.get("/")
def home():
return {"message": "Hugging Face Video Processor is Running"}