mdpg4 / app.py
Mahiruoshi's picture
Update app.py
e5e1a0a verified
from flask import Flask, request, jsonify, render_template_string, send_from_directory, send_file
from flask_cors import CORS
import os
import importlib.util
import time
import zipfile
import shutil
import json
from datetime import datetime
from predict_task2 import Predictor
from id_mapping import mapping
from show_stitched import *
import cv2
import supervision as sv
from ultralytics import YOLO
app = Flask(__name__)
CORS(app)
# Load configuration
config_dir = os.path.abspath(os.path.dirname(__file__))
config_path = os.path.join(config_dir, 'PC_CONFIG.py')
spec = importlib.util.spec_from_file_location("PC_CONFIG", config_path)
PC_CONFIG = importlib.util.module_from_spec(spec)
spec.loader.exec_module(PC_CONFIG)
HOST = PC_CONFIG.HOST
PORT = 7860 # Changed from PC_CONFIG.IMAGE_REC_PORT to 7860
UPLOAD_FOLDER = os.path.join(PC_CONFIG.FILE_DIRECTORY, "image-rec", "images")
DATASET_FOLDER = os.path.join(PC_CONFIG.BASE_DIR, "yolo_dataset")
ANNOTATED_FOLDER = os.path.join(DATASET_FOLDER, "annotated_images")
LABELS_FOLDER = os.path.join(DATASET_FOLDER, "labels")
IMAGES_FOLDER = os.path.join(DATASET_FOLDER, "images")
CLASS_MAPPING_FILE = os.path.join(DATASET_FOLDER, "classes.json")
app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER
# Initialize predictor
predictor = Predictor()
# Ensure directories exist
os.makedirs(UPLOAD_FOLDER, exist_ok=True)
os.makedirs(DATASET_FOLDER, exist_ok=True)
os.makedirs(ANNOTATED_FOLDER, exist_ok=True)
os.makedirs(LABELS_FOLDER, exist_ok=True)
os.makedirs(IMAGES_FOLDER, exist_ok=True)
# Initialize class mapping file
if not os.path.exists(CLASS_MAPPING_FILE):
# Create initial class mapping from id_mapping.py
reverse_mapping = {str(v): k for k, v in mapping.items() if v not in [-999] and k is not None}
with open(CLASS_MAPPING_FILE, 'w', encoding='utf-8') as f:
json.dump(reverse_mapping, f, indent=2, ensure_ascii=False)
# Create YOLO to ID mapping compatibility function
def get_compatible_class_id(yolo_class_name):
"""Map YOLO class names to id_mapping values"""
# Handle different naming conventions between YOLO model and id_mapping
yolo_to_id_mapping = {
# Numbers
# "one": 11, "two": 12, "three": 13, "four": 14, "five": 15,
#"six": 16, "seven": 17, "eight": 18, "nine": 19,
# Letters
# "A": 20, "B": 21, "C": 22, "D": 23, "E": 24, "F": 25,
# "G": 26, "H": 27, "S": 28, "T": 29, "U": 30, "V": 31,
# "W": 32, "X": 33, "Y": 34, "Z": 35,
# Directions
"up": 36, "down": 37, "right": 38, "left": 39,
# Shapes
"circle": 40, "Bullseye": -1, "bullseye": -1
}
return yolo_to_id_mapping.get(yolo_class_name, mapping.get(yolo_class_name, -999))
def load_class_mapping():
"""Load class mapping from JSON file"""
try:
with open(CLASS_MAPPING_FILE, 'r', encoding='utf-8') as f:
return json.load(f)
except:
return {}
def save_class_mapping(class_mapping):
"""Save class mapping to JSON file"""
with open(CLASS_MAPPING_FILE, 'w', encoding='utf-8') as f:
json.dump(class_mapping, f, indent=2, ensure_ascii=False)
def save_obstacle_image(detection_result, obstacle_num):
"""Save obstacle image for display"""
if not detection_result or not detection_result.get('marked_image_path'):
return
# Create obstacles display folder
obstacles_folder = os.path.join(PC_CONFIG.BASE_DIR, "obstacles_display")
os.makedirs(obstacles_folder, exist_ok=True)
# Copy annotated image to obstacles display folder
source_path = detection_result['marked_image_path']
target_filename = f"obstacle_{obstacle_num}.jpg"
target_path = os.path.join(obstacles_folder, target_filename)
# Save detection info to JSON file for later retrieval
info_filename = f"obstacle_{obstacle_num}_info.json"
info_path = os.path.join(obstacles_folder, info_filename)
try:
if os.path.exists(source_path):
shutil.copy2(source_path, target_path)
# Save detection info
detection_info = {
'label': detection_result.get('label', 'Unknown'),
'image_id': detection_result.get('image_id', 'N/A'),
'confidence': detection_result.get('confidence', 0.0),
'timestamp': time.time()
}
with open(info_path, 'w', encoding='utf-8') as f:
json.dump(detection_info, f, indent=2, ensure_ascii=False)
print(f"Saved obstacle {obstacle_num} image and info to {target_path}")
except Exception as e:
print(f"Error saving obstacle image: {e}")
def get_obstacle_detection_info(obstacle_num):
"""Get detection info for a specific obstacle"""
obstacles_folder = os.path.join(PC_CONFIG.BASE_DIR, "obstacles_display")
info_filename = f"obstacle_{obstacle_num}_info.json"
info_path = os.path.join(obstacles_folder, info_filename)
try:
if os.path.exists(info_path):
with open(info_path, 'r', encoding='utf-8') as f:
return json.load(f)
except Exception as e:
print(f"Error loading obstacle info: {e}")
return {}
def generate_yolo_annotation(results, detection_id, image_width, image_height, class_name):
"""Generate YOLO format annotation string"""
if not results or not results[0].boxes or detection_id >= len(results[0].boxes):
return ""
# Get class mapping
class_mapping = load_class_mapping()
# Get class ID from mapping, if not found, add it
class_id = None
for id_str, name in class_mapping.items():
if name == class_name:
class_id = int(id_str)
break
if class_id is None:
# Add new class to mapping
max_id = max([int(k) for k in class_mapping.keys()]) if class_mapping else -1
class_id = max_id + 1
class_mapping[str(class_id)] = class_name
save_class_mapping(class_mapping)
# Get bounding box - handle tensor conversion properly
try:
box = results[0].boxes.xyxy[detection_id]
if hasattr(box, 'cpu'):
box = box.cpu()
if hasattr(box, 'numpy'):
box = box.numpy()
x1, y1, x2, y2 = box.tolist() if hasattr(box, 'tolist') else box
# Convert to YOLO format (normalized)
x_center = ((x1 + x2) / 2) / image_width
y_center = ((y1 + y2) / 2) / image_height
width = (x2 - x1) / image_width
height = (y2 - y1) / image_height
confidence = results[0].boxes.conf[detection_id]
if hasattr(confidence, 'item'):
confidence = confidence.item()
# YOLO format: class_id x_center y_center width height
return f"{class_id} {x_center:.6f} {y_center:.6f} {width:.6f} {height:.6f}"
except Exception as e:
print(f"Error generating YOLO annotation: {e}")
return ""
def save_annotated_image(image, results, detection_id, filename):
"""Save annotated image with bounding boxes"""
if not results or not results[0].boxes:
return None
try:
# Simple annotation without supervision library
annotated_image = image.copy()
# Get detection boxes and info
boxes = results[0].boxes.xyxy
if hasattr(boxes, 'cpu'):
boxes = boxes.cpu().numpy()
confidences = results[0].boxes.conf
if hasattr(confidences, 'cpu'):
confidences = confidences.cpu().numpy()
class_ids = results[0].boxes.cls
if hasattr(class_ids, 'cpu'):
class_ids = class_ids.cpu().numpy().astype(int)
# Draw bounding boxes
for i, (box, conf, cls_id) in enumerate(zip(boxes, confidences, class_ids)):
x1, y1, x2, y2 = map(int, box)
class_name = results[0].names[cls_id]
# Draw rectangle
cv2.rectangle(annotated_image, (x1, y1), (x2, y2), (0, 255, 0), 2)
# Draw label
label = f"{class_name} {conf:.2f}"
label_size = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.5, 1)[0]
cv2.rectangle(annotated_image, (x1, y1 - label_size[1] - 10),
(x1 + label_size[0], y1), (0, 255, 0), -1)
cv2.putText(annotated_image, label, (x1, y1 - 5),
cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 0), 1)
# Save annotated image with timestamp to ensure proper ordering
annotated_path = os.path.join(ANNOTATED_FOLDER, f"annotated_{filename}")
cv2.imwrite(annotated_path, annotated_image)
# Clean up old images if more than 4 exist
cleanup_old_annotated_images()
return annotated_path
except Exception as e:
print(f"Error saving annotated image: {e}")
# Fallback: just copy the original image
annotated_path = os.path.join(ANNOTATED_FOLDER, f"annotated_{filename}")
cv2.imwrite(annotated_path, image)
cleanup_old_annotated_images()
return annotated_path
def cleanup_old_annotated_images():
"""Keep only the latest 4 annotated images"""
try:
if not os.path.exists(ANNOTATED_FOLDER):
return
# Get all annotated images
annotated_files = []
for filename in os.listdir(ANNOTATED_FOLDER):
if filename.startswith('annotated_') and filename.lower().endswith(('.png', '.jpg', '.jpeg')):
filepath = os.path.join(ANNOTATED_FOLDER, filename)
mtime = os.path.getmtime(filepath)
annotated_files.append((filename, filepath, mtime))
# If we have more than 4 images, remove the oldest ones
if len(annotated_files) > 4:
# Sort by modification time (newest first)
annotated_files.sort(key=lambda x: x[2], reverse=True)
# Remove files beyond the first 4
for filename, filepath, _ in annotated_files[4:]:
try:
os.remove(filepath)
print(f"Removed old annotated image: {filename}")
except Exception as e:
print(f"Error removing old image {filename}: {e}")
except Exception as e:
print(f"Error during cleanup: {e}")
def process_file(file_path, direction, task_type, filename):
"""Process uploaded file and generate predictions"""
print("File received and saved successfully.")
print(f"Direction received: {direction}")
print(f"Task type received: {task_type}")
startTime = datetime.now()
# Load image and ensure it's 640x640
image = cv2.imread(file_path)
if image is None:
return None
# Resize image to 640x640 if needed (to match YOLO model input)
if image.shape[0] != 640 or image.shape[1] != 640:
image = cv2.resize(image, (640, 640))
# Save the resized image back to ensure consistency
cv2.imwrite(file_path, image)
# Perform prediction
class_name, results, detection_id = predictor.predict_id(file_path, task_type)
# For TASK_2, apply priority-based selection AFTER getting all detections
if task_type == "TASK_2" and results and results[0].boxes is not None and len(results[0].boxes) > 0:
detections_list = []
boxes = results[0].boxes
for i in range(len(boxes)):
detected_class = results[0].names[int(boxes.cls[i])]
confidence = float(boxes.conf[i])
yolo_class_id = int(boxes.cls[i])
print(f"[APP.PY] Detection {i}: {detected_class} (confidence: {confidence:.2f}, class_id: {yolo_class_id})")
# Only set Bullseye to lowest priority, all others have equal priority (0)
# Check by class name to be model-agnostic
if detected_class.lower() == 'bullseye':
priority = -10 # Lowest priority for bullseye
else:
priority = 0 # Equal priority for all non-bullseye detections
detections_list.append({
'index': i,
'class_name': detected_class,
'confidence': confidence,
'priority': priority,
'yolo_class_id': yolo_class_id
})
if detections_list:
# Sort by priority (descending), then by confidence (descending)
detections_list.sort(key=lambda x: (x['priority'], x['confidence']), reverse=True)
print(f"\n[APP.PY] Sorted detections:")
for det in detections_list:
print(f" - {det['class_name']}: priority={det['priority']}, confidence={det['confidence']:.2f}")
# Override with highest priority detection
selected = detections_list[0]
class_name = selected['class_name']
detection_id = selected['index']
print(f"\n[APP.PY] ✓ Final selection: {class_name} (priority: {selected['priority']}, confidence: {selected['confidence']:.2f})")
# Use compatible mapping function
class_id = str(get_compatible_class_id(class_name))
detection_result = None
if class_name and results and results[0].boxes is not None and len(results[0].boxes) > 0:
# Generate filename
timestamp = int(time.time())
base_filename = f"{class_name}_{timestamp}"
# Save original image to dataset
image_filename = f"{base_filename}.jpg"
dataset_image_path = os.path.join(IMAGES_FOLDER, image_filename)
shutil.copy2(file_path, dataset_image_path)
# Generate and save YOLO annotation
h, w = image.shape[:2]
yolo_annotation = generate_yolo_annotation(results, detection_id, w, h, class_name)
txt_path = None
if yolo_annotation:
txt_filename = f"{base_filename}.txt"
txt_path = os.path.join(LABELS_FOLDER, txt_filename)
with open(txt_path, 'w') as f:
f.write(yolo_annotation)
# Save annotated image
annotated_path = save_annotated_image(image, results, detection_id, image_filename)
# Get bounding box and confidence for compatibility
try:
box = results[0].boxes.xyxy[detection_id]
if hasattr(box, 'cpu'):
box = box.cpu()
if hasattr(box, 'numpy'):
box = box.numpy()
x1, y1, x2, y2 = box.tolist() if hasattr(box, 'tolist') else box
confidence = results[0].boxes.conf[detection_id]
if hasattr(confidence, 'item'):
confidence = confidence.item()
else:
confidence = float(confidence)
except Exception as e:
print(f"Error extracting box/confidence: {e}")
x1, y1, x2, y2 = 0, 0, 0, 0
confidence = 0.0
# Create detection result in compatible format
detection_result = {
"image_id": class_id,
"label": class_name,
"confidence": confidence,
"bbox": [x1, y1, x2, y2],
"original_image_path": dataset_image_path,
"marked_image_path": annotated_path,
"txt_file_path": txt_path
}
else:
# Handle case when no detection found
print("No valid detections found")
# Still save the image for record keeping
timestamp = int(time.time())
base_filename = f"no_detection_{timestamp}"
image_filename = f"{base_filename}.jpg"
dataset_image_path = os.path.join(IMAGES_FOLDER, image_filename)
shutil.copy2(file_path, dataset_image_path)
detection_result = {
"image_id": class_id,
"label": class_name or "unknown",
"confidence": 0.0,
"bbox": [0, 0, 0, 0],
"original_image_path": dataset_image_path,
"marked_image_path": dataset_image_path, # Use original as no annotation
"txt_file_path": None
}
endTime = datetime.now()
totalTime = (endTime - startTime).total_seconds()
print(f"Predicted ID: {class_id}")
print(f"Time taken for Predicting Image = {totalTime} s")
return class_id, detection_result
# HTML template for the frontend
HTML_TEMPLATE = """
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>YOLO Image Recognition System</title>
<style>
body {
font-family: Arial, sans-serif;
max-width: 1200px;
margin: 0 auto;
padding: 20px;
background-color: #f5f5f5;
}
.header {
text-align: center;
margin-bottom: 30px;
}
.container {
display: grid;
grid-template-columns: 1fr 1fr;
gap: 20px;
margin-bottom: 30px;
}
.card {
background-color: white;
padding: 20px;
border-radius: 10px;
box-shadow: 0 2px 10px rgba(0,0,0,0.1);
}
.images-display {
grid-column: span 2;
}
.images-grid {
display: grid;
grid-template-columns: repeat(4, 1fr);
gap: 15px;
margin-top: 15px;
}
.image-slot {
background-color: #f8f9fa;
border: 2px dashed #dee2e6;
border-radius: 8px;
padding: 10px;
text-align: center;
min-height: 200px;
display: flex;
flex-direction: column;
justify-content: center;
align-items: center;
}
.image-slot.has-image {
border: 2px solid #007bff;
border-style: solid;
background-color: white;
}
.image-slot img {
max-width: 100%;
max-height: 150px;
border-radius: 5px;
box-shadow: 0 2px 5px rgba(0,0,0,0.1);
}
.image-label {
margin-top: 8px;
font-size: 14px;
font-weight: bold;
color: #007bff;
}
.image-timestamp {
font-size: 12px;
color: #666;
margin-top: 5px;
}
.empty-slot {
color: #6c757d;
font-style: italic;
}
.status {
padding: 10px;
border-radius: 5px;
margin-bottom: 15px;
font-weight: bold;
}
.status.waiting {
background-color: #fff3cd;
color: #856404;
}
.status.updated {
background-color: #d4edda;
color: #155724;
}
.upload-section {
text-align: center;
}
.upload-form {
display: inline-block;
text-align: left;
}
.form-group {
margin-bottom: 15px;
}
.form-group label {
display: block;
margin-bottom: 5px;
font-weight: bold;
}
.form-group input, .form-group select {
width: 300px;
padding: 8px;
border: 1px solid #ddd;
border-radius: 4px;
}
.btn {
background-color: #007bff;
color: white;
border: none;
padding: 12px 24px;
border-radius: 5px;
font-size: 16px;
cursor: pointer;
transition: background-color 0.3s;
}
.btn:hover {
background-color: #0056b3;
}
.btn:disabled {
background-color: #6c757d;
cursor: not-allowed;
}
.dataset-info {
grid-column: span 2;
}
.stats-grid {
display: grid;
grid-template-columns: repeat(auto-fit, minmax(200px, 1fr));
gap: 15px;
margin-top: 15px;
}
.stat-card {
background-color: #f8f9fa;
padding: 15px;
border-radius: 5px;
text-align: center;
}
.stat-number {
font-size: 24px;
font-weight: bold;
color: #007bff;
}
.timestamp {
color: #666;
font-size: 12px;
margin-top: 10px;
}
.class-mapping {
max-height: 200px;
overflow-y: auto;
background-color: #f8f9fa;
padding: 10px;
border-radius: 5px;
font-family: monospace;
font-size: 12px;
}
</style>
</head>
<body>
<div class="header">
<h1>YOLO Image Recognition System</h1>
<p>Real-time image processing with dataset management</p>
<div style="margin-top: 15px;">
<a href="/" style="margin: 0 10px; padding: 8px 16px; background-color: #007bff; color: white; text-decoration: none; border-radius: 4px;">Main Interface</a>
<a href="/obstacles" style="margin: 0 10px; padding: 8px 16px; background-color: #28a745; color: white; text-decoration: none; border-radius: 4px;">Obstacles Display</a>
</div>
</div>
<div class="container">
<div class="card images-display">
<h3>Recent Recognition Results (Latest 4 Images)</h3>
<div id="status" class="status waiting">Waiting for recognition results...</div>
<div class="images-grid" id="imagesGrid">
<div class="image-slot" id="slot1">
<div class="empty-slot">Waiting for first image...</div>
</div>
<div class="image-slot" id="slot2">
<div class="empty-slot">Waiting for second image...</div>
</div>
<div class="image-slot" id="slot3">
<div class="empty-slot">Waiting for third image...</div>
</div>
<div class="image-slot" id="slot4">
<div class="empty-slot">Waiting for fourth image...</div>
</div>
</div>
</div>
<div class="card">
<h3>Upload Image for Recognition</h3>
<div class="upload-section">
<form id="uploadForm" class="upload-form" enctype="multipart/form-data">
<div class="form-group">
<label for="file">Select Image:</label>
<input type="file" id="file" name="file" accept="image/*" required>
</div>
<div class="form-group">
<label for="direction">Direction:</label>
<select id="direction" name="direction" required>
<option value="north">North</option>
<option value="south">South</option>
<option value="east">East</option>
<option value="west">West</option>
</select>
</div>
<div class="form-group">
<label for="task_type">Task Type:</label>
<select id="task_type" name="task_type" required>
<option value="TASK_1">Task 1</option>
<option value="TASK_2" selected>Task 2</option>
</select>
</div>
<div class="form-group">
<label for="num_obstacles">Obstacle Number (Optional):</label>
<select id="num_obstacles" name="NUM_OBSTACLES">
<option value="0">None</option>
<option value="1">Obstacle 1</option>
<option value="2">Obstacle 2</option>
<option value="3">Obstacle 3</option>
<option value="4">Obstacle 4</option>
<option value="5">Obstacle 5</option>
<option value="6">Obstacle 6</option>
<option value="7">Obstacle 7</option>
<option value="8">Obstacle 8</option>
</select>
</div>
<button type="submit" class="btn">Upload and Predict</button>
</form>
<div id="uploadResult" style="margin-top: 15px;"></div>
</div>
</div>
<div class="card dataset-info">
<h3>Dataset Information</h3>
<div class="stats-grid">
<div class="stat-card">
<div class="stat-number" id="totalImages">0</div>
<div>Total Images</div>
</div>
<div class="stat-card">
<div class="stat-number" id="totalClasses">0</div>
<div>Total Classes</div>
</div>
<div class="stat-card">
<div class="stat-number" id="annotatedImages">0</div>
<div>Annotated Images</div>
</div>
<div class="stat-card">
<button id="downloadBtn" class="btn" onclick="downloadDataset()">
Download Dataset
</button>
</div>
</div>
<div style="margin-top: 20px;">
<h4>Class Mapping:</h4>
<div id="classMapping" class="class-mapping">Loading...</div>
</div>
<div id="downloadInfo" style="margin-top: 10px; color: #666;"></div>
</div>
</div>
<script>
let imageHistory = [];
// Check for latest results
async function checkLatestImages() {
try {
const response = await fetch('/latest-images');
const data = await response.json();
if (data.success && data.images) {
updateImageDisplay(data.images);
}
} catch (error) {
console.error('Failed to check latest images:', error);
}
}
// Update image display with latest 4 images
function updateImageDisplay(images) {
const slots = ['slot1', 'slot2', 'slot3', 'slot4'];
// Update image history
imageHistory = images;
// Update status
const statusDiv = document.getElementById('status');
if (images.length > 0) {
statusDiv.className = 'status updated';
statusDiv.textContent = `Displaying ${images.length} recent recognition results`;
} else {
statusDiv.className = 'status waiting';
statusDiv.textContent = 'Waiting for recognition results...';
}
// Update each slot
slots.forEach((slotId, index) => {
const slot = document.getElementById(slotId);
if (index < images.length) {
const image = images[index];
slot.className = 'image-slot has-image';
slot.innerHTML = `
<img src="/annotated/${image.filename}?t=${new Date().getTime()}"
alt="Recognition Result ${index + 1}"
onerror="this.style.display='none'">
<div class="image-label">${image.label || 'Unknown'}</div>
<div class="image-timestamp">${new Date(image.timestamp * 1000).toLocaleString()}</div>
`;
} else {
slot.className = 'image-slot';
slot.innerHTML = `<div class="empty-slot">Waiting for ${getOrdinal(index + 1)} image...</div>`;
}
});
}
// Helper function to get ordinal numbers
function getOrdinal(num) {
const ordinals = ['first', 'second', 'third', 'fourth'];
return ordinals[num - 1] || `${num}th`;
}
// Update dataset statistics
async function updateDatasetStats() {
try {
const response = await fetch('/dataset-stats');
const data = await response.json();
document.getElementById('totalImages').textContent = data.total_images || 0;
document.getElementById('totalClasses').textContent = data.total_classes || 0;
document.getElementById('annotatedImages').textContent = data.annotated_images || 0;
// Update class mapping
const mappingDiv = document.getElementById('classMapping');
if (data.class_mapping) {
let mappingText = '';
for (const [id, name] of Object.entries(data.class_mapping)) {
mappingText += `${id}: ${name}\\n`;
}
mappingDiv.textContent = mappingText || 'No classes defined yet';
} else {
mappingDiv.textContent = 'No classes defined yet';
}
} catch (error) {
console.error('Failed to update dataset stats:', error);
}
}
// Handle file upload
document.getElementById('uploadForm').addEventListener('submit', async function(e) {
e.preventDefault();
const formData = new FormData(this);
const uploadResult = document.getElementById('uploadResult');
const submitBtn = this.querySelector('button[type="submit"]');
submitBtn.disabled = true;
submitBtn.textContent = 'Processing...';
uploadResult.innerHTML = '<div style="color: #007bff;">Processing image...</div>';
try {
const response = await fetch('/image', {
method: 'POST',
body: formData
});
const result = await response.json();
if (response.ok) {
// Handle both legacy format (predicted_id) and new format (result.image_id)
const predictedId = result.predicted_id || result.image_id || result.result?.image_id;
const obstacleId = result.obstacle_id || result.result?.obstacle_id;
const numObstacles = result.num_obstacles;
let successMessage = `
<div style="color: #28a745;">
<strong>Success!</strong><br>
Recognition completed!
`;
if (numObstacles && numObstacles !== '0') {
if (predictedId === '-999') {
successMessage += `<br><span style="color: #ffc107;">⚠️ Obstacle ${numObstacles} not saved - No valid detection</span>`;
} else {
successMessage += `<br><span style="color: #28a745;">✓ Saved as Obstacle ${numObstacles}</span>`;
successMessage += `<br><a href="/obstacles" target="_blank" style="color: #007bff; text-decoration: underline;">View Obstacles Display</a>`;
}
}
successMessage += `</div>`;
uploadResult.innerHTML = successMessage;
// Refresh dataset stats and images immediately after upload
updateDatasetStats();
setTimeout(checkLatestImages, 500); // Small delay to ensure file is saved
} else {
uploadResult.innerHTML = `<div style="color: #dc3545;">Error: ${result.error}</div>`;
}
} catch (error) {
uploadResult.innerHTML = `<div style="color: #dc3545;">Error: ${error.message}</div>`;
} finally {
submitBtn.disabled = false;
submitBtn.textContent = 'Upload and Predict';
}
});
// Download dataset
async function downloadDataset() {
const downloadBtn = document.getElementById('downloadBtn');
const downloadInfo = document.getElementById('downloadInfo');
try {
downloadBtn.disabled = true;
downloadBtn.textContent = 'Preparing...';
downloadInfo.textContent = 'Creating ZIP file, please wait...';
downloadInfo.style.color = '#007bff';
const response = await fetch('/download-dataset');
if (response.ok) {
const blob = await response.blob();
const url = window.URL.createObjectURL(blob);
const a = document.createElement('a');
a.href = url;
a.download = `yolo_dataset_${new Date().toISOString().slice(0,10)}.zip`;
document.body.appendChild(a);
a.click();
window.URL.revokeObjectURL(url);
document.body.removeChild(a);
downloadInfo.textContent = 'Dataset downloaded successfully!';
downloadInfo.style.color = '#28a745';
} else {
const errorData = await response.json();
downloadInfo.textContent = 'Error: ' + (errorData.message || 'Failed to download');
downloadInfo.style.color = '#dc3545';
}
} catch (error) {
downloadInfo.textContent = 'Error: ' + error.message;
downloadInfo.style.color = '#dc3545';
} finally {
downloadBtn.disabled = false;
downloadBtn.textContent = 'Download Dataset';
setTimeout(() => { downloadInfo.textContent = ''; }, 5000);
}
}
// Initialize
checkLatestImages();
updateDatasetStats();
// Auto-refresh every 5 seconds
setInterval(checkLatestImages, 5000);
setInterval(updateDatasetStats, 15000);
</script>
</body>
</html>
"""
# Obstacles display HTML template
OBSTACLES_HTML_TEMPLATE = """
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Obstacles Display - YOLO Image Recognition</title>
<style>
body {
font-family: Arial, sans-serif;
max-width: 1400px;
margin: 0 auto;
padding: 20px;
background-color: #f5f5f5;
}
.header {
text-align: center;
margin-bottom: 30px;
}
.nav {
text-align: center;
margin-bottom: 20px;
}
.nav a {
display: inline-block;
padding: 10px 20px;
margin: 0 10px;
background-color: #007bff;
color: white;
text-decoration: none;
border-radius: 5px;
transition: background-color 0.3s;
}
.nav a:hover {
background-color: #0056b3;
}
.obstacles-grid {
display: grid;
grid-template-columns: repeat(4, 1fr);
gap: 20px;
margin-bottom: 30px;
}
.obstacle-card {
background-color: white;
border-radius: 10px;
box-shadow: 0 2px 10px rgba(0,0,0,0.1);
overflow: hidden;
position: relative;
}
.obstacle-header {
background-color: #007bff;
color: white;
padding: 10px;
text-align: center;
font-weight: bold;
}
.obstacle-content {
padding: 15px;
text-align: center;
min-height: 280px;
display: flex;
flex-direction: column;
justify-content: center;
align-items: center;
}
.obstacle-image {
max-width: 100%;
max-height: 150px;
border-radius: 5px;
box-shadow: 0 2px 5px rgba(0,0,0,0.1);
margin-bottom: 10px;
}
.obstacle-placeholder {
color: #666;
font-style: italic;
padding: 40px 20px;
}
.obstacle-info {
margin-top: 10px;
font-size: 12px;
color: #666;
}
.label-display {
width: 100%;
margin-top: 10px;
padding: 8px;
background-color: #f8f9fa;
border: 1px solid #dee2e6;
border-radius: 4px;
font-size: 14px;
min-height: 60px;
}
.label-row {
display: flex;
justify-content: space-between;
align-items: center;
margin-bottom: 5px;
padding: 2px 0;
}
.label-key {
font-weight: bold;
color: #495057;
margin-right: 8px;
}
.label-value {
color: #007bff;
font-family: monospace;
background-color: #e9ecef;
padding: 2px 6px;
border-radius: 3px;
font-size: 13px;
}
.no-detection {
color: #6c757d;
font-style: italic;
}
.refresh-btn {
position: fixed;
top: 20px;
right: 20px;
background-color: #28a745;
color: white;
border: none;
padding: 10px 20px;
border-radius: 5px;
cursor: pointer;
font-size: 14px;
}
.refresh-btn:hover {
background-color: #1e7e34;
}
.status {
text-align: center;
margin-bottom: 20px;
padding: 10px;
background-color: #d4edda;
color: #155724;
border-radius: 5px;
}
</style>
</head>
<body>
<button class="refresh-btn" onclick="refreshObstacles()">Refresh</button>
<div class="header">
<h1>Obstacles Display</h1>
<p>Real-time display of 8 obstacle recognition results</p>
</div>
<div class="nav">
<a href="/">Main Interface</a>
<a href="/obstacles">Obstacles Display</a>
</div>
<div class="status" id="status">
Loading obstacles data...
</div>
<div class="obstacles-grid" id="obstaclesGrid">
<!-- Obstacles will be populated here -->
</div>
<script>
let obstaclesData = {};
async function loadObstacles() {
try {
const response = await fetch('/obstacles-data');
const data = await response.json();
obstaclesData = data;
renderObstacles();
updateStatus();
} catch (error) {
console.error('Error loading obstacles:', error);
document.getElementById('status').textContent = 'Error loading obstacles data';
document.getElementById('status').style.backgroundColor = '#f8d7da';
document.getElementById('status').style.color = '#721c24';
}
}
function renderObstacles() {
const grid = document.getElementById('obstaclesGrid');
grid.innerHTML = '';
for (let i = 1; i <= 8; i++) {
const obstacle = obstaclesData[i.toString()];
const card = document.createElement('div');
card.className = 'obstacle-card';
const header = document.createElement('div');
header.className = 'obstacle-header';
header.textContent = `Obstacle ${i}`;
const content = document.createElement('div');
content.className = 'obstacle-content';
if (obstacle && obstacle.exists) {
const img = document.createElement('img');
img.className = 'obstacle-image';
img.src = `/obstacles-images/${obstacle.filename}?t=${new Date().getTime()}`;
img.alt = `Obstacle ${i}`;
img.onerror = function() {
this.style.display = 'none';
content.innerHTML = '<div class="obstacle-placeholder">Image failed to load</div>';
};
// Create label display box
const labelBox = document.createElement('div');
labelBox.className = 'label-display';
if (obstacle.label && obstacle.image_id) {
labelBox.innerHTML = `
<div class="label-row">
<span class="label-key">Label:</span>
<span class="label-value">${obstacle.label}</span>
</div>
<div class="label-row">
<span class="label-key">ID:</span>
<span class="label-value">${obstacle.image_id}</span>
</div>
<div class="label-row">
<span class="label-key">Confidence:</span>
<span class="label-value">${(obstacle.confidence * 100).toFixed(1)}%</span>
</div>
`;
} else {
labelBox.innerHTML = '<div class="no-detection">No detection data available</div>';
}
const info = document.createElement('div');
info.className = 'obstacle-info';
info.textContent = `Last updated: ${new Date(obstacle.timestamp * 1000).toLocaleString()}`;
content.appendChild(img);
content.appendChild(labelBox);
content.appendChild(info);
} else {
const placeholder = document.createElement('div');
placeholder.className = 'obstacle-placeholder';
placeholder.textContent = 'No image available';
const labelBox = document.createElement('div');
labelBox.className = 'label-display';
labelBox.innerHTML = '<div class="no-detection">Waiting for detection...</div>';
content.appendChild(placeholder);
content.appendChild(labelBox);
}
card.appendChild(header);
card.appendChild(content);
grid.appendChild(card);
}
}
function updateStatus() {
const existingCount = Object.values(obstaclesData).filter(o => o && o.exists).length;
const statusEl = document.getElementById('status');
if (existingCount === 0) {
statusEl.style.backgroundColor = '#fff3cd';
statusEl.style.color = '#856404';
statusEl.textContent = 'No obstacles detected yet. Upload images with NUM_OBSTACLES parameter (1-8) and valid detections (ID ≠ -999).';
} else {
statusEl.style.backgroundColor = '#d4edda';
statusEl.style.color = '#155724';
statusEl.textContent = `Displaying ${existingCount}/8 obstacles with valid detections`;
}
}
function refreshObstacles() {
loadObstacles();
}
// Initialize
loadObstacles();
// Auto-refresh every 5 seconds
setInterval(loadObstacles, 5000);
</script>
</body>
</html>
"""
# Routes
@app.route('/')
def index():
"""Home page with web interface"""
return render_template_string(HTML_TEMPLATE)
@app.route('/status', methods=['GET'])
def server_status():
"""Health check endpoint"""
return jsonify({'status': 'OK'})
@app.route('/upload', methods=['POST'])
def upload_file():
"""Handle file upload and prediction (legacy endpoint - redirects to /image)"""
# Redirect to /image endpoint to avoid code duplication
return image_predict()
@app.route('/image', methods=['POST'])
def image_predict():
"""
This is the main endpoint for the image prediction algorithm
:return: a json object with a key "result" and value a dictionary with keys "obstacle_id" and "image_id"
"""
if 'file' not in request.files:
return jsonify({'error': 'No file part'}), 400
file = request.files['file']
filename = file.filename
if filename == '':
return jsonify({'error': 'No selected file'}), 400
# Save to uploads folder first
file_path = os.path.join(app.config['UPLOAD_FOLDER'], filename)
file.save(file_path)
# Get parameters from both old and new format
direction = request.form.get('direction', 'north')
task_type = request.form.get('task_type', 'TASK_2')
num_obstacles = request.form.get('NUM_OBSTACLES', '0') # Support NUM_OBSTACLES parameter
# Try to parse filename format: "<timestamp>_<obstacle_id>_<signal>.jpeg"
# But be flexible with different formats
constituents = file.filename.split("_")
# Default values
obstacle_id = "unknown"
signal = direction # Use direction parameter as signal
# Try to extract obstacle_id and signal if available
try:
if len(constituents) >= 2:
obstacle_id = constituents[1]
if len(constituents) >= 3:
# Remove file extension from signal
signal_part = constituents[2]
# Handle both .jpg and .png extensions
for ext in ['.jpg', '.jpeg', '.png', '.JPG', '.JPEG', '.PNG']:
if signal_part.endswith(ext):
signal = signal_part[:-len(ext)]
break
else:
signal = signal_part
except IndexError:
# Use default values if parsing fails
pass
# Check for optional preference parameter
prefer_close = request.form.get('prefer_close_objects', 'true').lower() == 'true'
# Process the file and predict
class_id, detection_result = process_file(file_path, signal, task_type, filename)
# Handle NUM_OBSTACLES parameter for obstacle display
# Only save if detection is valid (image_id != '-999')
if (num_obstacles and num_obstacles.isdigit() and 1 <= int(num_obstacles) <= 8
and detection_result and detection_result.get('image_id') != '-999'):
save_obstacle_image(detection_result, int(num_obstacles))
print(f"Obstacle {num_obstacles} saved with valid detection (ID: {detection_result.get('image_id')})")
elif (num_obstacles and num_obstacles.isdigit() and 1 <= int(num_obstacles) <= 8
and detection_result and detection_result.get('image_id') == '-999'):
print(f"Obstacle {num_obstacles} NOT saved - invalid detection (ID: -999)")
elif num_obstacles and num_obstacles.isdigit() and 1 <= int(num_obstacles) <= 8:
print(f"Obstacle {num_obstacles} NOT saved - no detection result")
if detection_result is None:
return jsonify({'error': 'Failed to process image'}), 500
# Extract image_id from detection result
image_id = detection_result["image_id"]
print(f"Original image saved to: {detection_result.get('original_image_path', 'N/A')}")
print(f"Annotated image saved to: {detection_result['marked_image_path']}")
print(f"YOLO txt file saved to: {detection_result.get('txt_file_path', 'N/A')}")
# Determine response format based on request context
# If it's from the web interface (has direction/task_type), return simple format
if direction != 'north' or task_type != 'TASK_1' or num_obstacles != '0':
# Web interface format
return jsonify({
'message': 'File successfully uploaded and processed',
'predicted_id': image_id,
'direction': direction,
'task_type': task_type,
'num_obstacles': num_obstacles
}), 200
else:
# Original API format
result = {
"obstacle_id": obstacle_id,
"image_id": image_id,
"detection": {
"label": detection_result["label"],
"confidence": detection_result["confidence"],
"bbox_coordinates": detection_result["bbox"],
"original_image_path": detection_result.get("original_image_path"),
"annotated_image_path": detection_result["marked_image_path"],
"txt_file_path": detection_result.get("txt_file_path")
}
}
return jsonify(result)
@app.route('/latest-result')
def get_latest_result():
"""Get the latest annotated image (legacy endpoint)"""
if not os.path.exists(ANNOTATED_FOLDER):
return jsonify({"success": False, "message": "Annotated folder not found"})
# Get all annotated images
annotated_files = []
for filename in os.listdir(ANNOTATED_FOLDER):
if filename.startswith('annotated_') and filename.lower().endswith(('.png', '.jpg', '.jpeg')):
filepath = os.path.join(ANNOTATED_FOLDER, filename)
mtime = os.path.getmtime(filepath)
annotated_files.append((filename, mtime))
if not annotated_files:
return jsonify({"success": False, "message": "No annotated images found"})
# Sort by modification time, get latest
annotated_files.sort(key=lambda x: x[1], reverse=True)
latest_file = annotated_files[0][0]
return jsonify({
"success": True,
"image_path": latest_file,
"timestamp": annotated_files[0][1]
})
@app.route('/latest-images')
def get_latest_images():
"""Get the latest 4 annotated images with labels"""
if not os.path.exists(ANNOTATED_FOLDER):
return jsonify({"success": False, "message": "Annotated folder not found", "images": []})
# Get all annotated images with their info
annotated_files = []
for filename in os.listdir(ANNOTATED_FOLDER):
if filename.startswith('annotated_') and filename.lower().endswith(('.png', '.jpg', '.jpeg')):
filepath = os.path.join(ANNOTATED_FOLDER, filename)
mtime = os.path.getmtime(filepath)
# Try to extract label from filename
# Filename format: annotated_<class_name>_<timestamp>.jpg
label = 'Unknown'
try:
# Remove 'annotated_' prefix and file extension
base_name = filename[10:] # Remove 'annotated_'
if '.' in base_name:
base_name = base_name[:base_name.rfind('.')]
# Split by underscore and take first part as class name
parts = base_name.split('_')
if len(parts) >= 2:
label = parts[0] # Class name should be first part
elif len(parts) == 1:
label = parts[0]
except:
label = 'Unknown'
annotated_files.append({
'filename': filename,
'timestamp': mtime,
'label': label
})
# Sort by modification time, get latest 4
annotated_files.sort(key=lambda x: x['timestamp'], reverse=True)
latest_images = annotated_files[:4]
return jsonify({
"success": True,
"images": latest_images
})
@app.route('/annotated/<filename>')
def serve_annotated_image(filename):
"""Serve annotated images"""
return send_from_directory(ANNOTATED_FOLDER, filename)
@app.route('/dataset-stats')
def get_dataset_stats():
"""Get dataset statistics"""
stats = {
'total_images': 0,
'total_classes': 0,
'annotated_images': 0,
'class_mapping': {}
}
# Count images
if os.path.exists(IMAGES_FOLDER):
stats['total_images'] = len([f for f in os.listdir(IMAGES_FOLDER) if f.lower().endswith(('.png', '.jpg', '.jpeg'))])
# Count annotated images
if os.path.exists(ANNOTATED_FOLDER):
stats['annotated_images'] = len([f for f in os.listdir(ANNOTATED_FOLDER) if f.lower().endswith(('.png', '.jpg', '.jpeg'))])
# Load class mapping
stats['class_mapping'] = load_class_mapping()
stats['total_classes'] = len(stats['class_mapping'])
return jsonify(stats)
@app.route('/download-dataset')
def download_dataset():
"""Download the complete YOLO dataset as ZIP"""
if not os.path.exists(DATASET_FOLDER):
return jsonify({"success": False, "message": "Dataset folder not found"}), 404
# Check if there are files to download
has_files = False
for folder in [IMAGES_FOLDER, LABELS_FOLDER, ANNOTATED_FOLDER]:
if os.path.exists(folder) and os.listdir(folder):
has_files = True
break
if not has_files:
return jsonify({"success": False, "message": "No files found in dataset"}), 404
# Create timestamp for filename
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
zip_filename = f"yolo_dataset_{timestamp}.zip"
zip_path = os.path.join(UPLOAD_FOLDER, zip_filename)
try:
with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
# Add all files from dataset structure
for root, dirs, files in os.walk(DATASET_FOLDER):
for file in files:
file_path = os.path.join(root, file)
arcname = os.path.relpath(file_path, DATASET_FOLDER)
zipf.write(file_path, arcname)
return send_file(zip_path, as_attachment=True, download_name=zip_filename)
except Exception as e:
return jsonify({"success": False, "message": f"Error creating zip: {str(e)}"}), 500
finally:
# Clean up temporary file
try:
if os.path.exists(zip_path):
os.remove(zip_path)
except:
pass
@app.route('/obstacles')
def obstacles_display():
"""Obstacles display page"""
return render_template_string(OBSTACLES_HTML_TEMPLATE)
@app.route('/obstacles-data')
def get_obstacles_data():
"""Get obstacles images data with labels and IDs"""
obstacles_folder = os.path.join(PC_CONFIG.BASE_DIR, "obstacles_display")
obstacles = {}
if os.path.exists(obstacles_folder):
for i in range(1, 9): # 1 to 8
filename = f"obstacle_{i}.jpg"
filepath = os.path.join(obstacles_folder, filename)
if os.path.exists(filepath):
mtime = os.path.getmtime(filepath)
# Try to find corresponding detection info
detection_info = get_obstacle_detection_info(i)
obstacles[str(i)] = {
'filename': filename,
'timestamp': mtime,
'exists': True,
'label': detection_info.get('label', 'Unknown'),
'image_id': detection_info.get('image_id', 'N/A'),
'confidence': detection_info.get('confidence', 0.0)
}
else:
obstacles[str(i)] = {
'filename': None,
'timestamp': None,
'exists': False,
'label': None,
'image_id': None,
'confidence': None
}
return jsonify(obstacles)
@app.route('/obstacles-images/<filename>')
def serve_obstacle_image(filename):
"""Serve obstacle images"""
obstacles_folder = os.path.join(PC_CONFIG.BASE_DIR, "obstacles_display")
return send_from_directory(obstacles_folder, filename)
@app.route('/display_stitched', methods=['POST'])
def display_stitched():
"""Display stitched images"""
try:
showAnnotatedStitched()
return jsonify({'display_stitched': 'OK'})
except Exception as e:
return jsonify({'error': str(e)}), 500
if __name__ == '__main__':
print()
print(f"UPLOAD FOLDER: {UPLOAD_FOLDER}")
print(f"DATASET FOLDER: {DATASET_FOLDER}")
print(f"Starting Enhanced Image Recognition Server...")
print(f"Web interface available at: http://{HOST}:{PORT}")
print(f"Obstacles display available at: http://{HOST}:{PORT}/obstacles")
print(f"API endpoints: /image (unified endpoint), /upload (legacy redirect)")
try:
app.run(host=HOST, port=PORT, debug=False)
except:
print('Unable to connect to configured host and port. Switching to localhost:7860.')
print(f"Web interface: http://localhost:7860")
print(f"Obstacles display: http://localhost:7860/obstacles")
app.run(host='0.0.0.0', port=7860, debug=True)