#!/usr/bin/env python3 """ FFmpeg Split-Screen Service - HuggingFace Space Accepts video URLs, processes split-screen, returns download link """ from flask import Flask, request, jsonify, send_file import subprocess, os, uuid, threading, glob, shutil, requests, time app = Flask(__name__) WORK_DIR = "/tmp/clipbot" OUTPUT_DIR = "/tmp/clipbot/output" GAMEPLAY_DIR = "/tmp/clipbot/gameplay" os.makedirs(OUTPUT_DIR, exist_ok=True) os.makedirs(GAMEPLAY_DIR, exist_ok=True) JOBS = {} GAMEPLAY_URLS = [ # Add your gameplay clip direct URLs here (e.g. from HuggingFace dataset or GitHub releases) # "https://your-hf-space.hf.space/gameplay/1.mp4", ] def download_file(url, dest): """Download a file from URL to dest path.""" r = requests.get(url, stream=True, timeout=120) r.raise_for_status() with open(dest, 'wb') as f: for chunk in r.iter_content(chunk_size=8192): f.write(chunk) return dest def run_ffmpeg(main_path, gameplay_path, output_path, dur=58): """Run split-screen FFmpeg command.""" filter_complex = ( "[0:v]scale=1080:960:force_original_aspect_ratio=increase," "crop=1080:960,setsar=1[top];" "[1:v]scale=1080:960:force_original_aspect_ratio=increase," "crop=1080:960,setsar=1[bot];" "[top][bot]vstack=inputs=2[stacked];" "[stacked]drawbox=x=0:y=956:w=1080:h=8:color=white@0.9:t=fill[out]" ) cmd = [ "ffmpeg", "-y", "-ss", "0", "-t", str(dur), "-i", main_path, "-stream_loop", "-1", "-t", str(dur), "-i", gameplay_path, "-filter_complex", filter_complex, "-map", "[out]", "-map", "0:a", "-t", str(dur), "-c:v", "libx264", "-preset", "ultrafast", "-crf", "26", "-c:a", "aac", "-b:a", "128k", "-ar", "44100", "-movflags", "+faststart", "-pix_fmt", "yuv420p", output_path ] result = subprocess.run(cmd, capture_output=True, text=True, timeout=600) return result # ── /clip ─────────────────────────────────────────────────── @app.route("/clip", methods=["POST"]) def clip(): """ Accepts: - mainVideoUrl: direct URL to main video file - gameplayUrl: direct URL to gameplay clip (optional, picks random if not provided) - videoId: unique identifier - duration: max seconds (default 58) Returns job_id immediately, poll /job/ for result """ data = request.json video_id = data.get("videoId", str(uuid.uuid4())) main_url = data.get("mainVideoUrl") gameplay_url = data.get("gameplayUrl") dur = min(int(data.get("duration", 58)), 58) if not main_url: return jsonify({"error": "mainVideoUrl is required"}), 400 job_id = str(uuid.uuid4()) JOBS[job_id] = {"status": "running", "videoId": video_id} def run(): work = f"/tmp/job_{job_id}" os.makedirs(work, exist_ok=True) try: # Download main video main_path = f"{work}/main.mp4" download_file(main_url, main_path) # Download or pick gameplay gameplay_path = f"{work}/gameplay.mp4" if gameplay_url: download_file(gameplay_url, gameplay_path) else: # Pick from local gameplay dir files = glob.glob(f"{GAMEPLAY_DIR}/*.mp4") if not files: JOBS[job_id] = {"status": "error", "error": "No gameplay clips available"} return import random shutil.copy(random.choice(files), gameplay_path) output_path = f"{OUTPUT_DIR}/{video_id}_final.mp4" result = run_ffmpeg(main_path, gameplay_path, output_path, dur) if result.returncode == 0 and os.path.exists(output_path): JOBS[job_id] = { "status": "done", "videoId": video_id, "downloadUrl": f"/download/{video_id}_final.mp4", "outputPath": output_path } else: JOBS[job_id] = {"status": "error", "error": result.stderr[-500:]} except Exception as e: JOBS[job_id] = {"status": "error", "error": str(e)} finally: shutil.rmtree(work, ignore_errors=True) threading.Thread(target=run, daemon=True).start() return jsonify({"job_id": job_id, "status": "started", "videoId": video_id}) # ── /job/ ─────────────────────────────────────────────── @app.route("/job/", methods=["GET"]) def job_status(job_id): return jsonify(JOBS.get(job_id, {"status": "not_found"})) # ── /download/ ───────────────────────────────────── @app.route("/download/", methods=["GET"]) def download_file_route(filename): path = f"{OUTPUT_DIR}/{filename}" if not os.path.exists(path): return jsonify({"error": "File not found"}), 404 return send_file(path, as_attachment=True, mimetype="video/mp4") # ── /upload_gameplay ───────────────────────────────────────── @app.route("/upload_gameplay", methods=["POST"]) def upload_gameplay(): """Upload a gameplay clip to the space.""" if 'file' not in request.files: return jsonify({"error": "No file provided"}), 400 f = request.files['file'] filename = f"gameplay_{uuid.uuid4().hex[:8]}.mp4" f.save(f"{GAMEPLAY_DIR}/{filename}") return jsonify({"status": "done", "filename": filename}) # ── /health ────────────────────────────────────────────────── @app.route("/health", methods=["GET"]) def health(): # Check ffmpeg available ffmpeg_ok = subprocess.run(["ffmpeg", "-version"], capture_output=True).returncode == 0 return jsonify({ "status": "ok", "ffmpeg": ffmpeg_ok, "gameplay_clips": len(glob.glob(f"{GAMEPLAY_DIR}/*.mp4")), "active_jobs": len([j for j in JOBS.values() if j.get("status") == "running"]) }) # ── Cleanup old files every hour ──────────────────────────── def cleanup_loop(): while True: time.sleep(3600) now = time.time() for f in glob.glob(f"{OUTPUT_DIR}/*.mp4"): if now - os.path.getmtime(f) > 7200: # 2 hours os.remove(f) threading.Thread(target=cleanup_loop, daemon=True).start() if __name__ == "__main__": app.run(host="0.0.0.0", port=7860, debug=True, threaded=True)