#!/usr/bin/env python3 """ MorphGuard Drone Simulation Test Simulates a companion computer sending face crops to the ground station. This script allows testing the ground station API WITHOUT the actual Pi hardware. It uses your computer's webcam to detect faces and sends them to MorphGuard. Usage: python test_drone_simulation.py [--server URL] [--camera N] [--interval SECONDS] Examples: # Use default settings (localhost, camera 0, 2 second interval) python test_drone_simulation.py # Connect to specific server python test_drone_simulation.py --server http://192.168.200.112:5000 # Use different camera python test_drone_simulation.py --camera 1 """ import os import sys import time import json import base64 import argparse import random from datetime import datetime from io import BytesIO import cv2 import numpy as np import requests # Simulated GPS coordinates (Austin, TX area) SIMULATED_GPS = { "lat": 30.2672 + random.uniform(-0.01, 0.01), "lon": -97.7431 + random.uniform(-0.01, 0.01), "alt": 100.0 + random.uniform(-10, 10) } class DroneSimulator: """Simulates a drone companion computer for testing.""" def __init__(self, server_url: str, camera_id: int = 0): self.server_url = server_url.rstrip('/') self.camera_id = camera_id self.cap = None self.face_cascade = None self.connected = False self.faces_sent = 0 self.session_start = None def initialize(self) -> bool: """Initialize camera and face detector.""" print(f"🚁 Drone Simulator Starting...") print(f" Server: {self.server_url}") print(f" Camera: {self.camera_id}") print() # Initialize camera print("šŸ“· Initializing camera...") self.cap = cv2.VideoCapture(self.camera_id) if not self.cap.isOpened(): print("āŒ Failed to open camera!") return False # Set camera resolution self.cap.set(cv2.CAP_PROP_FRAME_WIDTH, 1280) self.cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 720) print(f" Resolution: {int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH))}x{int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT))}") # Initialize face detector print("šŸ” Loading face detector...") cascade_path = cv2.data.haarcascades + 'haarcascade_frontalface_default.xml' self.face_cascade = cv2.CascadeClassifier(cascade_path) if self.face_cascade.empty(): print("āŒ Failed to load face cascade!") return False print(" Using OpenCV Haar Cascade (simulating Hailo)") print() return True def connect(self) -> bool: """Register with ground station.""" print("šŸ“” Connecting to ground station...") try: response = requests.post( f"{self.server_url}/api/drone/connect", json={ "companion_ip": "SIMULATION", "hardware": "Simulated Pi5 + Hailo 26 TOPS", "model": "opencv_haar_cascade (simulation)" }, timeout=5 ) if response.status_code == 200: data = response.json() print(f" āœ… Connected: {data.get('message')}") self.connected = True self.session_start = datetime.now() return True else: print(f" āŒ Failed: HTTP {response.status_code}") return False except requests.exceptions.RequestException as e: print(f" āŒ Connection error: {e}") return False def disconnect(self): """Disconnect from ground station.""" if self.connected: try: requests.post(f"{self.server_url}/api/drone/disconnect", timeout=5) print("šŸ”Œ Disconnected from ground station") except: pass if self.cap: self.cap.release() self.connected = False def detect_faces(self, frame: np.ndarray) -> list: """Detect faces in frame using OpenCV.""" gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) faces = self.face_cascade.detectMultiScale( gray, scaleFactor=1.1, minNeighbors=5, minSize=(60, 60) ) return list(faces) def crop_face(self, frame: np.ndarray, bbox: tuple, margin: float = 0.2) -> np.ndarray: """Crop face from frame with margin.""" x, y, w, h = bbox # Add margin margin_x = int(w * margin) margin_y = int(h * margin) x1 = max(0, x - margin_x) y1 = max(0, y - margin_y) x2 = min(frame.shape[1], x + w + margin_x) y2 = min(frame.shape[0], y + h + margin_y) return frame[y1:y2, x1:x2].copy() def encode_crop(self, crop: np.ndarray) -> str: """Encode face crop to base64 JPEG.""" # Resize if too large max_size = 256 h, w = crop.shape[:2] if max(h, w) > max_size: scale = max_size / max(h, w) crop = cv2.resize(crop, (int(w * scale), int(h * scale))) # Encode _, buffer = cv2.imencode('.jpg', crop, [cv2.IMWRITE_JPEG_QUALITY, 85]) return base64.b64encode(buffer).decode('utf-8') def send_face_crop(self, crop: np.ndarray, bbox: tuple, confidence: float) -> dict: """Send face crop to ground station for analysis.""" # Update simulated GPS (slight drift) gps = { "lat": SIMULATED_GPS["lat"] + random.uniform(-0.0001, 0.0001), "lon": SIMULATED_GPS["lon"] + random.uniform(-0.0001, 0.0001), "alt": SIMULATED_GPS["alt"] + random.uniform(-1, 1) } payload = { "crop_base64": self.encode_crop(crop), "confidence": confidence, "bbox": list(bbox), "gps_lat": gps["lat"], "gps_lon": gps["lon"], "gps_alt": gps["alt"], "heading": random.uniform(0, 360), "flight_mode": "SIMULATION" } try: response = requests.post( f"{self.server_url}/api/drone/analyze", json=payload, timeout=10 ) if response.status_code == 200: return response.json() else: return {"error": f"HTTP {response.status_code}"} except requests.exceptions.RequestException as e: return {"error": str(e)} def send_heartbeat(self, fps: float = 30.0): """Send heartbeat with telemetry.""" try: requests.post( f"{self.server_url}/api/drone/heartbeat", json={ "fps": fps, "gps": SIMULATED_GPS, "battery": random.randint(70, 100) }, timeout=2 ) except: pass def run(self, interval: float = 2.0, show_preview: bool = True): """ Main simulation loop. Args: interval: Seconds between sending face crops show_preview: Show camera preview window """ print() print("=" * 50) print("🚁 DRONE SIMULATION RUNNING") print("=" * 50) print() print("Press 'Q' to quit (in preview window)") print("Press Ctrl+C to stop") print() last_send_time = 0 frame_count = 0 fps_start = time.time() current_fps = 0 try: while True: # Capture frame ret, frame = self.cap.read() if not ret: print("āš ļø Failed to capture frame") time.sleep(0.1) continue # FPS calculation frame_count += 1 elapsed = time.time() - fps_start if elapsed >= 1.0: current_fps = frame_count / elapsed frame_count = 0 fps_start = time.time() self.send_heartbeat(current_fps) # Detect faces faces = self.detect_faces(frame) # Draw boxes on preview display_frame = frame.copy() for (x, y, w, h) in faces: cv2.rectangle(display_frame, (x, y), (x+w, y+h), (0, 255, 0), 2) # Add status overlay status_text = f"FPS: {current_fps:.1f} | Faces: {len(faces)} | Sent: {self.faces_sent}" cv2.putText(display_frame, status_text, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2) gps_text = f"GPS: {SIMULATED_GPS['lat']:.4f}, {SIMULATED_GPS['lon']:.4f}" cv2.putText(display_frame, gps_text, (10, 60), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 0), 2) # Show preview if show_preview: cv2.imshow("Drone Simulation - MorphGuard", display_frame) key = cv2.waitKey(1) & 0xFF if key == ord('q'): break # Send face crops at interval current_time = time.time() if faces is not None and len(faces) > 0 and (current_time - last_send_time) >= interval: # Send first detected face x, y, w, h = faces[0] crop = self.crop_face(frame, (x, y, w, h)) confidence = 0.9 + random.uniform(0, 0.09) # Simulated confidence print(f"\nšŸ“¤ Sending face crop to ground station...") print(f" Size: {crop.shape[1]}x{crop.shape[0]}") print(f" Confidence: {confidence:.2%}") result = self.send_face_crop(crop, (x, y, w, h), confidence) if "error" in result: print(f" āŒ Error: {result['error']}") else: is_morph = result.get("is_morph", False) morph_conf = result.get("confidence", 0) print(f" āœ… Result: {'MORPH DETECTED!' if is_morph else 'Genuine'}") print(f" šŸ“Š Morph Confidence: {morph_conf:.2%}") self.faces_sent += 1 last_send_time = current_time time.sleep(0.01) # Small delay to prevent CPU overload except KeyboardInterrupt: print("\n\nšŸ›‘ Simulation stopped by user") finally: cv2.destroyAllWindows() self.disconnect() # Print summary print() print("=" * 50) print("šŸ“Š SIMULATION SUMMARY") print("=" * 50) if self.session_start: duration = datetime.now() - self.session_start print(f" Duration: {duration}") print(f" Face crops sent: {self.faces_sent}") print() def main(): parser = argparse.ArgumentParser(description="Drone Simulation Test for MorphGuard") parser.add_argument("--server", "-s", default="http://192.168.200.112:5000", help="Ground station URL (default: http://192.168.200.112:5000)") parser.add_argument("--camera", "-c", type=int, default=0, help="Camera ID (default: 0)") parser.add_argument("--interval", "-i", type=float, default=2.0, help="Seconds between sending crops (default: 2.0)") parser.add_argument("--no-preview", action="store_true", help="Disable camera preview window") args = parser.parse_args() # Create simulator sim = DroneSimulator(args.server, args.camera) # Initialize if not sim.initialize(): print("\nāŒ Initialization failed!") sys.exit(1) # Connect if not sim.connect(): print("\nāŒ Connection failed!") sys.exit(1) # Run simulation sim.run(interval=args.interval, show_preview=not args.no_preview) if __name__ == "__main__": main()