Spaces:
Running
Running
| #!/usr/bin/env python3 | |
| """ | |
| MorphGuard Drone Simulation Test | |
| Simulates a companion computer sending face crops to the ground station. | |
| This script allows testing the ground station API WITHOUT the actual Pi hardware. | |
| It uses your computer's webcam to detect faces and sends them to MorphGuard. | |
| Usage: | |
| python test_drone_simulation.py [--server URL] [--camera N] [--interval SECONDS] | |
| Examples: | |
| # Use default settings (localhost, camera 0, 2 second interval) | |
| python test_drone_simulation.py | |
| # Connect to specific server | |
| python test_drone_simulation.py --server http://192.168.200.112:5000 | |
| # Use different camera | |
| python test_drone_simulation.py --camera 1 | |
| """ | |
| import os | |
| import sys | |
| import time | |
| import json | |
| import base64 | |
| import argparse | |
| import random | |
| from datetime import datetime | |
| from io import BytesIO | |
| import cv2 | |
| import numpy as np | |
| import requests | |
| # Simulated GPS coordinates (Austin, TX area) | |
| SIMULATED_GPS = { | |
| "lat": 30.2672 + random.uniform(-0.01, 0.01), | |
| "lon": -97.7431 + random.uniform(-0.01, 0.01), | |
| "alt": 100.0 + random.uniform(-10, 10) | |
| } | |
| class DroneSimulator: | |
| """Simulates a drone companion computer for testing.""" | |
| def __init__(self, server_url: str, camera_id: int = 0): | |
| self.server_url = server_url.rstrip('/') | |
| self.camera_id = camera_id | |
| self.cap = None | |
| self.face_cascade = None | |
| self.connected = False | |
| self.faces_sent = 0 | |
| self.session_start = None | |
| def initialize(self) -> bool: | |
| """Initialize camera and face detector.""" | |
| print(f"๐ Drone Simulator Starting...") | |
| print(f" Server: {self.server_url}") | |
| print(f" Camera: {self.camera_id}") | |
| print() | |
| # Initialize camera | |
| print("๐ท Initializing camera...") | |
| self.cap = cv2.VideoCapture(self.camera_id) | |
| if not self.cap.isOpened(): | |
| print("โ Failed to open camera!") | |
| return False | |
| # Set camera resolution | |
| self.cap.set(cv2.CAP_PROP_FRAME_WIDTH, 1280) | |
| self.cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 720) | |
| print(f" Resolution: {int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH))}x{int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT))}") | |
| # Initialize face detector | |
| print("๐ Loading face detector...") | |
| cascade_path = cv2.data.haarcascades + 'haarcascade_frontalface_default.xml' | |
| self.face_cascade = cv2.CascadeClassifier(cascade_path) | |
| if self.face_cascade.empty(): | |
| print("โ Failed to load face cascade!") | |
| return False | |
| print(" Using OpenCV Haar Cascade (simulating Hailo)") | |
| print() | |
| return True | |
| def connect(self) -> bool: | |
| """Register with ground station.""" | |
| print("๐ก Connecting to ground station...") | |
| try: | |
| response = requests.post( | |
| f"{self.server_url}/api/drone/connect", | |
| json={ | |
| "companion_ip": "SIMULATION", | |
| "hardware": "Simulated Pi5 + Hailo 26 TOPS", | |
| "model": "opencv_haar_cascade (simulation)" | |
| }, | |
| timeout=5 | |
| ) | |
| if response.status_code == 200: | |
| data = response.json() | |
| print(f" โ Connected: {data.get('message')}") | |
| self.connected = True | |
| self.session_start = datetime.now() | |
| return True | |
| else: | |
| print(f" โ Failed: HTTP {response.status_code}") | |
| return False | |
| except requests.exceptions.RequestException as e: | |
| print(f" โ Connection error: {e}") | |
| return False | |
| def disconnect(self): | |
| """Disconnect from ground station.""" | |
| if self.connected: | |
| try: | |
| requests.post(f"{self.server_url}/api/drone/disconnect", timeout=5) | |
| print("๐ Disconnected from ground station") | |
| except: | |
| pass | |
| if self.cap: | |
| self.cap.release() | |
| self.connected = False | |
| def detect_faces(self, frame: np.ndarray) -> list: | |
| """Detect faces in frame using OpenCV.""" | |
| gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) | |
| faces = self.face_cascade.detectMultiScale( | |
| gray, | |
| scaleFactor=1.1, | |
| minNeighbors=5, | |
| minSize=(60, 60) | |
| ) | |
| return list(faces) | |
| def crop_face(self, frame: np.ndarray, bbox: tuple, margin: float = 0.2) -> np.ndarray: | |
| """Crop face from frame with margin.""" | |
| x, y, w, h = bbox | |
| # Add margin | |
| margin_x = int(w * margin) | |
| margin_y = int(h * margin) | |
| x1 = max(0, x - margin_x) | |
| y1 = max(0, y - margin_y) | |
| x2 = min(frame.shape[1], x + w + margin_x) | |
| y2 = min(frame.shape[0], y + h + margin_y) | |
| return frame[y1:y2, x1:x2].copy() | |
| def encode_crop(self, crop: np.ndarray) -> str: | |
| """Encode face crop to base64 JPEG.""" | |
| # Resize if too large | |
| max_size = 256 | |
| h, w = crop.shape[:2] | |
| if max(h, w) > max_size: | |
| scale = max_size / max(h, w) | |
| crop = cv2.resize(crop, (int(w * scale), int(h * scale))) | |
| # Encode | |
| _, buffer = cv2.imencode('.jpg', crop, [cv2.IMWRITE_JPEG_QUALITY, 85]) | |
| return base64.b64encode(buffer).decode('utf-8') | |
| def send_face_crop(self, crop: np.ndarray, bbox: tuple, confidence: float) -> dict: | |
| """Send face crop to ground station for analysis.""" | |
| # Update simulated GPS (slight drift) | |
| gps = { | |
| "lat": SIMULATED_GPS["lat"] + random.uniform(-0.0001, 0.0001), | |
| "lon": SIMULATED_GPS["lon"] + random.uniform(-0.0001, 0.0001), | |
| "alt": SIMULATED_GPS["alt"] + random.uniform(-1, 1) | |
| } | |
| payload = { | |
| "crop_base64": self.encode_crop(crop), | |
| "confidence": confidence, | |
| "bbox": list(bbox), | |
| "gps_lat": gps["lat"], | |
| "gps_lon": gps["lon"], | |
| "gps_alt": gps["alt"], | |
| "heading": random.uniform(0, 360), | |
| "flight_mode": "SIMULATION" | |
| } | |
| try: | |
| response = requests.post( | |
| f"{self.server_url}/api/drone/analyze", | |
| json=payload, | |
| timeout=10 | |
| ) | |
| if response.status_code == 200: | |
| return response.json() | |
| else: | |
| return {"error": f"HTTP {response.status_code}"} | |
| except requests.exceptions.RequestException as e: | |
| return {"error": str(e)} | |
| def send_heartbeat(self, fps: float = 30.0): | |
| """Send heartbeat with telemetry.""" | |
| try: | |
| requests.post( | |
| f"{self.server_url}/api/drone/heartbeat", | |
| json={ | |
| "fps": fps, | |
| "gps": SIMULATED_GPS, | |
| "battery": random.randint(70, 100) | |
| }, | |
| timeout=2 | |
| ) | |
| except: | |
| pass | |
| def run(self, interval: float = 2.0, show_preview: bool = True): | |
| """ | |
| Main simulation loop. | |
| Args: | |
| interval: Seconds between sending face crops | |
| show_preview: Show camera preview window | |
| """ | |
| print() | |
| print("=" * 50) | |
| print("๐ DRONE SIMULATION RUNNING") | |
| print("=" * 50) | |
| print() | |
| print("Press 'Q' to quit (in preview window)") | |
| print("Press Ctrl+C to stop") | |
| print() | |
| last_send_time = 0 | |
| frame_count = 0 | |
| fps_start = time.time() | |
| current_fps = 0 | |
| try: | |
| while True: | |
| # Capture frame | |
| ret, frame = self.cap.read() | |
| if not ret: | |
| print("โ ๏ธ Failed to capture frame") | |
| time.sleep(0.1) | |
| continue | |
| # FPS calculation | |
| frame_count += 1 | |
| elapsed = time.time() - fps_start | |
| if elapsed >= 1.0: | |
| current_fps = frame_count / elapsed | |
| frame_count = 0 | |
| fps_start = time.time() | |
| self.send_heartbeat(current_fps) | |
| # Detect faces | |
| faces = self.detect_faces(frame) | |
| # Draw boxes on preview | |
| display_frame = frame.copy() | |
| for (x, y, w, h) in faces: | |
| cv2.rectangle(display_frame, (x, y), (x+w, y+h), (0, 255, 0), 2) | |
| # Add status overlay | |
| status_text = f"FPS: {current_fps:.1f} | Faces: {len(faces)} | Sent: {self.faces_sent}" | |
| cv2.putText(display_frame, status_text, (10, 30), | |
| cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2) | |
| gps_text = f"GPS: {SIMULATED_GPS['lat']:.4f}, {SIMULATED_GPS['lon']:.4f}" | |
| cv2.putText(display_frame, gps_text, (10, 60), | |
| cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 0), 2) | |
| # Show preview | |
| if show_preview: | |
| cv2.imshow("Drone Simulation - MorphGuard", display_frame) | |
| key = cv2.waitKey(1) & 0xFF | |
| if key == ord('q'): | |
| break | |
| # Send face crops at interval | |
| current_time = time.time() | |
| if faces is not None and len(faces) > 0 and (current_time - last_send_time) >= interval: | |
| # Send first detected face | |
| x, y, w, h = faces[0] | |
| crop = self.crop_face(frame, (x, y, w, h)) | |
| confidence = 0.9 + random.uniform(0, 0.09) # Simulated confidence | |
| print(f"\n๐ค Sending face crop to ground station...") | |
| print(f" Size: {crop.shape[1]}x{crop.shape[0]}") | |
| print(f" Confidence: {confidence:.2%}") | |
| result = self.send_face_crop(crop, (x, y, w, h), confidence) | |
| if "error" in result: | |
| print(f" โ Error: {result['error']}") | |
| else: | |
| is_morph = result.get("is_morph", False) | |
| morph_conf = result.get("confidence", 0) | |
| print(f" โ Result: {'MORPH DETECTED!' if is_morph else 'Genuine'}") | |
| print(f" ๐ Morph Confidence: {morph_conf:.2%}") | |
| self.faces_sent += 1 | |
| last_send_time = current_time | |
| time.sleep(0.01) # Small delay to prevent CPU overload | |
| except KeyboardInterrupt: | |
| print("\n\n๐ Simulation stopped by user") | |
| finally: | |
| cv2.destroyAllWindows() | |
| self.disconnect() | |
| # Print summary | |
| print() | |
| print("=" * 50) | |
| print("๐ SIMULATION SUMMARY") | |
| print("=" * 50) | |
| if self.session_start: | |
| duration = datetime.now() - self.session_start | |
| print(f" Duration: {duration}") | |
| print(f" Face crops sent: {self.faces_sent}") | |
| print() | |
| def main(): | |
| parser = argparse.ArgumentParser(description="Drone Simulation Test for MorphGuard") | |
| parser.add_argument("--server", "-s", default="http://192.168.200.112:5000", | |
| help="Ground station URL (default: http://192.168.200.112:5000)") | |
| parser.add_argument("--camera", "-c", type=int, default=0, | |
| help="Camera ID (default: 0)") | |
| parser.add_argument("--interval", "-i", type=float, default=2.0, | |
| help="Seconds between sending crops (default: 2.0)") | |
| parser.add_argument("--no-preview", action="store_true", | |
| help="Disable camera preview window") | |
| args = parser.parse_args() | |
| # Create simulator | |
| sim = DroneSimulator(args.server, args.camera) | |
| # Initialize | |
| if not sim.initialize(): | |
| print("\nโ Initialization failed!") | |
| sys.exit(1) | |
| # Connect | |
| if not sim.connect(): | |
| print("\nโ Connection failed!") | |
| sys.exit(1) | |
| # Run simulation | |
| sim.run(interval=args.interval, show_preview=not args.no_preview) | |
| if __name__ == "__main__": | |
| main() | |