clipbot-ffmpeg / app.py
imseldrith's picture
Update app.py
55fb787 verified
#!/usr/bin/env python3
"""
FFmpeg Split-Screen Service - HuggingFace Space
Accepts video URLs, processes split-screen, returns download link
"""
from flask import Flask, request, jsonify, send_file
import subprocess, os, uuid, threading, glob, shutil, requests, time
app = Flask(__name__)
WORK_DIR = "/tmp/clipbot"
OUTPUT_DIR = "/tmp/clipbot/output"
GAMEPLAY_DIR = "/tmp/clipbot/gameplay"
os.makedirs(OUTPUT_DIR, exist_ok=True)
os.makedirs(GAMEPLAY_DIR, exist_ok=True)
JOBS = {}
GAMEPLAY_URLS = [
# Add your gameplay clip direct URLs here (e.g. from HuggingFace dataset or GitHub releases)
# "https://your-hf-space.hf.space/gameplay/1.mp4",
]
def download_file(url, dest):
"""Download a file from URL to dest path."""
r = requests.get(url, stream=True, timeout=120)
r.raise_for_status()
with open(dest, 'wb') as f:
for chunk in r.iter_content(chunk_size=8192):
f.write(chunk)
return dest
def run_ffmpeg(main_path, gameplay_path, output_path, dur=58):
"""Run split-screen FFmpeg command."""
filter_complex = (
"[0:v]scale=1080:960:force_original_aspect_ratio=increase,"
"crop=1080:960,setsar=1[top];"
"[1:v]scale=1080:960:force_original_aspect_ratio=increase,"
"crop=1080:960,setsar=1[bot];"
"[top][bot]vstack=inputs=2[stacked];"
"[stacked]drawbox=x=0:y=956:w=1080:h=8:color=white@0.9:t=fill[out]"
)
cmd = [
"ffmpeg", "-y",
"-ss", "0", "-t", str(dur), "-i", main_path,
"-stream_loop", "-1", "-t", str(dur), "-i", gameplay_path,
"-filter_complex", filter_complex,
"-map", "[out]", "-map", "0:a",
"-t", str(dur),
"-c:v", "libx264", "-preset", "ultrafast", "-crf", "26",
"-c:a", "aac", "-b:a", "128k", "-ar", "44100",
"-movflags", "+faststart", "-pix_fmt", "yuv420p",
output_path
]
result = subprocess.run(cmd, capture_output=True, text=True, timeout=600)
return result
# ── /clip ───────────────────────────────────────────────────
@app.route("/clip", methods=["POST"])
def clip():
"""
Accepts:
- mainVideoUrl: direct URL to main video file
- gameplayUrl: direct URL to gameplay clip (optional, picks random if not provided)
- videoId: unique identifier
- duration: max seconds (default 58)
Returns job_id immediately, poll /job/<id> for result
"""
data = request.json
video_id = data.get("videoId", str(uuid.uuid4()))
main_url = data.get("mainVideoUrl")
gameplay_url = data.get("gameplayUrl")
dur = min(int(data.get("duration", 58)), 58)
if not main_url:
return jsonify({"error": "mainVideoUrl is required"}), 400
job_id = str(uuid.uuid4())
JOBS[job_id] = {"status": "running", "videoId": video_id}
def run():
work = f"/tmp/job_{job_id}"
os.makedirs(work, exist_ok=True)
try:
# Download main video
main_path = f"{work}/main.mp4"
download_file(main_url, main_path)
# Download or pick gameplay
gameplay_path = f"{work}/gameplay.mp4"
if gameplay_url:
download_file(gameplay_url, gameplay_path)
else:
# Pick from local gameplay dir
files = glob.glob(f"{GAMEPLAY_DIR}/*.mp4")
if not files:
JOBS[job_id] = {"status": "error", "error": "No gameplay clips available"}
return
import random
shutil.copy(random.choice(files), gameplay_path)
output_path = f"{OUTPUT_DIR}/{video_id}_final.mp4"
result = run_ffmpeg(main_path, gameplay_path, output_path, dur)
if result.returncode == 0 and os.path.exists(output_path):
JOBS[job_id] = {
"status": "done",
"videoId": video_id,
"downloadUrl": f"/download/{video_id}_final.mp4",
"outputPath": output_path
}
else:
JOBS[job_id] = {"status": "error", "error": result.stderr[-500:]}
except Exception as e:
JOBS[job_id] = {"status": "error", "error": str(e)}
finally:
shutil.rmtree(work, ignore_errors=True)
threading.Thread(target=run, daemon=True).start()
return jsonify({"job_id": job_id, "status": "started", "videoId": video_id})
# ── /job/<id> ───────────────────────────────────────────────
@app.route("/job/<job_id>", methods=["GET"])
def job_status(job_id):
return jsonify(JOBS.get(job_id, {"status": "not_found"}))
# ── /download/<filename> ─────────────────────────────────────
@app.route("/download/<filename>", methods=["GET"])
def download_file_route(filename):
path = f"{OUTPUT_DIR}/{filename}"
if not os.path.exists(path):
return jsonify({"error": "File not found"}), 404
return send_file(path, as_attachment=True, mimetype="video/mp4")
# ── /upload_gameplay ─────────────────────────────────────────
@app.route("/upload_gameplay", methods=["POST"])
def upload_gameplay():
"""Upload a gameplay clip to the space."""
if 'file' not in request.files:
return jsonify({"error": "No file provided"}), 400
f = request.files['file']
filename = f"gameplay_{uuid.uuid4().hex[:8]}.mp4"
f.save(f"{GAMEPLAY_DIR}/{filename}")
return jsonify({"status": "done", "filename": filename})
# ── /health ──────────────────────────────────────────────────
@app.route("/health", methods=["GET"])
def health():
# Check ffmpeg available
ffmpeg_ok = subprocess.run(["ffmpeg", "-version"], capture_output=True).returncode == 0
return jsonify({
"status": "ok",
"ffmpeg": ffmpeg_ok,
"gameplay_clips": len(glob.glob(f"{GAMEPLAY_DIR}/*.mp4")),
"active_jobs": len([j for j in JOBS.values() if j.get("status") == "running"])
})
# ── Cleanup old files every hour ────────────────────────────
def cleanup_loop():
while True:
time.sleep(3600)
now = time.time()
for f in glob.glob(f"{OUTPUT_DIR}/*.mp4"):
if now - os.path.getmtime(f) > 7200: # 2 hours
os.remove(f)
threading.Thread(target=cleanup_loop, daemon=True).start()
if __name__ == "__main__":
app.run(host="0.0.0.0", port=7860, debug=True, threaded=True)