Spaces:
Running
Running
| """ | |
| MorphGuard Security Integration | |
| Integrates the comprehensive security and compliance system with the main Flask application | |
| """ | |
| import logging | |
| import json | |
| from datetime import datetime | |
| from typing import Dict, Any, Optional | |
| from functools import wraps | |
| from flask import request, jsonify, session | |
| from .compliance_integration import AsyncComplianceWrapper, BiometricProcessingRequest | |
| logger = logging.getLogger(__name__) | |
| class MorphGuardSecurityIntegration: | |
| """ | |
| Main integration class for MorphGuard security and compliance | |
| """ | |
| def __init__(self): | |
| self.compliance_wrapper = AsyncComplianceWrapper() | |
| self.active_sessions = {} # Track active compliance sessions | |
| logger.info("MorphGuard Security Integration initialized") | |
| def compliance_required(self, operation_type: str = "general"): | |
| """ | |
| Decorator to enforce compliance checks on Flask routes | |
| """ | |
| def decorator(f): | |
| def decorated_function(*args, **kwargs): | |
| try: | |
| # Get user info from session or request | |
| user_id = session.get('user_id') or request.json.get('user_id') | |
| jurisdiction = session.get('jurisdiction') or request.json.get('jurisdiction', 'US') | |
| if not user_id: | |
| return jsonify({ | |
| 'error': 'User identification required for compliance', | |
| 'compliance_status': 'user_id_missing' | |
| }), 400 | |
| # Check if user has valid consent for biometric operations | |
| if operation_type in ['biometric_enrollment', 'biometric_verification', 'biometric_authentication']: | |
| consent_status = self._check_user_consent(user_id, jurisdiction, operation_type) | |
| if not consent_status['valid']: | |
| return jsonify({ | |
| 'error': 'Valid consent required', | |
| 'compliance_status': 'consent_required', | |
| 'consent_details': consent_status, | |
| 'action_required': 'collect_consent' | |
| }), 403 | |
| # Add compliance context to request | |
| request.compliance_context = { | |
| 'user_id': user_id, | |
| 'jurisdiction': jurisdiction, | |
| 'operation_type': operation_type, | |
| 'timestamp': datetime.now().isoformat() | |
| } | |
| # Execute the original function | |
| return f(*args, **kwargs) | |
| except Exception as e: | |
| logger.error(f"Compliance check failed: {e}") | |
| return jsonify({ | |
| 'error': 'Compliance check failed', | |
| 'details': str(e) | |
| }), 500 | |
| return decorated_function | |
| return decorator | |
| def _check_user_consent(self, user_id: str, jurisdiction: str, operation_type: str) -> Dict[str, Any]: | |
| """Check if user has valid consent for the operation""" | |
| try: | |
| # For Illinois users, check BIPA compliance | |
| if jurisdiction == "IL": | |
| bipa_status = self.compliance_wrapper.compliance_manager.bipa_manager.get_bipa_compliance_status(user_id) | |
| return { | |
| 'valid': bipa_status['compliance_summary']['has_valid_consent'], | |
| 'type': 'BIPA', | |
| 'details': bipa_status, | |
| 'action_required': 'bipa_consent' if not bipa_status['compliance_summary']['has_valid_consent'] else None | |
| } | |
| # For EU users, check GDPR consent | |
| elif jurisdiction in ["EU", "UK", "EEA"]: | |
| # This would check GDPR consent records | |
| # For now, assume consent is needed for biometric operations | |
| return { | |
| 'valid': False, # Would check actual GDPR consent | |
| 'type': 'GDPR', | |
| 'action_required': 'gdpr_consent' | |
| } | |
| # For California users, provide CCPA disclosure | |
| elif jurisdiction == "CA": | |
| return { | |
| 'valid': True, # CCPA doesn't require explicit consent | |
| 'type': 'CCPA_DISCLOSURE', | |
| 'disclosure_provided': True | |
| } | |
| # Default for other jurisdictions | |
| else: | |
| return { | |
| 'valid': True, | |
| 'type': 'STANDARD' | |
| } | |
| except Exception as e: | |
| logger.error(f"Consent check failed for user {user_id}: {e}") | |
| return { | |
| 'valid': False, | |
| 'error': str(e), | |
| 'action_required': 'consent_verification_failed' | |
| } | |
| def process_biometric_with_compliance(self, biometric_data: bytes, | |
| quality_metrics: Dict[str, float] = None) -> Dict[str, Any]: | |
| """ | |
| Process biometric data with full compliance and security checks | |
| """ | |
| # Get compliance context from request | |
| context = getattr(request, 'compliance_context', {}) | |
| if not context: | |
| return { | |
| 'success': False, | |
| 'error': 'Compliance context missing - use @compliance_required decorator' | |
| } | |
| # Prepare processing request | |
| processing_request = { | |
| 'user_id': context['user_id'], | |
| 'biometric_data': biometric_data, | |
| 'operation_type': context['operation_type'], | |
| 'purpose': self._map_operation_to_purpose(context['operation_type']), | |
| 'user_jurisdiction': context['jurisdiction'], | |
| 'quality_metrics': quality_metrics or {} | |
| } | |
| # Process with compliance | |
| result = self.compliance_wrapper.process_biometric_sync(processing_request) | |
| # Log the result | |
| self._log_processing_result(context, result) | |
| return result | |
| def _map_operation_to_purpose(self, operation_type: str) -> str: | |
| """Map operation type to compliance purpose""" | |
| purpose_mapping = { | |
| 'biometric_enrollment': 'identity_enrollment', | |
| 'biometric_verification': 'identity_verification', | |
| 'biometric_authentication': 'secure_authentication', | |
| 'face_capture': 'biometric_capture', | |
| 'quality_assessment': 'data_quality_verification' | |
| } | |
| return purpose_mapping.get(operation_type, 'biometric_processing') | |
| def _log_processing_result(self, context: Dict[str, Any], result: Dict[str, Any]): | |
| """Log processing result for audit""" | |
| log_entry = { | |
| 'timestamp': datetime.now().isoformat(), | |
| 'user_id': context['user_id'], | |
| 'jurisdiction': context['jurisdiction'], | |
| 'operation_type': context['operation_type'], | |
| 'success': result['success'], | |
| 'compliance_status': result.get('compliance_status'), | |
| 'processing_time_ms': result.get('processing_time_ms'), | |
| 'privacy_measures_applied': result.get('privacy_measures_applied', False), | |
| 'security_verified': result.get('security_verified', False) | |
| } | |
| if not result['success']: | |
| log_entry['error'] = result.get('error') | |
| log_entry['threat_details'] = result.get('threat_details') | |
| logger.info(f"Biometric processing result: {json.dumps(log_entry)}") | |
| def collect_user_consent_endpoint(self, user_id: str, jurisdiction: str, | |
| biometric_type: str = "face_biometric", | |
| purpose: str = "authentication") -> Dict[str, Any]: | |
| """ | |
| Endpoint for collecting user consent based on jurisdiction | |
| """ | |
| try: | |
| consent_result = self.compliance_wrapper.collect_consent_sync( | |
| user_id=user_id, | |
| jurisdiction=jurisdiction, | |
| biometric_type=biometric_type, | |
| purpose=purpose | |
| ) | |
| # Store consent session for follow-up | |
| self.active_sessions[user_id] = { | |
| 'consent_result': consent_result, | |
| 'timestamp': datetime.now(), | |
| 'jurisdiction': jurisdiction | |
| } | |
| return { | |
| 'success': True, | |
| 'consent_type': consent_result['consent_type'], | |
| 'next_step': consent_result['next_step'], | |
| 'consent_details': consent_result, | |
| 'session_id': user_id # In production, use proper session ID | |
| } | |
| except Exception as e: | |
| logger.error(f"Consent collection failed for user {user_id}: {e}") | |
| return { | |
| 'success': False, | |
| 'error': str(e) | |
| } | |
| def acknowledge_consent_endpoint(self, user_id: str, consent_id: str, | |
| disclosure_id: str, signature: str) -> Dict[str, Any]: | |
| """ | |
| Endpoint for acknowledging consent (specifically for BIPA) | |
| """ | |
| try: | |
| # Get user's active session | |
| session_data = self.active_sessions.get(user_id) | |
| if not session_data: | |
| return { | |
| 'success': False, | |
| 'error': 'No active consent session found' | |
| } | |
| # For BIPA users, record acknowledgment | |
| if session_data['jurisdiction'] == "IL": | |
| result = self.compliance_wrapper.compliance_manager.bipa_manager.acknowledge_disclosure_and_consent( | |
| user_id=user_id, | |
| consent_id=consent_id, | |
| disclosure_id=disclosure_id, | |
| signature=signature, | |
| ip_address=request.remote_addr | |
| ) | |
| if result: | |
| # Update session | |
| session_data['consent_acknowledged'] = True | |
| session_data['acknowledgment_time'] = datetime.now() | |
| return { | |
| 'success': True, | |
| 'message': 'BIPA consent acknowledged successfully', | |
| 'status': 'consent_complete' | |
| } | |
| else: | |
| return { | |
| 'success': False, | |
| 'error': 'Failed to record consent acknowledgment' | |
| } | |
| else: | |
| return { | |
| 'success': True, | |
| 'message': 'Consent acknowledgment not required for this jurisdiction', | |
| 'status': 'consent_complete' | |
| } | |
| except Exception as e: | |
| logger.error(f"Consent acknowledgment failed for user {user_id}: {e}") | |
| return { | |
| 'success': False, | |
| 'error': str(e) | |
| } | |
| def handle_user_rights_request_endpoint(self, user_id: str, request_type: str, | |
| jurisdiction: str = None) -> Dict[str, Any]: | |
| """ | |
| Endpoint for handling user rights requests (access, deletion, etc.) | |
| """ | |
| try: | |
| # Use jurisdiction from session if not provided | |
| if not jurisdiction: | |
| session_data = self.active_sessions.get(user_id) | |
| jurisdiction = session_data['jurisdiction'] if session_data else 'US' | |
| result = self.compliance_wrapper.compliance_manager.handle_user_rights_request( | |
| user_id=user_id, | |
| request_type=request_type, | |
| jurisdiction=jurisdiction | |
| ) | |
| return { | |
| 'success': True, | |
| 'request_type': request_type, | |
| 'jurisdiction': jurisdiction, | |
| 'request_ids': result, | |
| 'status': 'request_submitted' | |
| } | |
| except Exception as e: | |
| logger.error(f"User rights request failed for user {user_id}: {e}") | |
| return { | |
| 'success': False, | |
| 'error': str(e) | |
| } | |
| def get_compliance_dashboard_endpoint(self) -> Dict[str, Any]: | |
| """ | |
| Endpoint for compliance dashboard data | |
| """ | |
| try: | |
| dashboard_data = self.compliance_wrapper.get_dashboard_data_sync() | |
| return { | |
| 'success': True, | |
| 'dashboard_data': dashboard_data | |
| } | |
| except Exception as e: | |
| logger.error(f"Dashboard data generation failed: {e}") | |
| return { | |
| 'success': False, | |
| 'error': str(e) | |
| } | |
| def get_user_compliance_status_endpoint(self, user_id: str, | |
| jurisdiction: str = None) -> Dict[str, Any]: | |
| """ | |
| Endpoint for getting user's compliance status | |
| """ | |
| try: | |
| # Use jurisdiction from session if not provided | |
| if not jurisdiction: | |
| session_data = self.active_sessions.get(user_id) | |
| jurisdiction = session_data['jurisdiction'] if session_data else 'US' | |
| status_data = {} | |
| # Get jurisdiction-specific compliance status | |
| if jurisdiction == "IL": | |
| bipa_status = self.compliance_wrapper.compliance_manager.bipa_manager.get_bipa_compliance_status(user_id) | |
| status_data['bipa'] = bipa_status | |
| # Add other jurisdictions as needed | |
| return { | |
| 'success': True, | |
| 'user_id': user_id, | |
| 'jurisdiction': jurisdiction, | |
| 'compliance_status': status_data | |
| } | |
| except Exception as e: | |
| logger.error(f"Compliance status check failed for user {user_id}: {e}") | |
| return { | |
| 'success': False, | |
| 'error': str(e) | |
| } | |
| # Flask integration functions | |
| def setup_compliance_routes(app): | |
| """ | |
| Setup compliance routes in Flask app | |
| """ | |
| security_integration = MorphGuardSecurityIntegration() | |
| def collect_consent(): | |
| """Collect user consent endpoint""" | |
| data = request.json | |
| result = security_integration.collect_user_consent_endpoint( | |
| user_id=data['user_id'], | |
| jurisdiction=data['jurisdiction'], | |
| biometric_type=data.get('biometric_type', 'face_biometric'), | |
| purpose=data.get('purpose', 'authentication') | |
| ) | |
| return jsonify(result) | |
| def acknowledge_consent(): | |
| """Acknowledge consent endpoint""" | |
| data = request.json | |
| result = security_integration.acknowledge_consent_endpoint( | |
| user_id=data['user_id'], | |
| consent_id=data['consent_id'], | |
| disclosure_id=data['disclosure_id'], | |
| signature=data['signature'] | |
| ) | |
| return jsonify(result) | |
| def handle_user_rights(): | |
| """Handle user rights requests endpoint""" | |
| data = request.json | |
| result = security_integration.handle_user_rights_request_endpoint( | |
| user_id=data['user_id'], | |
| request_type=data['request_type'], | |
| jurisdiction=data.get('jurisdiction') | |
| ) | |
| return jsonify(result) | |
| def compliance_dashboard(): | |
| """Compliance dashboard endpoint""" | |
| result = security_integration.get_compliance_dashboard_endpoint() | |
| return jsonify(result) | |
| def user_compliance_status(user_id): | |
| """User compliance status endpoint""" | |
| jurisdiction = request.args.get('jurisdiction') | |
| result = security_integration.get_user_compliance_status_endpoint( | |
| user_id=user_id, | |
| jurisdiction=jurisdiction | |
| ) | |
| return jsonify(result) | |
| def process_biometric(): | |
| """Process biometric data with compliance checks""" | |
| # Get biometric data from request | |
| # In real implementation, this would handle file upload or base64 data | |
| biometric_data = request.files.get('biometric_data') | |
| quality_metrics = request.form.get('quality_metrics') | |
| if biometric_data: | |
| data_bytes = biometric_data.read() | |
| if quality_metrics: | |
| quality_metrics = json.loads(quality_metrics) | |
| result = security_integration.process_biometric_with_compliance( | |
| biometric_data=data_bytes, | |
| quality_metrics=quality_metrics | |
| ) | |
| return jsonify(result) | |
| else: | |
| return jsonify({ | |
| 'success': False, | |
| 'error': 'No biometric data provided' | |
| }), 400 | |
| logger.info("Compliance routes registered with Flask app") | |
| return security_integration | |
| if __name__ == "__main__": | |
| # Example usage | |
| print("MorphGuard Security Integration module") | |
| print("Use setup_compliance_routes(app) to integrate with Flask application") |