import cv2 import torch import gradio as gr import numpy as np import os import json import logging import matplotlib.pyplot as plt import csv import time from datetime import datetime from collections import Counter from typing import List, Dict, Any, Optional from ultralytics import YOLO import piexif import zipfile from reportlab.lib.pagesizes import letter from reportlab.pdfgen import canvas from reportlab.lib.units import inch os.environ["YOLO_CONFIG_DIR"] = "/tmp/Ultralytics" logging.basicConfig(filename="app.log", level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") CAPTURED_FRAMES_DIR = "captured_frames" OUTPUT_DIR = "outputs" FLIGHT_LOG_DIR = "flight_logs" os.makedirs(CAPTURED_FRAMES_DIR, exist_ok=True) os.makedirs(OUTPUT_DIR, exist_ok=True) os.makedirs(FLIGHT_LOG_DIR, exist_ok=True) os.chmod(CAPTURED_FRAMES_DIR, 0o777) os.chmod(OUTPUT_DIR, 0o777) os.chmod(FLIGHT_LOG_DIR, 0o777) log_entries: List[str] = [] detected_counts: List[int] = [] detected_issues: List[str] = [] gps_coordinates: List[List[float]] = [] last_metrics: Dict[str, Any] = {} frame_count: int = 0 SAVE_IMAGE_INTERVAL = 1 DETECTION_CLASSES = ["Longitudinal", "Pothole", "Transverse"] device = "cuda" if torch.cuda.is_available() else "cpu" model = YOLO('./data/best.pt').to(device) if device == "cuda": model.half() def generate_pdf_report(summary: str, screenshots: List[str], log_results: List[str], chart_path: str, map_path: str, pdf_path: str, additional_results: List[str] = None): c = canvas.Canvas(pdf_path, pagesize=letter) width, height = letter margin_left = 0.75 * inch margin_right = 0.75 * inch margin_top = height - 0.75 * inch margin_bottom = 0.75 * inch line_height = 14 max_text_width = width - margin_left - margin_right image_width = 5 * inch image_height = 3.5 * inch def new_page(): nonlocal y_position c.showPage() y_position = margin_top c.setFont("Helvetica", 12) def draw_wrapped_text(text, x, y, max_width): nonlocal y_position words = text.split() line = "" for word in words: test_line = f"{line} {word}".strip() if c.stringWidth(test_line, "Helvetica", 12) <= max_width: line = test_line else: if line: c.drawString(x, y, line) y -= line_height if y < margin_bottom: new_page() y = y_position line = word else: c.drawString(x, y, word) y -= line_height if y < margin_bottom: new_page() y = y_position if line: c.drawString(x, y, line) y -= line_height return y # Title Page y_position = margin_top c.setFont("Helvetica-Bold", 16) c.drawString(margin_left, y_position, "Drone Survey Analysis Report") c.setFont("Helvetica", 12) y_position -= 30 c.drawString(margin_left, y_position, f"Generated on: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") new_page() # Summary Page c.setFont("Helvetica-Bold", 14) c.drawString(margin_left, y_position, "Summary") y_position -= line_height * 1.5 c.setFont("Helvetica", 12) y_position = draw_wrapped_text(summary, margin_left, y_position, max_text_width) y_position -= 20 new_page() # Log Results Page c.setFont("Helvetica-Bold", 14) c.drawString(margin_left, y_position, "Log Results for Top 5 Images") y_position -= line_height * 1.5 c.setFont("Helvetica", 12) for log in log_results[:5]: y_position = draw_wrapped_text(log, margin_left, y_position, max_text_width) y_position -= line_height if y_position < margin_bottom: new_page() y_position -= 20 new_page() # Screenshots Page c.setFont("Helvetica-Bold", 14) c.drawString(margin_left, y_position, "Incident Screenshots") y_position -= line_height * 1.5 c.setFont("Helvetica", 12) for screenshot in screenshots[:5]: if os.path.exists(screenshot): c.drawImage(screenshot, margin_left, y_position - image_height, width=image_width, height=image_height, preserveAspectRatio=True) y_position -= image_height + 20 if y_position < margin_bottom: new_page() else: y_position = draw_wrapped_text(f"Image not found: {screenshot}", margin_left, y_position, max_text_width) y_position -= line_height if y_position < margin_bottom: new_page() new_page() # Chart Page c.setFont("Helvetica-Bold", 14) c.drawString(margin_left, y_position, "Detection Trend Chart") y_position -= line_height * 1.5 c.setFont("Helvetica", 12) if os.path.exists(chart_path): c.drawImage(chart_path, margin_left, y_position - 4 * inch, width=4 * inch, height=3 * inch, preserveAspectRatio=True) y_position -= 4 * inch + 20 else: y_position = draw_wrapped_text("Chart not found", margin_left, y_position, max_text_width) y_position -= line_height new_page() # Map Page c.setFont("Helvetica-Bold", 14) c.drawString(margin_left, y_position, "Issue Locations Map") y_position -= line_height * 1.5 c.setFont("Helvetica", 12) if os.path.exists(map_path): c.drawImage(map_path, margin_left, y_position - 4 * inch, width=4 * inch, height=3 * inch, preserveAspectRatio=True) y_position -= 4 * inch + 20 else: y_position = draw_wrapped_text("Map not found", margin_left, y_position, max_text_width) y_position -= line_height new_page() # Additional Results Page if additional_results: c.setFont("Helvetica-Bold", 14) c.drawString(margin_left, y_position, "Additional Analysis Results") y_position -= line_height * 1.5 c.setFont("Helvetica", 12) for result in additional_results: y_position = draw_wrapped_text(result, margin_left, y_position, max_text_width) y_position -= line_height if y_position < margin_bottom: new_page() c.save() def zip_all_outputs(report_path: str, video_path: str, chart_path: str, map_path: str) -> str: zip_path = os.path.join(OUTPUT_DIR, f"drone_analysis_outputs_{datetime.now().strftime('%Y%m%d_%H%M%S')}.zip") try: with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf: if os.path.exists(report_path): zipf.write(report_path, os.path.basename(report_path)) if os.path.exists(video_path): zipf.write(video_path, os.path.join("outputs", os.path.basename(video_path))) if os.path.exists(chart_path): zipf.write(chart_path, os.path.join("outputs", os.path.basename(chart_path))) if os.path.exists(map_path): zipf.write(map_path, os.path.join("outputs", os.path.basename(map_path))) for file in detected_issues: if os.path.exists(file): zipf.write(file, os.path.join("captured_frames", os.path.basename(file))) for root, _, files in os.walk(FLIGHT_LOG_DIR): for file in files: file_path = os.path.join(root, file) zipf.write(file_path, os.path.join("flight_logs", file)) log_entries.append(f"Created ZIP: {zip_path}") return zip_path except Exception as e: log_entries.append(f"Error: Failed to create ZIP: {str(e)}") return "" def generate_map(gps_coords: List[List[float]], items: List[Dict[str, Any]]) -> str: map_path = os.path.join(OUTPUT_DIR, f"map_{datetime.now().strftime('%Y%m%d_%H%M%S')}.png") plt.figure(figsize=(5, 5)) plt.scatter([x[1] for x in gps_coords], [x[0] for x in gps_coords], c='blue', label='GPS Points') plt.title("Issue Locations Map") plt.xlabel("Longitude") plt.ylabel("Latitude") plt.legend() plt.savefig(map_path) plt.close() return map_path def write_geotag(image_path: str, gps_coord: List[float]) -> bool: try: lat = abs(gps_coord[0]) lon = abs(gps_coord[1]) lat_ref = "N" if gps_coord[0] >= 0 else "S" lon_ref = "E" if gps_coord[1] >= 0 else "W" exif_dict = piexif.load(image_path) if os.path.exists(image_path) else {"GPS": {}} exif_dict["GPS"] = { piexif.GPSIFD.GPSLatitudeRef: lat_ref, piexif.GPSIFD.GPSLatitude: ((int(lat), 1), (0, 1), (0, 1)), piexif.GPSIFD.GPSLongitudeRef: lon_ref, piexif.GPSIFD.GPSLongitude: ((int(lon), 1), (0, 1), (0, 1)) } piexif.insert(piexif.dump(exif_dict), image_path) return True except Exception as e: log_entries.append(f"Error: Failed to geotag {image_path}: {str(e)}") return False def write_flight_log(frame_count: int, gps_coord: List[float], timestamp: str) -> str: log_path = os.path.join(FLIGHT_LOG_DIR, f"flight_log_{frame_count:06d}.csv") try: with open(log_path, 'w', newline='') as csvfile: writer = csv.writer(csvfile) writer.writerow(["Frame", "Timestamp", "Latitude", "Longitude", "Speed_ms", "Satellites", "Altitude_m"]) writer.writerow([frame_count, timestamp, gps_coord[0], gps_coord[1], 5.0, 12, 60]) return log_path except Exception as e: log_entries.append(f"Error: Failed to write flight log {log_path}: {str(e)}") return "" def check_image_quality(frame: np.ndarray, input_resolution: int) -> bool: height, width, _ = frame.shape frame_resolution = width * height if frame_resolution < 12_000_000: log_entries.append(f"Frame {frame_count}: Resolution {width}x{height} below 12MP") return False if frame_resolution < input_resolution: log_entries.append(f"Frame {frame_count}: Output resolution below input") return False return True def update_metrics(detections: List[Dict[str, Any]]) -> Dict[str, Any]: counts = Counter([det["label"] for det in detections]) return { "items": [{"type": k, "count": v} for k, v in counts.items()], "total_detections": len(detections), "timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S") } def generate_line_chart(detections: List[Dict[str, Any]]) -> Optional[str]: if not detections: return None plt.figure(figsize=(5, 3)) detection_times = [det["frame"] for det in detections] detection_counts = [det["conf"] for det in detections] plt.plot(detection_times, detection_counts, marker='o', color='#FF8C00') plt.title("Detections Over Time") plt.xlabel("Frame") plt.ylabel("Confidence") plt.grid(True) plt.tight_layout() chart_path = os.path.join(OUTPUT_DIR, f"chart_{datetime.now().strftime('%Y%m%d_%H%M%S')}.png") plt.savefig(chart_path) plt.close() return chart_path def process_video(video, resize_width=4000, resize_height=3000, frame_skip=5): global frame_count, last_metrics, detected_counts, detected_issues, gps_coordinates, log_entries frame_count = 0 detected_counts.clear() detected_issues.clear() gps_coordinates.clear() log_entries.clear() last_metrics = {} if video is None: log_entries.append("Error: No video uploaded") return None, json.dumps({"error": "No video uploaded"}, indent=2), "\n".join(log_entries), [], None, None, None log_entries.append("Starting video processing...") start_time = time.time() cap = cv2.VideoCapture(video) if not cap.isOpened(): log_entries.append("Error: Could not open video file") return None, json.dumps({"error": "Could not open video file"}, indent=2), "\n".join(log_entries), [], None, None, None frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) input_resolution = frame_width * frame_height fps = cap.get(cv2.CAP_PROP_FPS) total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) log_entries.append(f"Input video: {frame_width}x{frame_height}, {fps} FPS, {total_frames} frames") out_width, out_height = resize_width, resize_height output_path = os.path.join(OUTPUT_DIR, "processed_output.mp4") out = cv2.VideoWriter(output_path, cv2.VideoWriter_fourcc(*'mp4v'), fps, (out_width, out_height)) if not out.isOpened(): log_entries.append("Error: Failed to initialize mp4v codec") cap.release() return None, json.dumps({"error": "mp4v codec failed"}, indent=2), "\n".join(log_entries), [], None, None, None processed_frames = 0 all_detections = [] frame_times = [] inference_times = [] resize_times = [] io_times = [] detection_frame_count = 0 output_frame_count = 0 last_annotated_frame = None additional_results = [] while True: ret, frame = cap.read() if not ret: break frame_count += 1 if frame_count % frame_skip != 0: continue processed_frames += 1 frame_start = time.time() frame = cv2.resize(frame, (out_width, out_height)) resize_times.append((time.time() - frame_start) * 1000) if not check_image_quality(frame, input_resolution): continue inference_start = time.time() results = model(frame, verbose=False, conf=0.5, iou=0.7) annotated_frame = results[0].plot() inference_times.append((time.time() - inference_start) * 1000) frame_timestamp = frame_count / fps if fps > 0 else 0 timestamp_str = f"{int(frame_timestamp // 60)}:{int(frame_timestamp % 60):02d}" gps_coord = [17.385044 + (frame_count * 0.0001), 78.486671 + (frame_count * 0.0001)] gps_coordinates.append(gps_coord) io_start = time.time() frame_detections = [] for detection in results[0].boxes: cls = int(detection.cls) conf = float(detection.conf) box = detection.xyxy[0].cpu().numpy().astype(int).tolist() label = model.names[cls] if label in DETECTION_CLASSES: frame_detections.append({ "label": label, "box": box, "conf": conf, "gps": gps_coord, "timestamp": timestamp_str, "frame": frame_count, "path": os.path.join(CAPTURED_FRAMES_DIR, f"detected_{frame_count:06d}.jpg") }) log_entries.append(f"Frame {frame_count} at {timestamp_str}: Detected {label} with confidence {conf:.2f}") if label == "Pothole": additional_results.append(f"Pothole detected with quality score: {conf:.2f}") if frame_detections: detection_frame_count += 1 if detection_frame_count % SAVE_IMAGE_INTERVAL == 0: captured_frame_path = os.path.join(CAPTURED_FRAMES_DIR, f"detected_{frame_count:06d}.jpg") if cv2.imwrite(captured_frame_path, annotated_frame): if write_geotag(captured_frame_path, gps_coord): detected_issues.append(captured_frame_path) if len(detected_issues) > 5: detected_issues.pop(0) else: log_entries.append(f"Frame {frame_count}: Geotagging failed") else: log_entries.append(f"Error: Failed to save {captured_frame_path}") flight_log_path = write_flight_log(frame_count, gps_coord, timestamp_str) io_times.append((time.time() - io_start) * 1000) out.write(annotated_frame) output_frame_count += 1 last_annotated_frame = annotated_frame if frame_skip > 1: for _ in range(frame_skip - 1): out.write(annotated_frame) output_frame_count += 1 detected_counts.append(len(frame_detections)) all_detections.extend(frame_detections) frame_times.append((time.time() - frame_start) * 1000) if len(log_entries) > 50: log_entries.pop(0) while output_frame_count < total_frames and last_annotated_frame is not None: out.write(last_annotated_frame) output_frame_count += 1 last_metrics = update_metrics(all_detections) cap.release() out.release() cap = cv2.VideoCapture(output_path) output_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) output_fps = cap.get(cv2.CAP_PROP_FPS) output_duration = output_frames / output_fps if output_fps > 0 else 0 cap.release() total_time = time.time() - start_time log_entries.append(f"Output video: {output_frames} frames, {output_fps:.2f} FPS, {output_duration:.2f} seconds") # Generate chart and map chart_path = generate_line_chart(all_detections) map_path = generate_map(gps_coordinates[-5:], all_detections) # Generate the report with additional results pdf_path = f"{OUTPUT_DIR}/drone_analysis_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.pdf" generate_pdf_report( "Drone analysis completed. Top 5 detections included.", detected_issues, log_entries, chart_path, map_path, pdf_path, additional_results ) log_entries.append(f"Processing completed in {total_time:.2f} seconds") return ( output_path, json.dumps(last_metrics, indent=2), "\n".join(log_entries[-10:]), detected_issues, chart_path, map_path, pdf_path ) with gr.Blocks(theme=gr.themes.Soft(primary_hue="orange")) as iface: gr.Markdown("# NHAI Road Defect Detection Dashboard") with gr.Row(): with gr.Column(scale=3): video_input = gr.Video(label="Upload Video (12MP recommended)") width_slider = gr.Slider(320, 4000, value=4000, label="Output Width", step=1) height_slider = gr.Slider(240, 3000, value=3000, label="Output Height", step=1) skip_slider = gr.Slider(1, 10, value=5, label="Frame Skip", step=1) process_btn = gr.Button("Process Video", variant="primary") with gr.Column(scale=1): metrics_output = gr.Textbox(label="Detection Metrics", lines=5, interactive=False) with gr.Row(): video_output = gr.Video(label="Processed Video") issue_gallery = gr.Gallery(label="Detected Issues", columns=4, height="auto", object_fit="contain") with gr.Row(): chart_output = gr.Image(label="Detection Trend") map_output = gr.Image(label="Issue Locations Map") with gr.Row(): logs_output = gr.Textbox(label="Logs", lines=5, interactive=False) with gr.Row(): gr.Markdown("## Download Results") with gr.Row(): output_pdf_download = gr.File(label="Download Report (PDF)") process_btn.click( fn=process_video, inputs=[video_input, width_slider, height_slider, skip_slider], outputs=[ video_output, metrics_output, logs_output, issue_gallery, chart_output, map_output, output_pdf_download ] ) if __name__ == "__main__": iface.launch()