#!/usr/bin/env python3 import sys import os import argparse import secrets import hashlib import psycopg2 import datetime # from tabulate import tabulate # Add project root to path sys.path.append(os.getcwd()) # DB Config (matching other scripts) DB_PARAMS = { "dbname": os.environ.get('MORPHGUARD_DB_NAME', 'morphguard'), "user": os.environ.get('MORPHGUARD_DB_USER', 'morphguard'), "password": os.environ.get('MORPHGUARD_DB_PASS', 'morphguard'), "host": os.environ.get('MORPHGUARD_DB_HOST', 'localhost'), "port": int(os.environ.get('MORPHGUARD_DB_PORT', 5432)) } def get_db_connection(): return psycopg2.connect(**DB_PARAMS) def hash_key(key): return hashlib.sha256(key.encode()).hexdigest() def create_key(name, permissions=None): if permissions is None: permissions = ['detect'] # Default permission raw_key = f"mg_{secrets.token_urlsafe(32)}" key_hash = hash_key(raw_key) try: conn = get_db_connection() cur = conn.cursor() cur.execute( """ INSERT INTO api_keys (key_hash, name, permissions) VALUES (%s, %s, %s::text[]) RETURNING id """, (key_hash, name, permissions) ) key_id = cur.fetchone()[0] conn.commit() cur.close() conn.close() print(f"\n✅ API Key Created Successfully!") print(f"Name: {name}") print(f"Key: {raw_key}") print(f"Permissions: {permissions}") print(f"\n⚠️ SAVE THIS KEY NOW. It cannot be retrieved later.\n") return raw_key except Exception as e: print(f"Error creating key: {e}") return None def list_keys(): try: conn = get_db_connection() cur = conn.cursor() cur.execute("SELECT id, name, created_at, is_active, permissions FROM api_keys ORDER BY created_at DESC") rows = cur.fetchall() headers = ["ID", "Name", "Created", "Active", "Permissions"] print(f"{'ID':<5} {'Name':<20} {'Created':<30} {'Active':<8} {'Permissions'}") print("-" * 80) for r in rows: print(f"{r[0]:<5} {r[1]:<20} {str(r[2]):<30} {r[3]:<8} {r[4]}") cur.close() conn.close() except Exception as e: print(f"Error listing keys: {e}") def revoke_key(key_id): try: conn = get_db_connection() cur = conn.cursor() cur.execute("UPDATE api_keys SET is_active = FALSE WHERE id = %s", (key_id,)) if cur.rowcount > 0: print(f"✅ Key ID {key_id} revoked.") else: print(f"❌ Key ID {key_id} not found.") conn.commit() cur.close() conn.close() except Exception as e: print(f"Error revoking key: {e}") def main(): parser = argparse.ArgumentParser(description="Manage MorphGuard API Keys") subparsers = parser.add_subparsers(dest='command', help='Command to run') # Create create_parser = subparsers.add_parser('create', help='Create a new API key') create_parser.add_argument('name', help='Name/Description of the key owner') create_parser.add_argument('--perms', nargs='+', default=['detect'], help='List of permissions (detect, train, admin)') # List subparsers.add_parser('list', help='List all API keys') # Revoke revoke_parser = subparsers.add_parser('revoke', help='Revoke an API key') revoke_parser.add_argument('id', type=int, help='ID of the key to revoke') args = parser.parse_args() if args.command == 'create': create_key(args.name, args.perms) elif args.command == 'list': list_keys() elif args.command == 'revoke': revoke_key(args.id) else: parser.print_help() if __name__ == '__main__': main()