from flask import Flask, request, jsonify, render_template_string, send_from_directory, send_file from flask_cors import CORS import os import importlib.util import time import zipfile import shutil import json from datetime import datetime from predict_task2 import Predictor from id_mapping import mapping from show_stitched import * import cv2 import supervision as sv from ultralytics import YOLO app = Flask(__name__) CORS(app) # Load configuration config_dir = os.path.abspath(os.path.dirname(__file__)) config_path = os.path.join(config_dir, 'PC_CONFIG.py') spec = importlib.util.spec_from_file_location("PC_CONFIG", config_path) PC_CONFIG = importlib.util.module_from_spec(spec) spec.loader.exec_module(PC_CONFIG) HOST = PC_CONFIG.HOST PORT = 7860 # Changed from PC_CONFIG.IMAGE_REC_PORT to 7860 UPLOAD_FOLDER = os.path.join(PC_CONFIG.FILE_DIRECTORY, "image-rec", "images") DATASET_FOLDER = os.path.join(PC_CONFIG.BASE_DIR, "yolo_dataset") ANNOTATED_FOLDER = os.path.join(DATASET_FOLDER, "annotated_images") LABELS_FOLDER = os.path.join(DATASET_FOLDER, "labels") IMAGES_FOLDER = os.path.join(DATASET_FOLDER, "images") CLASS_MAPPING_FILE = os.path.join(DATASET_FOLDER, "classes.json") app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER # Initialize predictor predictor = Predictor() # Ensure directories exist os.makedirs(UPLOAD_FOLDER, exist_ok=True) os.makedirs(DATASET_FOLDER, exist_ok=True) os.makedirs(ANNOTATED_FOLDER, exist_ok=True) os.makedirs(LABELS_FOLDER, exist_ok=True) os.makedirs(IMAGES_FOLDER, exist_ok=True) # Initialize class mapping file if not os.path.exists(CLASS_MAPPING_FILE): # Create initial class mapping from id_mapping.py reverse_mapping = {str(v): k for k, v in mapping.items() if v not in [-999] and k is not None} with open(CLASS_MAPPING_FILE, 'w', encoding='utf-8') as f: json.dump(reverse_mapping, f, indent=2, ensure_ascii=False) # Create YOLO to ID mapping compatibility function def get_compatible_class_id(yolo_class_name): """Map YOLO class names to id_mapping values""" # Handle different naming conventions between YOLO model and id_mapping yolo_to_id_mapping = { # Numbers # "one": 11, "two": 12, "three": 13, "four": 14, "five": 15, #"six": 16, "seven": 17, "eight": 18, "nine": 19, # Letters # "A": 20, "B": 21, "C": 22, "D": 23, "E": 24, "F": 25, # "G": 26, "H": 27, "S": 28, "T": 29, "U": 30, "V": 31, # "W": 32, "X": 33, "Y": 34, "Z": 35, # Directions "up": 36, "down": 37, "right": 38, "left": 39, # Shapes "circle": 40, "Bullseye": -1, "bullseye": -1 } return yolo_to_id_mapping.get(yolo_class_name, mapping.get(yolo_class_name, -999)) def load_class_mapping(): """Load class mapping from JSON file""" try: with open(CLASS_MAPPING_FILE, 'r', encoding='utf-8') as f: return json.load(f) except: return {} def save_class_mapping(class_mapping): """Save class mapping to JSON file""" with open(CLASS_MAPPING_FILE, 'w', encoding='utf-8') as f: json.dump(class_mapping, f, indent=2, ensure_ascii=False) def save_obstacle_image(detection_result, obstacle_num): """Save obstacle image for display""" if not detection_result or not detection_result.get('marked_image_path'): return # Create obstacles display folder obstacles_folder = os.path.join(PC_CONFIG.BASE_DIR, "obstacles_display") os.makedirs(obstacles_folder, exist_ok=True) # Copy annotated image to obstacles display folder source_path = detection_result['marked_image_path'] target_filename = f"obstacle_{obstacle_num}.jpg" target_path = os.path.join(obstacles_folder, target_filename) # Save detection info to JSON file for later retrieval info_filename = f"obstacle_{obstacle_num}_info.json" info_path = os.path.join(obstacles_folder, info_filename) try: if os.path.exists(source_path): shutil.copy2(source_path, target_path) # Save detection info detection_info = { 'label': detection_result.get('label', 'Unknown'), 'image_id': detection_result.get('image_id', 'N/A'), 'confidence': detection_result.get('confidence', 0.0), 'timestamp': time.time() } with open(info_path, 'w', encoding='utf-8') as f: json.dump(detection_info, f, indent=2, ensure_ascii=False) print(f"Saved obstacle {obstacle_num} image and info to {target_path}") except Exception as e: print(f"Error saving obstacle image: {e}") def get_obstacle_detection_info(obstacle_num): """Get detection info for a specific obstacle""" obstacles_folder = os.path.join(PC_CONFIG.BASE_DIR, "obstacles_display") info_filename = f"obstacle_{obstacle_num}_info.json" info_path = os.path.join(obstacles_folder, info_filename) try: if os.path.exists(info_path): with open(info_path, 'r', encoding='utf-8') as f: return json.load(f) except Exception as e: print(f"Error loading obstacle info: {e}") return {} def generate_yolo_annotation(results, detection_id, image_width, image_height, class_name): """Generate YOLO format annotation string""" if not results or not results[0].boxes or detection_id >= len(results[0].boxes): return "" # Get class mapping class_mapping = load_class_mapping() # Get class ID from mapping, if not found, add it class_id = None for id_str, name in class_mapping.items(): if name == class_name: class_id = int(id_str) break if class_id is None: # Add new class to mapping max_id = max([int(k) for k in class_mapping.keys()]) if class_mapping else -1 class_id = max_id + 1 class_mapping[str(class_id)] = class_name save_class_mapping(class_mapping) # Get bounding box - handle tensor conversion properly try: box = results[0].boxes.xyxy[detection_id] if hasattr(box, 'cpu'): box = box.cpu() if hasattr(box, 'numpy'): box = box.numpy() x1, y1, x2, y2 = box.tolist() if hasattr(box, 'tolist') else box # Convert to YOLO format (normalized) x_center = ((x1 + x2) / 2) / image_width y_center = ((y1 + y2) / 2) / image_height width = (x2 - x1) / image_width height = (y2 - y1) / image_height confidence = results[0].boxes.conf[detection_id] if hasattr(confidence, 'item'): confidence = confidence.item() # YOLO format: class_id x_center y_center width height return f"{class_id} {x_center:.6f} {y_center:.6f} {width:.6f} {height:.6f}" except Exception as e: print(f"Error generating YOLO annotation: {e}") return "" def save_annotated_image(image, results, detection_id, filename): """Save annotated image with bounding boxes""" if not results or not results[0].boxes: return None try: # Simple annotation without supervision library annotated_image = image.copy() # Get detection boxes and info boxes = results[0].boxes.xyxy if hasattr(boxes, 'cpu'): boxes = boxes.cpu().numpy() confidences = results[0].boxes.conf if hasattr(confidences, 'cpu'): confidences = confidences.cpu().numpy() class_ids = results[0].boxes.cls if hasattr(class_ids, 'cpu'): class_ids = class_ids.cpu().numpy().astype(int) # Draw bounding boxes for i, (box, conf, cls_id) in enumerate(zip(boxes, confidences, class_ids)): x1, y1, x2, y2 = map(int, box) class_name = results[0].names[cls_id] # Draw rectangle cv2.rectangle(annotated_image, (x1, y1), (x2, y2), (0, 255, 0), 2) # Draw label label = f"{class_name} {conf:.2f}" label_size = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.5, 1)[0] cv2.rectangle(annotated_image, (x1, y1 - label_size[1] - 10), (x1 + label_size[0], y1), (0, 255, 0), -1) cv2.putText(annotated_image, label, (x1, y1 - 5), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 0), 1) # Save annotated image with timestamp to ensure proper ordering annotated_path = os.path.join(ANNOTATED_FOLDER, f"annotated_{filename}") cv2.imwrite(annotated_path, annotated_image) # Clean up old images if more than 4 exist cleanup_old_annotated_images() return annotated_path except Exception as e: print(f"Error saving annotated image: {e}") # Fallback: just copy the original image annotated_path = os.path.join(ANNOTATED_FOLDER, f"annotated_{filename}") cv2.imwrite(annotated_path, image) cleanup_old_annotated_images() return annotated_path def cleanup_old_annotated_images(): """Keep only the latest 4 annotated images""" try: if not os.path.exists(ANNOTATED_FOLDER): return # Get all annotated images annotated_files = [] for filename in os.listdir(ANNOTATED_FOLDER): if filename.startswith('annotated_') and filename.lower().endswith(('.png', '.jpg', '.jpeg')): filepath = os.path.join(ANNOTATED_FOLDER, filename) mtime = os.path.getmtime(filepath) annotated_files.append((filename, filepath, mtime)) # If we have more than 4 images, remove the oldest ones if len(annotated_files) > 4: # Sort by modification time (newest first) annotated_files.sort(key=lambda x: x[2], reverse=True) # Remove files beyond the first 4 for filename, filepath, _ in annotated_files[4:]: try: os.remove(filepath) print(f"Removed old annotated image: {filename}") except Exception as e: print(f"Error removing old image {filename}: {e}") except Exception as e: print(f"Error during cleanup: {e}") def process_file(file_path, direction, task_type, filename): """Process uploaded file and generate predictions""" print("File received and saved successfully.") print(f"Direction received: {direction}") print(f"Task type received: {task_type}") startTime = datetime.now() # Load image and ensure it's 640x640 image = cv2.imread(file_path) if image is None: return None # Resize image to 640x640 if needed (to match YOLO model input) if image.shape[0] != 640 or image.shape[1] != 640: image = cv2.resize(image, (640, 640)) # Save the resized image back to ensure consistency cv2.imwrite(file_path, image) # Perform prediction class_name, results, detection_id = predictor.predict_id(file_path, task_type) # For TASK_2, apply priority-based selection AFTER getting all detections if task_type == "TASK_2" and results and results[0].boxes is not None and len(results[0].boxes) > 0: detections_list = [] boxes = results[0].boxes for i in range(len(boxes)): detected_class = results[0].names[int(boxes.cls[i])] confidence = float(boxes.conf[i]) yolo_class_id = int(boxes.cls[i]) print(f"[APP.PY] Detection {i}: {detected_class} (confidence: {confidence:.2f}, class_id: {yolo_class_id})") # Only set Bullseye to lowest priority, all others have equal priority (0) # Check by class name to be model-agnostic if detected_class.lower() == 'bullseye': priority = -10 # Lowest priority for bullseye else: priority = 0 # Equal priority for all non-bullseye detections detections_list.append({ 'index': i, 'class_name': detected_class, 'confidence': confidence, 'priority': priority, 'yolo_class_id': yolo_class_id }) if detections_list: # Sort by priority (descending), then by confidence (descending) detections_list.sort(key=lambda x: (x['priority'], x['confidence']), reverse=True) print(f"\n[APP.PY] Sorted detections:") for det in detections_list: print(f" - {det['class_name']}: priority={det['priority']}, confidence={det['confidence']:.2f}") # Override with highest priority detection selected = detections_list[0] class_name = selected['class_name'] detection_id = selected['index'] print(f"\n[APP.PY] ✓ Final selection: {class_name} (priority: {selected['priority']}, confidence: {selected['confidence']:.2f})") # Use compatible mapping function class_id = str(get_compatible_class_id(class_name)) detection_result = None if class_name and results and results[0].boxes is not None and len(results[0].boxes) > 0: # Generate filename timestamp = int(time.time()) base_filename = f"{class_name}_{timestamp}" # Save original image to dataset image_filename = f"{base_filename}.jpg" dataset_image_path = os.path.join(IMAGES_FOLDER, image_filename) shutil.copy2(file_path, dataset_image_path) # Generate and save YOLO annotation h, w = image.shape[:2] yolo_annotation = generate_yolo_annotation(results, detection_id, w, h, class_name) txt_path = None if yolo_annotation: txt_filename = f"{base_filename}.txt" txt_path = os.path.join(LABELS_FOLDER, txt_filename) with open(txt_path, 'w') as f: f.write(yolo_annotation) # Save annotated image annotated_path = save_annotated_image(image, results, detection_id, image_filename) # Get bounding box and confidence for compatibility try: box = results[0].boxes.xyxy[detection_id] if hasattr(box, 'cpu'): box = box.cpu() if hasattr(box, 'numpy'): box = box.numpy() x1, y1, x2, y2 = box.tolist() if hasattr(box, 'tolist') else box confidence = results[0].boxes.conf[detection_id] if hasattr(confidence, 'item'): confidence = confidence.item() else: confidence = float(confidence) except Exception as e: print(f"Error extracting box/confidence: {e}") x1, y1, x2, y2 = 0, 0, 0, 0 confidence = 0.0 # Create detection result in compatible format detection_result = { "image_id": class_id, "label": class_name, "confidence": confidence, "bbox": [x1, y1, x2, y2], "original_image_path": dataset_image_path, "marked_image_path": annotated_path, "txt_file_path": txt_path } else: # Handle case when no detection found print("No valid detections found") # Still save the image for record keeping timestamp = int(time.time()) base_filename = f"no_detection_{timestamp}" image_filename = f"{base_filename}.jpg" dataset_image_path = os.path.join(IMAGES_FOLDER, image_filename) shutil.copy2(file_path, dataset_image_path) detection_result = { "image_id": class_id, "label": class_name or "unknown", "confidence": 0.0, "bbox": [0, 0, 0, 0], "original_image_path": dataset_image_path, "marked_image_path": dataset_image_path, # Use original as no annotation "txt_file_path": None } endTime = datetime.now() totalTime = (endTime - startTime).total_seconds() print(f"Predicted ID: {class_id}") print(f"Time taken for Predicting Image = {totalTime} s") return class_id, detection_result # HTML template for the frontend HTML_TEMPLATE = """
Real-time image processing with dataset management
Real-time display of 8 obstacle recognition results