|
|
from typing import Dict, List, Any, Optional |
|
|
from pydantic import BaseModel |
|
|
from datetime import datetime, timedelta |
|
|
import jwt |
|
|
from passlib.hash import bcrypt |
|
|
from enum import Enum |
|
|
import uuid |
|
|
|
|
|
class UserRole(Enum): |
|
|
ADMIN = "admin" |
|
|
USER = "user" |
|
|
VIEWER = "viewer" |
|
|
|
|
|
class Permission(Enum): |
|
|
READ = "read" |
|
|
WRITE = "write" |
|
|
EXECUTE = "execute" |
|
|
MANAGE = "manage" |
|
|
|
|
|
class User(BaseModel): |
|
|
"""User model""" |
|
|
id: str |
|
|
username: str |
|
|
email: str |
|
|
hashed_password: str |
|
|
role: UserRole |
|
|
tenant_id: str |
|
|
permissions: List[Permission] |
|
|
metadata: Dict[str, Any] |
|
|
created_at: datetime |
|
|
last_login: Optional[datetime] |
|
|
is_active: bool = True |
|
|
|
|
|
class Tenant(BaseModel): |
|
|
"""Tenant model for multi-tenancy""" |
|
|
id: str |
|
|
name: str |
|
|
config: Dict[str, Any] |
|
|
created_at: datetime |
|
|
features: List[str] |
|
|
is_active: bool = True |
|
|
|
|
|
class AuthManager: |
|
|
"""Handles authentication and authorization""" |
|
|
|
|
|
def __init__(self, config: Dict[str, Any]): |
|
|
self.config = config |
|
|
self.secret_key = config['secret_key'] |
|
|
self.token_expiry = config.get('token_expiry', 3600) |
|
|
|
|
|
async def create_user( |
|
|
self, |
|
|
username: str, |
|
|
email: str, |
|
|
password: str, |
|
|
role: UserRole, |
|
|
tenant_id: str, |
|
|
permissions: List[Permission] |
|
|
) -> User: |
|
|
"""Create new user""" |
|
|
hashed_password = bcrypt.hash(password) |
|
|
|
|
|
user = User( |
|
|
id=str(uuid.uuid4()), |
|
|
username=username, |
|
|
email=email, |
|
|
hashed_password=hashed_password, |
|
|
role=role, |
|
|
tenant_id=tenant_id, |
|
|
permissions=permissions, |
|
|
metadata={}, |
|
|
created_at=datetime.now() |
|
|
) |
|
|
|
|
|
|
|
|
await self.store_user(user) |
|
|
|
|
|
return user |
|
|
|
|
|
async def authenticate(self, username: str, password: str) -> Optional[str]: |
|
|
"""Authenticate user and return JWT token""" |
|
|
user = await self.get_user_by_username(username) |
|
|
if not user or not user.is_active: |
|
|
return None |
|
|
|
|
|
if not bcrypt.verify(password, user.hashed_password): |
|
|
return None |
|
|
|
|
|
|
|
|
user.last_login = datetime.now() |
|
|
await self.update_user(user) |
|
|
|
|
|
|
|
|
token_data = { |
|
|
'sub': user.id, |
|
|
'username': user.username, |
|
|
'role': user.role.value, |
|
|
'tenant_id': user.tenant_id, |
|
|
'permissions': [p.value for p in user.permissions], |
|
|
'exp': datetime.utcnow() + timedelta(seconds=self.token_expiry) |
|
|
} |
|
|
|
|
|
return jwt.encode(token_data, self.secret_key, algorithm='HS256') |
|
|
|
|
|
async def verify_token(self, token: str) -> Optional[Dict[str, Any]]: |
|
|
"""Verify JWT token and return payload""" |
|
|
try: |
|
|
payload = jwt.decode(token, self.secret_key, algorithms=['HS256']) |
|
|
return payload |
|
|
except jwt.ExpiredSignatureError: |
|
|
return None |
|
|
except jwt.InvalidTokenError: |
|
|
return None |
|
|
|
|
|
async def has_permission( |
|
|
self, |
|
|
user_id: str, |
|
|
permission: Permission, |
|
|
resource: str |
|
|
) -> bool: |
|
|
"""Check if user has specific permission""" |
|
|
user = await self.get_user(user_id) |
|
|
if not user or not user.is_active: |
|
|
return False |
|
|
|
|
|
|
|
|
if user.role == UserRole.ADMIN: |
|
|
return True |
|
|
|
|
|
|
|
|
return permission in user.permissions |
|
|
|
|
|
class AuditLogger: |
|
|
"""Handles audit logging""" |
|
|
|
|
|
def __init__(self, storage_manager: Any): |
|
|
self.storage = storage_manager |
|
|
|
|
|
async def log_event( |
|
|
self, |
|
|
event_type: str, |
|
|
user_id: str, |
|
|
tenant_id: str, |
|
|
resource: str, |
|
|
action: str, |
|
|
status: str, |
|
|
details: Dict[str, Any] |
|
|
): |
|
|
"""Log audit event""" |
|
|
event = { |
|
|
'event_type': event_type, |
|
|
'user_id': user_id, |
|
|
'tenant_id': tenant_id, |
|
|
'resource': resource, |
|
|
'action': action, |
|
|
'status': status, |
|
|
'details': details, |
|
|
'timestamp': datetime.now().isoformat() |
|
|
} |
|
|
|
|
|
await self.storage.store( |
|
|
type=StorageType.AUDIT, |
|
|
data=event, |
|
|
tenant_id=tenant_id |
|
|
) |
|
|
|
|
|
class SecurityManager: |
|
|
"""Main security manager""" |
|
|
|
|
|
def __init__(self, config: Dict[str, Any], storage_manager: Any): |
|
|
self.config = config |
|
|
self.auth_manager = AuthManager(config) |
|
|
self.audit_logger = AuditLogger(storage_manager) |
|
|
|
|
|
async def initialize_tenant( |
|
|
self, |
|
|
name: str, |
|
|
features: List[str], |
|
|
config: Dict[str, Any] |
|
|
) -> Tenant: |
|
|
"""Initialize new tenant""" |
|
|
tenant = Tenant( |
|
|
id=str(uuid.uuid4()), |
|
|
name=name, |
|
|
config=config, |
|
|
features=features, |
|
|
created_at=datetime.now() |
|
|
) |
|
|
|
|
|
|
|
|
await self.store_tenant(tenant) |
|
|
|
|
|
return tenant |
|
|
|
|
|
async def handle_request( |
|
|
self, |
|
|
token: str, |
|
|
resource: str, |
|
|
action: Permission |
|
|
) -> Optional[Dict[str, Any]]: |
|
|
"""Handle authenticated request""" |
|
|
|
|
|
payload = await self.auth_manager.verify_token(token) |
|
|
if not payload: |
|
|
return None |
|
|
|
|
|
|
|
|
has_permission = await self.auth_manager.has_permission( |
|
|
payload['sub'], |
|
|
action, |
|
|
resource |
|
|
) |
|
|
|
|
|
if not has_permission: |
|
|
return None |
|
|
|
|
|
|
|
|
await self.audit_logger.log_event( |
|
|
event_type="access", |
|
|
user_id=payload['sub'], |
|
|
tenant_id=payload['tenant_id'], |
|
|
resource=resource, |
|
|
action=action.value, |
|
|
status="success", |
|
|
details={} |
|
|
) |
|
|
|
|
|
return payload |
|
|
|
|
|
|
|
|
security_config = { |
|
|
'secret_key': 'your-secret-key', |
|
|
'token_expiry': 3600, |
|
|
'password_policy': { |
|
|
'min_length': 8, |
|
|
'require_special': True |
|
|
} |
|
|
} |
|
|
|
|
|
security_manager = SecurityManager(security_config, storage_manager) |