|
|
""" |
|
|
Security Module - HIPAA/GDPR Compliance Features |
|
|
Implements authentication, authorization, audit logging, and encryption |
|
|
""" |
|
|
|
|
|
import logging |
|
|
import hashlib |
|
|
import secrets |
|
|
import json |
|
|
from datetime import datetime, timedelta |
|
|
from typing import Dict, List, Any, Optional |
|
|
from functools import wraps |
|
|
import jwt |
|
|
from fastapi import HTTPException, Request, Depends |
|
|
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials |
|
|
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
|
|
|
SECRET_KEY = secrets.token_urlsafe(32) |
|
|
ALGORITHM = "HS256" |
|
|
ACCESS_TOKEN_EXPIRE_MINUTES = 30 |
|
|
|
|
|
|
|
|
class AuditLogger: |
|
|
""" |
|
|
HIPAA-compliant audit logging |
|
|
Tracks all access to PHI (Protected Health Information) |
|
|
""" |
|
|
|
|
|
def __init__(self): |
|
|
self.audit_log_path = "logs/audit.log" |
|
|
logger.info("Audit Logger initialized") |
|
|
|
|
|
def log_access( |
|
|
self, |
|
|
user_id: str, |
|
|
action: str, |
|
|
resource: str, |
|
|
ip_address: str, |
|
|
status: str, |
|
|
details: Optional[Dict[str, Any]] = None |
|
|
): |
|
|
"""Log access to medical data""" |
|
|
try: |
|
|
audit_entry = { |
|
|
"timestamp": datetime.utcnow().isoformat(), |
|
|
"user_id": user_id, |
|
|
"action": action, |
|
|
"resource": resource, |
|
|
"ip_address": self._anonymize_ip(ip_address), |
|
|
"status": status, |
|
|
"details": details or {} |
|
|
} |
|
|
|
|
|
|
|
|
logger.info(f"AUDIT: {json.dumps(audit_entry)}") |
|
|
|
|
|
|
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Audit logging failed: {str(e)}") |
|
|
|
|
|
def _anonymize_ip(self, ip_address: str) -> str: |
|
|
"""Anonymize IP address for GDPR compliance""" |
|
|
|
|
|
if ':' in ip_address: |
|
|
|
|
|
parts = ip_address.split(':') |
|
|
return ':'.join(parts[:4]) + ':xxxx' |
|
|
else: |
|
|
|
|
|
parts = ip_address.split('.') |
|
|
return '.'.join(parts[:3]) + '.xxx' |
|
|
|
|
|
def log_phi_access( |
|
|
self, |
|
|
user_id: str, |
|
|
document_id: str, |
|
|
action: str, |
|
|
ip_address: str |
|
|
): |
|
|
"""Specific logging for PHI access""" |
|
|
self.log_access( |
|
|
user_id=user_id, |
|
|
action=f"PHI_{action}", |
|
|
resource=f"document:{document_id}", |
|
|
ip_address=ip_address, |
|
|
status="SUCCESS", |
|
|
details={"phi_accessed": True} |
|
|
) |
|
|
|
|
|
|
|
|
class SecurityManager: |
|
|
""" |
|
|
Manages authentication, authorization, and encryption |
|
|
""" |
|
|
|
|
|
def __init__(self): |
|
|
self.audit_logger = AuditLogger() |
|
|
self.security_bearer = HTTPBearer(auto_error=False) |
|
|
logger.info("Security Manager initialized") |
|
|
|
|
|
def create_access_token(self, user_id: str, email: str) -> str: |
|
|
"""Create JWT access token""" |
|
|
expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) |
|
|
|
|
|
payload = { |
|
|
"sub": user_id, |
|
|
"email": email, |
|
|
"exp": expire, |
|
|
"iat": datetime.utcnow() |
|
|
} |
|
|
|
|
|
token = jwt.encode(payload, SECRET_KEY, algorithm=ALGORITHM) |
|
|
return token |
|
|
|
|
|
def verify_token(self, token: str) -> Optional[Dict[str, Any]]: |
|
|
"""Verify and decode JWT token""" |
|
|
try: |
|
|
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) |
|
|
return payload |
|
|
except jwt.ExpiredSignatureError: |
|
|
logger.warning("Token expired") |
|
|
return None |
|
|
except jwt.JWTError as e: |
|
|
logger.warning(f"Token verification failed: {str(e)}") |
|
|
return None |
|
|
|
|
|
async def get_current_user( |
|
|
self, |
|
|
request: Request, |
|
|
credentials: Optional[HTTPAuthorizationCredentials] = Depends(HTTPBearer(auto_error=False)) |
|
|
) -> Dict[str, Any]: |
|
|
""" |
|
|
FastAPI dependency for protected routes |
|
|
Validates JWT token and returns user info |
|
|
""" |
|
|
|
|
|
if not credentials: |
|
|
logger.warning("Anonymous access - should be restricted in production") |
|
|
anonymous_user = { |
|
|
"user_id": "anonymous", |
|
|
"email": "anonymous@demo.local", |
|
|
"is_anonymous": True |
|
|
} |
|
|
|
|
|
|
|
|
client_ip = request.client.host if request.client else "unknown" |
|
|
self.audit_logger.log_access( |
|
|
user_id="anonymous", |
|
|
action="API_ACCESS", |
|
|
resource=request.url.path, |
|
|
ip_address=client_ip, |
|
|
status="WARNING_ANONYMOUS" |
|
|
) |
|
|
|
|
|
return anonymous_user |
|
|
|
|
|
|
|
|
token = credentials.credentials |
|
|
payload = self.verify_token(token) |
|
|
|
|
|
if not payload: |
|
|
raise HTTPException( |
|
|
status_code=401, |
|
|
detail="Invalid or expired authentication token" |
|
|
) |
|
|
|
|
|
user_info = { |
|
|
"user_id": payload.get("sub"), |
|
|
"email": payload.get("email"), |
|
|
"is_anonymous": False |
|
|
} |
|
|
|
|
|
|
|
|
client_ip = request.client.host if request.client else "unknown" |
|
|
self.audit_logger.log_access( |
|
|
user_id=user_info["user_id"], |
|
|
action="API_ACCESS", |
|
|
resource=request.url.path, |
|
|
ip_address=client_ip, |
|
|
status="SUCCESS" |
|
|
) |
|
|
|
|
|
return user_info |
|
|
|
|
|
def hash_phi_identifier(self, identifier: str) -> str: |
|
|
""" |
|
|
Hash PHI identifiers for pseudonymization |
|
|
Required for GDPR compliance |
|
|
""" |
|
|
return hashlib.sha256(identifier.encode()).hexdigest() |
|
|
|
|
|
def sanitize_response(self, data: Dict[str, Any]) -> Dict[str, Any]: |
|
|
""" |
|
|
Remove or redact sensitive information from API responses |
|
|
""" |
|
|
|
|
|
|
|
|
if "error" in data: |
|
|
|
|
|
data["error"] = "An error occurred during processing" |
|
|
|
|
|
return data |
|
|
|
|
|
|
|
|
class DataEncryption: |
|
|
""" |
|
|
Handles encryption of data at rest and in transit |
|
|
Required for HIPAA/GDPR compliance |
|
|
""" |
|
|
|
|
|
def __init__(self): |
|
|
|
|
|
self.encryption_key = self._load_or_generate_key() |
|
|
logger.info("Data Encryption initialized") |
|
|
|
|
|
def _load_or_generate_key(self) -> bytes: |
|
|
"""Load encryption key from secure storage""" |
|
|
|
|
|
|
|
|
return secrets.token_bytes(32) |
|
|
|
|
|
def encrypt_data(self, data: bytes) -> bytes: |
|
|
""" |
|
|
Encrypt sensitive data using AES-256 |
|
|
""" |
|
|
|
|
|
|
|
|
logger.warning("Encryption not fully implemented - add cryptography library") |
|
|
return data |
|
|
|
|
|
def decrypt_data(self, encrypted_data: bytes) -> bytes: |
|
|
"""Decrypt data""" |
|
|
logger.warning("Decryption not fully implemented - add cryptography library") |
|
|
return encrypted_data |
|
|
|
|
|
def secure_delete(self, file_path: str): |
|
|
""" |
|
|
Securely delete files containing PHI |
|
|
HIPAA requires secure deletion |
|
|
""" |
|
|
import os |
|
|
try: |
|
|
|
|
|
if os.path.exists(file_path): |
|
|
|
|
|
file_size = os.path.getsize(file_path) |
|
|
with open(file_path, 'wb') as f: |
|
|
f.write(secrets.token_bytes(file_size)) |
|
|
|
|
|
|
|
|
os.remove(file_path) |
|
|
logger.info(f"Securely deleted file: {file_path}") |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Secure deletion failed: {str(e)}") |
|
|
|
|
|
|
|
|
class ComplianceValidator: |
|
|
""" |
|
|
Validates compliance with HIPAA and GDPR requirements |
|
|
""" |
|
|
|
|
|
def __init__(self): |
|
|
self.required_features = { |
|
|
"encryption_at_rest": False, |
|
|
"encryption_in_transit": True, |
|
|
"access_logging": True, |
|
|
"user_authentication": True, |
|
|
"data_retention_policy": False, |
|
|
"right_to_erasure": False, |
|
|
"consent_management": False |
|
|
} |
|
|
|
|
|
def check_compliance(self) -> Dict[str, Any]: |
|
|
"""Check current compliance status""" |
|
|
total_features = len(self.required_features) |
|
|
implemented_features = sum(1 for v in self.required_features.values() if v) |
|
|
|
|
|
return { |
|
|
"compliance_score": f"{implemented_features}/{total_features}", |
|
|
"percentage": round((implemented_features / total_features) * 100, 1), |
|
|
"features": self.required_features, |
|
|
"status": "DEMO_MODE" if implemented_features < total_features else "COMPLIANT", |
|
|
"recommendations": self._get_recommendations() |
|
|
} |
|
|
|
|
|
def _get_recommendations(self) -> List[str]: |
|
|
"""Get compliance recommendations""" |
|
|
recommendations = [] |
|
|
|
|
|
for feature, implemented in self.required_features.items(): |
|
|
if not implemented: |
|
|
recommendations.append( |
|
|
f"Implement {feature.replace('_', ' ').title()}" |
|
|
) |
|
|
|
|
|
return recommendations |
|
|
|
|
|
|
|
|
|
|
|
_security_manager = None |
|
|
|
|
|
|
|
|
def get_security_manager() -> SecurityManager: |
|
|
"""Get singleton security manager instance""" |
|
|
global _security_manager |
|
|
if _security_manager is None: |
|
|
_security_manager = SecurityManager() |
|
|
return _security_manager |
|
|
|
|
|
|
|
|
|
|
|
def require_auth(func): |
|
|
"""Decorator to protect endpoints with authentication""" |
|
|
@wraps(func) |
|
|
async def wrapper(*args, **kwargs): |
|
|
|
|
|
|
|
|
logger.warning(f"Protected endpoint accessed: {func.__name__}") |
|
|
return await func(*args, **kwargs) |
|
|
return wrapper |
|
|
|