Spaces:
Running
Running
| from functools import wraps | |
| from flask import request, jsonify, g, session | |
| import hashlib | |
| try: | |
| import psycopg2 | |
| HAS_POSTGRES = True | |
| except ImportError: | |
| psycopg2 = None | |
| HAS_POSTGRES = False | |
| import os | |
| IS_HF_SPACE = os.environ.get('HF_SPACE', '0') == '1' | |
| def get_db_connection(): | |
| if IS_HF_SPACE: | |
| raise RuntimeError("PostgreSQL database is disabled on HuggingFace Spaces.") | |
| if not HAS_POSTGRES: | |
| raise RuntimeError("PostgreSQL / psycopg2 is not installed or available.") | |
| # Helper to get DB connection (duplicated from telemetry for standalone usage) | |
| # In a real app config should be shared properly | |
| try: | |
| import config | |
| DB_PARAMS = { | |
| "dbname": getattr(config, 'DB_NAME', 'morphguard'), | |
| "user": getattr(config, 'DB_USER', 'morphguard'), | |
| "password": getattr(config, 'DB_PASSWORD', 'morphguard'), | |
| "host": getattr(config, 'DB_HOST', 'localhost'), | |
| "port": getattr(config, 'DB_PORT', 5432) | |
| } | |
| except (ImportError, AttributeError): | |
| DB_PARAMS = { | |
| "dbname": os.environ.get('MORPHGUARD_DB_NAME', 'morphguard'), | |
| "user": os.environ.get('MORPHGUARD_DB_USER', 'morphguard'), | |
| "password": os.environ.get('MORPHGUARD_DB_PASS', 'morphguard'), | |
| "host": os.environ.get('MORPHGUARD_DB_HOST', 'localhost'), | |
| "port": int(os.environ.get('MORPHGUARD_DB_PORT', 5432)) | |
| } | |
| return psycopg2.connect(**DB_PARAMS) | |
| def hash_key(key): | |
| return hashlib.sha256(key.encode()).hexdigest() | |
| def validate_api_key(key): | |
| """ | |
| Validate API key against database. | |
| Returns (is_valid, key_info_dict) | |
| """ | |
| if IS_HF_SPACE or not HAS_POSTGRES: | |
| return False, None | |
| hashed = hash_key(key) | |
| try: | |
| conn = get_db_connection() | |
| cur = conn.cursor() | |
| cur.execute( | |
| "SELECT id, name, permissions FROM api_keys WHERE key_hash = %s AND is_active = TRUE", | |
| (hashed,) | |
| ) | |
| row = cur.fetchone() | |
| cur.close() | |
| conn.close() | |
| if row: | |
| return True, {'id': row[0], 'name': row[1], 'permissions': row[2]} | |
| return False, None | |
| except Exception as e: | |
| import logging | |
| logging.getLogger("APIAuth").error(f"API Key validation error: {e}") | |
| return False, None | |
| def require_api_1(require_perm=None): | |
| """ | |
| Decorator to require API key or Session login. | |
| If session is active, proceeds. | |
| If not, checks X-API-Key header. | |
| """ | |
| def decorator(f): | |
| def decorated_function(*args, **kwargs): | |
| # 1. Check Session | |
| if session.get('logged_in'): | |
| # Admin/Mod check if perms needed (simplified) | |
| # In robust system, check specific user permissions | |
| return f(*args, **kwargs) | |
| # 2. Check API Key | |
| api_key = request.headers.get('X-API-Key') | |
| if api_key: | |
| is_valid, info = validate_api_key(api_key) | |
| if is_valid: | |
| # Check permissions if required | |
| if require_perm: | |
| perms = info.get('permissions', []) | |
| # if 'admin' in perms, allow all. validation logic here. | |
| if require_perm not in perms and 'admin' not in perms: | |
| return jsonify({'error': 'Insufficient API Permissions'}), 403 | |
| g.api_user = info | |
| return f(*args, **kwargs) | |
| return jsonify({'error': 'Unauthorized. Session or Valid API Key required.'}), 401 | |
| return decorated_function | |
| return decorator | |
| # Alias just primarily for simple usage | |
| require_api_key = require_api_1 | |