| import gradio as gr |
| from azure.storage.blob import BlobServiceClient, BlobClient |
| import os |
| import cv2 |
| import tempfile |
| from ultralytics import YOLO |
| import logging |
| from datetime import datetime |
|
|
| |
| logging.basicConfig(level=logging.INFO) |
| logger = logging.getLogger(__name__) |
|
|
| |
| AZURE_ACCOUNT_NAME = "assentian" |
| AZURE_SAS_TOKEN = "sv=2024-11-04&ss=bfqt&srt=sco&sp=rwdlacupiytfx&se=2025-04-30T04:25:22Z&st=2025-04-16T20:25:22Z&spr=https&sig=HYrJBoOYc4PRe%2BoqBMl%2FmoL5Kz4ZYugbTLuEh63sbeo%3D" |
| CONTAINER_NAME = "logs" |
| VIDEO_PREFIX = "" |
|
|
| |
| try: |
| YOLO_MODEL = YOLO("./best_yolov11.pt") |
| logger.info("YOLO model loaded successfully") |
| except Exception as e: |
| logger.error(f"Failed to load YOLO model: {e}") |
| raise |
|
|
| |
| def get_blob_service_client(): |
| return BlobServiceClient( |
| account_url=f"https://{AZURE_ACCOUNT_NAME}.blob.core.windows.net", |
| credential=AZURE_SAS_TOKEN |
| ) |
|
|
| def list_azure_videos(): |
| try: |
| blob_service_client = get_blob_service_client() |
| container_client = blob_service_client.get_container_client(CONTAINER_NAME) |
| blobs = container_client.list_blobs(name_starts_with=VIDEO_PREFIX) |
| return [blob.name for blob in blobs if blob.name.lower().endswith(".mp4")] |
| except Exception as e: |
| logger.error(f"Error listing videos: {e}") |
| return [] |
|
|
| def get_latest_azure_video(): |
| try: |
| blob_service_client = get_blob_service_client() |
| container_client = blob_service_client.get_container_client(CONTAINER_NAME) |
| blobs = container_client.list_blobs(name_starts_with=VIDEO_PREFIX) |
| |
| latest_blob = None |
| latest_time = None |
| |
| for blob in blobs: |
| if blob.name.lower().endswith(".mp4"): |
| blob_client = container_client.get_blob_client(blob.name) |
| properties = blob_client.get_blob_properties() |
| if not latest_time or properties.last_modified > latest_time: |
| latest_time = properties.last_modified |
| latest_blob = blob.name |
| |
| return latest_blob if latest_blob else None |
| except Exception as e: |
| logger.error(f"Error finding latest video: {e}") |
| return None |
|
|
| def download_azure_video(blob_name): |
| try: |
| blob_service_client = get_blob_service_client() |
| blob_client = blob_service_client.get_blob_client( |
| container=CONTAINER_NAME, |
| blob=blob_name |
| ) |
| |
| with tempfile.NamedTemporaryFile(suffix=".mp4", delete=False) as temp_file: |
| download_stream = blob_client.download_blob() |
| temp_file.write(download_stream.readall()) |
| return temp_file.name |
| |
| except Exception as e: |
| logger.error(f"Download failed: {e}") |
| return None |
|
|
| def annotate_video(input_video_path): |
| try: |
| if not input_video_path or not os.path.exists(input_video_path): |
| logger.error("Invalid input video path") |
| return None |
|
|
| cap = cv2.VideoCapture(input_video_path) |
| if not cap.isOpened(): |
| logger.error("Failed to open video file") |
| return None |
|
|
| |
| frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) |
| frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) |
| fps = cap.get(cv2.CAP_PROP_FPS) |
| |
| with tempfile.NamedTemporaryFile(suffix=".mp4", delete=False) as temp_output: |
| output_path = temp_output.name |
| |
| fourcc = cv2.VideoWriter_fourcc(*'mp4v') |
| writer = cv2.VideoWriter(output_path, fourcc, fps, (frame_width, frame_height)) |
|
|
| |
| while cap.isOpened(): |
| ret, frame = cap.read() |
| if not ret: |
| break |
|
|
| |
| results = YOLO_MODEL(frame) |
| class_counts = {} |
|
|
| for result in results: |
| for box in result.boxes: |
| cls_id = int(box.cls[0]) |
| conf = float(box.conf[0]) |
| if conf < 0.5: |
| continue |
|
|
| |
| x1, y1, x2, y2 = map(int, box.xyxy[0]) |
| class_name = YOLO_MODEL.names[cls_id] |
| color = (0, 255, 0) |
| |
| |
| cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2) |
| |
| |
| label = f"{class_name} {conf:.2f}" |
| cv2.putText(frame, label, (x1, y1 - 10), |
| cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 2) |
| |
| |
| class_counts[class_name] = class_counts.get(class_name, 0) + 1 |
|
|
| |
| summary_text = " | ".join([f"{k}: {v}" for k, v in class_counts.items()]) |
| cv2.putText(frame, summary_text, (10, 30), |
| cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 255), 2) |
| |
| writer.write(frame) |
|
|
| |
| cap.release() |
| writer.release() |
| os.remove(input_video_path) |
| |
| return output_path |
|
|
| except Exception as e: |
| logger.error(f"Annotation failed: {e}") |
| if 'cap' in locals(): cap.release() |
| if 'writer' in locals(): writer.release() |
| return None |
|
|
| def process_video(blob_name): |
| try: |
| local_path = download_azure_video(blob_name) |
| if not local_path: |
| return None |
| return annotate_video(local_path) |
| except Exception as e: |
| logger.error(f"Processing failed: {e}") |
| return None |
|
|
| |
| with gr.Blocks(title="PRISM Video Annotator", theme=gr.themes.Soft()) as demo: |
| gr.Markdown("# 🎥 PRISM Site Diary - Video Analyzer") |
| |
| with gr.Row(): |
| with gr.Column(scale=1): |
| gr.Markdown("## Azure Storage Controls") |
| refresh_btn = gr.Button("🔄 Refresh Video List", variant="secondary") |
| video_dropdown = gr.Dropdown( |
| label="Available Videos", |
| choices=list_azure_videos(), |
| interactive=True |
| ) |
| latest_btn = gr.Button("⏩ Process Latest Video", variant="primary") |
| selected_btn = gr.Button("✅ Process Selected Video", variant="primary") |
| |
| with gr.Column(scale=2): |
| gr.Markdown("## Annotated Output") |
| output_video = gr.Video( |
| label="Processed Video", |
| format="mp4", |
| interactive=False |
| ) |
| status = gr.Textbox(label="Processing Status") |
|
|
| def update_ui(): |
| new_choices = list_azure_videos() |
| return gr.Dropdown.update(choices=new_choices) |
|
|
| def handle_latest(): |
| latest = get_latest_azure_video() |
| if latest: |
| output = process_video(latest) |
| return output if output else None |
| return None |
|
|
| |
| refresh_btn.click( |
| fn=update_ui, |
| outputs=video_dropdown, |
| queue=False |
| ) |
| |
| latest_btn.click( |
| fn=handle_latest, |
| outputs=output_video, |
| api_name="process_latest" |
| ) |
| |
| selected_btn.click( |
| fn=lambda x: process_video(x), |
| inputs=video_dropdown, |
| outputs=output_video, |
| api_name="process_selected" |
| ) |
|
|
| if __name__ == "__main__": |
| demo.launch( |
| server_name="0.0.0.0", |
| server_port=7860, |
| show_error=True, |
| share=False |
| ) |