Spaces:
Runtime error
Runtime error
| import json | |
| import os | |
| import uuid | |
| import cv2 | |
| import subprocess | |
| import numpy as np | |
| import gradio as gr | |
| import tempfile | |
| from typing import Dict, List, Iterable, Tuple | |
| from ns_vfs.video.read_mp4 import Mp4Reader | |
| from execute_with_mp4 import process_entry | |
| def _load_entry_from_reader(video_path, query_text): | |
| reader = Mp4Reader( | |
| [{"path": video_path, "query": query_text}], | |
| openai_save_path="", | |
| sampling_rate_fps=0.5 | |
| ) | |
| data = reader.read_video() | |
| if not data: | |
| raise RuntimeError("No data returned by Mp4Reader (check video path)") | |
| return data[0] | |
| def _make_empty_video(path, width=320, height=240, fps=1.0): | |
| fourcc = cv2.VideoWriter_fourcc(*"mp4v") | |
| writer = cv2.VideoWriter(path, fourcc, fps, (width, height)) | |
| frame = np.zeros((height, width, 3), dtype=np.uint8) | |
| writer.write(frame) | |
| writer.release() | |
| return path | |
| def _crop_video_ffmpeg(input_path, output_path, frame_indices, prop_matrix): | |
| if len(frame_indices) == 0: | |
| cap = cv2.VideoCapture(str(input_path)) | |
| if not cap.isOpened(): | |
| raise RuntimeError(f"Could not open video: {input_path}") | |
| width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) | |
| height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) | |
| cap.release() | |
| _make_empty_video(output_path, width, height, fps=1.0) | |
| return | |
| def group_into_ranges(frames): | |
| if not frames: | |
| return [] | |
| frames = sorted(set(frames)) | |
| ranges = [] | |
| start = prev = frames[0] | |
| for f in frames[1:]: | |
| if f == prev + 1: | |
| prev = f | |
| else: | |
| ranges.append((start, prev + 1)) # end-exclusive | |
| start = prev = f | |
| ranges.append((start, prev + 1)) | |
| return ranges | |
| ranges = group_into_ranges(frame_indices) | |
| filters = [] | |
| labels = [] | |
| for i, (start, end) in enumerate(ranges): | |
| filters.append( | |
| f"[0:v]trim=start_frame={start}:end_frame={end},setpts=PTS-STARTPTS[v{i}]" | |
| ) | |
| labels.append(f"[v{i}]") | |
| filters.append(f"{''.join(labels)}concat=n={len(ranges)}:v=1:a=0[outv]") | |
| cmd = [ | |
| "ffmpeg", "-y", "-i", input_path, | |
| "-filter_complex", "; ".join(filters), | |
| "-map", "[outv]", | |
| "-c:v", "libx264", "-preset", "fast", "-crf", "23", | |
| output_path, | |
| ] | |
| subprocess.run(cmd, check=True) | |
| def _crop_video(input_path: str, output_path: str, frame_indices: List[int], prop_matrix: Dict[str, List[int]]): | |
| input_path = str(input_path) | |
| output_path = str(output_path) | |
| # Probe width/height/fps | |
| cap = cv2.VideoCapture(input_path) | |
| if not cap.isOpened(): | |
| raise RuntimeError(f"Could not open video: {input_path}") | |
| width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) | |
| height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) | |
| fps = float(cap.get(cv2.CAP_PROP_FPS)) or 0.0 | |
| cap.release() | |
| if fps <= 0: | |
| fps = 30.0 | |
| # If nothing to write, emit a 1-frame empty video | |
| if not frame_indices: | |
| from numpy import zeros, uint8 | |
| fourcc = cv2.VideoWriter_fourcc(*'mp4v') | |
| out = cv2.VideoWriter(output_path, fourcc, 1.0, (width, height)) | |
| out.write(zeros((height, width, 3), dtype=uint8)) | |
| out.release() | |
| return | |
| # Helper: group consecutive integers into (start, end_exclusive) | |
| def _group_ranges(frames: Iterable[int]) -> List[Tuple[int, int]]: | |
| f = sorted(set(int(x) for x in frames)) | |
| if not f: | |
| return [] | |
| out = [] | |
| s = p = f[0] | |
| for x in f[1:]: | |
| if x == p + 1: | |
| p = x | |
| else: | |
| out.append((s, p + 1)) | |
| s = p = x | |
| out.append((s, p + 1)) | |
| return out | |
| # Invert prop_matrix to {frame_idx: sorted [props]} | |
| props_by_frame: Dict[int, List[str]] = {} | |
| for prop, frames in (prop_matrix or {}).items(): | |
| for fi in frames: | |
| fi = int(fi) | |
| props_by_frame.setdefault(fi, []).append(prop) | |
| for fi in list(props_by_frame.keys()): | |
| props_by_frame[fi] = sorted(set(props_by_frame[fi])) | |
| # Only subtitle frames we will output | |
| fi_set = set(int(x) for x in frame_indices) | |
| frames_with_labels = sorted(fi for fi in fi_set if props_by_frame.get(fi)) | |
| # Compress consecutive frames that share the same label set | |
| grouped_label_spans: List[Tuple[int, int, Tuple[str, ...]]] = [] | |
| prev_f = None | |
| prev_labels: Tuple[str, ...] = () | |
| span_start = None | |
| for f in frames_with_labels: | |
| labels = tuple(props_by_frame.get(f, [])) | |
| if prev_f is None: | |
| span_start, prev_f, prev_labels = f, f, labels | |
| elif (f == prev_f + 1) and (labels == prev_labels): | |
| prev_f = f | |
| else: | |
| grouped_label_spans.append((span_start, prev_f + 1, prev_labels)) | |
| span_start, prev_f, prev_labels = f, f, labels | |
| if prev_f is not None and prev_labels: | |
| grouped_label_spans.append((span_start, prev_f + 1, prev_labels)) | |
| # Build ASS subtitle file (top-right) | |
| def ass_time(t_sec: float) -> str: | |
| cs = int(round(t_sec * 100)) | |
| h = cs // (100 * 3600) | |
| m = (cs // (100 * 60)) % 60 | |
| s = (cs // 100) % 60 | |
| cs = cs % 100 | |
| return f"{h}:{m:02d}:{s:02d}.{cs:02d}" | |
| def make_ass(width: int, height: int) -> str: | |
| lines = [] | |
| lines.append("[Script Info]") | |
| lines.append("ScriptType: v4.00+") | |
| lines.append("ScaledBorderAndShadow: yes") | |
| lines.append(f"PlayResX: {width}") | |
| lines.append(f"PlayResY: {height}") | |
| lines.append("") | |
| lines.append("[V4+ Styles]") | |
| lines.append("Format: Name, Fontname, Fontsize, PrimaryColour, SecondaryColour, OutlineColour, BackColour, " | |
| "Bold, Italic, Underline, StrikeOut, ScaleX, ScaleY, Spacing, Angle, BorderStyle, Outline, " | |
| "Shadow, Alignment, MarginL, MarginR, MarginV, Encoding") | |
| # Font size 18 per your request; Alignment=9 (top-right) | |
| lines.append("Style: Default,DejaVu Sans,18,&H00FFFFFF,&H000000FF,&H00000000,&H64000000," | |
| "0,0,0,0,100,100,0,0,1,2,0.8,9,16,16,16,1") | |
| lines.append("") | |
| lines.append("[Events]") | |
| lines.append("Format: Layer, Start, End, Style, Name, MarginL, MarginR, MarginV, Effect, Text") | |
| for start_f, end_f, labels in grouped_label_spans: | |
| if not labels: | |
| continue | |
| start_t = ass_time(start_f / fps) | |
| end_t = ass_time(end_f / fps) | |
| text = r"\N".join(labels) # stacked lines | |
| lines.append(f"Dialogue: 0,{start_t},{end_t},Default,,0,0,0,,{text}") | |
| return "\n".join(lines) | |
| tmp_dir = tempfile.mkdtemp(prefix="props_ass_") | |
| ass_path = os.path.join(tmp_dir, "props.ass") | |
| with open(ass_path, "w", encoding="utf-8") as f: | |
| f.write(make_ass(width, height)) | |
| # Build trim/concat ranges from requested frame_indices | |
| ranges = _group_ranges(frame_indices) | |
| # Filtergraph with burned subtitles then trim/concat | |
| split_labels = [f"[s{i}]" for i in range(len(ranges))] if ranges else [] | |
| out_labels = [f"[v{i}]" for i in range(len(ranges))] if ranges else [] | |
| filters = [] | |
| ass_arg = ass_path.replace("\\", "\\\\") | |
| filters.append(f"[0:v]subtitles='{ass_arg}'[sub]") | |
| if len(ranges) == 1: | |
| s0, e0 = ranges[0] | |
| filters.append(f"[sub]trim=start_frame={s0}:end_frame={e0},setpts=PTS-STARTPTS[v0]") | |
| else: | |
| if ranges: | |
| filters.append(f"[sub]split={len(ranges)}{''.join(split_labels)}") | |
| for i, (s, e) in enumerate(ranges): | |
| filters.append(f"{split_labels[i]}trim=start_frame={s}:end_frame={e},setpts=PTS-STARTPTS{out_labels[i]}") | |
| if ranges: | |
| filters.append(f"{''.join(out_labels)}concat=n={len(ranges)}:v=1:a=0[outv]") | |
| filter_complex = "; ".join(filters) | |
| cmd = [ | |
| "ffmpeg", "-y", | |
| "-i", input_path, | |
| "-filter_complex", filter_complex, | |
| "-map", "[outv]" if ranges else "[sub]", | |
| "-c:v", "libx264", "-preset", "fast", "-crf", "23", | |
| output_path, | |
| ] | |
| try: | |
| subprocess.run(cmd, check=True) | |
| finally: | |
| try: | |
| os.remove(ass_path) | |
| os.rmdir(tmp_dir) | |
| except OSError: | |
| pass | |
| def _format_prop_ranges(prop_matrix: Dict[str, List[int]]) -> str: | |
| def group_into_ranges(frames: Iterable[int]) -> List[Tuple[int, int]]: | |
| f = sorted(set(int(x) for x in frames)) | |
| if not f: | |
| return [] | |
| ranges: List[Tuple[int, int]] = [] | |
| s = p = f[0] | |
| for x in f[1:]: | |
| if x == p + 1: | |
| p = x | |
| else: | |
| ranges.append((s, p)) # inclusive end for display | |
| s = p = x | |
| ranges.append((s, p)) | |
| return ranges | |
| if not prop_matrix: | |
| return "No propositions detected." | |
| lines = [] | |
| for prop, frames in prop_matrix.items(): | |
| ranges = group_into_ranges(frames) | |
| pretty = prop.replace("_", " ").title() | |
| if not ranges: | |
| lines.append(f"{pretty}: —") | |
| continue | |
| parts = [f"{a}" if a == b else f"{a}-{b}" for (a, b) in ranges] | |
| lines.append(f"{pretty}: {', '.join(parts)}") | |
| return "\n".join(lines) | |
| # ----------------------------- | |
| # Gradio handler | |
| # ----------------------------- | |
| def run_pipeline(input_video, mode, query_text, propositions_json, specification_text): | |
| """ | |
| Returns: (cropped_video_path, prop_ranges_text, tl_text) | |
| """ | |
| def _err(msg, width=320, height=240): # keep outputs shape consistent | |
| tmp_out = os.path.join("/tmp", f"empty_{uuid.uuid4().hex}.mp4") | |
| _make_empty_video(tmp_out, width=width, height=height, fps=1.0) | |
| return ( | |
| tmp_out, | |
| "No propositions detected.", | |
| f"Error: {msg}" | |
| ) | |
| # Resolve video path | |
| if isinstance(input_video, dict) and "name" in input_video: | |
| video_path = input_video["name"] | |
| elif isinstance(input_video, str): | |
| video_path = input_video | |
| else: | |
| return _err("Please provide a video.") | |
| # Build entry | |
| if mode == "Natural language query": | |
| if not query_text or not query_text.strip(): | |
| return _err("Please enter a query.") | |
| entry = _load_entry_from_reader(video_path, query_text) | |
| else: | |
| if not (propositions_json and propositions_json.strip()) or not (specification_text and specification_text.strip()): | |
| return _err("Please provide both Propositions (array) and Specification.") | |
| entry = _load_entry_from_reader(video_path, "dummy-query") | |
| try: | |
| props = json.loads(propositions_json) | |
| if not isinstance(props, list): | |
| return _err("Propositions must be a JSON array.") | |
| except Exception as e: | |
| return _err(f"Failed to parse propositions JSON: {e}") | |
| entry["tl"] = { | |
| "propositions": props, | |
| "specification": specification_text | |
| } | |
| # Compute FOI | |
| try: | |
| foi, prop_matrix = process_entry(entry) # list of frame indices & {prop: [frames]} | |
| print(foi) | |
| print(prop_matrix) | |
| except Exception as e: | |
| return _err(f"Processing error: {e}") | |
| # Write cropped video | |
| try: | |
| out_path = os.path.join("/tmp", f"cropped_{uuid.uuid4().hex}.mp4") | |
| _crop_video(video_path, out_path, foi, prop_matrix) | |
| print(f"Wrote cropped video to: {out_path}") | |
| except Exception as e: | |
| return _err(f"Failed to write cropped video: {e}") | |
| # Build right-side text sections | |
| prop_ranges_text = _format_prop_ranges(prop_matrix) | |
| tl_text = ( | |
| f"Propositions: {json.dumps(entry['tl']['propositions'], ensure_ascii=False)}\n" | |
| f"Specification: {entry['tl']['specification']}" | |
| ) | |
| return out_path, prop_ranges_text, tl_text | |
| # ----------------------------- | |
| # UI | |
| # ----------------------------- | |
| with gr.Blocks(css=""" | |
| #io-col {display: flex; gap: 1rem;} | |
| #left {flex: 1;} | |
| #right {flex: 1;} | |
| """, title="NSVS-TL") as demo: | |
| gr.Markdown("# Neuro-Symbolic Visual Search with Temporal Logic") | |
| gr.Markdown( | |
| "Upload a video and either provide a natural-language **Query** *or* directly supply **Propositions** (array) + **Specification**. " | |
| "On the right, you'll get a **cropped video** containing only the frames of interest, a **Propositions by Frames** summary, and the combined TL summary." | |
| ) | |
| with gr.Row(elem_id="io-col"): | |
| with gr.Column(elem_id="left"): | |
| mode = gr.Radio( | |
| choices=["Natural language query", "Props/Spec"], | |
| value="Natural language query", | |
| label="Input mode" | |
| ) | |
| video = gr.Video(label="Upload Video") | |
| query = gr.Textbox( | |
| label="Query (natural language)", | |
| placeholder="e.g., a man is jumping and panting until he falls down" | |
| ) | |
| propositions = gr.Textbox( | |
| label="Propositions (JSON array)", | |
| placeholder='e.g., ["man_jumps", "man_pants", "man_falls_down"]', | |
| lines=4, | |
| visible=False | |
| ) | |
| specification = gr.Textbox( | |
| label="Specification", | |
| placeholder='e.g., ("woman_jumps" & "woman_claps") U "candle_is_blown"', | |
| visible=False | |
| ) | |
| def _toggle_fields(m): | |
| if m == "Natural language query": | |
| return gr.update(visible=True), gr.update(visible=False), gr.update(visible=False) | |
| else: | |
| return gr.update(visible=False), gr.update(visible=True), gr.update(visible=True) | |
| mode.change(_toggle_fields, inputs=[mode], outputs=[query, propositions, specification]) | |
| run_btn = gr.Button("Run", variant="primary") | |
| gr.Examples( | |
| label="Examples (dummy paths + queries)", | |
| examples=[ | |
| ["demo_videos/dog_jump.mp4", "a dog jumps until a red tube is in view"], | |
| ["demo_videos/blue_shirt.mp4", "a girl in a green shirt until a candle is blown"], | |
| ["demo_videos/car.mp4", "red car until a truck"] | |
| ], | |
| inputs=[video, query], | |
| cache_examples=False | |
| ) | |
| with gr.Column(elem_id="right"): | |
| cropped_video = gr.Video(label="Cropped Video (Frames of Interest Only)") | |
| prop_ranges_out = gr.Textbox( | |
| label="Propositions by Frames", | |
| lines=6, | |
| interactive=False | |
| ) | |
| tl_out = gr.Textbox( | |
| label="TL (Propositions & Specification)", | |
| lines=8, | |
| interactive=False | |
| ) | |
| run_btn.click( | |
| fn=run_pipeline, | |
| inputs=[video, mode, query, propositions, specification], | |
| outputs=[cropped_video, prop_ranges_out, tl_out] | |
| ) | |
| if __name__ == "__main__": | |
| demo.launch(server_name="0.0.0.0", server_port=7860) | |