Spaces:
Runtime error
Runtime error
File size: 5,158 Bytes
d41a400 47875a1 d41a400 47875a1 d41a400 47875a1 d41a400 47875a1 d41a400 47875a1 d41a400 47875a1 d41a400 47875a1 d41a400 47875a1 d41a400 47875a1 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 | import tqdm
import itertools
import operator
import json
import time
import os
from ns_vfs.nsvs import run_nsvs
from ns_vfs.nsvs_yolo import *
from ns_vfs.video.read_mp4 import Mp4Reader
VIDEOS = [
{
"path": "demo_videos/car.mp4",
"query": "car until truck"
}
]
DEVICE = 7 # GPU device index
OPENAI_SAVE_PATH = ""
OUTPUT_DIR = "output"
import itertools
def fill_in_frame_count(arr, entry):
scale = (entry["video_info"].fps) / (entry["metadata"]["sampling_rate_fps"])
runs = []
for _, grp in itertools.groupby(
sorted(arr),
key=lambda x, c=[0]: (x - (c.__setitem__(0, c[0] + 1) or c[0]))
):
g = list(grp)
runs.append((g[0], g[-1]))
real = []
for start_i, end_i in runs:
a = int(round(start_i * scale))
b = int(round(end_i * scale))
if real and a <= real[-1]:
a = real[-1] + 1
real.extend(range(a, b + 1))
return real
def _fill_in_frame_count_pairs(pairs, entry):
if not pairs:
return []
scale = (entry["video_info"].fps) / (entry["metadata"]["sampling_rate_fps"])
pairs = sorted(pairs, key=lambda t: int(t[0]))
sampled_indices = [int(i) for i, _ in pairs]
runs = []
for _, grp in itertools.groupby(
sampled_indices,
key=lambda x, c=[0]: (x - (c.__setitem__(0, c[0] + 1) or c[0]))
):
g = list(grp)
runs.append((g[0], g[-1]))
idx2bbox = {}
for i, bbox in pairs:
i = int(i)
if i not in idx2bbox:
idx2bbox[i] = bbox
expanded: list[tuple[int, tuple[float, float, float, float]]] = []
last_real = -1
for start_i, end_i in runs:
rep_bbox = idx2bbox.get(start_i)
if rep_bbox is None:
for k in range(start_i, end_i + 1):
if k in idx2bbox:
rep_bbox = idx2bbox[k]
break
if rep_bbox is None:
continue
a = int(round(start_i * scale))
b = int(round(end_i * scale))
if expanded and a <= last_real:
a = last_real + 1
for real_i in range(a, b + 1):
expanded.append((real_i, rep_bbox))
last_real = b
return expanded
def process_entry(entry, run_with_yolo=False, cache_path=""):
"""
VLM path (run_with_yolo=False):
- Returns (foi, object_frame_dict_expanded)
where object_frame_dict_expanded: Dict[str, List[int]] (real frame indices)
YOLO path (run_with_yolo=True):
- Expects run_nsvs_yolo to return (foi, object_frame_bounding_boxes)
where object_frame_bounding_boxes: Dict[str, List[(sample_idx, bbox)]]
- Returns (foi, object_frame_bounding_boxes_expanded)
where each bbox is duplicated across the scaled span to real frames:
Dict[str, List[(real_idx, bbox)]]
"""
if run_with_yolo:
foi, object_frame_bounding_boxes = run_nsvs_yolo(
frames=entry["images"],
proposition=entry['tl']['propositions'],
specification=entry['tl']['specification'],
yolo_cache_path=cache_path,
vlm_detection_threshold=0.35,
)
foi = fill_in_frame_count([i for sub in foi for i in sub], entry)
expanded_boxes = {}
for key, pairs in (object_frame_bounding_boxes or {}).items():
expanded_boxes[key] = _fill_in_frame_count_pairs(pairs, entry)
return foi, expanded_boxes
else:
foi, object_frame_dict = run_nsvs(
frames=entry['images'],
proposition=entry['tl']['propositions'],
specification=entry['tl']['specification'],
model_name="InternVL2-8B",
device=DEVICE
)
foi = fill_in_frame_count([i for sub in foi for i in sub], entry)
object_frame_dict = {key: fill_in_frame_count(value, entry) for key, value in (object_frame_dict or {}).items()}
return foi, object_frame_dict
def main():
reader = Mp4Reader(VIDEOS, OPENAI_SAVE_PATH, sampling_rate_fps=1)
data = reader.read_video()
if not data:
return
# cache_path = preprocess_yolo(entry["images"], model_weights="yolov8n.pt",
# device="cuda:0", out_path="yolo_cache.npz")
with tqdm.tqdm(enumerate(data), total=len(data), desc="Processing entries") as pbar:
for i, entry in pbar:
start_time = time.time()
foi = process_entry(entry, run_with_yolo=True)
end_time = time.time()
processing_time = round(end_time - start_time, 3)
if foi:
output = {
"tl": entry["tl"],
"metadata": entry["metadata"],
"video_info": entry["video_info"].to_dict(),
"frames_of_interest": foi,
"processting_time_seconds": processing_time
}
os.makedirs(OUTPUT_DIR, exist_ok=True)
with open(os.path.join(OUTPUT_DIR, f"output_{i}.json"), "w") as f:
json.dump(output, f, indent=4)
if __name__ == "__main__":
main()
|