| """ |
| Navora β Gradio interface for assistive vision navigation |
| """ |
|
|
| import gradio as gr |
| import cv2 |
| import numpy as np |
| from PIL import Image |
| import sys |
| from pathlib import Path |
|
|
| sys.path.insert(0, str(Path(__file__).parent.parent)) |
|
|
| from app.models.loader import load_models |
| from app.services.pipeline import run_pipeline_frame_data |
|
|
| models = None |
|
|
| def initialize_models(): |
| """Load models on startup""" |
| global models |
| if models is None: |
| print("π Loading models...") |
| models = load_models() |
| print("β
Models loaded") |
| return models |
|
|
| def process_frame(image): |
| """Process a single frame and return guidance""" |
| if image is None: |
| return "π· Waiting for camera input...", None |
| |
| try: |
| global models |
| if models is None: |
| models = initialize_models() |
| |
| frame = cv2.cvtColor(np.array(image), cv2.COLOR_RGB2BGR) |
| |
| h, w = frame.shape[:2] |
| TARGET_WIDTH = 640 |
| if w > TARGET_WIDTH: |
| frame = cv2.resize(frame, (TARGET_WIDTH, int(h * (TARGET_WIDTH / w)))) |
| h, w = frame.shape[:2] |
| |
| result = run_pipeline_frame_data(frame, models) |
| detections = result.get("detections", []) |
| |
| priority = choose_priority_obstacle(detections, w, h) |
| action, guidance_text = guidance_from_priority(priority) |
| |
| annotated_frame = draw_detections(frame, detections, action) |
| annotated_image = Image.fromarray(cv2.cvtColor(annotated_frame, cv2.COLOR_BGR2RGB)) |
| |
| action_emoji = {"forward": "β¬οΈ", "stop": "π", "left": "β¬
οΈ", "right": "β‘οΈ"} |
| formatted_guidance = f"{action_emoji.get(action, 'π§')} {guidance_text}" |
| |
| return formatted_guidance, annotated_image |
| |
| except Exception as e: |
| print(f"β Error: {e}") |
| return f"β Error: {str(e)}", None |
|
|
| def draw_detections(frame, detections, action): |
| """Draw thin bounding boxes with visual feedback""" |
| annotated = frame.copy() |
| h, w = frame.shape[:2] |
| |
| |
| action_colors = {"forward": (0, 255, 0), "stop": (0, 0, 255), |
| "left": (255, 165, 0), "right": (255, 165, 0)} |
| color = action_colors.get(action, (255, 255, 255)) |
| |
| cv2.rectangle(annotated, (0, 0), (w, 50), color, -1) |
| cv2.putText(annotated, action.upper(), (w//2 - 50, 35), |
| cv2.FONT_HERSHEY_BOLD, 1.0, (255, 255, 255), 2) |
| |
| |
| for det in detections: |
| x1, y1, x2, y2 = det["box"] |
| label = det["label"] |
| conf = det["confidence"] |
| |
| |
| box_color = (0, 0, 255) if action == "stop" else (0, 255, 0) |
| cv2.rectangle(annotated, (x1, y1), (x2, y2), box_color, 1) |
| |
| |
| text = f"{label} {conf:.2f}" |
| (text_w, text_h), _ = cv2.getTextSize(text, cv2.FONT_HERSHEY_SIMPLEX, 0.4, 1) |
| cv2.rectangle(annotated, (x1, y1 - text_h - 6), (x1 + text_w + 4, y1), box_color, -1) |
| cv2.putText(annotated, text, (x1 + 2, y1 - 3), |
| cv2.FONT_HERSHEY_SIMPLEX, 0.4, (255, 255, 255), 1) |
| |
| |
| center_x = (x1 + x2) // 2 |
| center_y = (y1 + y2) // 2 |
| cv2.circle(annotated, (center_x, center_y), 3, box_color, -1) |
| |
| |
| center_x = w // 2 |
| cv2.line(annotated, (center_x, 50), (center_x, h), (255, 255, 0), 1) |
| |
| return annotated |
|
|
| def choose_priority_obstacle(detections, frame_width, frame_height): |
| """Choose the most important obstacle""" |
| if not detections: |
| return None |
| |
| DANGER_LABELS = {"person", "car", "truck", "bus", "motorcycle", "bicycle", "dog", "cat"} |
| CENTER_BAND_START = 0.42 |
| CENTER_BAND_END = 0.58 |
| frame_area = float(frame_width * frame_height) |
| |
| def direction_from_box(box): |
| center_x = (box[0] + box[2]) / 2.0 |
| if center_x < frame_width * CENTER_BAND_START: |
| return "left" |
| if center_x > frame_width * CENTER_BAND_END: |
| return "right" |
| return "center" |
| |
| def risk_score(det): |
| x1, y1, x2, y2 = det["box"] |
| area_ratio = max(0, (x2 - x1) * (y2 - y1)) / max(1.0, frame_area) |
| direction = direction_from_box(det["box"]) |
| dir_weight = 1.3 if direction == "center" else 1.0 |
| label_weight = 1.5 if det["label"].lower() in DANGER_LABELS else 1.0 |
| return det["confidence"] * max(area_ratio, 1e-4) * dir_weight * label_weight |
| |
| priority = max(detections, key=risk_score) |
| direction = direction_from_box(priority["box"]) |
| x1, y1, x2, y2 = priority["box"] |
| area_ratio = max(0, (x2 - x1) * (y2 - y1)) / max(1.0, frame_area) |
| |
| return { |
| "label": priority["label"], |
| "confidence": priority["confidence"], |
| "direction": direction, |
| "area_ratio": round(area_ratio, 4), |
| } |
|
|
| def guidance_from_priority(priority_obstacle): |
| """Generate navigation guidance from priority obstacle""" |
| if priority_obstacle is None: |
| return "forward", "Path clear. Move forward." |
| |
| DANGER_LABELS = {"person", "car", "truck", "bus", "motorcycle", "bicycle", "dog", "cat"} |
| STOP_CONFIDENCE_THRESHOLD = 0.55 |
| STOP_AREA_THRESHOLD = 0.06 |
| |
| label = priority_obstacle["label"].lower() |
| direction = priority_obstacle["direction"] |
| confidence = priority_obstacle["confidence"] |
| area_ratio = priority_obstacle.get("area_ratio", 0.0) |
| |
| if label in DANGER_LABELS and direction == "center": |
| if confidence >= STOP_CONFIDENCE_THRESHOLD and area_ratio >= STOP_AREA_THRESHOLD: |
| return "stop", f"Stop. {label.title()} ahead." |
| if confidence >= 0.65 and area_ratio >= 0.03: |
| return "stop", f"Stop. {label.title()} detected." |
| |
| if direction == "left": |
| return "right", f"{label.title()} on left. Move right." |
| if direction == "right": |
| return "left", f"{label.title()} on right. Move left." |
| |
| return "forward", f"{label.title()} ahead. Continue carefully." |
|
|
| |
| custom_css = """ |
| #component-0 { |
| max-width: 100%; |
| margin: 0 auto; |
| } |
| .gradio-container { |
| font-family: -apple-system, BlinkMacSystemFont, "Segoe UI", Roboto, sans-serif !important; |
| } |
| #guidance_box { |
| font-size: 1.5rem !important; |
| font-weight: 600 !important; |
| text-align: center !important; |
| padding: 1.5rem !important; |
| background: linear-gradient(135deg, #667eea 0%, #764ba2 100%) !important; |
| color: white !important; |
| border-radius: 12px !important; |
| border: none !important; |
| } |
| #camera_input { |
| border-radius: 12px !important; |
| border: 2px solid #e0e0e0 !important; |
| } |
| #output_image { |
| border-radius: 12px !important; |
| border: 2px solid #e0e0e0 !important; |
| } |
| .gr-button-primary { |
| background: linear-gradient(135deg, #667eea 0%, #764ba2 100%) !important; |
| border: none !important; |
| border-radius: 12px !important; |
| padding: 1rem 2rem !important; |
| font-size: 1.1rem !important; |
| font-weight: 600 !important; |
| } |
| footer { |
| display: none !important; |
| } |
| """ |
|
|
| |
| tts_js = """ |
| function speak(text) { |
| if ('speechSynthesis' in window) { |
| // Cancel any ongoing speech |
| window.speechSynthesis.cancel(); |
| |
| // Create utterance |
| const utterance = new SpeechSynthesisUtterance(text); |
| utterance.rate = 1.0; |
| utterance.pitch = 1.0; |
| utterance.volume = 1.0; |
| utterance.lang = 'en-US'; |
| |
| // Speak |
| window.speechSynthesis.speak(utterance); |
| } |
| } |
| |
| // Auto-speak when guidance text changes |
| const observer = new MutationObserver((mutations) => { |
| mutations.forEach((mutation) => { |
| if (mutation.type === 'childList' || mutation.type === 'characterData') { |
| const guidanceBox = document.querySelector('#guidance_box textarea'); |
| if (guidanceBox && guidanceBox.value && !guidanceBox.value.includes('Waiting')) { |
| speak(guidanceBox.value); |
| } |
| } |
| }); |
| }); |
| |
| // Start observing |
| setTimeout(() => { |
| const guidanceBox = document.querySelector('#guidance_box textarea'); |
| if (guidanceBox) { |
| observer.observe(guidanceBox, { |
| childList: true, |
| characterData: true, |
| subtree: true, |
| attributes: true, |
| attributeFilter: ['value'] |
| }); |
| } |
| }, 1000); |
| """ |
|
|
| |
| with gr.Blocks() as demo: |
| gr.Markdown( |
| """ |
| # π§ Navora |
| ### AI Navigation Assistant |
| """, |
| elem_id="header" |
| ) |
| |
| with gr.Row(): |
| input_image = gr.Image( |
| label="Camera", |
| type="pil", |
| sources=["webcam"], |
| streaming=False, |
| elem_id="camera_input" |
| ) |
| |
| with gr.Row(): |
| guidance_text = gr.Textbox( |
| label="", |
| lines=2, |
| interactive=False, |
| elem_id="guidance_box", |
| value="π· Tap camera to start" |
| ) |
| |
| with gr.Row(): |
| output_image = gr.Image( |
| label="Detection View", |
| elem_id="output_image" |
| ) |
| |
| |
| input_image.change( |
| fn=process_frame, |
| inputs=[input_image], |
| outputs=[guidance_text, output_image] |
| ) |
| |
| gr.Markdown( |
| """ |
| --- |
| **How to use:** Allow camera access, point at your path, and receive real-time guidance. |
| |
| Powered by BLIP-2, YOLOv8, and MiDaS. |
| """, |
| elem_id="footer_text" |
| ) |
| |
| |
| demo.load(None, None, None, js=tts_js) |
|
|
| |
| print("π Starting Navora...") |
| initialize_models() |
|
|
| if __name__ == "__main__": |
| demo.launch( |
| server_name="0.0.0.0", |
| server_port=7860, |
| share=False, |
| css=custom_css, |
| theme=gr.themes.Soft() |
| ) |
|
|