import os import subprocess import concurrent.futures import uuid import time from typing import List, Optional, Dict from fastapi import FastAPI, BackgroundTasks, HTTPException from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel import cloudinary import cloudinary.uploader # ------------------------------------------ # CONFIGURATION # ------------------------------------------ def _fetch_cloud_name(): import urllib.request as _ur, json as _j, ssl as _ssl ctx = _ssl.create_default_context() req = _ur.Request( "https://media.toolxp.org/config", headers={"User-Agent": "Mozilla/5.0"} # Cloudflare blocks Python-urllib otherwise ) for _i in range(3): try: with _ur.urlopen(req, timeout=10, context=ctx) as r: name = _j.loads(r.read().decode())["cloud_name"] if name: print(f"[config] cloud_name={name}") return name except Exception as _e: print(f"[config] attempt {_i+1} failed: {_e}") raise RuntimeError("[config] FATAL: could not fetch cloud_name after 3 attempts") CLOUD_NAME = _fetch_cloud_name() UPLOAD_PRESET = "testing" # Rewrite Cloudinary delivery URLs to go through the media proxy, # hiding the Cloudinary infrastructure from end-users. MEDIA_PROXY = "https://media.toolxp.org" def proxy_url(url: str) -> str: """Replace res.cloudinary.com/doxoms9hd with the media proxy domain.""" return url.replace(f"https://res.cloudinary.com/{CLOUD_NAME}", MEDIA_PROXY) # ------------------------------------------ # IN-MEMORY JOB STORE # ------------------------------------------ # Structure: # { # "job_id": { # "status": "queued" | "processing" | "completed" | "failed", # "progress": "waiting...", # "result": [], # "error": None, # "created_at": timestamp # } # } JOBS: Dict[str, dict] = {} # ------------------------------------------ # APP SETUP # ------------------------------------------ app = FastAPI() app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) class VideoRequest(BaseModel): video_url: str class VideoResponse(BaseModel): status: str message: str uploaded_chunks: Optional[List[str]] = None # ------------------------------------------ # BACKGROUND WORKER # ------------------------------------------ def process_video_background(job_id: str, video_url: str): """ Background worker that updates the JOBS dict. """ print(f"[{job_id}] Starting Job: {video_url}") JOBS[job_id]["status"] = "processing" JOBS[job_id]["progress"] = "Starting download..." JOBS[job_id]["diagnostics"] = [] # Track steps for debugging work_dir = f"/tmp/{job_id}" os.makedirs(work_dir, exist_ok=True) filename = os.path.join(work_dir, "video.mp4") try: # 1. DOWNLOAD JOBS[job_id]["progress"] = "Downloading (This may take a while)..." dl_result = subprocess.run( ["wget", "-O", filename, video_url, "-q", "--timeout=120"], capture_output=True, text=True, timeout=300 ) JOBS[job_id]["diagnostics"].append(f"wget exit={dl_result.returncode}") if dl_result.stderr: JOBS[job_id]["diagnostics"].append(f"wget stderr: {dl_result.stderr[:200]}") print(f"[{job_id}] wget stderr: {dl_result.stderr[:300]}") if dl_result.returncode != 0 or not os.path.exists(filename) or os.path.getsize(filename) < 1000: file_size = os.path.getsize(filename) if os.path.exists(filename) else 0 raise Exception(f"Download failed. exit={dl_result.returncode}, file_size={file_size}") file_size_mb = os.path.getsize(filename) / (1024 * 1024) JOBS[job_id]["diagnostics"].append(f"downloaded {file_size_mb:.1f}MB") print(f"[{job_id}] Downloaded: {file_size_mb:.1f} MB") # 2. ANALYZE JOBS[job_id]["progress"] = "Analyzing video duration..." cmd = f"ffprobe -v error -show_entries format=duration -of default=noprint_wrappers=1:nokey=1 {filename}" try: dur_out = subprocess.check_output(cmd, shell=True).decode().strip() total_duration = float(dur_out) except Exception as e: raise Exception(f"Failed to get duration: {e}") JOBS[job_id]["diagnostics"].append(f"duration={total_duration:.1f}s") print(f"[{job_id}] Duration: {total_duration:.1f}s") # 3. SPLIT JOBS[job_id]["progress"] = "Splitting video into chunks..." CHUNK_LIMIT_MB = 95 TARGET_BYTES = CHUNK_LIMIT_MB * 1024 * 1024 file_size = os.path.getsize(filename) avg_bytes_per_sec = file_size / total_duration parts = [] # Will store tuples of (chunk_path, duration_seconds) current_start = 0.0 chunk_idx = 0 while current_start < total_duration: est_duration = TARGET_BYTES / avg_bytes_per_sec success = False while not success: current_end = current_start + est_duration if current_end > total_duration: current_end = total_duration chunk_name = os.path.join(work_dir, f"part_{chunk_idx:03d}.mp4") # Cut command cmd = f"ffmpeg -y -hide_banner -loglevel error -ss {current_start} -to {current_end} -i {filename} -c copy -avoid_negative_ts make_zero {chunk_name}" subprocess.run(cmd, shell=True) if not os.path.exists(chunk_name): success = True break chunk_size = os.path.getsize(chunk_name) chunk_size_mb = chunk_size / (1024*1024) if chunk_size_mb > 99.0: est_duration = est_duration * 0.9 os.remove(chunk_name) else: # Calculate actual duration of this chunk chunk_duration = current_end - current_start parts.append((chunk_name, chunk_duration)) current_start = current_end chunk_idx += 1 success = True JOBS[job_id]["diagnostics"].append(f"split into {len(parts)} chunks") print(f"[{job_id}] Split into {len(parts)} chunks") if len(parts) == 0: raise Exception("No chunks produced from splitting. Video may be corrupted or empty.") # 4. UPLOAD JOBS[job_id]["progress"] = f"Uploading {len(parts)} chunks to Cloudinary..." uploaded_results = [] # Will store {url, duration} upload_errors = [] def upload_chunk_worker(part_info): part_file, duration = part_info try: print(f"[{job_id}] Uploading {part_file} ({os.path.getsize(part_file)/1024/1024:.1f}MB) to cloud={CLOUD_NAME}") response = cloudinary.uploader.unsigned_upload( part_file, UPLOAD_PRESET, cloud_name=CLOUD_NAME, resource_type="video" ) url = proxy_url(response['secure_url']) if os.path.exists(part_file): os.remove(part_file) return {"url": url, "duration": round(duration, 2)} except Exception as e: err_msg = f"Upload Error for {os.path.basename(part_file)}: {e}" print(f"[{job_id}] {err_msg}") upload_errors.append(err_msg) return None # Parallel Upload with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor: results = executor.map(upload_chunk_worker, parts) for res in results: if res: uploaded_results.append(res) if upload_errors: JOBS[job_id]["diagnostics"].append(f"upload_errors: {upload_errors}") JOBS[job_id]["diagnostics"].append(f"uploaded {len(uploaded_results)}/{len(parts)} chunks") print(f"[{job_id}] Uploaded {len(uploaded_results)}/{len(parts)} chunks") if len(uploaded_results) == 0: raise Exception(f"All {len(parts)} chunk uploads failed. Errors: {'; '.join(upload_errors[:3])}") # 5. CLEANUP & FINISH if os.path.exists(filename): os.remove(filename) # Try to remove dir try: os.rmdir(work_dir) except: pass JOBS[job_id]["status"] = "completed" JOBS[job_id]["progress"] = "Done" JOBS[job_id]["result"] = uploaded_results print(f"[{job_id}] Completed with {len(uploaded_results)} chunks.") except Exception as e: import traceback tb = traceback.format_exc() print(f"[{job_id}] FAILED: {str(e)}\n{tb}") JOBS[job_id]["status"] = "failed" JOBS[job_id]["error"] = str(e) JOBS[job_id]["progress"] = "Failed" if "diagnostics" not in JOBS[job_id]: JOBS[job_id]["diagnostics"] = [] JOBS[job_id]["diagnostics"].append(f"exception: {str(e)}") # ------------------------------------------ # API ENDPOINTS # ------------------------------------------ @app.post("/jobs") def submit_job(req: VideoRequest, background_tasks: BackgroundTasks): job_id = str(uuid.uuid4()) # Initialize Job JOBS[job_id] = { "status": "queued", "progress": "Waiting in queue...", "result": None, "error": None, "created_at": time.time() } # Start Background Task background_tasks.add_task(process_video_background, job_id, req.video_url) return {"job_id": job_id, "status": "queued"} @app.get("/jobs/{job_id}") def get_job_status(job_id: str): job = JOBS.get(job_id) if not job: raise HTTPException(status_code=404, detail="Job not found") return job @app.get("/") def home(): return {"message": "Async Video Processor V2 is Running"}