| """ |
| Authentication and Authorization System |
| Implements JWT-based authentication for production deployments |
| """ |
|
|
| import os |
| import secrets |
| from datetime import datetime, timedelta |
| from typing import Optional, Dict, Any |
| import hashlib |
| import logging |
| from functools import wraps |
|
|
| try: |
| import jwt |
| JWT_AVAILABLE = True |
| except ImportError: |
| JWT_AVAILABLE = False |
| logging.warning("PyJWT not installed. Authentication disabled. Install with: pip install PyJWT") |
|
|
| logger = logging.getLogger(__name__) |
|
|
| |
| SECRET_KEY = os.getenv('SECRET_KEY', secrets.token_urlsafe(32)) |
| ALGORITHM = "HS256" |
| ACCESS_TOKEN_EXPIRE_MINUTES = int(os.getenv('ACCESS_TOKEN_EXPIRE_MINUTES', '60')) |
| ENABLE_AUTH = os.getenv('ENABLE_AUTH', 'false').lower() == 'true' |
|
|
|
|
| class AuthManager: |
| """ |
| Authentication manager for API endpoints and dashboard access |
| Supports JWT tokens and basic API key authentication |
| """ |
|
|
| def __init__(self): |
| self.users_db: Dict[str, str] = {} |
| self.api_keys_db: Dict[str, Dict[str, Any]] = {} |
| self._load_credentials() |
|
|
| def _load_credentials(self): |
| """Load credentials from environment variables""" |
| |
| admin_user = os.getenv('ADMIN_USERNAME', 'admin') |
| admin_pass = os.getenv('ADMIN_PASSWORD') |
|
|
| if admin_pass: |
| self.users_db[admin_user] = self._hash_password(admin_pass) |
| logger.info(f"Loaded admin user: {admin_user}") |
|
|
| |
| api_keys_str = os.getenv('API_KEYS', '') |
| if api_keys_str: |
| for key in api_keys_str.split(','): |
| key = key.strip() |
| if key: |
| self.api_keys_db[key] = { |
| 'created_at': datetime.utcnow(), |
| 'name': 'env_key', |
| 'active': True |
| } |
| logger.info(f"Loaded {len(self.api_keys_db)} API keys") |
|
|
| @staticmethod |
| def _hash_password(password: str) -> str: |
| """Hash password using SHA-256""" |
| return hashlib.sha256(password.encode()).hexdigest() |
|
|
| def verify_password(self, username: str, password: str) -> bool: |
| """ |
| Verify username and password |
| |
| Args: |
| username: Username |
| password: Plain text password |
| |
| Returns: |
| True if valid, False otherwise |
| """ |
| if username not in self.users_db: |
| return False |
|
|
| hashed = self._hash_password(password) |
| return secrets.compare_digest(self.users_db[username], hashed) |
|
|
| def create_access_token( |
| self, |
| username: str, |
| expires_delta: Optional[timedelta] = None |
| ) -> str: |
| """ |
| Create JWT access token |
| |
| Args: |
| username: Username |
| expires_delta: Token expiration time |
| |
| Returns: |
| JWT token string |
| """ |
| if not JWT_AVAILABLE: |
| raise RuntimeError("PyJWT not installed") |
|
|
| if expires_delta is None: |
| expires_delta = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) |
|
|
| expire = datetime.utcnow() + expires_delta |
| payload = { |
| 'sub': username, |
| 'exp': expire, |
| 'iat': datetime.utcnow() |
| } |
|
|
| token = jwt.encode(payload, SECRET_KEY, algorithm=ALGORITHM) |
| return token |
|
|
| def verify_token(self, token: str) -> Optional[str]: |
| """ |
| Verify JWT token and extract username |
| |
| Args: |
| token: JWT token string |
| |
| Returns: |
| Username if valid, None otherwise |
| """ |
| if not JWT_AVAILABLE: |
| return None |
|
|
| try: |
| payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) |
| username: str = payload.get('sub') |
| return username |
| except jwt.ExpiredSignatureError: |
| logger.warning("Token expired") |
| return None |
| except jwt.JWTError as e: |
| logger.warning(f"Invalid token: {e}") |
| return None |
|
|
| def verify_api_key(self, api_key: str) -> bool: |
| """ |
| Verify API key |
| |
| Args: |
| api_key: API key string |
| |
| Returns: |
| True if valid and active, False otherwise |
| """ |
| if api_key not in self.api_keys_db: |
| return False |
|
|
| key_data = self.api_keys_db[api_key] |
| return key_data.get('active', False) |
|
|
| def create_api_key(self, name: str) -> str: |
| """ |
| Create new API key |
| |
| Args: |
| name: Descriptive name for the key |
| |
| Returns: |
| Generated API key |
| """ |
| api_key = secrets.token_urlsafe(32) |
| self.api_keys_db[api_key] = { |
| 'created_at': datetime.utcnow(), |
| 'name': name, |
| 'active': True, |
| 'usage_count': 0 |
| } |
| logger.info(f"Created API key: {name}") |
| return api_key |
|
|
| def revoke_api_key(self, api_key: str) -> bool: |
| """ |
| Revoke API key |
| |
| Args: |
| api_key: API key to revoke |
| |
| Returns: |
| True if revoked, False if not found |
| """ |
| if api_key in self.api_keys_db: |
| self.api_keys_db[api_key]['active'] = False |
| logger.info(f"Revoked API key: {self.api_keys_db[api_key]['name']}") |
| return True |
| return False |
|
|
| def track_usage(self, api_key: str): |
| """Track API key usage""" |
| if api_key in self.api_keys_db: |
| self.api_keys_db[api_key]['usage_count'] = \ |
| self.api_keys_db[api_key].get('usage_count', 0) + 1 |
|
|
|
|
| |
| auth_manager = AuthManager() |
|
|
|
|
| |
|
|
|
|
| def require_auth(func): |
| """ |
| Decorator to require authentication for endpoints |
| Checks for JWT token in Authorization header or API key in X-API-Key header |
| """ |
| @wraps(func) |
| async def wrapper(*args, **kwargs): |
| if not ENABLE_AUTH: |
| |
| return await func(*args, **kwargs) |
|
|
| |
| |
| |
| |
| |
| |
|
|
| |
| raise NotImplementedError("Integrate with your web framework") |
|
|
| return wrapper |
|
|
|
|
| def require_api_key(func): |
| """Decorator to require API key authentication""" |
| @wraps(func) |
| async def wrapper(*args, **kwargs): |
| if not ENABLE_AUTH: |
| return await func(*args, **kwargs) |
|
|
| |
| raise NotImplementedError("Integrate with your web framework") |
|
|
| return wrapper |
|
|
|
|
| |
|
|
|
|
| def authenticate_user(username: str, password: str) -> Optional[str]: |
| """ |
| Authenticate user and return JWT token |
| |
| Args: |
| username: Username |
| password: Password |
| |
| Returns: |
| JWT token if successful, None otherwise |
| """ |
| if not ENABLE_AUTH: |
| logger.warning("Authentication disabled") |
| return None |
|
|
| if auth_manager.verify_password(username, password): |
| return auth_manager.create_access_token(username) |
|
|
| return None |
|
|
|
|
| def verify_request_auth( |
| authorization: Optional[str] = None, |
| api_key: Optional[str] = None |
| ) -> bool: |
| """ |
| Verify request authentication |
| |
| Args: |
| authorization: Authorization header (Bearer token) |
| api_key: X-API-Key header |
| |
| Returns: |
| True if authenticated, False otherwise |
| """ |
| if not ENABLE_AUTH: |
| return True |
|
|
| |
| if api_key and auth_manager.verify_api_key(api_key): |
| auth_manager.track_usage(api_key) |
| return True |
|
|
| |
| if authorization and authorization.startswith('Bearer '): |
| token = authorization.split(' ')[1] |
| username = auth_manager.verify_token(token) |
| if username: |
| return True |
|
|
| return False |
|
|