| """ |
| Streaming anomaly detection for the M3 predictor. |
| Rolling 15-frame buffer -> predict next frame -> per-frame anomaly score (+ heatmap). |
| """ |
|
|
| import os |
| import glob |
| import numpy as np |
| import cv2 |
| import onnxruntime as ort |
| from collections import deque |
| from src.data.video_transforms import transform |
| import torch |
|
|
|
|
| class AnomalyStream: |
| def __init__(self, onnx_path: str, buffer_size: int = 15): |
| self.sess = ort.InferenceSession(onnx_path) |
| self.input_name = self.sess.get_inputs()[0].name |
| self.buffer_size = buffer_size |
| self.buffer = deque(maxlen=buffer_size) |
|
|
| def preprocess(self, frame_bgr: np.ndarray) -> torch.Tensor: |
| """ |
| Raw video frame (H,W,3 BGR) -> training format (1, H, W) [-1,1] grayscale 128x128. |
| """ |
| |
| gray = cv2.cvtColor(frame_bgr, cv2.COLOR_BGR2GRAY) |
|
|
| |
| gray_t = torch.from_numpy(gray).float() / 255.0 |
|
|
| |
| gray_t = gray_t.unsqueeze(0).unsqueeze(0) |
|
|
| |
| gray_t = transform(gray_t) |
|
|
| |
| return gray_t.squeeze(0) |
|
|
| def push(self, frame_bgr: np.ndarray): |
| """ |
| Add one frame. Returns (score, heatmap) or None if still warming up. |
| """ |
| frame_t = self.preprocess(frame_bgr) |
|
|
| |
| if len(self.buffer) < self.buffer_size: |
| self.buffer.append(frame_t) |
| return None |
|
|
| |
| |
| stacked = torch.stack(list(self.buffer)) |
| inp = stacked.unsqueeze(0).numpy() |
| pred = self.sess.run(None, {self.input_name: inp})[0] |
|
|
| |
| actual = frame_t.numpy()[None, ...] |
|
|
| |
| error_map = (pred - actual) ** 2 |
| heatmap = error_map[0, 0] |
| score = float(error_map.mean()) |
|
|
| |
| self.buffer.append(frame_t) |
|
|
| return score, heatmap, frame_t.numpy()[0] |
|
|
|
|
| def process_video(video_path, onnx_path, top_n=5): |
| """Run the stream over a video file, collect per-frame scores.""" |
| stream = AnomalyStream(onnx_path) |
| cap = cv2.VideoCapture(video_path) |
|
|
| scores = [] |
| scored_records = [] |
|
|
| |
| fps = cap.get(cv2.CAP_PROP_FPS) |
| |
| frame_idx = 0 |
| while True: |
| ret, frame = cap.read() |
| if not ret: |
| break |
| result = stream.push(frame) |
| if result is None: |
| scores.append(None) |
| else: |
| score, heatmap, frame_img = result |
| scores.append(score) |
| scored_records.append((score, frame_idx, heatmap, frame_img)) |
| frame_idx += 1 |
| cap.release() |
|
|
| |
| top = sorted(scored_records, key=lambda r: r[0], reverse=True)[:top_n] |
| top_anomalies = [ |
| {"frame_idx": idx, "score": float(s), "heatmap": hmap, "frame": img} |
| for (s, idx, hmap, img) in top |
| ] |
|
|
| return scores, top_anomalies, fps |
|
|
|
|
| def process_frames(frame_dir: str, onnx_path: str): |
| """ |
| Run the stream over a directory of .tif frames (UCSD format). |
| Mirrors process_video but reads ordered image files instead of decoding video. |
| Used to verify the streaming pipeline matches eval scoring. |
| """ |
| stream = AnomalyStream(onnx_path) |
|
|
| |
| frame_paths = sorted(glob.glob(os.path.join(frame_dir, "*.tif"))) |
|
|
| scores = [] |
| for path in frame_paths: |
| |
| frame = cv2.imread(path) |
| result = stream.push(frame) |
| if result is None: |
| scores.append(None) |
| else: |
| score, heatmap = result |
| scores.append(score) |
|
|
| return scores |
|
|
|
|
| if __name__ == "__main__": |
| |
| onnx_path = "checkpoints/model.onnx" |
| clip_dir = "data/ucsd/raw/UCSDped2/Test/Test001" |
|
|
| scores, top = process_video("/tmp/test001.mp4", "checkpoints/model.onnx", top_n=5) |
| print(f"scored: {len([s for s in scores if s is not None])}, top anomalies: {len(top)}") |
| for t in top: |
| print(f" frame {t['frame_idx']}: score {t['score']:.6e}, heatmap {t['heatmap'].shape}, frame {t['frame'].shape}") |