|
|
|
|
|
""" |
|
|
Optimized ONNX Inference for Raspberry Pi |
|
|
High-performance inference with ONNX Runtime optimizations |
|
|
""" |
|
|
|
|
|
import os |
|
|
import cv2 |
|
|
import numpy as np |
|
|
import onnxruntime as ort |
|
|
import time |
|
|
from pathlib import Path |
|
|
from typing import Tuple, List, Optional |
|
|
|
|
|
class OptimizedONNXInference: |
|
|
""" |
|
|
Optimized ONNX inference engine for Raspberry Pi |
|
|
""" |
|
|
|
|
|
def __init__(self, model_path: str, conf_threshold: float = 0.5): |
|
|
""" |
|
|
Initialize optimized ONNX inference engine |
|
|
|
|
|
Args: |
|
|
model_path: Path to ONNX model |
|
|
conf_threshold: Confidence threshold for detections |
|
|
""" |
|
|
self.conf_threshold = conf_threshold |
|
|
self.model_path = model_path |
|
|
self.session = self._create_optimized_session() |
|
|
self.input_name = self.session.get_inputs()[0].name |
|
|
self.input_shape = self.session.get_inputs()[0].shape |
|
|
|
|
|
|
|
|
self.input_height = self.input_shape[2] |
|
|
self.input_width = self.input_shape[3] |
|
|
|
|
|
print(f"β
Optimized ONNX model loaded: {model_path}") |
|
|
print(f"π Input shape: {self.input_shape}") |
|
|
print(f"π― Confidence threshold: {conf_threshold}") |
|
|
|
|
|
def _create_optimized_session(self) -> ort.InferenceSession: |
|
|
""" |
|
|
Create ONNX session with Raspberry Pi optimizations |
|
|
""" |
|
|
|
|
|
os.environ["OMP_NUM_THREADS"] = "4" |
|
|
os.environ["OMP_THREAD_LIMIT"] = "4" |
|
|
os.environ["OMP_WAIT_POLICY"] = "PASSIVE" |
|
|
os.environ["MKL_NUM_THREADS"] = "4" |
|
|
|
|
|
|
|
|
session_options = ort.SessionOptions() |
|
|
|
|
|
|
|
|
session_options.graph_optimization_level = ort.GraphOptimizationLevel.ORT_ENABLE_ALL |
|
|
|
|
|
|
|
|
session_options.execution_mode = ort.ExecutionMode.ORT_SEQUENTIAL |
|
|
|
|
|
|
|
|
session_options.intra_op_num_threads = 4 |
|
|
session_options.inter_op_num_threads = 1 |
|
|
|
|
|
|
|
|
session_options.enable_mem_pattern = True |
|
|
session_options.enable_mem_reuse = True |
|
|
|
|
|
|
|
|
providers = ['CPUExecutionProvider'] |
|
|
|
|
|
try: |
|
|
session = ort.InferenceSession( |
|
|
self.model_path, |
|
|
sess_options=session_options, |
|
|
providers=providers |
|
|
) |
|
|
return session |
|
|
except Exception as e: |
|
|
print(f"β Failed to create optimized session: {e}") |
|
|
|
|
|
return ort.InferenceSession(self.model_path, providers=providers) |
|
|
|
|
|
def preprocess(self, image: np.ndarray) -> np.ndarray: |
|
|
""" |
|
|
Optimized preprocessing for Raspberry Pi |
|
|
|
|
|
Args: |
|
|
image: Input image (BGR format) |
|
|
|
|
|
Returns: |
|
|
Preprocessed tensor |
|
|
""" |
|
|
|
|
|
if len(image.shape) == 3: |
|
|
image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) |
|
|
|
|
|
|
|
|
image = cv2.resize(image, (self.input_width, self.input_height), |
|
|
interpolation=cv2.INTER_LINEAR) |
|
|
|
|
|
|
|
|
image = image.astype(np.float32) / 255.0 |
|
|
|
|
|
|
|
|
image = np.transpose(image, (2, 0, 1)) |
|
|
|
|
|
|
|
|
image = np.expand_dims(image, axis=0) |
|
|
|
|
|
return image |
|
|
|
|
|
def postprocess(self, outputs: np.ndarray) -> List[dict]: |
|
|
""" |
|
|
Post-process YOLOv8 outputs |
|
|
|
|
|
Args: |
|
|
outputs: Raw model outputs |
|
|
|
|
|
Returns: |
|
|
List of detections |
|
|
""" |
|
|
detections = [] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
outputs = outputs[0] |
|
|
|
|
|
|
|
|
conf_mask = outputs[4] > self.conf_threshold |
|
|
filtered_outputs = outputs[:, conf_mask] |
|
|
|
|
|
if filtered_outputs.shape[1] == 0: |
|
|
return detections |
|
|
|
|
|
|
|
|
boxes = filtered_outputs[:4].T |
|
|
scores = filtered_outputs[4] |
|
|
|
|
|
|
|
|
x, y, w, h = boxes[:, 0], boxes[:, 1], boxes[:, 2], boxes[:, 3] |
|
|
x1 = x - w / 2 |
|
|
y1 = y - h / 2 |
|
|
x2 = x + w / 2 |
|
|
y2 = y + h / 2 |
|
|
|
|
|
|
|
|
x1 = np.clip(x1, 0, self.input_width) |
|
|
y1 = np.clip(y1, 0, self.input_height) |
|
|
x2 = np.clip(x2, 0, self.input_width) |
|
|
y2 = np.clip(y2, 0, self.input_height) |
|
|
|
|
|
|
|
|
for i in range(len(scores)): |
|
|
detection = { |
|
|
'bbox': [float(x1[i]), float(y1[i]), float(x2[i]), float(y2[i])], |
|
|
'confidence': float(scores[i]), |
|
|
'class': 0, |
|
|
'class_name': 'strawberry' |
|
|
} |
|
|
detections.append(detection) |
|
|
|
|
|
return detections |
|
|
|
|
|
def predict(self, image: np.ndarray) -> Tuple[List[dict], float]: |
|
|
""" |
|
|
Run optimized inference |
|
|
|
|
|
Args: |
|
|
image: Input image |
|
|
|
|
|
Returns: |
|
|
Tuple of (detections, inference_time) |
|
|
""" |
|
|
|
|
|
input_tensor = self.preprocess(image) |
|
|
|
|
|
|
|
|
start_time = time.perf_counter() |
|
|
outputs = self.session.run(None, {self.input_name: input_tensor}) |
|
|
inference_time = time.perf_counter() - start_time |
|
|
|
|
|
|
|
|
detections = self.postprocess(outputs) |
|
|
|
|
|
return detections, inference_time |
|
|
|
|
|
def predict_batch(self, images: List[np.ndarray]) -> Tuple[List[List[dict]], float]: |
|
|
""" |
|
|
Run batch inference for multiple images |
|
|
|
|
|
Args: |
|
|
images: List of input images |
|
|
|
|
|
Returns: |
|
|
Tuple of (list_of_detections, total_inference_time) |
|
|
""" |
|
|
if not images: |
|
|
return [], 0.0 |
|
|
|
|
|
|
|
|
input_tensors = [self.preprocess(img) for img in images] |
|
|
batch_tensor = np.concatenate(input_tensors, axis=0) |
|
|
|
|
|
|
|
|
start_time = time.perf_counter() |
|
|
outputs = self.session.run(None, {self.input_name: batch_tensor}) |
|
|
inference_time = time.perf_counter() - start_time |
|
|
|
|
|
|
|
|
all_detections = [] |
|
|
for i in range(len(images)): |
|
|
single_output = outputs[0][i:i+1] |
|
|
detections = self.postprocess([single_output]) |
|
|
all_detections.append(detections) |
|
|
|
|
|
return all_detections, inference_time |
|
|
|
|
|
def benchmark_model(model_path: str, test_image_path: str, runs: int = 10) -> dict: |
|
|
""" |
|
|
Benchmark model performance |
|
|
|
|
|
Args: |
|
|
model_path: Path to ONNX model |
|
|
test_image_path: Path to test image |
|
|
runs: Number of benchmark runs |
|
|
|
|
|
Returns: |
|
|
Benchmark results dictionary |
|
|
""" |
|
|
|
|
|
model = OptimizedONNXInference(model_path) |
|
|
|
|
|
|
|
|
test_image = cv2.imread(test_image_path) |
|
|
if test_image is None: |
|
|
raise ValueError(f"Could not load test image: {test_image_path}") |
|
|
|
|
|
|
|
|
_ = model.predict(test_image) |
|
|
|
|
|
|
|
|
times = [] |
|
|
for _ in range(runs): |
|
|
_, inference_time = model.predict(test_image) |
|
|
times.append(inference_time * 1000) |
|
|
|
|
|
|
|
|
times_array = np.array(times) |
|
|
results = { |
|
|
'mean_ms': float(np.mean(times_array)), |
|
|
'median_ms': float(np.median(times_array)), |
|
|
'std_ms': float(np.std(times_array)), |
|
|
'min_ms': float(np.min(times_array)), |
|
|
'max_ms': float(np.max(times_array)), |
|
|
'fps': float(1000 / np.mean(times_array)), |
|
|
'runs': runs |
|
|
} |
|
|
|
|
|
return results |
|
|
|
|
|
if __name__ == "__main__": |
|
|
|
|
|
model_path = "model/detection/yolov8n/best_416.onnx" |
|
|
test_image = "test_detection_result.jpg" |
|
|
|
|
|
if os.path.exists(model_path) and os.path.exists(test_image): |
|
|
print("π Testing Optimized ONNX Inference") |
|
|
print("=" * 50) |
|
|
|
|
|
|
|
|
model = OptimizedONNXInference(model_path) |
|
|
|
|
|
|
|
|
image = cv2.imread(test_image) |
|
|
detections, inference_time = model.predict(image) |
|
|
|
|
|
print(".2f" print(f"π Detections found: {len(detections)}") |
|
|
|
|
|
|
|
|
print("\nπ Running benchmark (10 runs)...") |
|
|
results = benchmark_model(model_path, test_image, runs=10) |
|
|
|
|
|
print("π Benchmark Results:" print(".2f" print(".2f" print(".2f" print(".2f" print(".2f" print(".1f" |
|
|
print("\nβ
Optimized inference test complete!") |
|
|
else: |
|
|
print("β Model or test image not found") |
|
|
print(f"Model: {model_path} - {'β
' if os.path.exists(model_path) else 'β'}") |
|
|
print(f"Image: {test_image} - {'β
' if os.path.exists(test_image) else 'β'}") |