import gradio as gr from azure.storage.blob import BlobServiceClient, BlobClient import os import cv2 import tempfile from ultralytics import YOLO import logging from datetime import datetime # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Azure Storage Configuration AZURE_ACCOUNT_NAME = "assentian" AZURE_SAS_TOKEN = "sv=2024-11-04&ss=bfqt&srt=sco&sp=rwdlacupiytfx&se=2025-04-30T04:25:22Z&st=2025-04-16T20:25:22Z&spr=https&sig=HYrJBoOYc4PRe%2BoqBMl%2FmoL5Kz4ZYugbTLuEh63sbeo%3D" CONTAINER_NAME = "logs" VIDEO_PREFIX = "" # Initialize YOLO Model try: YOLO_MODEL = YOLO("./best_yolov11.pt") logger.info("YOLO model loaded successfully") except Exception as e: logger.error(f"Failed to load YOLO model: {e}") raise # Azure Blob Service Client def get_blob_service_client(): return BlobServiceClient( account_url=f"https://{AZURE_ACCOUNT_NAME}.blob.core.windows.net", credential=AZURE_SAS_TOKEN ) def list_azure_videos(): try: blob_service_client = get_blob_service_client() container_client = blob_service_client.get_container_client(CONTAINER_NAME) blobs = container_client.list_blobs(name_starts_with=VIDEO_PREFIX) return [blob.name for blob in blobs if blob.name.lower().endswith(".mp4")] except Exception as e: logger.error(f"Error listing videos: {e}") return [] def get_latest_azure_video(): try: blob_service_client = get_blob_service_client() container_client = blob_service_client.get_container_client(CONTAINER_NAME) blobs = container_client.list_blobs(name_starts_with=VIDEO_PREFIX) latest_blob = None latest_time = None for blob in blobs: if blob.name.lower().endswith(".mp4"): blob_client = container_client.get_blob_client(blob.name) properties = blob_client.get_blob_properties() if not latest_time or properties.last_modified > latest_time: latest_time = properties.last_modified latest_blob = blob.name return latest_blob if latest_blob else None except Exception as e: logger.error(f"Error finding latest video: {e}") return None def download_azure_video(blob_name): try: blob_service_client = get_blob_service_client() blob_client = blob_service_client.get_blob_client( container=CONTAINER_NAME, blob=blob_name ) with tempfile.NamedTemporaryFile(suffix=".mp4", delete=False) as temp_file: download_stream = blob_client.download_blob() temp_file.write(download_stream.readall()) return temp_file.name except Exception as e: logger.error(f"Download failed: {e}") return None def annotate_video(input_video_path): try: if not input_video_path or not os.path.exists(input_video_path): logger.error("Invalid input video path") return None cap = cv2.VideoCapture(input_video_path) if not cap.isOpened(): logger.error("Failed to open video file") return None # Video writer setup frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) fps = cap.get(cv2.CAP_PROP_FPS) with tempfile.NamedTemporaryFile(suffix=".mp4", delete=False) as temp_output: output_path = temp_output.name fourcc = cv2.VideoWriter_fourcc(*'mp4v') writer = cv2.VideoWriter(output_path, fourcc, fps, (frame_width, frame_height)) # Processing loop while cap.isOpened(): ret, frame = cap.read() if not ret: break # YOLO inference results = YOLO_MODEL(frame) class_counts = {} for result in results: for box in result.boxes: cls_id = int(box.cls[0]) conf = float(box.conf[0]) if conf < 0.5: continue # Bounding box x1, y1, x2, y2 = map(int, box.xyxy[0]) class_name = YOLO_MODEL.names[cls_id] color = (0, 255, 0) # BGR format # Draw rectangle cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2) # Text label label = f"{class_name} {conf:.2f}" cv2.putText(frame, label, (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 2) # Update counts class_counts[class_name] = class_counts.get(class_name, 0) + 1 # Add summary overlay summary_text = " | ".join([f"{k}: {v}" for k, v in class_counts.items()]) cv2.putText(frame, summary_text, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 255), 2) writer.write(frame) # Cleanup cap.release() writer.release() os.remove(input_video_path) return output_path except Exception as e: logger.error(f"Annotation failed: {e}") if 'cap' in locals(): cap.release() if 'writer' in locals(): writer.release() return None def process_video(blob_name): try: local_path = download_azure_video(blob_name) if not local_path: return None return annotate_video(local_path) except Exception as e: logger.error(f"Processing failed: {e}") return None # Gradio Interface with gr.Blocks(title="PRISM Video Annotator", theme=gr.themes.Soft()) as demo: gr.Markdown("# 🎥 PRISM Site Diary - Video Analyzer") with gr.Row(): with gr.Column(scale=1): gr.Markdown("## Azure Storage Controls") refresh_btn = gr.Button("🔄 Refresh Video List", variant="secondary") video_dropdown = gr.Dropdown( label="Available Videos", choices=list_azure_videos(), interactive=True ) latest_btn = gr.Button("⏩ Process Latest Video", variant="primary") selected_btn = gr.Button("✅ Process Selected Video", variant="primary") with gr.Column(scale=2): gr.Markdown("## Annotated Output") output_video = gr.Video( label="Processed Video", format="mp4", interactive=False ) status = gr.Textbox(label="Processing Status") def update_ui(): new_choices = list_azure_videos() return gr.Dropdown.update(choices=new_choices) def handle_latest(): latest = get_latest_azure_video() if latest: output = process_video(latest) return output if output else None return None # Event handlers refresh_btn.click( fn=update_ui, outputs=video_dropdown, queue=False ) latest_btn.click( fn=handle_latest, outputs=output_video, api_name="process_latest" ) selected_btn.click( fn=lambda x: process_video(x), inputs=video_dropdown, outputs=output_video, api_name="process_selected" ) if __name__ == "__main__": demo.launch( server_name="0.0.0.0", server_port=7860, show_error=True, share=False )