File size: 12,801 Bytes
2978bba
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
#!/usr/bin/env python3
"""
MorphGuard Drone Simulation Test
Simulates a companion computer sending face crops to the ground station.

This script allows testing the ground station API WITHOUT the actual Pi hardware.
It uses your computer's webcam to detect faces and sends them to MorphGuard.

Usage:
    python test_drone_simulation.py [--server URL] [--camera N] [--interval SECONDS]

Examples:
    # Use default settings (localhost, camera 0, 2 second interval)
    python test_drone_simulation.py
    
    # Connect to specific server
    python test_drone_simulation.py --server http://192.168.200.112:5000
    
    # Use different camera
    python test_drone_simulation.py --camera 1
"""

import os
import sys
import time
import json
import base64
import argparse
import random
from datetime import datetime
from io import BytesIO

import cv2
import numpy as np
import requests

# Simulated GPS coordinates (Austin, TX area)
SIMULATED_GPS = {
    "lat": 30.2672 + random.uniform(-0.01, 0.01),
    "lon": -97.7431 + random.uniform(-0.01, 0.01),
    "alt": 100.0 + random.uniform(-10, 10)
}


class DroneSimulator:
    """Simulates a drone companion computer for testing."""
    
    def __init__(self, server_url: str, camera_id: int = 0):
        self.server_url = server_url.rstrip('/')
        self.camera_id = camera_id
        self.cap = None
        self.face_cascade = None
        self.connected = False
        self.faces_sent = 0
        self.session_start = None
        
    def initialize(self) -> bool:
        """Initialize camera and face detector."""
        print(f"🚁 Drone Simulator Starting...")
        print(f"   Server: {self.server_url}")
        print(f"   Camera: {self.camera_id}")
        print()
        
        # Initialize camera
        print("πŸ“· Initializing camera...")
        self.cap = cv2.VideoCapture(self.camera_id)
        if not self.cap.isOpened():
            print("❌ Failed to open camera!")
            return False
        
        # Set camera resolution
        self.cap.set(cv2.CAP_PROP_FRAME_WIDTH, 1280)
        self.cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 720)
        print(f"   Resolution: {int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH))}x{int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT))}")
        
        # Initialize face detector
        print("πŸ” Loading face detector...")
        cascade_path = cv2.data.haarcascades + 'haarcascade_frontalface_default.xml'
        self.face_cascade = cv2.CascadeClassifier(cascade_path)
        if self.face_cascade.empty():
            print("❌ Failed to load face cascade!")
            return False
        print("   Using OpenCV Haar Cascade (simulating Hailo)")
        
        print()
        return True
    
    def connect(self) -> bool:
        """Register with ground station."""
        print("πŸ“‘ Connecting to ground station...")
        
        try:
            response = requests.post(
                f"{self.server_url}/api/drone/connect",
                json={
                    "companion_ip": "SIMULATION",
                    "hardware": "Simulated Pi5 + Hailo 26 TOPS",
                    "model": "opencv_haar_cascade (simulation)"
                },
                timeout=5
            )
            
            if response.status_code == 200:
                data = response.json()
                print(f"   βœ… Connected: {data.get('message')}")
                self.connected = True
                self.session_start = datetime.now()
                return True
            else:
                print(f"   ❌ Failed: HTTP {response.status_code}")
                return False
                
        except requests.exceptions.RequestException as e:
            print(f"   ❌ Connection error: {e}")
            return False
    
    def disconnect(self):
        """Disconnect from ground station."""
        if self.connected:
            try:
                requests.post(f"{self.server_url}/api/drone/disconnect", timeout=5)
                print("πŸ”Œ Disconnected from ground station")
            except:
                pass
        
        if self.cap:
            self.cap.release()
        
        self.connected = False
    
    def detect_faces(self, frame: np.ndarray) -> list:
        """Detect faces in frame using OpenCV."""
        gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
        faces = self.face_cascade.detectMultiScale(
            gray, 
            scaleFactor=1.1, 
            minNeighbors=5,
            minSize=(60, 60)
        )
        return list(faces)
    
    def crop_face(self, frame: np.ndarray, bbox: tuple, margin: float = 0.2) -> np.ndarray:
        """Crop face from frame with margin."""
        x, y, w, h = bbox
        
        # Add margin
        margin_x = int(w * margin)
        margin_y = int(h * margin)
        
        x1 = max(0, x - margin_x)
        y1 = max(0, y - margin_y)
        x2 = min(frame.shape[1], x + w + margin_x)
        y2 = min(frame.shape[0], y + h + margin_y)
        
        return frame[y1:y2, x1:x2].copy()
    
    def encode_crop(self, crop: np.ndarray) -> str:
        """Encode face crop to base64 JPEG."""
        # Resize if too large
        max_size = 256
        h, w = crop.shape[:2]
        if max(h, w) > max_size:
            scale = max_size / max(h, w)
            crop = cv2.resize(crop, (int(w * scale), int(h * scale)))
        
        # Encode
        _, buffer = cv2.imencode('.jpg', crop, [cv2.IMWRITE_JPEG_QUALITY, 85])
        return base64.b64encode(buffer).decode('utf-8')
    
    def send_face_crop(self, crop: np.ndarray, bbox: tuple, confidence: float) -> dict:
        """Send face crop to ground station for analysis."""
        # Update simulated GPS (slight drift)
        gps = {
            "lat": SIMULATED_GPS["lat"] + random.uniform(-0.0001, 0.0001),
            "lon": SIMULATED_GPS["lon"] + random.uniform(-0.0001, 0.0001),
            "alt": SIMULATED_GPS["alt"] + random.uniform(-1, 1)
        }
        
        payload = {
            "crop_base64": self.encode_crop(crop),
            "confidence": confidence,
            "bbox": list(bbox),
            "gps_lat": gps["lat"],
            "gps_lon": gps["lon"],
            "gps_alt": gps["alt"],
            "heading": random.uniform(0, 360),
            "flight_mode": "SIMULATION"
        }
        
        try:
            response = requests.post(
                f"{self.server_url}/api/drone/analyze",
                json=payload,
                timeout=10
            )
            
            if response.status_code == 200:
                return response.json()
            else:
                return {"error": f"HTTP {response.status_code}"}
                
        except requests.exceptions.RequestException as e:
            return {"error": str(e)}
    
    def send_heartbeat(self, fps: float = 30.0):
        """Send heartbeat with telemetry."""
        try:
            requests.post(
                f"{self.server_url}/api/drone/heartbeat",
                json={
                    "fps": fps,
                    "gps": SIMULATED_GPS,
                    "battery": random.randint(70, 100)
                },
                timeout=2
            )
        except:
            pass
    
    def run(self, interval: float = 2.0, show_preview: bool = True):
        """
        Main simulation loop.
        
        Args:
            interval: Seconds between sending face crops
            show_preview: Show camera preview window
        """
        print()
        print("=" * 50)
        print("🚁 DRONE SIMULATION RUNNING")
        print("=" * 50)
        print()
        print("Press 'Q' to quit (in preview window)")
        print("Press Ctrl+C to stop")
        print()
        
        last_send_time = 0
        frame_count = 0
        fps_start = time.time()
        current_fps = 0
        
        try:
            while True:
                # Capture frame
                ret, frame = self.cap.read()
                if not ret:
                    print("⚠️  Failed to capture frame")
                    time.sleep(0.1)
                    continue
                
                # FPS calculation
                frame_count += 1
                elapsed = time.time() - fps_start
                if elapsed >= 1.0:
                    current_fps = frame_count / elapsed
                    frame_count = 0
                    fps_start = time.time()
                    self.send_heartbeat(current_fps)
                
                # Detect faces
                faces = self.detect_faces(frame)
                
                # Draw boxes on preview
                display_frame = frame.copy()
                for (x, y, w, h) in faces:
                    cv2.rectangle(display_frame, (x, y), (x+w, y+h), (0, 255, 0), 2)
                
                # Add status overlay
                status_text = f"FPS: {current_fps:.1f} | Faces: {len(faces)} | Sent: {self.faces_sent}"
                cv2.putText(display_frame, status_text, (10, 30), 
                           cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2)
                
                gps_text = f"GPS: {SIMULATED_GPS['lat']:.4f}, {SIMULATED_GPS['lon']:.4f}"
                cv2.putText(display_frame, gps_text, (10, 60),
                           cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 0), 2)
                
                # Show preview
                if show_preview:
                    cv2.imshow("Drone Simulation - MorphGuard", display_frame)
                    key = cv2.waitKey(1) & 0xFF
                    if key == ord('q'):
                        break
                
                # Send face crops at interval
                current_time = time.time()
                if faces is not None and len(faces) > 0 and (current_time - last_send_time) >= interval:
                    # Send first detected face
                    x, y, w, h = faces[0]
                    crop = self.crop_face(frame, (x, y, w, h))
                    confidence = 0.9 + random.uniform(0, 0.09)  # Simulated confidence
                    
                    print(f"\nπŸ“€ Sending face crop to ground station...")
                    print(f"   Size: {crop.shape[1]}x{crop.shape[0]}")
                    print(f"   Confidence: {confidence:.2%}")
                    
                    result = self.send_face_crop(crop, (x, y, w, h), confidence)
                    
                    if "error" in result:
                        print(f"   ❌ Error: {result['error']}")
                    else:
                        is_morph = result.get("is_morph", False)
                        morph_conf = result.get("confidence", 0)
                        print(f"   βœ… Result: {'MORPH DETECTED!' if is_morph else 'Genuine'}")
                        print(f"   πŸ“Š Morph Confidence: {morph_conf:.2%}")
                    
                    self.faces_sent += 1
                    last_send_time = current_time
                
                time.sleep(0.01)  # Small delay to prevent CPU overload
                
        except KeyboardInterrupt:
            print("\n\nπŸ›‘ Simulation stopped by user")
        finally:
            cv2.destroyAllWindows()
            self.disconnect()
            
            # Print summary
            print()
            print("=" * 50)
            print("πŸ“Š SIMULATION SUMMARY")
            print("=" * 50)
            if self.session_start:
                duration = datetime.now() - self.session_start
                print(f"   Duration: {duration}")
            print(f"   Face crops sent: {self.faces_sent}")
            print()


def main():
    parser = argparse.ArgumentParser(description="Drone Simulation Test for MorphGuard")
    parser.add_argument("--server", "-s", default="http://192.168.200.112:5000",
                       help="Ground station URL (default: http://192.168.200.112:5000)")
    parser.add_argument("--camera", "-c", type=int, default=0,
                       help="Camera ID (default: 0)")
    parser.add_argument("--interval", "-i", type=float, default=2.0,
                       help="Seconds between sending crops (default: 2.0)")
    parser.add_argument("--no-preview", action="store_true",
                       help="Disable camera preview window")
    
    args = parser.parse_args()
    
    # Create simulator
    sim = DroneSimulator(args.server, args.camera)
    
    # Initialize
    if not sim.initialize():
        print("\n❌ Initialization failed!")
        sys.exit(1)
    
    # Connect
    if not sim.connect():
        print("\n❌ Connection failed!")
        sys.exit(1)
    
    # Run simulation
    sim.run(interval=args.interval, show_preview=not args.no_preview)


if __name__ == "__main__":
    main()