from typing import Dict, List, Any, Optional from pydantic import BaseModel from datetime import datetime, timedelta import jwt from passlib.hash import bcrypt from enum import Enum import uuid class UserRole(Enum): ADMIN = "admin" USER = "user" VIEWER = "viewer" class Permission(Enum): READ = "read" WRITE = "write" EXECUTE = "execute" MANAGE = "manage" class User(BaseModel): """User model""" id: str username: str email: str hashed_password: str role: UserRole tenant_id: str permissions: List[Permission] metadata: Dict[str, Any] created_at: datetime last_login: Optional[datetime] is_active: bool = True class Tenant(BaseModel): """Tenant model for multi-tenancy""" id: str name: str config: Dict[str, Any] created_at: datetime features: List[str] is_active: bool = True class AuthManager: """Handles authentication and authorization""" def __init__(self, config: Dict[str, Any]): self.config = config self.secret_key = config['secret_key'] self.token_expiry = config.get('token_expiry', 3600) # 1 hour default async def create_user( self, username: str, email: str, password: str, role: UserRole, tenant_id: str, permissions: List[Permission] ) -> User: """Create new user""" hashed_password = bcrypt.hash(password) user = User( id=str(uuid.uuid4()), username=username, email=email, hashed_password=hashed_password, role=role, tenant_id=tenant_id, permissions=permissions, metadata={}, created_at=datetime.now() ) # Store user in database await self.store_user(user) return user async def authenticate(self, username: str, password: str) -> Optional[str]: """Authenticate user and return JWT token""" user = await self.get_user_by_username(username) if not user or not user.is_active: return None if not bcrypt.verify(password, user.hashed_password): return None # Update last login user.last_login = datetime.now() await self.update_user(user) # Generate JWT token token_data = { 'sub': user.id, 'username': user.username, 'role': user.role.value, 'tenant_id': user.tenant_id, 'permissions': [p.value for p in user.permissions], 'exp': datetime.utcnow() + timedelta(seconds=self.token_expiry) } return jwt.encode(token_data, self.secret_key, algorithm='HS256') async def verify_token(self, token: str) -> Optional[Dict[str, Any]]: """Verify JWT token and return payload""" try: payload = jwt.decode(token, self.secret_key, algorithms=['HS256']) return payload except jwt.ExpiredSignatureError: return None except jwt.InvalidTokenError: return None async def has_permission( self, user_id: str, permission: Permission, resource: str ) -> bool: """Check if user has specific permission""" user = await self.get_user(user_id) if not user or not user.is_active: return False # Admin role has all permissions if user.role == UserRole.ADMIN: return True # Check specific permission return permission in user.permissions class AuditLogger: """Handles audit logging""" def __init__(self, storage_manager: Any): self.storage = storage_manager async def log_event( self, event_type: str, user_id: str, tenant_id: str, resource: str, action: str, status: str, details: Dict[str, Any] ): """Log audit event""" event = { 'event_type': event_type, 'user_id': user_id, 'tenant_id': tenant_id, 'resource': resource, 'action': action, 'status': status, 'details': details, 'timestamp': datetime.now().isoformat() } await self.storage.store( type=StorageType.AUDIT, data=event, tenant_id=tenant_id ) class SecurityManager: """Main security manager""" def __init__(self, config: Dict[str, Any], storage_manager: Any): self.config = config self.auth_manager = AuthManager(config) self.audit_logger = AuditLogger(storage_manager) async def initialize_tenant( self, name: str, features: List[str], config: Dict[str, Any] ) -> Tenant: """Initialize new tenant""" tenant = Tenant( id=str(uuid.uuid4()), name=name, config=config, features=features, created_at=datetime.now() ) # Store tenant await self.store_tenant(tenant) return tenant async def handle_request( self, token: str, resource: str, action: Permission ) -> Optional[Dict[str, Any]]: """Handle authenticated request""" # Verify token payload = await self.auth_manager.verify_token(token) if not payload: return None # Check permission has_permission = await self.auth_manager.has_permission( payload['sub'], action, resource ) if not has_permission: return None # Log access await self.audit_logger.log_event( event_type="access", user_id=payload['sub'], tenant_id=payload['tenant_id'], resource=resource, action=action.value, status="success", details={} ) return payload # Example usage: security_config = { 'secret_key': 'your-secret-key', 'token_expiry': 3600, 'password_policy': { 'min_length': 8, 'require_special': True } } security_manager = SecurityManager(security_config, storage_manager)