Spaces:
Running
Running
| #!/usr/bin/env python3 | |
| import sys | |
| import os | |
| import argparse | |
| import secrets | |
| import hashlib | |
| import psycopg2 | |
| import datetime | |
| # from tabulate import tabulate | |
| # Add project root to path | |
| sys.path.append(os.getcwd()) | |
| # DB Config (matching other scripts) | |
| DB_PARAMS = { | |
| "dbname": os.environ.get('MORPHGUARD_DB_NAME', 'morphguard'), | |
| "user": os.environ.get('MORPHGUARD_DB_USER', 'morphguard'), | |
| "password": os.environ.get('MORPHGUARD_DB_PASS', 'morphguard'), | |
| "host": os.environ.get('MORPHGUARD_DB_HOST', 'localhost'), | |
| "port": int(os.environ.get('MORPHGUARD_DB_PORT', 5432)) | |
| } | |
| def get_db_connection(): | |
| return psycopg2.connect(**DB_PARAMS) | |
| def hash_key(key): | |
| return hashlib.sha256(key.encode()).hexdigest() | |
| def create_key(name, permissions=None): | |
| if permissions is None: | |
| permissions = ['detect'] # Default permission | |
| raw_key = f"mg_{secrets.token_urlsafe(32)}" | |
| key_hash = hash_key(raw_key) | |
| try: | |
| conn = get_db_connection() | |
| cur = conn.cursor() | |
| cur.execute( | |
| """ | |
| INSERT INTO api_keys (key_hash, name, permissions) | |
| VALUES (%s, %s, %s::text[]) | |
| RETURNING id | |
| """, | |
| (key_hash, name, permissions) | |
| ) | |
| key_id = cur.fetchone()[0] | |
| conn.commit() | |
| cur.close() | |
| conn.close() | |
| print(f"\n✅ API Key Created Successfully!") | |
| print(f"Name: {name}") | |
| print(f"Key: {raw_key}") | |
| print(f"Permissions: {permissions}") | |
| print(f"\n⚠️ SAVE THIS KEY NOW. It cannot be retrieved later.\n") | |
| return raw_key | |
| except Exception as e: | |
| print(f"Error creating key: {e}") | |
| return None | |
| def list_keys(): | |
| try: | |
| conn = get_db_connection() | |
| cur = conn.cursor() | |
| cur.execute("SELECT id, name, created_at, is_active, permissions FROM api_keys ORDER BY created_at DESC") | |
| rows = cur.fetchall() | |
| headers = ["ID", "Name", "Created", "Active", "Permissions"] | |
| print(f"{'ID':<5} {'Name':<20} {'Created':<30} {'Active':<8} {'Permissions'}") | |
| print("-" * 80) | |
| for r in rows: | |
| print(f"{r[0]:<5} {r[1]:<20} {str(r[2]):<30} {r[3]:<8} {r[4]}") | |
| cur.close() | |
| conn.close() | |
| except Exception as e: | |
| print(f"Error listing keys: {e}") | |
| def revoke_key(key_id): | |
| try: | |
| conn = get_db_connection() | |
| cur = conn.cursor() | |
| cur.execute("UPDATE api_keys SET is_active = FALSE WHERE id = %s", (key_id,)) | |
| if cur.rowcount > 0: | |
| print(f"✅ Key ID {key_id} revoked.") | |
| else: | |
| print(f"❌ Key ID {key_id} not found.") | |
| conn.commit() | |
| cur.close() | |
| conn.close() | |
| except Exception as e: | |
| print(f"Error revoking key: {e}") | |
| def main(): | |
| parser = argparse.ArgumentParser(description="Manage MorphGuard API Keys") | |
| subparsers = parser.add_subparsers(dest='command', help='Command to run') | |
| # Create | |
| create_parser = subparsers.add_parser('create', help='Create a new API key') | |
| create_parser.add_argument('name', help='Name/Description of the key owner') | |
| create_parser.add_argument('--perms', nargs='+', default=['detect'], help='List of permissions (detect, train, admin)') | |
| # List | |
| subparsers.add_parser('list', help='List all API keys') | |
| # Revoke | |
| revoke_parser = subparsers.add_parser('revoke', help='Revoke an API key') | |
| revoke_parser.add_argument('id', type=int, help='ID of the key to revoke') | |
| args = parser.parse_args() | |
| if args.command == 'create': | |
| create_key(args.name, args.perms) | |
| elif args.command == 'list': | |
| list_keys() | |
| elif args.command == 'revoke': | |
| revoke_key(args.id) | |
| else: | |
| parser.print_help() | |
| if __name__ == '__main__': | |
| main() | |