#!/usr/bin/env python3 """ Full automated setup and demo test for MorphGuard without invoking separate shell scripts. Usage: python3 scripts/full_run.py [--base-url URL] [--verbose] """ import os import sys import subprocess import json import time import shutil import argparse import requests def run_cmd(cmd, verbose=False): if verbose: print(f"[CMD] {' '.join(cmd)}") result = subprocess.run(cmd, stdout=subprocess.PIPE if not verbose else None, stderr=subprocess.STDOUT, text=True) if result.returncode != 0: print(f"❌ Command failed (exit {result.returncode}): {' '.join(cmd)}") if not verbose and result.stdout: print(result.stdout) sys.exit(result.returncode) if not verbose and result.stdout: print(result.stdout) return result def install_requirements(verbose=False): run_cmd([sys.executable, '-m', 'pip', 'install', 'psycopg2-binary', 'opencv-python', 'dronekit', 'flask-socketio', 'open3d'], verbose) def check_server(session, base_url): try: r = session.get(base_url, timeout=5) if r.status_code != 200: print(f"❌ Server responded {r.status_code}, expected 200.") sys.exit(1) print(f"✔ Server is up at {base_url}") except Exception as e: print(f"❌ Error connecting to server at {base_url}: {e}") sys.exit(1) def login_admin(session, base_url): login_url = f"{base_url}/login" data = {'username': 'admin', 'password': 'morph@123'} r = session.post(login_url, data=data) if r.status_code != 200 or 'Invalid credentials' in r.text: print("❌ Admin login failed.") sys.exit(1) print("✔ Logged in as admin.") def write_db_config(): cfg = { 'DB_NAME': 'morphguard', 'DB_USER': 'morphguard', 'DB_PASS': 'morphguard', 'DB_HOST': '127.0.0.1', 'DB_PORT': 5432 } with open('db_config.json', 'w') as f: json.dump(cfg, f, indent=2) print("✔ Wrote db_config.json") def install_timescaledb(verbose=False): if not shutil.which('psql'): if shutil.which('apt-get'): run_cmd(['bash', 'scripts/install_timescaledb.sh'], verbose) else: print("⚠️ psql not found, skipping TimescaleDB setup.") else: print("✔ psql found, assuming TimescaleDB ready.") def api_call(session, base_url, method, endpoint, payload=None): url = f"{base_url}{endpoint}" if method.lower() == 'get': r = session.get(url) else: r = session.post(url, json=payload) try: data = r.json() except ValueError: data = r.text print(f"{endpoint} -> {data}") if r.status_code >= 400: print(f"❌ API error {r.status_code} at {endpoint}") sys.exit(1) return data def fetch_datasets(session, base_url): keys = { 'access_key': os.getenv('UNSPLASH_ACCESS_KEY', ''), 'secret_key': os.getenv('UNSPLASH_SECRET_KEY', ''), 'pexels_key': os.getenv('PEXELS_API_KEY', ''), 'pixabay_key': os.getenv('PIXABAY_API_KEY', '') } api_call(session, base_url, 'post', '/api/setup/keys', keys) api_call(session, base_url, 'post', '/api/setup/fetch_utkface', {'count': 1000, 'split': 'train'}) api_call(session, base_url, 'post', '/api/setup/fetch_lfw', {'count': 500, 'split': 'train'}) if keys['pexels_key']: api_call(session, base_url, 'post', '/api/setup/fetch_pexels', {'count': 500, 'split': 'train'}) if keys['pixabay_key']: api_call(session, base_url, 'post', '/api/setup/fetch_pixabay', {'count': 500, 'split': 'train'}) api_call(session, base_url, 'post', '/api/setup/fetch_real', {'count': 500, 'split': 'train'}) api_call(session, base_url, 'post', '/api/setup/generate_morphs', {'count': 4000, 'split': 'train'}) def train_detector(session, base_url): # Send empty JSON payload to initiate detector training api_call(session, base_url, 'post', '/api/train/detector', {}) print("✔ Training started.") def test_cameras(session, base_url): try: import cv2 except ImportError: print("⚠️ OpenCV not installed, skipping camera test.") return cams = [] for i in range(4): cap = cv2.VideoCapture(i) if cap.isOpened(): cams.append((i, cap)) else: cap.release() frames = [] for idx, cap in cams: ret, frame = cap.read() cap.release() if ret: _, buf = cv2.imencode('.jpg', frame) frames.append((idx, buf.tobytes())) if not frames: print("⚠️ No webcams detected for test.") return for idx, imgb in frames: files = {'image': (f'cam{idx}.jpg', imgb, 'image/jpeg')} r = session.post(f"{base_url}/api/detect", files=files) print(f"Camera {idx} detect: {r.json()}") idx, imgb = frames[0] files = {'image': (f'cam{idx}.jpg', imgb, 'image/jpeg')} r = session.post(f"{base_url}/api/demorph", files=files) print(f"Camera {idx} demorph: {r.json()}") def main(): parser = argparse.ArgumentParser(description="Full MorphGuard automator") parser.add_argument('--base-url', default='http://localhost:5000') parser.add_argument('--verbose', action='store_true') args = parser.parse_args() session = requests.Session() install_requirements(args.verbose) check_server(session, args.base_url) login_admin(session, args.base_url) write_db_config() # install_timescaledb(args.verbose) fetch_datasets(session, args.base_url) train_detector(session, args.base_url) time.sleep(2) test_cameras(session, args.base_url) print("✅ Full run complete.") if __name__ == '__main__': main()