Spaces:
Running
Running
| import os | |
| import sys | |
| import time | |
| # Add project root to sys.path | |
| sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) | |
| from suspicious_behavior.pipeline.frame_analyzer import FrameAnalyzer | |
| from suspicious_behavior.pipeline.video_processor import VideoProcessor | |
| def main(): | |
| video_path = "samples/FullSizeRender.mov" | |
| output_dir = "outputs" | |
| output_path = os.path.join(output_dir, "FullSizeRender_annotated.mp4") | |
| os.makedirs(output_dir, exist_ok=True) | |
| if not os.path.exists(video_path): | |
| print(f"Error: Sample video not found at {video_path}") | |
| sys.exit(1) | |
| print(f"[Validation] Opening video file: {video_path}...") | |
| processor = VideoProcessor(target_fps=10.0) | |
| metadata = processor.get_metadata(video_path) | |
| print(f"[Validation] Video metadata: {metadata}") | |
| print("[Validation] Initializing FrameAnalyzer (loading models)...") | |
| analyzer = FrameAnalyzer(camera_id="val_camera_1", violence_stride=8) | |
| processed_frames = [] | |
| latencies = [] | |
| all_alerts = [] | |
| print("[Validation] Starting frame-by-frame analysis...") | |
| start_total_time = time.time() | |
| try: | |
| for frame_idx, frame, timestamp in processor.extract_frames_generator(video_path): | |
| t0 = time.time() | |
| annotated, alerts, frame_meta = analyzer.analyze(frame, fps=10.0, output_base64=False) | |
| t_elapsed = (time.time() - t0) * 1000.0 | |
| latencies.append(t_elapsed) | |
| processed_frames.append(annotated) | |
| for alert in alerts: | |
| all_alerts.append((frame_idx, alert)) | |
| # Print frame summary every 10 processed frames | |
| if len(processed_frames) % 10 == 0: | |
| print( | |
| f"Frame {frame_idx:04d} ({timestamp:.2f}s) | " | |
| f"Latency: {t_elapsed:.1f}ms | " | |
| f"People: {frame_meta['active_tracks_count']} | " | |
| f"Threats: {frame_meta['threats_detected']}" | |
| ) | |
| except KeyboardInterrupt: | |
| print("[Validation] Interrupted by user. Compiling what we have...") | |
| total_time = time.time() - start_total_time | |
| if not latencies: | |
| print("[Validation] No frames processed.") | |
| sys.exit(1) | |
| avg_latency = sum(latencies) / len(latencies) | |
| throughput_fps = len(processed_frames) / total_time | |
| print("\n--- Validation Statistics ---") | |
| print(f"Frames Processed: {len(processed_frames)}") | |
| print(f"Total Analysis Time: {total_time:.2f} seconds") | |
| print(f"System Throughput: {throughput_fps:.2f} FPS") | |
| print(f"Average Frame Latency: {avg_latency:.1f} ms") | |
| print(f"Active Alerts Triggered: {len(all_alerts)}") | |
| for f_idx, alert in all_alerts: | |
| print(f" - Frame {f_idx}: [{alert.severity}] {alert.threat_type} ({alert.confidence*100:.1f}%)") | |
| print(f"\n[Validation] Compiling output video to: {output_path}...") | |
| processor.write_frames_to_video( | |
| processed_frames, | |
| output_path, | |
| fps=10.0, | |
| frame_size=(metadata['width'], metadata['height']) | |
| ) | |
| print("[Validation] Finished successfully!") | |
| if __name__ == "__main__": | |
| main() | |