from functools import wraps from flask import request, jsonify, g, session import hashlib try: import psycopg2 HAS_POSTGRES = True except ImportError: psycopg2 = None HAS_POSTGRES = False import os IS_HF_SPACE = os.environ.get('HF_SPACE', '0') == '1' def get_db_connection(): if IS_HF_SPACE: raise RuntimeError("PostgreSQL database is disabled on HuggingFace Spaces.") if not HAS_POSTGRES: raise RuntimeError("PostgreSQL / psycopg2 is not installed or available.") # Helper to get DB connection (duplicated from telemetry for standalone usage) # In a real app config should be shared properly try: import config DB_PARAMS = { "dbname": getattr(config, 'DB_NAME', 'morphguard'), "user": getattr(config, 'DB_USER', 'morphguard'), "password": getattr(config, 'DB_PASSWORD', 'morphguard'), "host": getattr(config, 'DB_HOST', 'localhost'), "port": getattr(config, 'DB_PORT', 5432) } except (ImportError, AttributeError): DB_PARAMS = { "dbname": os.environ.get('MORPHGUARD_DB_NAME', 'morphguard'), "user": os.environ.get('MORPHGUARD_DB_USER', 'morphguard'), "password": os.environ.get('MORPHGUARD_DB_PASS', 'morphguard'), "host": os.environ.get('MORPHGUARD_DB_HOST', 'localhost'), "port": int(os.environ.get('MORPHGUARD_DB_PORT', 5432)) } return psycopg2.connect(**DB_PARAMS) def hash_key(key): return hashlib.sha256(key.encode()).hexdigest() def validate_api_key(key): """ Validate API key against database. Returns (is_valid, key_info_dict) """ if IS_HF_SPACE or not HAS_POSTGRES: return False, None hashed = hash_key(key) try: conn = get_db_connection() cur = conn.cursor() cur.execute( "SELECT id, name, permissions FROM api_keys WHERE key_hash = %s AND is_active = TRUE", (hashed,) ) row = cur.fetchone() cur.close() conn.close() if row: return True, {'id': row[0], 'name': row[1], 'permissions': row[2]} return False, None except Exception as e: import logging logging.getLogger("APIAuth").error(f"API Key validation error: {e}") return False, None def require_api_1(require_perm=None): """ Decorator to require API key or Session login. If session is active, proceeds. If not, checks X-API-Key header. """ def decorator(f): @wraps(f) def decorated_function(*args, **kwargs): # 1. Check Session if session.get('logged_in'): # Admin/Mod check if perms needed (simplified) # In robust system, check specific user permissions return f(*args, **kwargs) # 2. Check API Key api_key = request.headers.get('X-API-Key') if api_key: is_valid, info = validate_api_key(api_key) if is_valid: # Check permissions if required if require_perm: perms = info.get('permissions', []) # if 'admin' in perms, allow all. validation logic here. if require_perm not in perms and 'admin' not in perms: return jsonify({'error': 'Insufficient API Permissions'}), 403 g.api_user = info return f(*args, **kwargs) return jsonify({'error': 'Unauthorized. Session or Valid API Key required.'}), 401 return decorated_function return decorator # Alias just primarily for simple usage require_api_key = require_api_1