""" MorphGuard Security Integration Integrates the comprehensive security and compliance system with the main Flask application """ import logging import json from datetime import datetime from typing import Dict, Any, Optional from functools import wraps from flask import request, jsonify, session from .compliance_integration import AsyncComplianceWrapper, BiometricProcessingRequest logger = logging.getLogger(__name__) class MorphGuardSecurityIntegration: """ Main integration class for MorphGuard security and compliance """ def __init__(self): self.compliance_wrapper = AsyncComplianceWrapper() self.active_sessions = {} # Track active compliance sessions logger.info("MorphGuard Security Integration initialized") def compliance_required(self, operation_type: str = "general"): """ Decorator to enforce compliance checks on Flask routes """ def decorator(f): @wraps(f) def decorated_function(*args, **kwargs): try: # Get user info from session or request user_id = session.get('user_id') or request.json.get('user_id') jurisdiction = session.get('jurisdiction') or request.json.get('jurisdiction', 'US') if not user_id: return jsonify({ 'error': 'User identification required for compliance', 'compliance_status': 'user_id_missing' }), 400 # Check if user has valid consent for biometric operations if operation_type in ['biometric_enrollment', 'biometric_verification', 'biometric_authentication']: consent_status = self._check_user_consent(user_id, jurisdiction, operation_type) if not consent_status['valid']: return jsonify({ 'error': 'Valid consent required', 'compliance_status': 'consent_required', 'consent_details': consent_status, 'action_required': 'collect_consent' }), 403 # Add compliance context to request request.compliance_context = { 'user_id': user_id, 'jurisdiction': jurisdiction, 'operation_type': operation_type, 'timestamp': datetime.now().isoformat() } # Execute the original function return f(*args, **kwargs) except Exception as e: logger.error(f"Compliance check failed: {e}") return jsonify({ 'error': 'Compliance check failed', 'details': str(e) }), 500 return decorated_function return decorator def _check_user_consent(self, user_id: str, jurisdiction: str, operation_type: str) -> Dict[str, Any]: """Check if user has valid consent for the operation""" try: # For Illinois users, check BIPA compliance if jurisdiction == "IL": bipa_status = self.compliance_wrapper.compliance_manager.bipa_manager.get_bipa_compliance_status(user_id) return { 'valid': bipa_status['compliance_summary']['has_valid_consent'], 'type': 'BIPA', 'details': bipa_status, 'action_required': 'bipa_consent' if not bipa_status['compliance_summary']['has_valid_consent'] else None } # For EU users, check GDPR consent elif jurisdiction in ["EU", "UK", "EEA"]: # This would check GDPR consent records # For now, assume consent is needed for biometric operations return { 'valid': False, # Would check actual GDPR consent 'type': 'GDPR', 'action_required': 'gdpr_consent' } # For California users, provide CCPA disclosure elif jurisdiction == "CA": return { 'valid': True, # CCPA doesn't require explicit consent 'type': 'CCPA_DISCLOSURE', 'disclosure_provided': True } # Default for other jurisdictions else: return { 'valid': True, 'type': 'STANDARD' } except Exception as e: logger.error(f"Consent check failed for user {user_id}: {e}") return { 'valid': False, 'error': str(e), 'action_required': 'consent_verification_failed' } def process_biometric_with_compliance(self, biometric_data: bytes, quality_metrics: Dict[str, float] = None) -> Dict[str, Any]: """ Process biometric data with full compliance and security checks """ # Get compliance context from request context = getattr(request, 'compliance_context', {}) if not context: return { 'success': False, 'error': 'Compliance context missing - use @compliance_required decorator' } # Prepare processing request processing_request = { 'user_id': context['user_id'], 'biometric_data': biometric_data, 'operation_type': context['operation_type'], 'purpose': self._map_operation_to_purpose(context['operation_type']), 'user_jurisdiction': context['jurisdiction'], 'quality_metrics': quality_metrics or {} } # Process with compliance result = self.compliance_wrapper.process_biometric_sync(processing_request) # Log the result self._log_processing_result(context, result) return result def _map_operation_to_purpose(self, operation_type: str) -> str: """Map operation type to compliance purpose""" purpose_mapping = { 'biometric_enrollment': 'identity_enrollment', 'biometric_verification': 'identity_verification', 'biometric_authentication': 'secure_authentication', 'face_capture': 'biometric_capture', 'quality_assessment': 'data_quality_verification' } return purpose_mapping.get(operation_type, 'biometric_processing') def _log_processing_result(self, context: Dict[str, Any], result: Dict[str, Any]): """Log processing result for audit""" log_entry = { 'timestamp': datetime.now().isoformat(), 'user_id': context['user_id'], 'jurisdiction': context['jurisdiction'], 'operation_type': context['operation_type'], 'success': result['success'], 'compliance_status': result.get('compliance_status'), 'processing_time_ms': result.get('processing_time_ms'), 'privacy_measures_applied': result.get('privacy_measures_applied', False), 'security_verified': result.get('security_verified', False) } if not result['success']: log_entry['error'] = result.get('error') log_entry['threat_details'] = result.get('threat_details') logger.info(f"Biometric processing result: {json.dumps(log_entry)}") def collect_user_consent_endpoint(self, user_id: str, jurisdiction: str, biometric_type: str = "face_biometric", purpose: str = "authentication") -> Dict[str, Any]: """ Endpoint for collecting user consent based on jurisdiction """ try: consent_result = self.compliance_wrapper.collect_consent_sync( user_id=user_id, jurisdiction=jurisdiction, biometric_type=biometric_type, purpose=purpose ) # Store consent session for follow-up self.active_sessions[user_id] = { 'consent_result': consent_result, 'timestamp': datetime.now(), 'jurisdiction': jurisdiction } return { 'success': True, 'consent_type': consent_result['consent_type'], 'next_step': consent_result['next_step'], 'consent_details': consent_result, 'session_id': user_id # In production, use proper session ID } except Exception as e: logger.error(f"Consent collection failed for user {user_id}: {e}") return { 'success': False, 'error': str(e) } def acknowledge_consent_endpoint(self, user_id: str, consent_id: str, disclosure_id: str, signature: str) -> Dict[str, Any]: """ Endpoint for acknowledging consent (specifically for BIPA) """ try: # Get user's active session session_data = self.active_sessions.get(user_id) if not session_data: return { 'success': False, 'error': 'No active consent session found' } # For BIPA users, record acknowledgment if session_data['jurisdiction'] == "IL": result = self.compliance_wrapper.compliance_manager.bipa_manager.acknowledge_disclosure_and_consent( user_id=user_id, consent_id=consent_id, disclosure_id=disclosure_id, signature=signature, ip_address=request.remote_addr ) if result: # Update session session_data['consent_acknowledged'] = True session_data['acknowledgment_time'] = datetime.now() return { 'success': True, 'message': 'BIPA consent acknowledged successfully', 'status': 'consent_complete' } else: return { 'success': False, 'error': 'Failed to record consent acknowledgment' } else: return { 'success': True, 'message': 'Consent acknowledgment not required for this jurisdiction', 'status': 'consent_complete' } except Exception as e: logger.error(f"Consent acknowledgment failed for user {user_id}: {e}") return { 'success': False, 'error': str(e) } def handle_user_rights_request_endpoint(self, user_id: str, request_type: str, jurisdiction: str = None) -> Dict[str, Any]: """ Endpoint for handling user rights requests (access, deletion, etc.) """ try: # Use jurisdiction from session if not provided if not jurisdiction: session_data = self.active_sessions.get(user_id) jurisdiction = session_data['jurisdiction'] if session_data else 'US' result = self.compliance_wrapper.compliance_manager.handle_user_rights_request( user_id=user_id, request_type=request_type, jurisdiction=jurisdiction ) return { 'success': True, 'request_type': request_type, 'jurisdiction': jurisdiction, 'request_ids': result, 'status': 'request_submitted' } except Exception as e: logger.error(f"User rights request failed for user {user_id}: {e}") return { 'success': False, 'error': str(e) } def get_compliance_dashboard_endpoint(self) -> Dict[str, Any]: """ Endpoint for compliance dashboard data """ try: dashboard_data = self.compliance_wrapper.get_dashboard_data_sync() return { 'success': True, 'dashboard_data': dashboard_data } except Exception as e: logger.error(f"Dashboard data generation failed: {e}") return { 'success': False, 'error': str(e) } def get_user_compliance_status_endpoint(self, user_id: str, jurisdiction: str = None) -> Dict[str, Any]: """ Endpoint for getting user's compliance status """ try: # Use jurisdiction from session if not provided if not jurisdiction: session_data = self.active_sessions.get(user_id) jurisdiction = session_data['jurisdiction'] if session_data else 'US' status_data = {} # Get jurisdiction-specific compliance status if jurisdiction == "IL": bipa_status = self.compliance_wrapper.compliance_manager.bipa_manager.get_bipa_compliance_status(user_id) status_data['bipa'] = bipa_status # Add other jurisdictions as needed return { 'success': True, 'user_id': user_id, 'jurisdiction': jurisdiction, 'compliance_status': status_data } except Exception as e: logger.error(f"Compliance status check failed for user {user_id}: {e}") return { 'success': False, 'error': str(e) } # Flask integration functions def setup_compliance_routes(app): """ Setup compliance routes in Flask app """ security_integration = MorphGuardSecurityIntegration() @app.route('/api/compliance/consent', methods=['POST']) def collect_consent(): """Collect user consent endpoint""" data = request.json result = security_integration.collect_user_consent_endpoint( user_id=data['user_id'], jurisdiction=data['jurisdiction'], biometric_type=data.get('biometric_type', 'face_biometric'), purpose=data.get('purpose', 'authentication') ) return jsonify(result) @app.route('/api/compliance/acknowledge', methods=['POST']) def acknowledge_consent(): """Acknowledge consent endpoint""" data = request.json result = security_integration.acknowledge_consent_endpoint( user_id=data['user_id'], consent_id=data['consent_id'], disclosure_id=data['disclosure_id'], signature=data['signature'] ) return jsonify(result) @app.route('/api/compliance/user-rights', methods=['POST']) def handle_user_rights(): """Handle user rights requests endpoint""" data = request.json result = security_integration.handle_user_rights_request_endpoint( user_id=data['user_id'], request_type=data['request_type'], jurisdiction=data.get('jurisdiction') ) return jsonify(result) @app.route('/api/compliance/dashboard', methods=['GET']) def compliance_dashboard(): """Compliance dashboard endpoint""" result = security_integration.get_compliance_dashboard_endpoint() return jsonify(result) @app.route('/api/compliance/user-status/', methods=['GET']) def user_compliance_status(user_id): """User compliance status endpoint""" jurisdiction = request.args.get('jurisdiction') result = security_integration.get_user_compliance_status_endpoint( user_id=user_id, jurisdiction=jurisdiction ) return jsonify(result) @app.route('/api/biometric/process', methods=['POST']) @security_integration.compliance_required('biometric_authentication') def process_biometric(): """Process biometric data with compliance checks""" # Get biometric data from request # In real implementation, this would handle file upload or base64 data biometric_data = request.files.get('biometric_data') quality_metrics = request.form.get('quality_metrics') if biometric_data: data_bytes = biometric_data.read() if quality_metrics: quality_metrics = json.loads(quality_metrics) result = security_integration.process_biometric_with_compliance( biometric_data=data_bytes, quality_metrics=quality_metrics ) return jsonify(result) else: return jsonify({ 'success': False, 'error': 'No biometric data provided' }), 400 logger.info("Compliance routes registered with Flask app") return security_integration if __name__ == "__main__": # Example usage print("MorphGuard Security Integration module") print("Use setup_compliance_routes(app) to integrate with Flask application")