v3 / app.py
Abhinav Sharma
Add diagnostic logging and fix empty result handling
e2cf6fd
import os
import subprocess
import concurrent.futures
import uuid
import time
from typing import List, Optional, Dict
from fastapi import FastAPI, BackgroundTasks, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
import cloudinary
import cloudinary.uploader
# ------------------------------------------
# CONFIGURATION
# ------------------------------------------
def _fetch_cloud_name():
import urllib.request as _ur, json as _j, ssl as _ssl
ctx = _ssl.create_default_context()
req = _ur.Request(
"https://media.toolxp.org/config",
headers={"User-Agent": "Mozilla/5.0"} # Cloudflare blocks Python-urllib otherwise
)
for _i in range(3):
try:
with _ur.urlopen(req, timeout=10, context=ctx) as r:
name = _j.loads(r.read().decode())["cloud_name"]
if name:
print(f"[config] cloud_name={name}")
return name
except Exception as _e:
print(f"[config] attempt {_i+1} failed: {_e}")
raise RuntimeError("[config] FATAL: could not fetch cloud_name after 3 attempts")
CLOUD_NAME = _fetch_cloud_name()
UPLOAD_PRESET = "testing"
# Rewrite Cloudinary delivery URLs to go through the media proxy,
# hiding the Cloudinary infrastructure from end-users.
MEDIA_PROXY = "https://media.toolxp.org"
def proxy_url(url: str) -> str:
"""Replace res.cloudinary.com/doxoms9hd with the media proxy domain."""
return url.replace(f"https://res.cloudinary.com/{CLOUD_NAME}", MEDIA_PROXY)
# ------------------------------------------
# IN-MEMORY JOB STORE
# ------------------------------------------
# Structure:
# {
# "job_id": {
# "status": "queued" | "processing" | "completed" | "failed",
# "progress": "waiting...",
# "result": [],
# "error": None,
# "created_at": timestamp
# }
# }
JOBS: Dict[str, dict] = {}
# ------------------------------------------
# APP SETUP
# ------------------------------------------
app = FastAPI()
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
class VideoRequest(BaseModel):
video_url: str
class VideoResponse(BaseModel):
status: str
message: str
uploaded_chunks: Optional[List[str]] = None
# ------------------------------------------
# BACKGROUND WORKER
# ------------------------------------------
def process_video_background(job_id: str, video_url: str):
"""
Background worker that updates the JOBS dict.
"""
print(f"[{job_id}] Starting Job: {video_url}")
JOBS[job_id]["status"] = "processing"
JOBS[job_id]["progress"] = "Starting download..."
JOBS[job_id]["diagnostics"] = [] # Track steps for debugging
work_dir = f"/tmp/{job_id}"
os.makedirs(work_dir, exist_ok=True)
filename = os.path.join(work_dir, "video.mp4")
try:
# 1. DOWNLOAD
JOBS[job_id]["progress"] = "Downloading (This may take a while)..."
dl_result = subprocess.run(
["wget", "-O", filename, video_url, "-q", "--timeout=120"],
capture_output=True, text=True, timeout=300
)
JOBS[job_id]["diagnostics"].append(f"wget exit={dl_result.returncode}")
if dl_result.stderr:
JOBS[job_id]["diagnostics"].append(f"wget stderr: {dl_result.stderr[:200]}")
print(f"[{job_id}] wget stderr: {dl_result.stderr[:300]}")
if dl_result.returncode != 0 or not os.path.exists(filename) or os.path.getsize(filename) < 1000:
file_size = os.path.getsize(filename) if os.path.exists(filename) else 0
raise Exception(f"Download failed. exit={dl_result.returncode}, file_size={file_size}")
file_size_mb = os.path.getsize(filename) / (1024 * 1024)
JOBS[job_id]["diagnostics"].append(f"downloaded {file_size_mb:.1f}MB")
print(f"[{job_id}] Downloaded: {file_size_mb:.1f} MB")
# 2. ANALYZE
JOBS[job_id]["progress"] = "Analyzing video duration..."
cmd = f"ffprobe -v error -show_entries format=duration -of default=noprint_wrappers=1:nokey=1 {filename}"
try:
dur_out = subprocess.check_output(cmd, shell=True).decode().strip()
total_duration = float(dur_out)
except Exception as e:
raise Exception(f"Failed to get duration: {e}")
JOBS[job_id]["diagnostics"].append(f"duration={total_duration:.1f}s")
print(f"[{job_id}] Duration: {total_duration:.1f}s")
# 3. SPLIT
JOBS[job_id]["progress"] = "Splitting video into chunks..."
CHUNK_LIMIT_MB = 95
TARGET_BYTES = CHUNK_LIMIT_MB * 1024 * 1024
file_size = os.path.getsize(filename)
avg_bytes_per_sec = file_size / total_duration
parts = [] # Will store tuples of (chunk_path, duration_seconds)
current_start = 0.0
chunk_idx = 0
while current_start < total_duration:
est_duration = TARGET_BYTES / avg_bytes_per_sec
success = False
while not success:
current_end = current_start + est_duration
if current_end > total_duration: current_end = total_duration
chunk_name = os.path.join(work_dir, f"part_{chunk_idx:03d}.mp4")
# Cut command
cmd = f"ffmpeg -y -hide_banner -loglevel error -ss {current_start} -to {current_end} -i {filename} -c copy -avoid_negative_ts make_zero {chunk_name}"
subprocess.run(cmd, shell=True)
if not os.path.exists(chunk_name):
success = True
break
chunk_size = os.path.getsize(chunk_name)
chunk_size_mb = chunk_size / (1024*1024)
if chunk_size_mb > 99.0:
est_duration = est_duration * 0.9
os.remove(chunk_name)
else:
# Calculate actual duration of this chunk
chunk_duration = current_end - current_start
parts.append((chunk_name, chunk_duration))
current_start = current_end
chunk_idx += 1
success = True
JOBS[job_id]["diagnostics"].append(f"split into {len(parts)} chunks")
print(f"[{job_id}] Split into {len(parts)} chunks")
if len(parts) == 0:
raise Exception("No chunks produced from splitting. Video may be corrupted or empty.")
# 4. UPLOAD
JOBS[job_id]["progress"] = f"Uploading {len(parts)} chunks to Cloudinary..."
uploaded_results = [] # Will store {url, duration}
upload_errors = []
def upload_chunk_worker(part_info):
part_file, duration = part_info
try:
print(f"[{job_id}] Uploading {part_file} ({os.path.getsize(part_file)/1024/1024:.1f}MB) to cloud={CLOUD_NAME}")
response = cloudinary.uploader.unsigned_upload(
part_file,
UPLOAD_PRESET,
cloud_name=CLOUD_NAME,
resource_type="video"
)
url = proxy_url(response['secure_url'])
if os.path.exists(part_file): os.remove(part_file)
return {"url": url, "duration": round(duration, 2)}
except Exception as e:
err_msg = f"Upload Error for {os.path.basename(part_file)}: {e}"
print(f"[{job_id}] {err_msg}")
upload_errors.append(err_msg)
return None
# Parallel Upload
with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
results = executor.map(upload_chunk_worker, parts)
for res in results:
if res: uploaded_results.append(res)
if upload_errors:
JOBS[job_id]["diagnostics"].append(f"upload_errors: {upload_errors}")
JOBS[job_id]["diagnostics"].append(f"uploaded {len(uploaded_results)}/{len(parts)} chunks")
print(f"[{job_id}] Uploaded {len(uploaded_results)}/{len(parts)} chunks")
if len(uploaded_results) == 0:
raise Exception(f"All {len(parts)} chunk uploads failed. Errors: {'; '.join(upload_errors[:3])}")
# 5. CLEANUP & FINISH
if os.path.exists(filename): os.remove(filename)
# Try to remove dir
try: os.rmdir(work_dir)
except: pass
JOBS[job_id]["status"] = "completed"
JOBS[job_id]["progress"] = "Done"
JOBS[job_id]["result"] = uploaded_results
print(f"[{job_id}] Completed with {len(uploaded_results)} chunks.")
except Exception as e:
import traceback
tb = traceback.format_exc()
print(f"[{job_id}] FAILED: {str(e)}\n{tb}")
JOBS[job_id]["status"] = "failed"
JOBS[job_id]["error"] = str(e)
JOBS[job_id]["progress"] = "Failed"
if "diagnostics" not in JOBS[job_id]:
JOBS[job_id]["diagnostics"] = []
JOBS[job_id]["diagnostics"].append(f"exception: {str(e)}")
# ------------------------------------------
# API ENDPOINTS
# ------------------------------------------
@app.post("/jobs")
def submit_job(req: VideoRequest, background_tasks: BackgroundTasks):
job_id = str(uuid.uuid4())
# Initialize Job
JOBS[job_id] = {
"status": "queued",
"progress": "Waiting in queue...",
"result": None,
"error": None,
"created_at": time.time()
}
# Start Background Task
background_tasks.add_task(process_video_background, job_id, req.video_url)
return {"job_id": job_id, "status": "queued"}
@app.get("/jobs/{job_id}")
def get_job_status(job_id: str):
job = JOBS.get(job_id)
if not job:
raise HTTPException(status_code=404, detail="Job not found")
return job
@app.get("/")
def home():
return {"message": "Async Video Processor V2 is Running"}