MorphGuard / src /security /morphguard_security_integration.py
juanquy's picture
Initial clean commit of modular MorphGuard
2978bba
Raw
History Blame Contribute Delete
18.4 kB
"""
MorphGuard Security Integration
Integrates the comprehensive security and compliance system with the main Flask application
"""
import logging
import json
from datetime import datetime
from typing import Dict, Any, Optional
from functools import wraps
from flask import request, jsonify, session
from .compliance_integration import AsyncComplianceWrapper, BiometricProcessingRequest
logger = logging.getLogger(__name__)
class MorphGuardSecurityIntegration:
"""
Main integration class for MorphGuard security and compliance
"""
def __init__(self):
self.compliance_wrapper = AsyncComplianceWrapper()
self.active_sessions = {} # Track active compliance sessions
logger.info("MorphGuard Security Integration initialized")
def compliance_required(self, operation_type: str = "general"):
"""
Decorator to enforce compliance checks on Flask routes
"""
def decorator(f):
@wraps(f)
def decorated_function(*args, **kwargs):
try:
# Get user info from session or request
user_id = session.get('user_id') or request.json.get('user_id')
jurisdiction = session.get('jurisdiction') or request.json.get('jurisdiction', 'US')
if not user_id:
return jsonify({
'error': 'User identification required for compliance',
'compliance_status': 'user_id_missing'
}), 400
# Check if user has valid consent for biometric operations
if operation_type in ['biometric_enrollment', 'biometric_verification', 'biometric_authentication']:
consent_status = self._check_user_consent(user_id, jurisdiction, operation_type)
if not consent_status['valid']:
return jsonify({
'error': 'Valid consent required',
'compliance_status': 'consent_required',
'consent_details': consent_status,
'action_required': 'collect_consent'
}), 403
# Add compliance context to request
request.compliance_context = {
'user_id': user_id,
'jurisdiction': jurisdiction,
'operation_type': operation_type,
'timestamp': datetime.now().isoformat()
}
# Execute the original function
return f(*args, **kwargs)
except Exception as e:
logger.error(f"Compliance check failed: {e}")
return jsonify({
'error': 'Compliance check failed',
'details': str(e)
}), 500
return decorated_function
return decorator
def _check_user_consent(self, user_id: str, jurisdiction: str, operation_type: str) -> Dict[str, Any]:
"""Check if user has valid consent for the operation"""
try:
# For Illinois users, check BIPA compliance
if jurisdiction == "IL":
bipa_status = self.compliance_wrapper.compliance_manager.bipa_manager.get_bipa_compliance_status(user_id)
return {
'valid': bipa_status['compliance_summary']['has_valid_consent'],
'type': 'BIPA',
'details': bipa_status,
'action_required': 'bipa_consent' if not bipa_status['compliance_summary']['has_valid_consent'] else None
}
# For EU users, check GDPR consent
elif jurisdiction in ["EU", "UK", "EEA"]:
# This would check GDPR consent records
# For now, assume consent is needed for biometric operations
return {
'valid': False, # Would check actual GDPR consent
'type': 'GDPR',
'action_required': 'gdpr_consent'
}
# For California users, provide CCPA disclosure
elif jurisdiction == "CA":
return {
'valid': True, # CCPA doesn't require explicit consent
'type': 'CCPA_DISCLOSURE',
'disclosure_provided': True
}
# Default for other jurisdictions
else:
return {
'valid': True,
'type': 'STANDARD'
}
except Exception as e:
logger.error(f"Consent check failed for user {user_id}: {e}")
return {
'valid': False,
'error': str(e),
'action_required': 'consent_verification_failed'
}
def process_biometric_with_compliance(self, biometric_data: bytes,
quality_metrics: Dict[str, float] = None) -> Dict[str, Any]:
"""
Process biometric data with full compliance and security checks
"""
# Get compliance context from request
context = getattr(request, 'compliance_context', {})
if not context:
return {
'success': False,
'error': 'Compliance context missing - use @compliance_required decorator'
}
# Prepare processing request
processing_request = {
'user_id': context['user_id'],
'biometric_data': biometric_data,
'operation_type': context['operation_type'],
'purpose': self._map_operation_to_purpose(context['operation_type']),
'user_jurisdiction': context['jurisdiction'],
'quality_metrics': quality_metrics or {}
}
# Process with compliance
result = self.compliance_wrapper.process_biometric_sync(processing_request)
# Log the result
self._log_processing_result(context, result)
return result
def _map_operation_to_purpose(self, operation_type: str) -> str:
"""Map operation type to compliance purpose"""
purpose_mapping = {
'biometric_enrollment': 'identity_enrollment',
'biometric_verification': 'identity_verification',
'biometric_authentication': 'secure_authentication',
'face_capture': 'biometric_capture',
'quality_assessment': 'data_quality_verification'
}
return purpose_mapping.get(operation_type, 'biometric_processing')
def _log_processing_result(self, context: Dict[str, Any], result: Dict[str, Any]):
"""Log processing result for audit"""
log_entry = {
'timestamp': datetime.now().isoformat(),
'user_id': context['user_id'],
'jurisdiction': context['jurisdiction'],
'operation_type': context['operation_type'],
'success': result['success'],
'compliance_status': result.get('compliance_status'),
'processing_time_ms': result.get('processing_time_ms'),
'privacy_measures_applied': result.get('privacy_measures_applied', False),
'security_verified': result.get('security_verified', False)
}
if not result['success']:
log_entry['error'] = result.get('error')
log_entry['threat_details'] = result.get('threat_details')
logger.info(f"Biometric processing result: {json.dumps(log_entry)}")
def collect_user_consent_endpoint(self, user_id: str, jurisdiction: str,
biometric_type: str = "face_biometric",
purpose: str = "authentication") -> Dict[str, Any]:
"""
Endpoint for collecting user consent based on jurisdiction
"""
try:
consent_result = self.compliance_wrapper.collect_consent_sync(
user_id=user_id,
jurisdiction=jurisdiction,
biometric_type=biometric_type,
purpose=purpose
)
# Store consent session for follow-up
self.active_sessions[user_id] = {
'consent_result': consent_result,
'timestamp': datetime.now(),
'jurisdiction': jurisdiction
}
return {
'success': True,
'consent_type': consent_result['consent_type'],
'next_step': consent_result['next_step'],
'consent_details': consent_result,
'session_id': user_id # In production, use proper session ID
}
except Exception as e:
logger.error(f"Consent collection failed for user {user_id}: {e}")
return {
'success': False,
'error': str(e)
}
def acknowledge_consent_endpoint(self, user_id: str, consent_id: str,
disclosure_id: str, signature: str) -> Dict[str, Any]:
"""
Endpoint for acknowledging consent (specifically for BIPA)
"""
try:
# Get user's active session
session_data = self.active_sessions.get(user_id)
if not session_data:
return {
'success': False,
'error': 'No active consent session found'
}
# For BIPA users, record acknowledgment
if session_data['jurisdiction'] == "IL":
result = self.compliance_wrapper.compliance_manager.bipa_manager.acknowledge_disclosure_and_consent(
user_id=user_id,
consent_id=consent_id,
disclosure_id=disclosure_id,
signature=signature,
ip_address=request.remote_addr
)
if result:
# Update session
session_data['consent_acknowledged'] = True
session_data['acknowledgment_time'] = datetime.now()
return {
'success': True,
'message': 'BIPA consent acknowledged successfully',
'status': 'consent_complete'
}
else:
return {
'success': False,
'error': 'Failed to record consent acknowledgment'
}
else:
return {
'success': True,
'message': 'Consent acknowledgment not required for this jurisdiction',
'status': 'consent_complete'
}
except Exception as e:
logger.error(f"Consent acknowledgment failed for user {user_id}: {e}")
return {
'success': False,
'error': str(e)
}
def handle_user_rights_request_endpoint(self, user_id: str, request_type: str,
jurisdiction: str = None) -> Dict[str, Any]:
"""
Endpoint for handling user rights requests (access, deletion, etc.)
"""
try:
# Use jurisdiction from session if not provided
if not jurisdiction:
session_data = self.active_sessions.get(user_id)
jurisdiction = session_data['jurisdiction'] if session_data else 'US'
result = self.compliance_wrapper.compliance_manager.handle_user_rights_request(
user_id=user_id,
request_type=request_type,
jurisdiction=jurisdiction
)
return {
'success': True,
'request_type': request_type,
'jurisdiction': jurisdiction,
'request_ids': result,
'status': 'request_submitted'
}
except Exception as e:
logger.error(f"User rights request failed for user {user_id}: {e}")
return {
'success': False,
'error': str(e)
}
def get_compliance_dashboard_endpoint(self) -> Dict[str, Any]:
"""
Endpoint for compliance dashboard data
"""
try:
dashboard_data = self.compliance_wrapper.get_dashboard_data_sync()
return {
'success': True,
'dashboard_data': dashboard_data
}
except Exception as e:
logger.error(f"Dashboard data generation failed: {e}")
return {
'success': False,
'error': str(e)
}
def get_user_compliance_status_endpoint(self, user_id: str,
jurisdiction: str = None) -> Dict[str, Any]:
"""
Endpoint for getting user's compliance status
"""
try:
# Use jurisdiction from session if not provided
if not jurisdiction:
session_data = self.active_sessions.get(user_id)
jurisdiction = session_data['jurisdiction'] if session_data else 'US'
status_data = {}
# Get jurisdiction-specific compliance status
if jurisdiction == "IL":
bipa_status = self.compliance_wrapper.compliance_manager.bipa_manager.get_bipa_compliance_status(user_id)
status_data['bipa'] = bipa_status
# Add other jurisdictions as needed
return {
'success': True,
'user_id': user_id,
'jurisdiction': jurisdiction,
'compliance_status': status_data
}
except Exception as e:
logger.error(f"Compliance status check failed for user {user_id}: {e}")
return {
'success': False,
'error': str(e)
}
# Flask integration functions
def setup_compliance_routes(app):
"""
Setup compliance routes in Flask app
"""
security_integration = MorphGuardSecurityIntegration()
@app.route('/api/compliance/consent', methods=['POST'])
def collect_consent():
"""Collect user consent endpoint"""
data = request.json
result = security_integration.collect_user_consent_endpoint(
user_id=data['user_id'],
jurisdiction=data['jurisdiction'],
biometric_type=data.get('biometric_type', 'face_biometric'),
purpose=data.get('purpose', 'authentication')
)
return jsonify(result)
@app.route('/api/compliance/acknowledge', methods=['POST'])
def acknowledge_consent():
"""Acknowledge consent endpoint"""
data = request.json
result = security_integration.acknowledge_consent_endpoint(
user_id=data['user_id'],
consent_id=data['consent_id'],
disclosure_id=data['disclosure_id'],
signature=data['signature']
)
return jsonify(result)
@app.route('/api/compliance/user-rights', methods=['POST'])
def handle_user_rights():
"""Handle user rights requests endpoint"""
data = request.json
result = security_integration.handle_user_rights_request_endpoint(
user_id=data['user_id'],
request_type=data['request_type'],
jurisdiction=data.get('jurisdiction')
)
return jsonify(result)
@app.route('/api/compliance/dashboard', methods=['GET'])
def compliance_dashboard():
"""Compliance dashboard endpoint"""
result = security_integration.get_compliance_dashboard_endpoint()
return jsonify(result)
@app.route('/api/compliance/user-status/<user_id>', methods=['GET'])
def user_compliance_status(user_id):
"""User compliance status endpoint"""
jurisdiction = request.args.get('jurisdiction')
result = security_integration.get_user_compliance_status_endpoint(
user_id=user_id,
jurisdiction=jurisdiction
)
return jsonify(result)
@app.route('/api/biometric/process', methods=['POST'])
@security_integration.compliance_required('biometric_authentication')
def process_biometric():
"""Process biometric data with compliance checks"""
# Get biometric data from request
# In real implementation, this would handle file upload or base64 data
biometric_data = request.files.get('biometric_data')
quality_metrics = request.form.get('quality_metrics')
if biometric_data:
data_bytes = biometric_data.read()
if quality_metrics:
quality_metrics = json.loads(quality_metrics)
result = security_integration.process_biometric_with_compliance(
biometric_data=data_bytes,
quality_metrics=quality_metrics
)
return jsonify(result)
else:
return jsonify({
'success': False,
'error': 'No biometric data provided'
}), 400
logger.info("Compliance routes registered with Flask app")
return security_integration
if __name__ == "__main__":
# Example usage
print("MorphGuard Security Integration module")
print("Use setup_compliance_routes(app) to integrate with Flask application")