File size: 10,837 Bytes
996387b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
"""
Security Module - HIPAA/GDPR Compliance Features
Implements authentication, authorization, audit logging, and encryption
"""

import logging
import hashlib
import secrets
import json
from datetime import datetime, timedelta
from typing import Dict, List, Any, Optional
from functools import wraps
import jwt
from fastapi import HTTPException, Request, Depends
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials

logger = logging.getLogger(__name__)

# Security configuration
SECRET_KEY = secrets.token_urlsafe(32)  # In production, load from environment
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30


class AuditLogger:
    """
    HIPAA-compliant audit logging
    Tracks all access to PHI (Protected Health Information)
    """
    
    def __init__(self):
        self.audit_log_path = "logs/audit.log"
        logger.info("Audit Logger initialized")
    
    def log_access(
        self,
        user_id: str,
        action: str,
        resource: str,
        ip_address: str,
        status: str,
        details: Optional[Dict[str, Any]] = None
    ):
        """Log access to medical data"""
        try:
            audit_entry = {
                "timestamp": datetime.utcnow().isoformat(),
                "user_id": user_id,
                "action": action,
                "resource": resource,
                "ip_address": self._anonymize_ip(ip_address),
                "status": status,
                "details": details or {}
            }
            
            # Log to file
            logger.info(f"AUDIT: {json.dumps(audit_entry)}")
            
            # In production, also store in database for long-term retention
            
        except Exception as e:
            logger.error(f"Audit logging failed: {str(e)}")
    
    def _anonymize_ip(self, ip_address: str) -> str:
        """Anonymize IP address for GDPR compliance"""
        # Hash the last octet for IPv4 or last 80 bits for IPv6
        if ':' in ip_address:
            # IPv6
            parts = ip_address.split(':')
            return ':'.join(parts[:4]) + ':xxxx'
        else:
            # IPv4
            parts = ip_address.split('.')
            return '.'.join(parts[:3]) + '.xxx'
    
    def log_phi_access(
        self,
        user_id: str,
        document_id: str,
        action: str,
        ip_address: str
    ):
        """Specific logging for PHI access"""
        self.log_access(
            user_id=user_id,
            action=f"PHI_{action}",
            resource=f"document:{document_id}",
            ip_address=ip_address,
            status="SUCCESS",
            details={"phi_accessed": True}
        )


class SecurityManager:
    """
    Manages authentication, authorization, and encryption
    """
    
    def __init__(self):
        self.audit_logger = AuditLogger()
        self.security_bearer = HTTPBearer(auto_error=False)
        logger.info("Security Manager initialized")
    
    def create_access_token(self, user_id: str, email: str) -> str:
        """Create JWT access token"""
        expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
        
        payload = {
            "sub": user_id,
            "email": email,
            "exp": expire,
            "iat": datetime.utcnow()
        }
        
        token = jwt.encode(payload, SECRET_KEY, algorithm=ALGORITHM)
        return token
    
    def verify_token(self, token: str) -> Optional[Dict[str, Any]]:
        """Verify and decode JWT token"""
        try:
            payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
            return payload
        except jwt.ExpiredSignatureError:
            logger.warning("Token expired")
            return None
        except jwt.JWTError as e:
            logger.warning(f"Token verification failed: {str(e)}")
            return None
    
    async def get_current_user(
        self,
        request: Request,
        credentials: Optional[HTTPAuthorizationCredentials] = Depends(HTTPBearer(auto_error=False))
    ) -> Dict[str, Any]:
        """
        FastAPI dependency for protected routes
        Validates JWT token and returns user info
        """
        # For development/demo, allow anonymous access but log it
        if not credentials:
            logger.warning("Anonymous access - should be restricted in production")
            anonymous_user = {
                "user_id": "anonymous",
                "email": "anonymous@demo.local",
                "is_anonymous": True
            }
            
            # Log anonymous access
            client_ip = request.client.host if request.client else "unknown"
            self.audit_logger.log_access(
                user_id="anonymous",
                action="API_ACCESS",
                resource=request.url.path,
                ip_address=client_ip,
                status="WARNING_ANONYMOUS"
            )
            
            return anonymous_user
        
        # Verify token
        token = credentials.credentials
        payload = self.verify_token(token)
        
        if not payload:
            raise HTTPException(
                status_code=401,
                detail="Invalid or expired authentication token"
            )
        
        user_info = {
            "user_id": payload.get("sub"),
            "email": payload.get("email"),
            "is_anonymous": False
        }
        
        # Log authenticated access
        client_ip = request.client.host if request.client else "unknown"
        self.audit_logger.log_access(
            user_id=user_info["user_id"],
            action="API_ACCESS",
            resource=request.url.path,
            ip_address=client_ip,
            status="SUCCESS"
        )
        
        return user_info
    
    def hash_phi_identifier(self, identifier: str) -> str:
        """
        Hash PHI identifiers for pseudonymization
        Required for GDPR compliance
        """
        return hashlib.sha256(identifier.encode()).hexdigest()
    
    def sanitize_response(self, data: Dict[str, Any]) -> Dict[str, Any]:
        """
        Remove or redact sensitive information from API responses
        """
        # In production, implement comprehensive PII/PHI redaction
        # For now, basic sanitization
        if "error" in data:
            # Don't expose internal error details
            data["error"] = "An error occurred during processing"
        
        return data


