arm-model / model /package_for_rpi.py
pragadeeshv23's picture
Upload folder using huggingface_hub
5b86813 verified
#!/usr/bin/env python3
"""
Package trained model for Raspberry Pi deployment
Creates a ready-to-deploy package with all necessary files
"""
import os
import sys
import shutil
import json
from pathlib import Path
from datetime import datetime
def find_latest_training_run(project_dir: Path) -> Path:
"""Find the most recent training run directory"""
runs = list(project_dir.glob("train_*"))
if not runs:
return None
return max(runs, key=lambda p: p.stat().st_mtime)
def create_deployment_package(
model_dir: Path = None,
output_dir: Path = None,
model_name: str = "road_anomaly_detector"
):
"""Create deployment package for Raspberry Pi"""
print("=" * 60)
print("CREATING RASPBERRY PI DEPLOYMENT PACKAGE")
print("=" * 60)
# Find model directory
if model_dir is None:
project_dir = Path("road-anomaly-detection")
model_dir = find_latest_training_run(project_dir)
if model_dir is None:
print("Error: No training runs found in road-anomaly-detection/")
print("Run train_road_anomaly_model.py first.")
sys.exit(1)
print(f"\nUsing model from: {model_dir}")
# Create output directory
if output_dir is None:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
output_dir = Path(f"rpi_deployment_{timestamp}")
output_dir.mkdir(parents=True, exist_ok=True)
# Find TFLite model
tflite_files = list(model_dir.glob("**/*.tflite"))
if not tflite_files:
print("Error: No TFLite model found. Run export first.")
sys.exit(1)
tflite_model = tflite_files[0]
print(f"Found TFLite model: {tflite_model}")
# Copy model
model_dest = output_dir / f"{model_name}.tflite"
shutil.copy(tflite_model, model_dest)
print(f"✓ Copied model to: {model_dest}")
# Copy data.yaml if exists
data_yaml = Path("road-anomaly-detection") / "data.yaml"
if data_yaml.exists():
shutil.copy(data_yaml, output_dir / "data.yaml")
print(f"✓ Copied data.yaml")
# Create inference script for RPi
rpi_inference_script = '''#!/usr/bin/env python3
"""
Road Anomaly Detection - Raspberry Pi Inference
Optimized for Raspberry Pi 4 with TFLite
"""
import time
import argparse
from pathlib import Path
import cv2
import numpy as np
# Use tflite_runtime for lighter footprint on RPi
try:
import tflite_runtime.interpreter as tflite
except ImportError:
import tensorflow as tf
tflite = tf.lite
# Colors for each class
COLORS = {
0: (0, 0, 255), # Red - pothole
1: (0, 165, 255), # Orange - crack
2: (0, 255, 255), # Yellow - bump
3: (255, 0, 0), # Blue - obstacle
4: (255, 0, 255), # Magenta - road_damage
}
CLASS_NAMES = ["pothole", "crack", "bump", "obstacle", "road_damage"]
class RoadAnomalyDetector:
def __init__(self, model_path: str, conf_thresh: float = 0.25):
self.conf_thresh = conf_thresh
# Load TFLite model
self.interpreter = tflite.Interpreter(model_path=model_path)
self.interpreter.allocate_tensors()
# Get I/O details
self.input_details = self.interpreter.get_input_details()
self.output_details = self.interpreter.get_output_details()
# Input shape
self.input_shape = self.input_details[0]['shape']
self.input_height = self.input_shape[1]
self.input_width = self.input_shape[2]
print(f"Model loaded: {model_path}")
print(f"Input: {self.input_width}x{self.input_height}")
def preprocess(self, image):
"""Prepare image for inference"""
img = cv2.resize(image, (self.input_width, self.input_height))
img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
img = img.astype(np.float32) / 255.0
img = np.expand_dims(img, axis=0)
return img
def detect(self, image):
"""Run detection and return list of detections"""
orig_h, orig_w = image.shape[:2]
# Preprocess
input_data = self.preprocess(image)
# Inference
self.interpreter.set_tensor(self.input_details[0]['index'], input_data)
self.interpreter.invoke()
output = self.interpreter.get_tensor(self.output_details[0]['index'])
# Parse detections
detections = []
if output.shape[1] < output.shape[2]:
output = output.transpose(0, 2, 1)
output = output[0]
for det in output:
x_c, y_c, w, h = det[:4]
scores = det[4:]
cls_id = np.argmax(scores)
conf = scores[cls_id]
if conf < self.conf_thresh:
continue
x1 = int((x_c - w/2) * orig_w)
y1 = int((y_c - h/2) * orig_h)
x2 = int((x_c + w/2) * orig_w)
y2 = int((y_c + h/2) * orig_h)
detections.append({
"bbox": [x1, y1, x2, y2],
"class_id": int(cls_id),
"confidence": float(conf)
})
return detections
def draw_detections(image, detections):
"""Draw boxes on image"""
for det in detections:
x1, y1, x2, y2 = det["bbox"]
cls_id = det["class_id"]
conf = det["confidence"]
color = COLORS.get(cls_id, (0, 255, 0))
cv2.rectangle(image, (x1, y1), (x2, y2), color, 2)
label = f"{CLASS_NAMES[cls_id]}: {conf:.2f}"
cv2.putText(image, label, (x1, y1-5), cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2)
return image
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--model", "-m", default="road_anomaly_detector.tflite")
parser.add_argument("--source", "-s", default="0", help="Camera ID or file path")
parser.add_argument("--conf", type=float, default=0.25)
args = parser.parse_args()
detector = RoadAnomalyDetector(args.model, args.conf)
# Open video source
source = int(args.source) if args.source.isdigit() else args.source
cap = cv2.VideoCapture(source)
if not cap.isOpened():
print(f"Error: Cannot open {args.source}")
return
print("Starting detection... Press 'q' to quit")
fps_times = []
while True:
ret, frame = cap.read()
if not ret:
break
# Detect
start = time.time()
detections = detector.detect(frame)
fps_times.append(time.time() - start)
# Calculate FPS
if len(fps_times) > 30:
fps_times = fps_times[-30:]
fps = 1.0 / np.mean(fps_times)
# Draw
frame = draw_detections(frame, detections)
cv2.putText(frame, f"FPS: {fps:.1f}", (10, 30),
cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
cv2.putText(frame, f"Detections: {len(detections)}", (10, 60),
cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
cv2.imshow("Road Anomaly Detection", frame)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
cap.release()
cv2.destroyAllWindows()
if __name__ == "__main__":
main()
'''
with open(output_dir / "detect.py", "w") as f:
f.write(rpi_inference_script)
print(f"✓ Created detect.py")
# Create requirements.txt for RPi
requirements = """# Raspberry Pi Requirements for Road Anomaly Detection
tflite-runtime>=2.14.0
opencv-python>=4.8.0
numpy>=1.24.0
"""
with open(output_dir / "requirements.txt", "w") as f:
f.write(requirements)
print(f"✓ Created requirements.txt")
# Create README
readme = f"""# Road Anomaly Detection - Raspberry Pi Deployment
## Quick Start
1. Copy this folder to your Raspberry Pi
2. Install dependencies:
```bash
pip install -r requirements.txt
```
3. Run detection:
```bash
# Webcam
python detect.py --model {model_name}.tflite --source 0
# Video file
python detect.py --model {model_name}.tflite --source video.mp4
# Image
python detect.py --model {model_name}.tflite --source image.jpg
```
## Expected Performance
- Raspberry Pi 4 (4GB): 10-12 FPS at 416x416
- Raspberry Pi 5: 15-20 FPS
## Files
- `{model_name}.tflite` - TFLite model for inference
- `detect.py` - Detection script
- `requirements.txt` - Python dependencies
- `data.yaml` - Class names and dataset config
## Troubleshooting
### Camera not working
```bash
# Check camera
vcgencmd get_camera
# Enable camera in raspi-config
sudo raspi-config
```
### Low FPS
- Reduce input resolution in detect.py
- Close other applications
- Use Raspberry Pi 5 for better performance
## Model Info
- Input size: 416x416
- Classes: pothole, crack, bump, obstacle, road_damage
- Format: TensorFlow Lite (FP16)
Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
"""
with open(output_dir / "README.md", "w") as f:
f.write(readme)
print(f"✓ Created README.md")
# Create deployment info
info = {
"model": model_name,
"format": "tflite",
"input_size": 416,
"classes": CLASS_NAMES if 'CLASS_NAMES' in dir() else ["pothole", "crack", "bump", "obstacle", "road_damage"],
"created": datetime.now().isoformat(),
"source_dir": str(model_dir),
}
with open(output_dir / "model_info.json", "w") as f:
json.dump(info, f, indent=2)
print(f"✓ Created model_info.json")
# Calculate package size
total_size = sum(f.stat().st_size for f in output_dir.glob("*"))
size_mb = total_size / (1024 * 1024)
print(f"\n{'=' * 60}")
print(f"✅ DEPLOYMENT PACKAGE READY!")
print(f"{'=' * 60}")
print(f"\nPackage: {output_dir}")
print(f"Size: {size_mb:.2f} MB")
print(f"\nContents:")
for f in sorted(output_dir.glob("*")):
fsize = f.stat().st_size / 1024
print(f" • {f.name} ({fsize:.1f} KB)")
print(f"\n📦 Transfer to Raspberry Pi:")
print(f" scp -r {output_dir} pi@raspberrypi.local:/home/pi/")
print(f"\n🚀 Run on Raspberry Pi:")
print(f" cd {output_dir.name}")
print(f" pip install -r requirements.txt")
print(f" python detect.py --source 0")
return output_dir
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description="Create RPi deployment package")
parser.add_argument("--model-dir", "-d", help="Training run directory")
parser.add_argument("--output", "-o", help="Output directory")
parser.add_argument("--name", "-n", default="road_anomaly_detector", help="Model name")
args = parser.parse_args()
model_dir = Path(args.model_dir) if args.model_dir else None
output_dir = Path(args.output) if args.output else None
create_deployment_package(model_dir, output_dir, args.name)