Spaces:
Sleeping
Sleeping
File size: 5,501 Bytes
8d0498c | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 | from __future__ import annotations
import json
import os
import re
import subprocess
import uuid
from pathlib import Path
from typing import Any, Dict
from fastapi import BackgroundTasks, FastAPI, HTTPException
from fastapi.responses import FileResponse, HTMLResponse
from fastapi.staticfiles import StaticFiles
from pydantic import BaseModel, Field
APP_DIR = Path(__file__).resolve().parent
TEMPLATES_DIR = APP_DIR / "templates"
RENDERS_DIR = APP_DIR / "renders"
TEMPLATES_DIR.mkdir(parents=True, exist_ok=True)
RENDERS_DIR.mkdir(parents=True, exist_ok=True)
app = FastAPI(title="Video Template API (FFmpeg)", version="0.1.0")
app.mount("/static", StaticFiles(directory=str(APP_DIR / "static")), name="static")
@app.get("/", response_class=HTMLResponse)
def home() -> Any:
return (APP_DIR / "static" / "index.html").read_text(encoding="utf-8")
def _safe_id(s: str) -> str:
if not re.fullmatch(r"[a-zA-Z0-9_\-]{1,80}", s):
raise HTTPException(400, "Invalid id (use letters, numbers, _ or -, max 80 chars)")
return s
def _template_path(tpl_id: str) -> Path:
return TEMPLATES_DIR / f"{tpl_id}.json"
def _load_template(tpl_id: str) -> Dict[str, Any]:
p = _template_path(tpl_id)
if not p.exists():
raise HTTPException(404, "Template not found")
return json.loads(p.read_text(encoding="utf-8"))
def _save_template(tpl_id: str, data: Dict[str, Any]) -> None:
p = _template_path(tpl_id)
p.write_text(json.dumps(data, indent=2), encoding="utf-8")
class RenderRequest(BaseModel):
template_id: str
variables: Dict[str, Any] = Field(default_factory=dict)
# In-memory job store (starter). For persistence, push outputs to a dataset repo.
JOBS: Dict[str, Dict[str, Any]] = {}
@app.get("/api/templates")
def list_templates() -> Any:
items = []
for p in sorted(TEMPLATES_DIR.glob("*.json")):
try:
obj = json.loads(p.read_text(encoding="utf-8"))
items.append({"id": obj.get("id", p.stem), "name": obj.get("name", "")})
except Exception:
items.append({"id": p.stem, "name": "(invalid json)"})
return items
@app.get("/api/templates/{tpl_id}")
def get_template(tpl_id: str) -> Any:
tpl_id = _safe_id(tpl_id)
return _load_template(tpl_id)
@app.post("/api/templates/{tpl_id}")
def upsert_template(tpl_id: str, body: Dict[str, Any]) -> Any:
tpl_id = _safe_id(tpl_id)
body = dict(body)
body["id"] = tpl_id
body.setdefault("name", tpl_id)
for k in ["width", "height", "fps", "duration_sec"]:
if k not in body:
raise HTTPException(400, f"Missing '{k}' in template")
_save_template(tpl_id, body)
return {"ok": True, "id": tpl_id}
def _render_job(job_id: str, req: RenderRequest) -> None:
try:
JOBS[job_id]["status"] = "running"
tpl = _load_template(_safe_id(req.template_id))
w = int(tpl["width"])
h = int(tpl["height"])
fps = int(tpl["fps"])
dur = float(tpl["duration_sec"])
bg = str(tpl.get("bg_color", "#111827"))
text = tpl.get("text", {})
raw_text = str(text.get("value", "{name}"))
name = str(req.variables.get("name", "Friend"))
final_text = raw_text.replace("{name}", name)
x = int(text.get("x", 80))
y = int(text.get("y", h - 120))
fontsize = int(text.get("fontsize", 48))
out_path = RENDERS_DIR / f"{job_id}.mp4"
fontfile = "/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf"
vf = (
f"drawtext=fontfile='{fontfile}':"
f"text='{final_text}':"
f"x={x}:y={y}:fontsize={fontsize}:fontcolor=white"
)
cmd = [
"ffmpeg", "-y",
"-f", "lavfi", "-i", f"color=c={bg}:s={w}x{h}:r={fps}",
"-t", str(dur),
"-vf", vf,
"-c:v", "libx264",
"-pix_fmt", "yuv420p",
str(out_path),
]
JOBS[job_id]["detail"] = "Rendering…"
proc = subprocess.run(cmd, capture_output=True, text=True)
if proc.returncode != 0:
raise RuntimeError(proc.stderr[-2000:] or "ffmpeg failed")
JOBS[job_id]["status"] = "done"
JOBS[job_id]["detail"] = "Done"
JOBS[job_id]["output_path"] = str(out_path)
except Exception as e:
JOBS[job_id]["status"] = "error"
JOBS[job_id]["detail"] = str(e)
@app.post("/api/render")
def render(req: RenderRequest, background: BackgroundTasks) -> Any:
tpl_id = _safe_id(req.template_id)
_load_template(tpl_id)
job_id = uuid.uuid4().hex
JOBS[job_id] = {"job_id": job_id, "status": "queued", "detail": "Queued"}
background.add_task(_render_job, job_id, req)
return {"job_id": job_id, "status": "queued"}
@app.get("/api/jobs/{job_id}")
def job_status(job_id: str) -> Any:
job = JOBS.get(job_id)
if not job:
raise HTTPException(404, "Job not found")
return {k: v for k, v in job.items() if k != "output_path"}
@app.get("/api/jobs/{job_id}/download")
def job_download(job_id: str) -> Any:
job = JOBS.get(job_id)
if not job:
raise HTTPException(404, "Job not found")
if job.get("status") != "done":
raise HTTPException(400, "Job not finished")
path = job.get("output_path")
if not path or not os.path.exists(path):
raise HTTPException(404, "Output missing (Space may have restarted)")
return FileResponse(path, media_type="video/mp4", filename=f"{job_id}.mp4")
|