class DataEncryption:
    """
    Handles encryption of data at rest and in transit
    Required for HIPAA/GDPR compliance
    """
    
    def __init__(self):
        # In production, use proper key management (e.g., AWS KMS, Azure Key Vault)
        self.encryption_key = self._load_or_generate_key()
        logger.info("Data Encryption initialized")
    
    def _load_or_generate_key(self) -> bytes:
        """Load encryption key from secure storage"""
        # In production, load from secure key management system
        # For demo, generate a key
        return secrets.token_bytes(32)
    
    def encrypt_data(self, data: bytes) -> bytes:
        """
        Encrypt sensitive data using AES-256
        """
        # In production, implement proper AES-256 encryption
        # For now, return as-is (encryption would require cryptography library)
        logger.warning("Encryption not fully implemented - add cryptography library")
        return data
    
    def decrypt_data(self, encrypted_data: bytes) -> bytes:
        """Decrypt data"""
        logger.warning("Decryption not fully implemented - add cryptography library")
        return encrypted_data
    
    def secure_delete(self, file_path: str):
        """
        Securely delete files containing PHI
        HIPAA requires secure deletion
        """
        import os
        try:
            # In production, overwrite file multiple times before deletion
            if os.path.exists(file_path):
                # Overwrite with random data
                file_size = os.path.getsize(file_path)
                with open(file_path, 'wb') as f:
                    f.write(secrets.token_bytes(file_size))
                
                # Delete file
                os.remove(file_path)
                logger.info(f"Securely deleted file: {file_path}")
                
        except Exception as e:
            logger.error(f"Secure deletion failed: {str(e)}")


class ComplianceValidator:
    """
    Validates compliance with HIPAA and GDPR requirements
    """
    
    def __init__(self):
        self.required_features = {
            "encryption_at_rest": False,  # Would be True in production
            "encryption_in_transit": True,  # HTTPS enforced
            "access_logging": True,
            "user_authentication": True,  # Available but not enforced in demo
            "data_retention_policy": False,  # Would implement in production
            "right_to_erasure": False,  # GDPR - would implement in production
            "consent_management": False  # Would implement in production
        }
    
    def check_compliance(self) -> Dict[str, Any]:
        """Check current compliance status"""
        total_features = len(self.required_features)
        implemented_features = sum(1 for v in self.required_features.values() if v)
        
        return {
            "compliance_score": f"{implemented_features}/{total_features}",
            "percentage": round((implemented_features / total_features) * 100, 1),
            "features": self.required_features,
            "status": "DEMO_MODE" if implemented_features < total_features else "COMPLIANT",
            "recommendations": self._get_recommendations()
        }
    
    def _get_recommendations(self) -> List[str]:
        """Get compliance recommendations"""
        recommendations = []
        
        for feature, implemented in self.required_features.items():
            if not implemented:
                recommendations.append(
                    f"Implement {feature.replace('_', ' ').title()}"
                )
        
        return recommendations


# Global security manager instance
_security_manager = None


def get_security_manager() -> SecurityManager:
    """Get singleton security manager instance"""
    global _security_manager
    if _security_manager is None:
        _security_manager = SecurityManager()
    return _security_manager


# Decorator for protected routes
def require_auth(func):
    """Decorator to protect endpoints with authentication"""
    @wraps(func)
    async def wrapper(*args, **kwargs):
        # In production, enforce authentication
        # For demo, log warning and allow access
        logger.warning(f"Protected endpoint accessed: {func.__name__}")
        return await func(*args, **kwargs)
    return wrapper