Spaces:
Paused
Paused
| #!/usr/bin/env python3 | |
| """ | |
| FFmpeg Split-Screen Service - HuggingFace Space | |
| Accepts video URLs, processes split-screen, returns download link | |
| """ | |
| from flask import Flask, request, jsonify, send_file | |
| import subprocess, os, uuid, threading, glob, shutil, requests, time | |
| app = Flask(__name__) | |
| WORK_DIR = "/tmp/clipbot" | |
| OUTPUT_DIR = "/tmp/clipbot/output" | |
| GAMEPLAY_DIR = "/tmp/clipbot/gameplay" | |
| os.makedirs(OUTPUT_DIR, exist_ok=True) | |
| os.makedirs(GAMEPLAY_DIR, exist_ok=True) | |
| JOBS = {} | |
| GAMEPLAY_URLS = [ | |
| # Add your gameplay clip direct URLs here (e.g. from HuggingFace dataset or GitHub releases) | |
| # "https://your-hf-space.hf.space/gameplay/1.mp4", | |
| ] | |
| def download_file(url, dest): | |
| """Download a file from URL to dest path.""" | |
| r = requests.get(url, stream=True, timeout=120) | |
| r.raise_for_status() | |
| with open(dest, 'wb') as f: | |
| for chunk in r.iter_content(chunk_size=8192): | |
| f.write(chunk) | |
| return dest | |
| def run_ffmpeg(main_path, gameplay_path, output_path, dur=58): | |
| """Run split-screen FFmpeg command.""" | |
| filter_complex = ( | |
| "[0:v]scale=1080:960:force_original_aspect_ratio=increase," | |
| "crop=1080:960,setsar=1[top];" | |
| "[1:v]scale=1080:960:force_original_aspect_ratio=increase," | |
| "crop=1080:960,setsar=1[bot];" | |
| "[top][bot]vstack=inputs=2[stacked];" | |
| "[stacked]drawbox=x=0:y=956:w=1080:h=8:color=white@0.9:t=fill[out]" | |
| ) | |
| cmd = [ | |
| "ffmpeg", "-y", | |
| "-ss", "0", "-t", str(dur), "-i", main_path, | |
| "-stream_loop", "-1", "-t", str(dur), "-i", gameplay_path, | |
| "-filter_complex", filter_complex, | |
| "-map", "[out]", "-map", "0:a", | |
| "-t", str(dur), | |
| "-c:v", "libx264", "-preset", "ultrafast", "-crf", "26", | |
| "-c:a", "aac", "-b:a", "128k", "-ar", "44100", | |
| "-movflags", "+faststart", "-pix_fmt", "yuv420p", | |
| output_path | |
| ] | |
| result = subprocess.run(cmd, capture_output=True, text=True, timeout=600) | |
| return result | |
| # ββ /clip βββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def clip(): | |
| """ | |
| Accepts: | |
| - mainVideoUrl: direct URL to main video file | |
| - gameplayUrl: direct URL to gameplay clip (optional, picks random if not provided) | |
| - videoId: unique identifier | |
| - duration: max seconds (default 58) | |
| Returns job_id immediately, poll /job/<id> for result | |
| """ | |
| data = request.json | |
| video_id = data.get("videoId", str(uuid.uuid4())) | |
| main_url = data.get("mainVideoUrl") | |
| gameplay_url = data.get("gameplayUrl") | |
| dur = min(int(data.get("duration", 58)), 58) | |
| if not main_url: | |
| return jsonify({"error": "mainVideoUrl is required"}), 400 | |
| job_id = str(uuid.uuid4()) | |
| JOBS[job_id] = {"status": "running", "videoId": video_id} | |
| def run(): | |
| work = f"/tmp/job_{job_id}" | |
| os.makedirs(work, exist_ok=True) | |
| try: | |
| # Download main video | |
| main_path = f"{work}/main.mp4" | |
| download_file(main_url, main_path) | |
| # Download or pick gameplay | |
| gameplay_path = f"{work}/gameplay.mp4" | |
| if gameplay_url: | |
| download_file(gameplay_url, gameplay_path) | |
| else: | |
| # Pick from local gameplay dir | |
| files = glob.glob(f"{GAMEPLAY_DIR}/*.mp4") | |
| if not files: | |
| JOBS[job_id] = {"status": "error", "error": "No gameplay clips available"} | |
| return | |
| import random | |
| shutil.copy(random.choice(files), gameplay_path) | |
| output_path = f"{OUTPUT_DIR}/{video_id}_final.mp4" | |
| result = run_ffmpeg(main_path, gameplay_path, output_path, dur) | |
| if result.returncode == 0 and os.path.exists(output_path): | |
| JOBS[job_id] = { | |
| "status": "done", | |
| "videoId": video_id, | |
| "downloadUrl": f"/download/{video_id}_final.mp4", | |
| "outputPath": output_path | |
| } | |
| else: | |
| JOBS[job_id] = {"status": "error", "error": result.stderr[-500:]} | |
| except Exception as e: | |
| JOBS[job_id] = {"status": "error", "error": str(e)} | |
| finally: | |
| shutil.rmtree(work, ignore_errors=True) | |
| threading.Thread(target=run, daemon=True).start() | |
| return jsonify({"job_id": job_id, "status": "started", "videoId": video_id}) | |
| # ββ /job/<id> βββββββββββββββββββββββββββββββββββββββββββββββ | |
| def job_status(job_id): | |
| return jsonify(JOBS.get(job_id, {"status": "not_found"})) | |
| # ββ /download/<filename> βββββββββββββββββββββββββββββββββββββ | |
| def download_file_route(filename): | |
| path = f"{OUTPUT_DIR}/{filename}" | |
| if not os.path.exists(path): | |
| return jsonify({"error": "File not found"}), 404 | |
| return send_file(path, as_attachment=True, mimetype="video/mp4") | |
| # ββ /upload_gameplay βββββββββββββββββββββββββββββββββββββββββ | |
| def upload_gameplay(): | |
| """Upload a gameplay clip to the space.""" | |
| if 'file' not in request.files: | |
| return jsonify({"error": "No file provided"}), 400 | |
| f = request.files['file'] | |
| filename = f"gameplay_{uuid.uuid4().hex[:8]}.mp4" | |
| f.save(f"{GAMEPLAY_DIR}/{filename}") | |
| return jsonify({"status": "done", "filename": filename}) | |
| # ββ /health ββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def health(): | |
| # Check ffmpeg available | |
| ffmpeg_ok = subprocess.run(["ffmpeg", "-version"], capture_output=True).returncode == 0 | |
| return jsonify({ | |
| "status": "ok", | |
| "ffmpeg": ffmpeg_ok, | |
| "gameplay_clips": len(glob.glob(f"{GAMEPLAY_DIR}/*.mp4")), | |
| "active_jobs": len([j for j in JOBS.values() if j.get("status") == "running"]) | |
| }) | |
| # ββ Cleanup old files every hour ββββββββββββββββββββββββββββ | |
| def cleanup_loop(): | |
| while True: | |
| time.sleep(3600) | |
| now = time.time() | |
| for f in glob.glob(f"{OUTPUT_DIR}/*.mp4"): | |
| if now - os.path.getmtime(f) > 7200: # 2 hours | |
| os.remove(f) | |
| threading.Thread(target=cleanup_loop, daemon=True).start() | |
| if __name__ == "__main__": | |
| app.run(host="0.0.0.0", port=7860, debug=True, threaded=True) |