wcaguar / app.py
ngoldbla
Fix frozen PipelineConfig vision override
cd43eab
from __future__ import annotations
import os
from dataclasses import asdict
from functools import wraps
from pathlib import Path
from tempfile import TemporaryDirectory
from typing import Any
from uuid import uuid4
import gradio as gr
from wcaguar.compliance import AuditEvent, AuditLogger, ReviewFlag, ReviewStatus
from wcaguar.compliance.prompts import Verbosity
from wcaguar.pipeline import PipelineConfig, PipelineResult, process_video
from wcaguar.render.report import ReportMetadata, generate_compliance_report
from wcaguar.render.to_vtt import ExportMetadata, summarize_review_status
from wcaguar.render import render_json, render_markdown, render_srt, render_vtt
from wcaguar.storage import default_output_dir
def _default_output_dir() -> Path:
return default_output_dir()
def _spaces_gpu(fn):
try:
import spaces # type: ignore
gpu_fn = spaces.GPU(duration=180)(fn)
@wraps(fn)
def wrapped(*args, **kwargs):
try:
return gpu_fn(*args, **kwargs)
except Exception:
return fn(*args, **kwargs)
return wrapped
except Exception:
return fn
def _materialize_video(video_input: Any) -> tuple[Path | None, TemporaryDirectory | None]:
if not video_input:
return None, None
if isinstance(video_input, str):
return Path(video_input), None
if isinstance(video_input, dict):
p = video_input.get("path") or video_input.get("filepath")
if isinstance(p, str) and p:
return Path(p), None
data = video_input.get("data")
if isinstance(data, (bytes, bytearray)):
video_input = bytes(data)
if isinstance(video_input, (bytes, bytearray)):
td = TemporaryDirectory()
path = Path(td.name) / "video.mp4"
path.write_bytes(bytes(video_input))
return path, td
if hasattr(video_input, "read"):
td = TemporaryDirectory()
path = Path(td.name) / "video.mp4"
path.write_bytes(video_input.read())
return path, td
return None, None
def _segments_table(result: PipelineResult) -> list[list[Any]]:
rows = []
for s in result.segments:
rows.append(
[
s.id,
f"{_ms_to_hms(s.start_ms)}{_ms_to_hms(s.end_ms)}",
s.chapter_title,
s.status.value,
f"{s.confidence:.2f}",
", ".join(sorted([f.value for f in s.flags])),
]
)
return rows
def _ms_to_hms(ms: int) -> str:
s = int(max(0, ms) // 1000)
h = s // 3600
m = (s % 3600) // 60
sec = s % 60
return f"{h:02d}:{m:02d}:{sec:02d}"
def _run_process_impl(
video_input: Any,
verbosity: str,
custom_prompt: str,
whisper_model: str,
chapter_mode: str,
vision_enabled: bool,
vision_model_id: str,
auto_approve_threshold: float,
) -> tuple[str, dict, list[list[Any]], dict]:
video_path, tmp = _materialize_video(video_input)
if not video_path:
return "Upload a video first.", {}, [], {}
try:
out_dir = _default_output_dir()
vision_cfg = PipelineConfig().vision
if vision_model_id.strip():
vision_cfg = vision_cfg.__class__(model_id=vision_model_id.strip())
cfg = PipelineConfig(
whisper_model=whisper_model,
chapter_mode=("chapter_llama" if chapter_mode == "Chapter-Llama" else "heuristic"),
vision_enabled=bool(vision_enabled),
vision=vision_cfg,
verbosity=Verbosity(verbosity),
custom_prompt=(custom_prompt.strip() or None),
auto_approve_threshold=float(auto_approve_threshold),
output_dir=out_dir,
)
status_lines = []
def prog(p: float, msg: str) -> None:
status_lines.append(f"{int(p*100):3d}% {msg}")
result = process_video(video_path=Path(video_path), cfg=cfg, progress=prog)
chapters_json = [
{
"title": c.title,
"start_ms": c.start_ms,
"end_ms": c.end_ms,
"start": _ms_to_hms(c.start_ms),
"end": _ms_to_hms(c.end_ms),
}
for c in result.chapters
]
state = {
"result": {
"video_id": result.video_id,
"input_path": result.input_path,
"duration_ms": result.duration_ms,
"transcript": result.transcript,
"chapters": chapters_json,
"segments": [
{
"id": s.id,
"start_ms": s.start_ms,
"end_ms": s.end_ms,
"chapter_title": s.chapter_title,
"original_text": s.original_text,
"current_text": s.current_text,
"confidence": s.confidence,
"flags": sorted([f.value for f in s.flags]),
"status": s.status.value,
"notes": s.notes,
}
for s in result.segments
],
"debug": result.debug,
}
}
return "\n".join(status_lines[-6:]) or "Done.", {"chapters": chapters_json, "debug": result.debug}, _segments_table(result), state
except Exception as e:
msg = f"Error: {type(e).__name__}: {e}"
return msg, {"error": msg}, [], {"error": msg}
finally:
if tmp is not None:
try:
tmp.cleanup()
except Exception:
pass
@_spaces_gpu
def _run_process_gpu(
video_input: Any,
verbosity: str,
custom_prompt: str,
whisper_model: str,
chapter_mode: str,
vision_enabled: bool,
vision_model_id: str,
auto_approve_threshold: float,
) -> tuple[str, dict, list[list[Any]], dict]:
return _run_process_impl(
video_input,
verbosity,
custom_prompt,
whisper_model,
chapter_mode,
vision_enabled,
vision_model_id,
auto_approve_threshold,
)
def run_process(
video_input: Any,
verbosity: str,
custom_prompt: str,
whisper_model: str,
chapter_mode: str,
vision_enabled: bool,
vision_model_id: str,
auto_approve_threshold: float,
) -> tuple[str, dict, list[list[Any]], dict]:
needs_gpu = bool(vision_enabled) or (chapter_mode == "Chapter-Llama")
if needs_gpu:
return _run_process_gpu(
video_input,
verbosity,
custom_prompt,
whisper_model,
chapter_mode,
vision_enabled,
vision_model_id,
auto_approve_threshold,
)
return _run_process_impl(
video_input,
verbosity,
custom_prompt,
whisper_model,
chapter_mode,
vision_enabled,
vision_model_id,
auto_approve_threshold,
)
def _get_segment(state: dict, seg_id: str) -> dict | None:
res = (state or {}).get("result") or {}
for s in res.get("segments", []):
if s.get("id") == seg_id:
return s
return None
def on_select_segment(evt: gr.SelectData, state: dict) -> tuple[str, str, list[str], str, str, str, str]:
row = evt.index[0] if isinstance(evt.index, (list, tuple)) else evt.index
res = (state or {}).get("result") or {}
segs = res.get("segments", [])
if not segs or row is None or int(row) >= len(segs):
return "", "", [], "0.00", ""
seg = segs[int(row)]
flags = seg.get("flags") or []
return (
seg.get("id") or "",
seg.get("original_text") or "",
flags,
f"{float(seg.get('confidence') or 0.0):.2f}",
seg.get("current_text") or "",
seg.get("status") or ReviewStatus.DRAFT.value,
seg.get("notes") or "",
)
def update_segment(
state: dict,
seg_id: str,
current_text: str,
notes: str,
status: str,
flags: list[str],
) -> tuple[list[list[Any]], dict]:
res = (state or {}).get("result") or {}
seg = _get_segment(state, seg_id)
if not seg:
return _segments_table(_state_to_result(state)), state
seg["current_text"] = current_text
seg["notes"] = notes.strip() or None
seg["status"] = status
seg["flags"] = sorted(set(flags or []))
audit = AuditLogger()
audit.log(
video_id=res.get("video_id") or "video",
event=AuditEvent(
audit_id=str(uuid4()),
video_id=res.get("video_id") or "video",
segment_id=seg_id,
timestamp=AuditEvent.now_iso(),
action="edited",
original_text=seg.get("original_text"),
final_text=current_text,
notes=seg.get("notes"),
),
)
return _segments_table(_state_to_result(state)), state
def _state_to_result(state: dict) -> PipelineResult:
res = (state or {}).get("result") or {}
from wcaguar.compliance.models import ReviewFlag, ReviewSegment, ReviewStatus
segs = []
for s in res.get("segments", []):
segs.append(
ReviewSegment(
id=s["id"],
start_ms=int(s["start_ms"]),
end_ms=int(s["end_ms"]),
chapter_title=str(s["chapter_title"]),
original_text=str(s["original_text"]),
current_text=str(s["current_text"]),
confidence=float(s["confidence"]),
flags=set(ReviewFlag(f) for f in s.get("flags", [])),
status=ReviewStatus(str(s.get("status"))),
notes=s.get("notes"),
)
)
return PipelineResult(
video_id=str(res.get("video_id", "video")),
input_path=str(res.get("input_path", "")),
duration_ms=int(res.get("duration_ms", 0)),
transcript=str(res.get("transcript", "")),
chapters=[],
segments=segs,
debug=dict(res.get("debug", {})),
)
def export_outputs(
state: dict,
formats: list[str],
reviewed_only: bool,
reviewer_id: str,
reviewer_name: str,
institution: str,
) -> tuple[list[str], str]:
res = (state or {}).get("result") or {}
if not res:
return [], "Nothing to export yet."
out_dir = _default_output_dir()
out_dir.mkdir(parents=True, exist_ok=True)
result = _state_to_result(state)
segments = result.segments
if reviewed_only:
segments = [s for s in segments if s.status in {ReviewStatus.APPROVED, ReviewStatus.EXPORTED}]
meta = ExportMetadata.now(
input_path=result.input_path,
ai_model=str(res.get("debug", {}).get("vision_model", "unknown")),
review_status=summarize_review_status(result.segments),
reviewer_id=reviewer_id.strip() or None,
reviewer_name=reviewer_name.strip() or None,
institution=institution.strip() or None,
)
written: list[Path] = []
base = Path(result.input_path).stem or result.video_id
if "VTT" in formats:
written.append(render_vtt(segments=segments, metadata=meta, output_path=out_dir / f"{base}.vtt"))
if "SRT" in formats:
written.append(render_srt(segments=segments, output_path=out_dir / f"{base}.srt"))
if "Markdown" in formats:
written.append(render_markdown(segments=segments, output_path=out_dir / f"{base}.md"))
if "JSON" in formats:
written.append(
render_json(
segments=segments,
metadata={"export": asdict(meta), "debug": res.get("debug", {})},
output_path=out_dir / f"{base}.json",
)
)
if "Compliance Report" in formats:
report = generate_compliance_report(
segments=segments,
metadata=ReportMetadata(
input_path=result.input_path,
generated_at=meta.generated_at,
reviewer_id=meta.reviewer_id,
reviewer_name=meta.reviewer_name,
institution=meta.institution,
),
)
report_path = out_dir / f"{base}_compliance_report.txt"
report_path.write_text(report, encoding="utf-8")
written.append(report_path)
audit = AuditLogger()
for s in result.segments:
audit.log(
video_id=result.video_id,
event=AuditEvent(
audit_id=str(uuid4()),
video_id=result.video_id,
segment_id=s.id,
timestamp=AuditEvent.now_iso(),
action="exported",
final_text=s.current_text,
user_id=meta.reviewer_id,
user_name=meta.reviewer_name,
),
)
return [str(p) for p in written], f"Wrote {len(written)} file(s) to {out_dir}."
with gr.Blocks(title="wcaguar") as demo:
gr.Markdown("# 🐆 wcaguar\nWCAG-aligned chapters + audio descriptions for long-form university video.")
state = gr.State({})
default_whisper = "small" if os.environ.get("SPACE_ID") else "large-v3"
with gr.Tab("Process"):
with gr.Row():
with gr.Column(scale=1):
video_input = gr.File(label="Upload video", file_types=["video"], type="binary")
verbosity = gr.Radio(["minimal", "standard", "detailed"], value="standard", label="Verbosity")
custom_prompt = gr.Textbox(label="Custom prompt (optional)", lines=3)
whisper_model = gr.Dropdown(
["large-v3", "medium", "small", "base", "tiny"],
value=default_whisper,
label="Whisper model (faster-whisper)",
)
chapter_mode = gr.Dropdown(["Chapter-Llama", "Heuristic"], value="Heuristic", label="Chaptering")
vision_enabled = gr.Checkbox(label="Enable vision (Qwen3-VL)", value=False)
vision_model_id = gr.Textbox(label="Vision model id", value="Qwen/Qwen3-VL-8B-Instruct")
auto_approve_threshold = gr.Slider(0.5, 1.0, value=0.85, step=0.01, label="Auto-approve threshold")
process_btn = gr.Button("🐆 Generate", variant="primary")
with gr.Column(scale=2):
status = gr.Textbox(label="Status", lines=6, interactive=False)
chapters_preview = gr.JSON(label="Chapters + debug")
segments_df = gr.Dataframe(
headers=["ID", "Time", "Chapter", "Status", "Confidence", "Flags"],
label="Segments",
interactive=True,
)
process_btn.click(
fn=run_process,
inputs=[video_input, verbosity, custom_prompt, whisper_model, chapter_mode, vision_enabled, vision_model_id, auto_approve_threshold],
outputs=[status, chapters_preview, segments_df, state],
)
with gr.Tab("Review"):
gr.Markdown("Select a row in **Segments** (Process tab) to edit here.")
seg_id = gr.Textbox(label="Segment ID", interactive=False)
original_text = gr.Textbox(label="Original (AI)", lines=4, interactive=False)
current_text = gr.Textbox(label="Current (editable)", lines=4)
flags = gr.CheckboxGroup(choices=[f.value for f in ReviewFlag], label="Flags")
confidence = gr.Textbox(label="Confidence", interactive=False)
notes = gr.Textbox(label="Reviewer notes", lines=2)
status_choice = gr.Dropdown([s.value for s in ReviewStatus], value=ReviewStatus.DRAFT.value, label="Status")
save_btn = gr.Button("Save edits", variant="primary")
segments_df.select(
fn=on_select_segment,
inputs=[state],
outputs=[seg_id, original_text, flags, confidence, current_text, status_choice, notes],
)
save_btn.click(fn=update_segment, inputs=[state, seg_id, current_text, notes, status_choice, flags], outputs=[segments_df, state])
with gr.Tab("Export"):
formats = gr.CheckboxGroup(["VTT", "SRT", "Markdown", "JSON", "Compliance Report"], value=["VTT", "JSON"], label="Formats")
reviewed_only = gr.Checkbox(label="Export reviewed only", value=True)
reviewer_id = gr.Textbox(label="Reviewer ID (email)")
reviewer_name = gr.Textbox(label="Reviewer name")
institution = gr.Textbox(label="Institution")
export_btn = gr.Button("📥 Export", variant="primary")
downloads = gr.Files(label="Download")
export_status = gr.Textbox(label="Export status", interactive=False)
export_btn.click(fn=export_outputs, inputs=[state, formats, reviewed_only, reviewer_id, reviewer_name, institution], outputs=[downloads, export_status])
with gr.Tab("Settings"):
gr.Markdown(
"For a true RTX 4090 run: set **Chaptering = Chapter-Llama** and enable vision.\n\n"
"If you do not have HF access to Llama 3.1, keep **Heuristic** chaptering."
)
if __name__ == "__main__":
demo.queue()
demo.launch(
server_name=os.environ.get("GRADIO_SERVER_NAME", "0.0.0.0" if os.environ.get("SPACE_ID") else "127.0.0.1"),
server_port=int(os.environ.get("GRADIO_SERVER_PORT", "7860")),
show_error=bool(os.environ.get("SPACE_ID")),
)