| from __future__ import annotations |
|
|
| import os |
| from dataclasses import asdict |
| from functools import wraps |
| from pathlib import Path |
| from tempfile import TemporaryDirectory |
| from typing import Any |
| from uuid import uuid4 |
|
|
| import gradio as gr |
|
|
| from wcaguar.compliance import AuditEvent, AuditLogger, ReviewFlag, ReviewStatus |
| from wcaguar.compliance.prompts import Verbosity |
| from wcaguar.pipeline import PipelineConfig, PipelineResult, process_video |
| from wcaguar.render.report import ReportMetadata, generate_compliance_report |
| from wcaguar.render.to_vtt import ExportMetadata, summarize_review_status |
| from wcaguar.render import render_json, render_markdown, render_srt, render_vtt |
| from wcaguar.storage import default_output_dir |
|
|
|
|
| def _default_output_dir() -> Path: |
| return default_output_dir() |
|
|
|
|
| def _spaces_gpu(fn): |
| try: |
| import spaces |
|
|
| gpu_fn = spaces.GPU(duration=180)(fn) |
|
|
| @wraps(fn) |
| def wrapped(*args, **kwargs): |
| try: |
| return gpu_fn(*args, **kwargs) |
| except Exception: |
| return fn(*args, **kwargs) |
|
|
| return wrapped |
| except Exception: |
| return fn |
|
|
|
|
| def _materialize_video(video_input: Any) -> tuple[Path | None, TemporaryDirectory | None]: |
| if not video_input: |
| return None, None |
|
|
| if isinstance(video_input, str): |
| return Path(video_input), None |
|
|
| if isinstance(video_input, dict): |
| p = video_input.get("path") or video_input.get("filepath") |
| if isinstance(p, str) and p: |
| return Path(p), None |
| data = video_input.get("data") |
| if isinstance(data, (bytes, bytearray)): |
| video_input = bytes(data) |
|
|
| if isinstance(video_input, (bytes, bytearray)): |
| td = TemporaryDirectory() |
| path = Path(td.name) / "video.mp4" |
| path.write_bytes(bytes(video_input)) |
| return path, td |
|
|
| if hasattr(video_input, "read"): |
| td = TemporaryDirectory() |
| path = Path(td.name) / "video.mp4" |
| path.write_bytes(video_input.read()) |
| return path, td |
|
|
| return None, None |
|
|
|
|
| def _segments_table(result: PipelineResult) -> list[list[Any]]: |
| rows = [] |
| for s in result.segments: |
| rows.append( |
| [ |
| s.id, |
| f"{_ms_to_hms(s.start_ms)}–{_ms_to_hms(s.end_ms)}", |
| s.chapter_title, |
| s.status.value, |
| f"{s.confidence:.2f}", |
| ", ".join(sorted([f.value for f in s.flags])), |
| ] |
| ) |
| return rows |
|
|
|
|
| def _ms_to_hms(ms: int) -> str: |
| s = int(max(0, ms) // 1000) |
| h = s // 3600 |
| m = (s % 3600) // 60 |
| sec = s % 60 |
| return f"{h:02d}:{m:02d}:{sec:02d}" |
|
|
|
|
| def _run_process_impl( |
| video_input: Any, |
| verbosity: str, |
| custom_prompt: str, |
| whisper_model: str, |
| chapter_mode: str, |
| vision_enabled: bool, |
| vision_model_id: str, |
| auto_approve_threshold: float, |
| ) -> tuple[str, dict, list[list[Any]], dict]: |
| video_path, tmp = _materialize_video(video_input) |
| if not video_path: |
| return "Upload a video first.", {}, [], {} |
|
|
| try: |
| out_dir = _default_output_dir() |
| vision_cfg = PipelineConfig().vision |
| if vision_model_id.strip(): |
| vision_cfg = vision_cfg.__class__(model_id=vision_model_id.strip()) |
|
|
| cfg = PipelineConfig( |
| whisper_model=whisper_model, |
| chapter_mode=("chapter_llama" if chapter_mode == "Chapter-Llama" else "heuristic"), |
| vision_enabled=bool(vision_enabled), |
| vision=vision_cfg, |
| verbosity=Verbosity(verbosity), |
| custom_prompt=(custom_prompt.strip() or None), |
| auto_approve_threshold=float(auto_approve_threshold), |
| output_dir=out_dir, |
| ) |
|
|
| status_lines = [] |
|
|
| def prog(p: float, msg: str) -> None: |
| status_lines.append(f"{int(p*100):3d}% {msg}") |
|
|
| result = process_video(video_path=Path(video_path), cfg=cfg, progress=prog) |
|
|
| chapters_json = [ |
| { |
| "title": c.title, |
| "start_ms": c.start_ms, |
| "end_ms": c.end_ms, |
| "start": _ms_to_hms(c.start_ms), |
| "end": _ms_to_hms(c.end_ms), |
| } |
| for c in result.chapters |
| ] |
| state = { |
| "result": { |
| "video_id": result.video_id, |
| "input_path": result.input_path, |
| "duration_ms": result.duration_ms, |
| "transcript": result.transcript, |
| "chapters": chapters_json, |
| "segments": [ |
| { |
| "id": s.id, |
| "start_ms": s.start_ms, |
| "end_ms": s.end_ms, |
| "chapter_title": s.chapter_title, |
| "original_text": s.original_text, |
| "current_text": s.current_text, |
| "confidence": s.confidence, |
| "flags": sorted([f.value for f in s.flags]), |
| "status": s.status.value, |
| "notes": s.notes, |
| } |
| for s in result.segments |
| ], |
| "debug": result.debug, |
| } |
| } |
|
|
| return "\n".join(status_lines[-6:]) or "Done.", {"chapters": chapters_json, "debug": result.debug}, _segments_table(result), state |
| except Exception as e: |
| msg = f"Error: {type(e).__name__}: {e}" |
| return msg, {"error": msg}, [], {"error": msg} |
| finally: |
| if tmp is not None: |
| try: |
| tmp.cleanup() |
| except Exception: |
| pass |
|
|
|
|
| @_spaces_gpu |
| def _run_process_gpu( |
| video_input: Any, |
| verbosity: str, |
| custom_prompt: str, |
| whisper_model: str, |
| chapter_mode: str, |
| vision_enabled: bool, |
| vision_model_id: str, |
| auto_approve_threshold: float, |
| ) -> tuple[str, dict, list[list[Any]], dict]: |
| return _run_process_impl( |
| video_input, |
| verbosity, |
| custom_prompt, |
| whisper_model, |
| chapter_mode, |
| vision_enabled, |
| vision_model_id, |
| auto_approve_threshold, |
| ) |
|
|
|
|
| def run_process( |
| video_input: Any, |
| verbosity: str, |
| custom_prompt: str, |
| whisper_model: str, |
| chapter_mode: str, |
| vision_enabled: bool, |
| vision_model_id: str, |
| auto_approve_threshold: float, |
| ) -> tuple[str, dict, list[list[Any]], dict]: |
| needs_gpu = bool(vision_enabled) or (chapter_mode == "Chapter-Llama") |
| if needs_gpu: |
| return _run_process_gpu( |
| video_input, |
| verbosity, |
| custom_prompt, |
| whisper_model, |
| chapter_mode, |
| vision_enabled, |
| vision_model_id, |
| auto_approve_threshold, |
| ) |
| return _run_process_impl( |
| video_input, |
| verbosity, |
| custom_prompt, |
| whisper_model, |
| chapter_mode, |
| vision_enabled, |
| vision_model_id, |
| auto_approve_threshold, |
| ) |
|
|
|
|
| def _get_segment(state: dict, seg_id: str) -> dict | None: |
| res = (state or {}).get("result") or {} |
| for s in res.get("segments", []): |
| if s.get("id") == seg_id: |
| return s |
| return None |
|
|
|
|
| def on_select_segment(evt: gr.SelectData, state: dict) -> tuple[str, str, list[str], str, str, str, str]: |
| row = evt.index[0] if isinstance(evt.index, (list, tuple)) else evt.index |
| res = (state or {}).get("result") or {} |
| segs = res.get("segments", []) |
| if not segs or row is None or int(row) >= len(segs): |
| return "", "", [], "0.00", "" |
| seg = segs[int(row)] |
| flags = seg.get("flags") or [] |
| return ( |
| seg.get("id") or "", |
| seg.get("original_text") or "", |
| flags, |
| f"{float(seg.get('confidence') or 0.0):.2f}", |
| seg.get("current_text") or "", |
| seg.get("status") or ReviewStatus.DRAFT.value, |
| seg.get("notes") or "", |
| ) |
|
|
|
|
| def update_segment( |
| state: dict, |
| seg_id: str, |
| current_text: str, |
| notes: str, |
| status: str, |
| flags: list[str], |
| ) -> tuple[list[list[Any]], dict]: |
| res = (state or {}).get("result") or {} |
| seg = _get_segment(state, seg_id) |
| if not seg: |
| return _segments_table(_state_to_result(state)), state |
|
|
| seg["current_text"] = current_text |
| seg["notes"] = notes.strip() or None |
| seg["status"] = status |
| seg["flags"] = sorted(set(flags or [])) |
|
|
| audit = AuditLogger() |
| audit.log( |
| video_id=res.get("video_id") or "video", |
| event=AuditEvent( |
| audit_id=str(uuid4()), |
| video_id=res.get("video_id") or "video", |
| segment_id=seg_id, |
| timestamp=AuditEvent.now_iso(), |
| action="edited", |
| original_text=seg.get("original_text"), |
| final_text=current_text, |
| notes=seg.get("notes"), |
| ), |
| ) |
|
|
| return _segments_table(_state_to_result(state)), state |
|
|
|
|
| def _state_to_result(state: dict) -> PipelineResult: |
| res = (state or {}).get("result") or {} |
| from wcaguar.compliance.models import ReviewFlag, ReviewSegment, ReviewStatus |
|
|
| segs = [] |
| for s in res.get("segments", []): |
| segs.append( |
| ReviewSegment( |
| id=s["id"], |
| start_ms=int(s["start_ms"]), |
| end_ms=int(s["end_ms"]), |
| chapter_title=str(s["chapter_title"]), |
| original_text=str(s["original_text"]), |
| current_text=str(s["current_text"]), |
| confidence=float(s["confidence"]), |
| flags=set(ReviewFlag(f) for f in s.get("flags", [])), |
| status=ReviewStatus(str(s.get("status"))), |
| notes=s.get("notes"), |
| ) |
| ) |
| return PipelineResult( |
| video_id=str(res.get("video_id", "video")), |
| input_path=str(res.get("input_path", "")), |
| duration_ms=int(res.get("duration_ms", 0)), |
| transcript=str(res.get("transcript", "")), |
| chapters=[], |
| segments=segs, |
| debug=dict(res.get("debug", {})), |
| ) |
|
|
|
|
| def export_outputs( |
| state: dict, |
| formats: list[str], |
| reviewed_only: bool, |
| reviewer_id: str, |
| reviewer_name: str, |
| institution: str, |
| ) -> tuple[list[str], str]: |
| res = (state or {}).get("result") or {} |
| if not res: |
| return [], "Nothing to export yet." |
|
|
| out_dir = _default_output_dir() |
| out_dir.mkdir(parents=True, exist_ok=True) |
|
|
| result = _state_to_result(state) |
| segments = result.segments |
| if reviewed_only: |
| segments = [s for s in segments if s.status in {ReviewStatus.APPROVED, ReviewStatus.EXPORTED}] |
|
|
| meta = ExportMetadata.now( |
| input_path=result.input_path, |
| ai_model=str(res.get("debug", {}).get("vision_model", "unknown")), |
| review_status=summarize_review_status(result.segments), |
| reviewer_id=reviewer_id.strip() or None, |
| reviewer_name=reviewer_name.strip() or None, |
| institution=institution.strip() or None, |
| ) |
|
|
| written: list[Path] = [] |
| base = Path(result.input_path).stem or result.video_id |
| if "VTT" in formats: |
| written.append(render_vtt(segments=segments, metadata=meta, output_path=out_dir / f"{base}.vtt")) |
| if "SRT" in formats: |
| written.append(render_srt(segments=segments, output_path=out_dir / f"{base}.srt")) |
| if "Markdown" in formats: |
| written.append(render_markdown(segments=segments, output_path=out_dir / f"{base}.md")) |
| if "JSON" in formats: |
| written.append( |
| render_json( |
| segments=segments, |
| metadata={"export": asdict(meta), "debug": res.get("debug", {})}, |
| output_path=out_dir / f"{base}.json", |
| ) |
| ) |
| if "Compliance Report" in formats: |
| report = generate_compliance_report( |
| segments=segments, |
| metadata=ReportMetadata( |
| input_path=result.input_path, |
| generated_at=meta.generated_at, |
| reviewer_id=meta.reviewer_id, |
| reviewer_name=meta.reviewer_name, |
| institution=meta.institution, |
| ), |
| ) |
| report_path = out_dir / f"{base}_compliance_report.txt" |
| report_path.write_text(report, encoding="utf-8") |
| written.append(report_path) |
|
|
| audit = AuditLogger() |
| for s in result.segments: |
| audit.log( |
| video_id=result.video_id, |
| event=AuditEvent( |
| audit_id=str(uuid4()), |
| video_id=result.video_id, |
| segment_id=s.id, |
| timestamp=AuditEvent.now_iso(), |
| action="exported", |
| final_text=s.current_text, |
| user_id=meta.reviewer_id, |
| user_name=meta.reviewer_name, |
| ), |
| ) |
|
|
| return [str(p) for p in written], f"Wrote {len(written)} file(s) to {out_dir}." |
|
|
|
|
| with gr.Blocks(title="wcaguar") as demo: |
| gr.Markdown("# 🐆 wcaguar\nWCAG-aligned chapters + audio descriptions for long-form university video.") |
|
|
| state = gr.State({}) |
| default_whisper = "small" if os.environ.get("SPACE_ID") else "large-v3" |
|
|
| with gr.Tab("Process"): |
| with gr.Row(): |
| with gr.Column(scale=1): |
| video_input = gr.File(label="Upload video", file_types=["video"], type="binary") |
| verbosity = gr.Radio(["minimal", "standard", "detailed"], value="standard", label="Verbosity") |
| custom_prompt = gr.Textbox(label="Custom prompt (optional)", lines=3) |
| whisper_model = gr.Dropdown( |
| ["large-v3", "medium", "small", "base", "tiny"], |
| value=default_whisper, |
| label="Whisper model (faster-whisper)", |
| ) |
| chapter_mode = gr.Dropdown(["Chapter-Llama", "Heuristic"], value="Heuristic", label="Chaptering") |
| vision_enabled = gr.Checkbox(label="Enable vision (Qwen3-VL)", value=False) |
| vision_model_id = gr.Textbox(label="Vision model id", value="Qwen/Qwen3-VL-8B-Instruct") |
| auto_approve_threshold = gr.Slider(0.5, 1.0, value=0.85, step=0.01, label="Auto-approve threshold") |
| process_btn = gr.Button("🐆 Generate", variant="primary") |
| with gr.Column(scale=2): |
| status = gr.Textbox(label="Status", lines=6, interactive=False) |
| chapters_preview = gr.JSON(label="Chapters + debug") |
| segments_df = gr.Dataframe( |
| headers=["ID", "Time", "Chapter", "Status", "Confidence", "Flags"], |
| label="Segments", |
| interactive=True, |
| ) |
|
|
| process_btn.click( |
| fn=run_process, |
| inputs=[video_input, verbosity, custom_prompt, whisper_model, chapter_mode, vision_enabled, vision_model_id, auto_approve_threshold], |
| outputs=[status, chapters_preview, segments_df, state], |
| ) |
|
|
| with gr.Tab("Review"): |
| gr.Markdown("Select a row in **Segments** (Process tab) to edit here.") |
| seg_id = gr.Textbox(label="Segment ID", interactive=False) |
| original_text = gr.Textbox(label="Original (AI)", lines=4, interactive=False) |
| current_text = gr.Textbox(label="Current (editable)", lines=4) |
| flags = gr.CheckboxGroup(choices=[f.value for f in ReviewFlag], label="Flags") |
| confidence = gr.Textbox(label="Confidence", interactive=False) |
| notes = gr.Textbox(label="Reviewer notes", lines=2) |
| status_choice = gr.Dropdown([s.value for s in ReviewStatus], value=ReviewStatus.DRAFT.value, label="Status") |
| save_btn = gr.Button("Save edits", variant="primary") |
|
|
| segments_df.select( |
| fn=on_select_segment, |
| inputs=[state], |
| outputs=[seg_id, original_text, flags, confidence, current_text, status_choice, notes], |
| ) |
| save_btn.click(fn=update_segment, inputs=[state, seg_id, current_text, notes, status_choice, flags], outputs=[segments_df, state]) |
|
|
| with gr.Tab("Export"): |
| formats = gr.CheckboxGroup(["VTT", "SRT", "Markdown", "JSON", "Compliance Report"], value=["VTT", "JSON"], label="Formats") |
| reviewed_only = gr.Checkbox(label="Export reviewed only", value=True) |
| reviewer_id = gr.Textbox(label="Reviewer ID (email)") |
| reviewer_name = gr.Textbox(label="Reviewer name") |
| institution = gr.Textbox(label="Institution") |
| export_btn = gr.Button("📥 Export", variant="primary") |
| downloads = gr.Files(label="Download") |
| export_status = gr.Textbox(label="Export status", interactive=False) |
|
|
| export_btn.click(fn=export_outputs, inputs=[state, formats, reviewed_only, reviewer_id, reviewer_name, institution], outputs=[downloads, export_status]) |
|
|
| with gr.Tab("Settings"): |
| gr.Markdown( |
| "For a true RTX 4090 run: set **Chaptering = Chapter-Llama** and enable vision.\n\n" |
| "If you do not have HF access to Llama 3.1, keep **Heuristic** chaptering." |
| ) |
|
|
|
|
| if __name__ == "__main__": |
| demo.queue() |
| demo.launch( |
| server_name=os.environ.get("GRADIO_SERVER_NAME", "0.0.0.0" if os.environ.get("SPACE_ID") else "127.0.0.1"), |
| server_port=int(os.environ.get("GRADIO_SERVER_PORT", "7860")), |
| show_error=bool(os.environ.get("SPACE_ID")), |
| ) |
|
|