MorphGuard / scripts /manage_api_keys.py
juanquy's picture
Initial clean commit of modular MorphGuard
2978bba
Raw
History Blame Contribute Delete
3.85 kB
#!/usr/bin/env python3
import sys
import os
import argparse
import secrets
import hashlib
import psycopg2
import datetime
# from tabulate import tabulate
# Add project root to path
sys.path.append(os.getcwd())
# DB Config (matching other scripts)
DB_PARAMS = {
"dbname": os.environ.get('MORPHGUARD_DB_NAME', 'morphguard'),
"user": os.environ.get('MORPHGUARD_DB_USER', 'morphguard'),
"password": os.environ.get('MORPHGUARD_DB_PASS', 'morphguard'),
"host": os.environ.get('MORPHGUARD_DB_HOST', 'localhost'),
"port": int(os.environ.get('MORPHGUARD_DB_PORT', 5432))
}
def get_db_connection():
return psycopg2.connect(**DB_PARAMS)
def hash_key(key):
return hashlib.sha256(key.encode()).hexdigest()
def create_key(name, permissions=None):
if permissions is None:
permissions = ['detect'] # Default permission
raw_key = f"mg_{secrets.token_urlsafe(32)}"
key_hash = hash_key(raw_key)
try:
conn = get_db_connection()
cur = conn.cursor()
cur.execute(
"""
INSERT INTO api_keys (key_hash, name, permissions)
VALUES (%s, %s, %s::text[])
RETURNING id
""",
(key_hash, name, permissions)
)
key_id = cur.fetchone()[0]
conn.commit()
cur.close()
conn.close()
print(f"\n✅ API Key Created Successfully!")
print(f"Name: {name}")
print(f"Key: {raw_key}")
print(f"Permissions: {permissions}")
print(f"\n⚠️ SAVE THIS KEY NOW. It cannot be retrieved later.\n")
return raw_key
except Exception as e:
print(f"Error creating key: {e}")
return None
def list_keys():
try:
conn = get_db_connection()
cur = conn.cursor()
cur.execute("SELECT id, name, created_at, is_active, permissions FROM api_keys ORDER BY created_at DESC")
rows = cur.fetchall()
headers = ["ID", "Name", "Created", "Active", "Permissions"]
print(f"{'ID':<5} {'Name':<20} {'Created':<30} {'Active':<8} {'Permissions'}")
print("-" * 80)
for r in rows:
print(f"{r[0]:<5} {r[1]:<20} {str(r[2]):<30} {r[3]:<8} {r[4]}")
cur.close()
conn.close()
except Exception as e:
print(f"Error listing keys: {e}")
def revoke_key(key_id):
try:
conn = get_db_connection()
cur = conn.cursor()
cur.execute("UPDATE api_keys SET is_active = FALSE WHERE id = %s", (key_id,))
if cur.rowcount > 0:
print(f"✅ Key ID {key_id} revoked.")
else:
print(f"❌ Key ID {key_id} not found.")
conn.commit()
cur.close()
conn.close()
except Exception as e:
print(f"Error revoking key: {e}")
def main():
parser = argparse.ArgumentParser(description="Manage MorphGuard API Keys")
subparsers = parser.add_subparsers(dest='command', help='Command to run')
# Create
create_parser = subparsers.add_parser('create', help='Create a new API key')
create_parser.add_argument('name', help='Name/Description of the key owner')
create_parser.add_argument('--perms', nargs='+', default=['detect'], help='List of permissions (detect, train, admin)')
# List
subparsers.add_parser('list', help='List all API keys')
# Revoke
revoke_parser = subparsers.add_parser('revoke', help='Revoke an API key')
revoke_parser.add_argument('id', type=int, help='ID of the key to revoke')
args = parser.parse_args()
if args.command == 'create':
create_key(args.name, args.perms)
elif args.command == 'list':
list_keys()
elif args.command == 'revoke':
revoke_key(args.id)
else:
parser.print_help()
if __name__ == '__main__':
main()