| | import gradio as gr |
| | import os |
| | import numpy as np |
| | |
| | |
| | from pathlib import Path |
| | import sys |
| | import asyncio |
| | import json |
| | if sys.platform == "win32": |
| | asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy()) |
| |
|
| | |
| | def get_absolute_path(relative_path): |
| | """Convert relative path to absolute path""" |
| | return os.path.abspath(relative_path) |
| |
|
| | def check_file_exists(file_path): |
| | """Check if file exists and return absolute path or None""" |
| | abs_path = get_absolute_path(file_path) |
| | if os.path.exists(abs_path): |
| | return abs_path |
| | else: |
| | print(f"Warning: File not found: {file_path}") |
| | return None |
| |
|
| | |
| | SAMPLES = {} |
| | for i in range(2, 9): |
| | sample_data = { |
| | "input": f"video/Assault{i:03d}_x264.mp4", |
| | "optical_flow": f"optical_flow/Assault{i:03d}_x264.mp4", |
| | "yolo": f"yolo/Assault{i:03d}_x264.mp4", |
| | "vggt": f"vggt/Assault{i:03d}_x264.mp4", |
| | |
| | "qa": f"qa/Assault{i:03d}_x264.json" |
| | } |
| | |
| | |
| | SAMPLES[i] = {} |
| | for key, path in sample_data.items(): |
| | abs_path = check_file_exists(path) |
| | SAMPLES[i][key] = abs_path |
| |
|
| | def load_qa_data(qa_file): |
| | """Load QA data from JSON file and format for chat display""" |
| | try: |
| | if not qa_file or not os.path.exists(qa_file): |
| | return [] |
| | |
| | with open(qa_file, 'r', encoding='utf-8') as f: |
| | qa_data = json.load(f) |
| | |
| | |
| | chat_history = [] |
| | for qa_pair in qa_data.get('qa_pairs', []): |
| | question = qa_pair.get('question', '') |
| | answer = qa_pair.get('answer', '') |
| | |
| | |
| | chat_history.append([question, answer]) |
| | |
| | return chat_history |
| | except Exception as e: |
| | print(f"Error loading QA data {qa_file}: {e}") |
| | return [] |
| |
|
| | def get_qa_metadata(qa_file): |
| | """Get metadata from QA JSON file""" |
| | try: |
| | if not qa_file or not os.path.exists(qa_file): |
| | return {} |
| | |
| | with open(qa_file, 'r', encoding='utf-8') as f: |
| | qa_data = json.load(f) |
| | |
| | return { |
| | 'video_name': qa_data.get('video_name', ''), |
| | 'timestamp': qa_data.get('timestamp', ''), |
| | 'model_path': qa_data.get('model_path', ''), |
| | 'max_num_frames': qa_data.get('max_num_frames', 0), |
| | 'total_questions': len(qa_data.get('qa_pairs', [])) |
| | } |
| | except Exception as e: |
| | print(f"Error loading QA metadata {qa_file}: {e}") |
| | return {} |
| |
|
| | def load_point_cloud_plotly(pcd_file): |
| | """Load point cloud and create a 3D plotly visualization""" |
| | try: |
| | if not pcd_file or not os.path.exists(pcd_file): |
| | return None |
| | |
| | pcd = o3d.io.read_point_cloud(pcd_file) |
| | points = np.asarray(pcd.points) |
| | colors = np.asarray(pcd.colors) if pcd.has_colors() else None |
| | |
| | if len(points) == 0: |
| | return None |
| | |
| | |
| | if len(points) > 10000: |
| | indices = np.random.choice(len(points), 10000, replace=False) |
| | points = points[indices] |
| | if colors is not None: |
| | colors = colors[indices] |
| | |
| | |
| | if colors is not None and len(colors) > 0: |
| | |
| | if colors.max() <= 1.0: |
| | colors = (colors * 255).astype(int) |
| | color_rgb = [f'rgb({r},{g},{b})' for r, g, b in colors] |
| | |
| | fig = go.Figure(data=[go.Scatter3d( |
| | x=points[:, 0], |
| | y=points[:, 1], |
| | z=points[:, 2], |
| | mode='markers', |
| | marker=dict( |
| | size=1.7, |
| | color=color_rgb, |
| | ), |
| | text=[f'Point {i}' for i in range(len(points))], |
| | hovertemplate='<b>Point %{text}</b><br>X: %{x}<br>Y: %{y}<br>Z: %{z}<extra></extra>' |
| | )]) |
| | else: |
| | fig = go.Figure(data=[go.Scatter3d( |
| | x=points[:, 0], |
| | y=points[:, 1], |
| | z=points[:, 2], |
| | mode='markers', |
| | marker=dict( |
| | size=2, |
| | color=points[:, 2], |
| | colorscale='Viridis', |
| | showscale=True |
| | ), |
| | text=[f'Point {i}' for i in range(len(points))], |
| | hovertemplate='<b>Point %{text}</b><br>X: %{x}<br>Y: %{y}<br>Z: %{z}<extra></extra>' |
| | )]) |
| | |
| | fig.update_layout( |
| | title=f'3D Point Cloud Visualization - {os.path.basename(pcd_file)}', |
| | scene=dict( |
| | xaxis_title='X', |
| | yaxis_title='Y', |
| | zaxis_title='Z', |
| | camera=dict( |
| | eye=dict(x=1.5, y=1.5, z=1.5) |
| | ), |
| | bgcolor='rgb(10, 10, 10)', |
| | ), |
| | margin=dict(l=0, r=0, t=50, b=0), |
| | paper_bgcolor='rgb(20, 20, 20)', |
| | plot_bgcolor='rgb(20, 20, 20)', |
| | font=dict(color='white') |
| | ) |
| | |
| | return fig |
| | except Exception as e: |
| | print(f"Error loading point cloud {pcd_file}: {e}") |
| | return None |
| |
|
| | def create_sample_gallery(sample_id): |
| | """Create a gallery view for a specific sample""" |
| | sample = SAMPLES[sample_id] |
| | |
| | |
| | pcd_plot = load_point_cloud_plotly(sample["pcd"]) |
| | |
| | return ( |
| | sample["input"], |
| | sample["optical_flow"], |
| | sample["yolo"], |
| | sample["vggt"], |
| | pcd_plot |
| | ) |
| |
|
| | def create_overview_gallery(): |
| | """Create an overview showing all samples""" |
| | gallery_items = [] |
| | for i in range(1, 6): |
| | sample = SAMPLES[i] |
| | |
| | if sample["input"]: |
| | gallery_items.append((sample["input"], f"Sample {i} - Input")) |
| | if sample["optical_flow"]: |
| | gallery_items.append((sample["optical_flow"], f"Sample {i} - Optical Flow")) |
| | if sample["yolo"]: |
| | gallery_items.append((sample["yolo"], f"Sample {i} - YOLO")) |
| | if sample["vggt"]: |
| | gallery_items.append((sample["vggt"], f"Sample {i} - VGGT")) |
| | return gallery_items |
| |
|
| | |
| | custom_css = """ |
| | # .gradio-container { |
| | # max-width: 1200px !important; |
| | # } |
| | .gallery-item { |
| | border-radius: 10px; |
| | } |
| | h1 { |
| | text-align: center; |
| | color: #2c3e50; |
| | margin-bottom: 30px; |
| | } |
| | .tab-nav { |
| | margin-bottom: 20px; |
| | } |
| | .qa-section-header { |
| | font-size: 1.2em; |
| | color: #2c3e50; |
| | margin-top: 20px; |
| | } |
| | .qa-metadata { |
| | background-color: #f8f9fa; |
| | padding: 15px; |
| | border-radius: 8px; |
| | border-left: 4px solid #007bff; |
| | } |
| | .qa-info { |
| | background-color: #e7f3ff; |
| | padding: 10px; |
| | border-radius: 5px; |
| | font-style: italic; |
| | } |
| | """ |
| |
|
| | |
| | with gr.Blocks(css=custom_css, title="Anomalous Event Detection") as demo: |
| | gr.Markdown("# ๐ฅ Results Gallery") |
| | |
| | with gr.Tabs() as tabs: |
| | |
| | for i in range(2, 9): |
| | with gr.Tab(f"๐ฌ Sample {i-1}"): |
| | gr.Markdown(f"## Sample {i-1} - Detailed View") |
| | |
| | sample = SAMPLES[i] |
| | |
| | with gr.Row(): |
| | |
| | with gr.Column(scale=1): |
| | gr.Markdown("### ๐น Input Video") |
| | if sample["input"]: |
| | input_video = gr.Video( |
| | value=sample["input"], |
| | label="Original Input", |
| | show_label=True |
| | ) |
| | else: |
| | gr.Markdown("โ Input video not found") |
| | |
| | |
| | with gr.Column(scale=1, min_width=400): |
| | gr.Markdown("### ๐ฌ Q&A Chat History") |
| | |
| | if sample["qa"]: |
| | |
| | qa_metadata = get_qa_metadata(sample["qa"]) |
| | if qa_metadata: |
| | gr.Markdown(f""" |
| | **๐ Chat Session Info:** |
| | - **Video:** {qa_metadata.get('video_name', 'N/A')} |
| | - **Total Questions:** {qa_metadata.get('total_questions', 0)} |
| | - **Max Frames:** {qa_metadata.get('max_num_frames', 0)} |
| | - **Timestamp:** {qa_metadata.get('timestamp', 'N/A')[:19].replace('T', ' ')} |
| | """) |
| | |
| | |
| | qa_history = load_qa_data(sample["qa"]) |
| | if qa_history: |
| | chatbot = gr.Chatbot( |
| | value=qa_history, |
| | label="Video Analysis Q&A", |
| | show_label=True, |
| | height=500, |
| | avatar_images=["๐ค", "๐ค"] |
| | ) |
| | |
| | gr.Markdown(""" |
| | ๐ก **About this Q&A:** Questions asked by humans about the video content and answers from an AI model trained for video analysis. |
| | """) |
| | else: |
| | gr.Markdown("โ No Q&A data available for this sample") |
| | else: |
| | gr.Markdown("โ Q&A file not found for this sample") |
| | |
| | with gr.Row(): |
| | with gr.Column(): |
| | gr.Markdown("### ๐ฎ VGGT") |
| |
|
| | if sample["vggt"]: |
| | vggt_video = gr.Video( |
| | value=sample["vggt"], |
| | label="VGGT Processing", |
| | show_label=True |
| | ) |
| | else: |
| | gr.Markdown("โ VGGT video not found") |
| | |
| | with gr.Column(): |
| | pass |
| | |
| |
|
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | with gr.Row(): |
| | with gr.Column(scale=2): |
| | |
| | with gr.Row(): |
| | with gr.Column(): |
| | gr.Markdown("### ๐ Optical Flow") |
| | if sample["optical_flow"]: |
| | optical_flow_video = gr.Video( |
| | value=sample["optical_flow"], |
| | label="Motion Analysis", |
| | show_label=True |
| | ) |
| | else: |
| | gr.Markdown("โ Optical flow video not found") |
| | |
| | with gr.Column(): |
| | gr.Markdown("### ๐ฏ YOLO Detection") |
| | if sample["yolo"]: |
| | yolo_video = gr.Video( |
| | value=sample["yolo"], |
| | label="Object Detection", |
| | show_label=True |
| | ) |
| | else: |
| | gr.Markdown("โ YOLO video not found") |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| |
|
| |
|
| | if __name__ == "__main__": |
| | |
| | print("=== File Status Check ===") |
| | for i in range(2, 9): |
| | print(f"\nSample {i}:") |
| | for key, path in SAMPLES[i].items(): |
| | status = "โ
Found" if path else "โ Missing" |
| | print(f" {key}: {status}") |
| | |
| | print(f"\n=== Starting Gradio App ===") |
| | demo.launch( |
| | share=True, |
| | show_error=True, |
| | inbrowser=True |
| | ) |
| |
|