| |
| """ |
| Package trained model for Raspberry Pi deployment |
| Creates a ready-to-deploy package with all necessary files |
| """ |
|
|
| import os |
| import sys |
| import shutil |
| import json |
| from pathlib import Path |
| from datetime import datetime |
|
|
| def find_latest_training_run(project_dir: Path) -> Path: |
| """Find the most recent training run directory""" |
| runs = list(project_dir.glob("train_*")) |
| if not runs: |
| return None |
| return max(runs, key=lambda p: p.stat().st_mtime) |
|
|
|
|
| def create_deployment_package( |
| model_dir: Path = None, |
| output_dir: Path = None, |
| model_name: str = "road_anomaly_detector" |
| ): |
| """Create deployment package for Raspberry Pi""" |
| |
| print("=" * 60) |
| print("CREATING RASPBERRY PI DEPLOYMENT PACKAGE") |
| print("=" * 60) |
| |
| |
| if model_dir is None: |
| project_dir = Path("road-anomaly-detection") |
| model_dir = find_latest_training_run(project_dir) |
| |
| if model_dir is None: |
| print("Error: No training runs found in road-anomaly-detection/") |
| print("Run train_road_anomaly_model.py first.") |
| sys.exit(1) |
| |
| print(f"\nUsing model from: {model_dir}") |
| |
| |
| if output_dir is None: |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") |
| output_dir = Path(f"rpi_deployment_{timestamp}") |
| |
| output_dir.mkdir(parents=True, exist_ok=True) |
| |
| |
| tflite_files = list(model_dir.glob("**/*.tflite")) |
| if not tflite_files: |
| print("Error: No TFLite model found. Run export first.") |
| sys.exit(1) |
| |
| tflite_model = tflite_files[0] |
| print(f"Found TFLite model: {tflite_model}") |
| |
| |
| model_dest = output_dir / f"{model_name}.tflite" |
| shutil.copy(tflite_model, model_dest) |
| print(f"✓ Copied model to: {model_dest}") |
| |
| |
| data_yaml = Path("road-anomaly-detection") / "data.yaml" |
| if data_yaml.exists(): |
| shutil.copy(data_yaml, output_dir / "data.yaml") |
| print(f"✓ Copied data.yaml") |
| |
| |
| rpi_inference_script = '''#!/usr/bin/env python3 |
| """ |
| Road Anomaly Detection - Raspberry Pi Inference |
| Optimized for Raspberry Pi 4 with TFLite |
| """ |
| |
| import time |
| import argparse |
| from pathlib import Path |
| |
| import cv2 |
| import numpy as np |
| |
| # Use tflite_runtime for lighter footprint on RPi |
| try: |
| import tflite_runtime.interpreter as tflite |
| except ImportError: |
| import tensorflow as tf |
| tflite = tf.lite |
| |
| |
| # Colors for each class |
| COLORS = { |
| 0: (0, 0, 255), # Red - pothole |
| 1: (0, 165, 255), # Orange - crack |
| 2: (0, 255, 255), # Yellow - bump |
| 3: (255, 0, 0), # Blue - obstacle |
| 4: (255, 0, 255), # Magenta - road_damage |
| } |
| |
| CLASS_NAMES = ["pothole", "crack", "bump", "obstacle", "road_damage"] |
| |
| |
| class RoadAnomalyDetector: |
| def __init__(self, model_path: str, conf_thresh: float = 0.25): |
| self.conf_thresh = conf_thresh |
| |
| # Load TFLite model |
| self.interpreter = tflite.Interpreter(model_path=model_path) |
| self.interpreter.allocate_tensors() |
| |
| # Get I/O details |
| self.input_details = self.interpreter.get_input_details() |
| self.output_details = self.interpreter.get_output_details() |
| |
| # Input shape |
| self.input_shape = self.input_details[0]['shape'] |
| self.input_height = self.input_shape[1] |
| self.input_width = self.input_shape[2] |
| |
| print(f"Model loaded: {model_path}") |
| print(f"Input: {self.input_width}x{self.input_height}") |
| |
| def preprocess(self, image): |
| """Prepare image for inference""" |
| img = cv2.resize(image, (self.input_width, self.input_height)) |
| img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB) |
| img = img.astype(np.float32) / 255.0 |
| img = np.expand_dims(img, axis=0) |
| return img |
| |
| def detect(self, image): |
| """Run detection and return list of detections""" |
| orig_h, orig_w = image.shape[:2] |
| |
| # Preprocess |
| input_data = self.preprocess(image) |
| |
| # Inference |
| self.interpreter.set_tensor(self.input_details[0]['index'], input_data) |
| self.interpreter.invoke() |
| output = self.interpreter.get_tensor(self.output_details[0]['index']) |
| |
| # Parse detections |
| detections = [] |
| |
| if output.shape[1] < output.shape[2]: |
| output = output.transpose(0, 2, 1) |
| output = output[0] |
| |
| for det in output: |
| x_c, y_c, w, h = det[:4] |
| scores = det[4:] |
| cls_id = np.argmax(scores) |
| conf = scores[cls_id] |
| |
| if conf < self.conf_thresh: |
| continue |
| |
| x1 = int((x_c - w/2) * orig_w) |
| y1 = int((y_c - h/2) * orig_h) |
| x2 = int((x_c + w/2) * orig_w) |
| y2 = int((y_c + h/2) * orig_h) |
| |
| detections.append({ |
| "bbox": [x1, y1, x2, y2], |
| "class_id": int(cls_id), |
| "confidence": float(conf) |
| }) |
| |
| return detections |
| |
| |
| def draw_detections(image, detections): |
| """Draw boxes on image""" |
| for det in detections: |
| x1, y1, x2, y2 = det["bbox"] |
| cls_id = det["class_id"] |
| conf = det["confidence"] |
| |
| color = COLORS.get(cls_id, (0, 255, 0)) |
| cv2.rectangle(image, (x1, y1), (x2, y2), color, 2) |
| |
| label = f"{CLASS_NAMES[cls_id]}: {conf:.2f}" |
| cv2.putText(image, label, (x1, y1-5), cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2) |
| |
| return image |
| |
| |
| def main(): |
| parser = argparse.ArgumentParser() |
| parser.add_argument("--model", "-m", default="road_anomaly_detector.tflite") |
| parser.add_argument("--source", "-s", default="0", help="Camera ID or file path") |
| parser.add_argument("--conf", type=float, default=0.25) |
| args = parser.parse_args() |
| |
| detector = RoadAnomalyDetector(args.model, args.conf) |
| |
| # Open video source |
| source = int(args.source) if args.source.isdigit() else args.source |
| cap = cv2.VideoCapture(source) |
| |
| if not cap.isOpened(): |
| print(f"Error: Cannot open {args.source}") |
| return |
| |
| print("Starting detection... Press 'q' to quit") |
| |
| fps_times = [] |
| |
| while True: |
| ret, frame = cap.read() |
| if not ret: |
| break |
| |
| # Detect |
| start = time.time() |
| detections = detector.detect(frame) |
| fps_times.append(time.time() - start) |
| |
| # Calculate FPS |
| if len(fps_times) > 30: |
| fps_times = fps_times[-30:] |
| fps = 1.0 / np.mean(fps_times) |
| |
| # Draw |
| frame = draw_detections(frame, detections) |
| cv2.putText(frame, f"FPS: {fps:.1f}", (10, 30), |
| cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2) |
| cv2.putText(frame, f"Detections: {len(detections)}", (10, 60), |
| cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2) |
| |
| cv2.imshow("Road Anomaly Detection", frame) |
| if cv2.waitKey(1) & 0xFF == ord('q'): |
| break |
| |
| cap.release() |
| cv2.destroyAllWindows() |
| |
| |
| if __name__ == "__main__": |
| main() |
| ''' |
| |
| with open(output_dir / "detect.py", "w") as f: |
| f.write(rpi_inference_script) |
| print(f"✓ Created detect.py") |
| |
| |
| requirements = """# Raspberry Pi Requirements for Road Anomaly Detection |
| tflite-runtime>=2.14.0 |
| opencv-python>=4.8.0 |
| numpy>=1.24.0 |
| """ |
| |
| with open(output_dir / "requirements.txt", "w") as f: |
| f.write(requirements) |
| print(f"✓ Created requirements.txt") |
| |
| |
| readme = f"""# Road Anomaly Detection - Raspberry Pi Deployment |
| |
| ## Quick Start |
| |
| 1. Copy this folder to your Raspberry Pi |
| 2. Install dependencies: |
| ```bash |
| pip install -r requirements.txt |
| ``` |
| |
| 3. Run detection: |
| ```bash |
| # Webcam |
| python detect.py --model {model_name}.tflite --source 0 |
| |
| # Video file |
| python detect.py --model {model_name}.tflite --source video.mp4 |
| |
| # Image |
| python detect.py --model {model_name}.tflite --source image.jpg |
| ``` |
| |
| ## Expected Performance |
| - Raspberry Pi 4 (4GB): 10-12 FPS at 416x416 |
| - Raspberry Pi 5: 15-20 FPS |
| |
| ## Files |
| - `{model_name}.tflite` - TFLite model for inference |
| - `detect.py` - Detection script |
| - `requirements.txt` - Python dependencies |
| - `data.yaml` - Class names and dataset config |
| |
| ## Troubleshooting |
| |
| ### Camera not working |
| ```bash |
| # Check camera |
| vcgencmd get_camera |
| # Enable camera in raspi-config |
| sudo raspi-config |
| ``` |
| |
| ### Low FPS |
| - Reduce input resolution in detect.py |
| - Close other applications |
| - Use Raspberry Pi 5 for better performance |
| |
| ## Model Info |
| - Input size: 416x416 |
| - Classes: pothole, crack, bump, obstacle, road_damage |
| - Format: TensorFlow Lite (FP16) |
| |
| Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} |
| """ |
| |
| with open(output_dir / "README.md", "w") as f: |
| f.write(readme) |
| print(f"✓ Created README.md") |
| |
| |
| info = { |
| "model": model_name, |
| "format": "tflite", |
| "input_size": 416, |
| "classes": CLASS_NAMES if 'CLASS_NAMES' in dir() else ["pothole", "crack", "bump", "obstacle", "road_damage"], |
| "created": datetime.now().isoformat(), |
| "source_dir": str(model_dir), |
| } |
| |
| with open(output_dir / "model_info.json", "w") as f: |
| json.dump(info, f, indent=2) |
| print(f"✓ Created model_info.json") |
| |
| |
| total_size = sum(f.stat().st_size for f in output_dir.glob("*")) |
| size_mb = total_size / (1024 * 1024) |
| |
| print(f"\n{'=' * 60}") |
| print(f"✅ DEPLOYMENT PACKAGE READY!") |
| print(f"{'=' * 60}") |
| print(f"\nPackage: {output_dir}") |
| print(f"Size: {size_mb:.2f} MB") |
| print(f"\nContents:") |
| for f in sorted(output_dir.glob("*")): |
| fsize = f.stat().st_size / 1024 |
| print(f" • {f.name} ({fsize:.1f} KB)") |
| |
| print(f"\n📦 Transfer to Raspberry Pi:") |
| print(f" scp -r {output_dir} pi@raspberrypi.local:/home/pi/") |
| print(f"\n🚀 Run on Raspberry Pi:") |
| print(f" cd {output_dir.name}") |
| print(f" pip install -r requirements.txt") |
| print(f" python detect.py --source 0") |
| |
| return output_dir |
|
|
|
|
| if __name__ == "__main__": |
| import argparse |
| |
| parser = argparse.ArgumentParser(description="Create RPi deployment package") |
| parser.add_argument("--model-dir", "-d", help="Training run directory") |
| parser.add_argument("--output", "-o", help="Output directory") |
| parser.add_argument("--name", "-n", default="road_anomaly_detector", help="Model name") |
| |
| args = parser.parse_args() |
| |
| model_dir = Path(args.model_dir) if args.model_dir else None |
| output_dir = Path(args.output) if args.output else None |
| |
| create_deployment_package(model_dir, output_dir, args.name) |
|
|