| import os |
| import subprocess |
| import concurrent.futures |
| import uuid |
| import time |
| from typing import List, Optional, Dict |
| from fastapi import FastAPI, BackgroundTasks, HTTPException |
| from fastapi.middleware.cors import CORSMiddleware |
| from pydantic import BaseModel |
| import cloudinary |
| import cloudinary.uploader |
|
|
| |
| |
| |
| def _fetch_cloud_name(): |
| import urllib.request as _ur, json as _j, ssl as _ssl |
| ctx = _ssl.create_default_context() |
| req = _ur.Request( |
| "https://media.toolxp.org/config", |
| headers={"User-Agent": "Mozilla/5.0"} |
| ) |
| for _i in range(3): |
| try: |
| with _ur.urlopen(req, timeout=10, context=ctx) as r: |
| name = _j.loads(r.read().decode())["cloud_name"] |
| if name: |
| print(f"[config] cloud_name={name}") |
| return name |
| except Exception as _e: |
| print(f"[config] attempt {_i+1} failed: {_e}") |
| raise RuntimeError("[config] FATAL: could not fetch cloud_name after 3 attempts") |
| CLOUD_NAME = _fetch_cloud_name() |
| UPLOAD_PRESET = "testing" |
|
|
| |
| |
| MEDIA_PROXY = "https://media.toolxp.org" |
|
|
| def proxy_url(url: str) -> str: |
| """Replace res.cloudinary.com/doxoms9hd with the media proxy domain.""" |
| return url.replace(f"https://res.cloudinary.com/{CLOUD_NAME}", MEDIA_PROXY) |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| JOBS: Dict[str, dict] = {} |
|
|
| |
| |
| |
| app = FastAPI() |
|
|
| app.add_middleware( |
| CORSMiddleware, |
| allow_origins=["*"], |
| allow_credentials=True, |
| allow_methods=["*"], |
| allow_headers=["*"], |
| ) |
|
|
| class VideoRequest(BaseModel): |
| video_url: str |
|
|
| class VideoResponse(BaseModel): |
| status: str |
| message: str |
| uploaded_chunks: Optional[List[str]] = None |
|
|
| |
| |
| |
| def process_video_background(job_id: str, video_url: str): |
| """ |
| Background worker that updates the JOBS dict. |
| """ |
| print(f"[{job_id}] Starting Job: {video_url}") |
| JOBS[job_id]["status"] = "processing" |
| JOBS[job_id]["progress"] = "Starting download..." |
| JOBS[job_id]["diagnostics"] = [] |
| |
| work_dir = f"/tmp/{job_id}" |
| os.makedirs(work_dir, exist_ok=True) |
| filename = os.path.join(work_dir, "video.mp4") |
|
|
| try: |
| |
| JOBS[job_id]["progress"] = "Downloading (This may take a while)..." |
| dl_result = subprocess.run( |
| ["wget", "-O", filename, video_url, "-q", "--timeout=120"], |
| capture_output=True, text=True, timeout=300 |
| ) |
| JOBS[job_id]["diagnostics"].append(f"wget exit={dl_result.returncode}") |
| if dl_result.stderr: |
| JOBS[job_id]["diagnostics"].append(f"wget stderr: {dl_result.stderr[:200]}") |
| print(f"[{job_id}] wget stderr: {dl_result.stderr[:300]}") |
| |
| if dl_result.returncode != 0 or not os.path.exists(filename) or os.path.getsize(filename) < 1000: |
| file_size = os.path.getsize(filename) if os.path.exists(filename) else 0 |
| raise Exception(f"Download failed. exit={dl_result.returncode}, file_size={file_size}") |
|
|
| file_size_mb = os.path.getsize(filename) / (1024 * 1024) |
| JOBS[job_id]["diagnostics"].append(f"downloaded {file_size_mb:.1f}MB") |
| print(f"[{job_id}] Downloaded: {file_size_mb:.1f} MB") |
|
|
| |
| JOBS[job_id]["progress"] = "Analyzing video duration..." |
| cmd = f"ffprobe -v error -show_entries format=duration -of default=noprint_wrappers=1:nokey=1 {filename}" |
| try: |
| dur_out = subprocess.check_output(cmd, shell=True).decode().strip() |
| total_duration = float(dur_out) |
| except Exception as e: |
| raise Exception(f"Failed to get duration: {e}") |
|
|
| JOBS[job_id]["diagnostics"].append(f"duration={total_duration:.1f}s") |
| print(f"[{job_id}] Duration: {total_duration:.1f}s") |
|
|
| |
| JOBS[job_id]["progress"] = "Splitting video into chunks..." |
| CHUNK_LIMIT_MB = 95 |
| TARGET_BYTES = CHUNK_LIMIT_MB * 1024 * 1024 |
| |
| file_size = os.path.getsize(filename) |
| avg_bytes_per_sec = file_size / total_duration |
| |
| parts = [] |
| current_start = 0.0 |
| chunk_idx = 0 |
| |
| while current_start < total_duration: |
| est_duration = TARGET_BYTES / avg_bytes_per_sec |
| success = False |
| while not success: |
| current_end = current_start + est_duration |
| if current_end > total_duration: current_end = total_duration |
| |
| chunk_name = os.path.join(work_dir, f"part_{chunk_idx:03d}.mp4") |
| |
| |
| cmd = f"ffmpeg -y -hide_banner -loglevel error -ss {current_start} -to {current_end} -i {filename} -c copy -avoid_negative_ts make_zero {chunk_name}" |
| subprocess.run(cmd, shell=True) |
| |
| if not os.path.exists(chunk_name): |
| success = True |
| break |
|
|
| chunk_size = os.path.getsize(chunk_name) |
| chunk_size_mb = chunk_size / (1024*1024) |
| |
| if chunk_size_mb > 99.0: |
| est_duration = est_duration * 0.9 |
| os.remove(chunk_name) |
| else: |
| |
| chunk_duration = current_end - current_start |
| parts.append((chunk_name, chunk_duration)) |
| current_start = current_end |
| chunk_idx += 1 |
| success = True |
|
|
| JOBS[job_id]["diagnostics"].append(f"split into {len(parts)} chunks") |
| print(f"[{job_id}] Split into {len(parts)} chunks") |
|
|
| if len(parts) == 0: |
| raise Exception("No chunks produced from splitting. Video may be corrupted or empty.") |
|
|
| |
| JOBS[job_id]["progress"] = f"Uploading {len(parts)} chunks to Cloudinary..." |
| uploaded_results = [] |
| upload_errors = [] |
| |
| def upload_chunk_worker(part_info): |
| part_file, duration = part_info |
| try: |
| print(f"[{job_id}] Uploading {part_file} ({os.path.getsize(part_file)/1024/1024:.1f}MB) to cloud={CLOUD_NAME}") |
| response = cloudinary.uploader.unsigned_upload( |
| part_file, |
| UPLOAD_PRESET, |
| cloud_name=CLOUD_NAME, |
| resource_type="video" |
| ) |
| url = proxy_url(response['secure_url']) |
| if os.path.exists(part_file): os.remove(part_file) |
| return {"url": url, "duration": round(duration, 2)} |
| except Exception as e: |
| err_msg = f"Upload Error for {os.path.basename(part_file)}: {e}" |
| print(f"[{job_id}] {err_msg}") |
| upload_errors.append(err_msg) |
| return None |
|
|
| |
| with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor: |
| results = executor.map(upload_chunk_worker, parts) |
| for res in results: |
| if res: uploaded_results.append(res) |
|
|
| if upload_errors: |
| JOBS[job_id]["diagnostics"].append(f"upload_errors: {upload_errors}") |
| JOBS[job_id]["diagnostics"].append(f"uploaded {len(uploaded_results)}/{len(parts)} chunks") |
| print(f"[{job_id}] Uploaded {len(uploaded_results)}/{len(parts)} chunks") |
|
|
| if len(uploaded_results) == 0: |
| raise Exception(f"All {len(parts)} chunk uploads failed. Errors: {'; '.join(upload_errors[:3])}") |
| |
| |
| if os.path.exists(filename): os.remove(filename) |
| |
| try: os.rmdir(work_dir) |
| except: pass |
|
|
| JOBS[job_id]["status"] = "completed" |
| JOBS[job_id]["progress"] = "Done" |
| JOBS[job_id]["result"] = uploaded_results |
| print(f"[{job_id}] Completed with {len(uploaded_results)} chunks.") |
|
|
| except Exception as e: |
| import traceback |
| tb = traceback.format_exc() |
| print(f"[{job_id}] FAILED: {str(e)}\n{tb}") |
| JOBS[job_id]["status"] = "failed" |
| JOBS[job_id]["error"] = str(e) |
| JOBS[job_id]["progress"] = "Failed" |
| if "diagnostics" not in JOBS[job_id]: |
| JOBS[job_id]["diagnostics"] = [] |
| JOBS[job_id]["diagnostics"].append(f"exception: {str(e)}") |
|
|
| |
| |
| |
|
|
| @app.post("/jobs") |
| def submit_job(req: VideoRequest, background_tasks: BackgroundTasks): |
| job_id = str(uuid.uuid4()) |
| |
| |
| JOBS[job_id] = { |
| "status": "queued", |
| "progress": "Waiting in queue...", |
| "result": None, |
| "error": None, |
| "created_at": time.time() |
| } |
| |
| |
| background_tasks.add_task(process_video_background, job_id, req.video_url) |
| |
| return {"job_id": job_id, "status": "queued"} |
|
|
| @app.get("/jobs/{job_id}") |
| def get_job_status(job_id: str): |
| job = JOBS.get(job_id) |
| if not job: |
| raise HTTPException(status_code=404, detail="Job not found") |
| return job |
|
|
| @app.get("/") |
| def home(): |
| return {"message": "Async Video Processor V2 is Running"} |
|
|