# Security Framework - BDR Agent Factory ## Overview Comprehensive security framework ensuring the protection of sensitive insurance data, secure AI capability access, and compliance with regulatory requirements. --- ## Security Architecture ``` ┌─────────────────────────────────────────────────────────────────┐ │ Security Layers │ ├─────────────────────────────────────────────────────────────────┤ │ │ │ ┌──────────────────────────────────────────────────────────┐ │ │ │ Layer 1: Network Security (Firewall, WAF, DDoS) │ │ │ └──────────────────────────────────────────────────────────┘ │ │ ↓ │ │ ┌──────────────────────────────────────────────────────────┐ │ │ │ Layer 2: Authentication & Authorization (OAuth 2.0) │ │ │ └──────────────────────────────────────────────────────────┘ │ │ ↓ │ │ ┌──────────────────────────────────────────────────────────┐ │ │ │ Layer 3: API Security (Rate Limiting, Input Validation) │ │ │ └──────────────────────────────────────────────────────────┘ │ │ ↓ │ │ ┌──────────────────────────────────────────────────────────┐ │ │ │ Layer 4: Data Encryption (TLS, AES-256) │ │ │ └──────────────────────────────────────────────────────────┘ │ │ ↓ │ │ ┌──────────────────────────────────────────────────────────┐ │ │ │ Layer 5: Audit & Monitoring (SIEM, Logging) │ │ │ └──────────────────────────────────────────────────────────┘ │ │ │ └─────────────────────────────────────────────────────────────────┘ ``` --- ## 1. Authentication & Authorization ### OAuth 2.0 Implementation #### Client Credentials Flow ```python from authlib.integrations.flask_oauth2 import AuthorizationServer from authlib.oauth2.rfc6749 import grants class ClientCredentialsGrant(grants.ClientCredentialsGrant): def save_token(self, token): # Save token to database pass authorization = AuthorizationServer() authorization.register_grant(ClientCredentialsGrant) # Token endpoint @app.route('/oauth/token', methods=['POST']) def issue_token(): return authorization.create_token_response() ``` #### Token Structure ```json { "access_token": "eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9...", "token_type": "Bearer", "expires_in": 3600, "scope": "capability:invoke system:read audit:read", "issued_at": "2026-01-03T00:13:00Z" } ``` #### JWT Token Validation ```python import jwt from functools import wraps from flask import request, jsonify def require_auth(scopes=None): """Decorator to require authentication""" def decorator(f): @wraps(f) def decorated_function(*args, **kwargs): token = request.headers.get('Authorization', '').replace('Bearer ', '') if not token: return jsonify({'error': 'Missing authentication token'}), 401 try: # Verify token payload = jwt.decode( token, PUBLIC_KEY, algorithms=['RS256'], audience='bdr-agent-factory' ) # Check scopes if scopes: token_scopes = payload.get('scope', '').split() if not any(scope in token_scopes for scope in scopes): return jsonify({'error': 'Insufficient permissions'}), 403 # Add user info to request context request.user_id = payload.get('sub') request.client_id = payload.get('client_id') return f(*args, **kwargs) except jwt.ExpiredSignatureError: return jsonify({'error': 'Token expired'}), 401 except jwt.InvalidTokenError: return jsonify({'error': 'Invalid token'}), 401 return decorated_function return decorator # Usage @app.route('/v1/capabilities//invoke', methods=['POST']) @require_auth(scopes=['capability:invoke']) def invoke_capability(capability_id): # Capability invocation logic pass ``` ### Role-Based Access Control (RBAC) ```python class Role: ADMIN = 'admin' DEVELOPER = 'developer' ANALYST = 'analyst' AUDITOR = 'auditor' SYSTEM = 'system' class Permission: # Capability permissions CAPABILITY_INVOKE = 'capability:invoke' CAPABILITY_READ = 'capability:read' CAPABILITY_WRITE = 'capability:write' # System permissions SYSTEM_READ = 'system:read' SYSTEM_WRITE = 'system:write' # Audit permissions AUDIT_READ = 'audit:read' AUDIT_WRITE = 'audit:write' # Governance permissions GOVERNANCE_READ = 'governance:read' GOVERNANCE_WRITE = 'governance:write' ROLE_PERMISSIONS = { Role.ADMIN: [ Permission.CAPABILITY_INVOKE, Permission.CAPABILITY_READ, Permission.CAPABILITY_WRITE, Permission.SYSTEM_READ, Permission.SYSTEM_WRITE, Permission.AUDIT_READ, Permission.AUDIT_WRITE, Permission.GOVERNANCE_READ, Permission.GOVERNANCE_WRITE, ], Role.DEVELOPER: [ Permission.CAPABILITY_INVOKE, Permission.CAPABILITY_READ, Permission.SYSTEM_READ, Permission.AUDIT_READ, ], Role.ANALYST: [ Permission.CAPABILITY_INVOKE, Permission.CAPABILITY_READ, Permission.AUDIT_READ, Permission.GOVERNANCE_READ, ], Role.AUDITOR: [ Permission.AUDIT_READ, Permission.GOVERNANCE_READ, ], Role.SYSTEM: [ Permission.CAPABILITY_INVOKE, Permission.SYSTEM_READ, Permission.AUDIT_WRITE, ], } def check_permission(user_role, required_permission): """Check if user role has required permission""" return required_permission in ROLE_PERMISSIONS.get(user_role, []) ``` --- ## 2. Data Encryption ### Encryption at Rest ```python from cryptography.fernet import Fernet from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes from cryptography.hazmat.backends import default_backend import os class DataEncryption: def __init__(self, key=None): """Initialize with AES-256 encryption key""" self.key = key or os.environ.get('ENCRYPTION_KEY') if not self.key: raise ValueError('Encryption key not provided') def encrypt_field(self, plaintext): """Encrypt sensitive field using AES-256""" if isinstance(plaintext, str): plaintext = plaintext.encode('utf-8') # Generate random IV iv = os.urandom(16) # Create cipher cipher = Cipher( algorithms.AES(self.key.encode()[:32]), modes.CBC(iv), backend=default_backend() ) # Encrypt encryptor = cipher.encryptor() # Pad plaintext to block size block_size = 16 padding_length = block_size - (len(plaintext) % block_size) padded_plaintext = plaintext + (bytes([padding_length]) * padding_length) ciphertext = encryptor.update(padded_plaintext) + encryptor.finalize() # Return IV + ciphertext (base64 encoded) import base64 return base64.b64encode(iv + ciphertext).decode('utf-8') def decrypt_field(self, ciphertext): """Decrypt sensitive field""" import base64 # Decode from base64 encrypted_data = base64.b64decode(ciphertext) # Extract IV and ciphertext iv = encrypted_data[:16] ciphertext = encrypted_data[16:] # Create cipher cipher = Cipher( algorithms.AES(self.key.encode()[:32]), modes.CBC(iv), backend=default_backend() ) # Decrypt decryptor = cipher.decryptor() padded_plaintext = decryptor.update(ciphertext) + decryptor.finalize() # Remove padding padding_length = padded_plaintext[-1] plaintext = padded_plaintext[:-padding_length] return plaintext.decode('utf-8') # Usage encryption = DataEncryption() # Encrypt sensitive data before storing sensitive_data = "Patient medical history" encrypted_data = encryption.encrypt_field(sensitive_data) # Store encrypted_data in database # ... # Decrypt when needed decrypted_data = encryption.decrypt_field(encrypted_data) ``` ### Encryption in Transit (TLS) ```python # Flask app with TLS from flask import Flask import ssl app = Flask(__name__) if __name__ == '__main__': context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2) context.load_cert_chain('cert.pem', 'key.pem') app.run( host='0.0.0.0', port=443, ssl_context=context ) ``` ### Field-Level Encryption ```python class EncryptedField: """Database field that automatically encrypts/decrypts""" def __init__(self, encryption_service): self.encryption = encryption_service def __set__(self, instance, value): if value is not None: encrypted_value = self.encryption.encrypt_field(value) instance.__dict__[self.name] = encrypted_value else: instance.__dict__[self.name] = None def __get__(self, instance, owner): encrypted_value = instance.__dict__.get(self.name) if encrypted_value is not None: return self.encryption.decrypt_field(encrypted_value) return None # Usage in model class ClaimRecord: def __init__(self): self.encryption = DataEncryption() # Encrypted fields claimant_ssn = EncryptedField(encryption) medical_details = EncryptedField(encryption) # Non-encrypted fields claim_id = None claim_amount = None ``` --- ## 3. Input Validation & Sanitization ### Input Validation ```python from pydantic import BaseModel, validator, Field from typing import List, Optional import re class CapabilityInvokeRequest(BaseModel): """Validated request model for capability invocation""" input: dict = Field(..., description="Input data for capability") options: Optional[dict] = Field(default={}, description="Optional parameters") @validator('input') def validate_input(cls, v): # Check for required fields based on capability if 'text' in v: text = v['text'] # Length validation if len(text) > 10000: raise ValueError('Text exceeds maximum length of 10000 characters') # Check for malicious content if re.search(r'/invoke', methods=['POST']) @require_auth(scopes=['capability:invoke']) def invoke_capability(capability_id): try: # Validate request request_data = CapabilityInvokeRequest(**request.json) # Process validated data result = process_capability(capability_id, request_data.input, request_data.options) return jsonify(result), 200 except ValueError as e: return jsonify({'error': str(e)}), 400 ``` ### SQL Injection Prevention ```python from sqlalchemy import create_engine, text from sqlalchemy.orm import sessionmaker # GOOD: Use parameterized queries def get_capability_safe(capability_id): query = text("SELECT * FROM capabilities WHERE id = :capability_id") result = session.execute(query, {"capability_id": capability_id}) return result.fetchone() # BAD: Never use string concatenation def get_capability_unsafe(capability_id): # NEVER DO THIS! query = f"SELECT * FROM capabilities WHERE id = '{capability_id}'" result = session.execute(query) return result.fetchone() ``` ### XSS Prevention ```python import bleach from markupsafe import escape def sanitize_html(html_content): """Sanitize HTML content to prevent XSS""" allowed_tags = ['p', 'br', 'strong', 'em', 'u'] allowed_attributes = {} return bleach.clean( html_content, tags=allowed_tags, attributes=allowed_attributes, strip=True ) def sanitize_text(text): """Escape special characters in text""" return escape(text) ``` --- ## 4. Rate Limiting ### Implementation ```python from flask_limiter import Limiter from flask_limiter.util import get_remote_address import redis # Initialize rate limiter limiter = Limiter( app, key_func=get_remote_address, storage_uri="redis://localhost:6379" ) # Apply rate limits @app.route('/v1/capabilities//invoke', methods=['POST']) @limiter.limit("100 per minute") @require_auth(scopes=['capability:invoke']) def invoke_capability(capability_id): # Capability invocation logic pass # Custom rate limit based on user tier def get_rate_limit(): """Dynamic rate limit based on user tier""" user_tier = request.user_tier # From auth token limits = { 'free': '10 per minute', 'standard': '100 per minute', 'premium': '1000 per minute', 'enterprise': '10000 per minute' } return limits.get(user_tier, '10 per minute') @app.route('/v1/capabilities//invoke', methods=['POST']) @limiter.limit(get_rate_limit) @require_auth(scopes=['capability:invoke']) def invoke_capability_tiered(capability_id): # Capability invocation logic pass ``` --- ## 5. Security Headers ### HTTP Security Headers ```python from flask import Flask from flask_talisman import Talisman app = Flask(__name__) # Configure security headers Talisman(app, force_https=True, strict_transport_security=True, strict_transport_security_max_age=31536000, content_security_policy={ 'default-src': "'self'", 'script-src': "'self'", 'style-src': "'self'", 'img-src': "'self' data:", 'font-src': "'self'", }, content_security_policy_nonce_in=['script-src'], referrer_policy='strict-origin-when-cross-origin', feature_policy={ 'geolocation': "'none'", 'microphone': "'none'", 'camera': "'none'", } ) # Additional headers @app.after_request def set_security_headers(response): response.headers['X-Content-Type-Options'] = 'nosniff' response.headers['X-Frame-Options'] = 'DENY' response.headers['X-XSS-Protection'] = '1; mode=block' response.headers['Permissions-Policy'] = 'geolocation=(), microphone=(), camera=()' return response ``` --- ## 6. Secrets Management ### Using Environment Variables ```python import os from dotenv import load_dotenv # Load environment variables load_dotenv() class Config: # Database DATABASE_URL = os.getenv('DATABASE_URL') # Encryption ENCRYPTION_KEY = os.getenv('ENCRYPTION_KEY') # OAuth OAUTH_CLIENT_ID = os.getenv('OAUTH_CLIENT_ID') OAUTH_CLIENT_SECRET = os.getenv('OAUTH_CLIENT_SECRET') # API Keys OPENAI_API_KEY = os.getenv('OPENAI_API_KEY') # JWT JWT_SECRET_KEY = os.getenv('JWT_SECRET_KEY') JWT_PUBLIC_KEY = os.getenv('JWT_PUBLIC_KEY') ``` ### Using AWS Secrets Manager ```python import boto3 import json class SecretsManager: def __init__(self): self.client = boto3.client('secretsmanager', region_name='us-east-1') def get_secret(self, secret_name): """Retrieve secret from AWS Secrets Manager""" try: response = self.client.get_secret_value(SecretId=secret_name) return json.loads(response['SecretString']) except Exception as e: raise Exception(f"Failed to retrieve secret: {str(e)}") # Usage secrets = SecretsManager() db_credentials = secrets.get_secret('bdr-agent-factory/database') ``` --- ## 7. Audit Logging ### Security Audit Logs ```python class SecurityAuditLogger: def __init__(self): self.logger = StructuredLogger('security_audit') def log_authentication_attempt(self, user_id, success, ip_address, reason=None): """Log authentication attempt""" self.logger.info( 'Authentication attempt', event_type='authentication', user_id=user_id, success=success, ip_address=ip_address, reason=reason, timestamp=datetime.utcnow().isoformat() ) def log_authorization_failure(self, user_id, resource, required_permission): """Log authorization failure""" self.logger.warning( 'Authorization failed', event_type='authorization_failure', user_id=user_id, resource=resource, required_permission=required_permission, timestamp=datetime.utcnow().isoformat() ) def log_data_access(self, user_id, resource_type, resource_id, action): """Log data access""" self.logger.info( 'Data access', event_type='data_access', user_id=user_id, resource_type=resource_type, resource_id=resource_id, action=action, timestamp=datetime.utcnow().isoformat() ) def log_security_event(self, event_type, severity, description, **kwargs): """Log general security event""" log_method = getattr(self.logger, severity.lower()) log_method( description, event_type=event_type, severity=severity, timestamp=datetime.utcnow().isoformat(), **kwargs ) # Usage audit_logger = SecurityAuditLogger() # Log authentication audit_logger.log_authentication_attempt( user_id='user_123', success=True, ip_address='192.168.1.1' ) # Log suspicious activity audit_logger.log_security_event( event_type='suspicious_activity', severity='WARNING', description='Multiple failed login attempts', user_id='user_123', ip_address='192.168.1.1', attempt_count=5 ) ``` --- ## 8. Incident Response ### Incident Response Plan #### Phase 1: Detection - Monitor security alerts - Analyze anomalous behavior - Validate security incidents #### Phase 2: Containment - Isolate affected systems - Revoke compromised credentials - Block malicious IP addresses #### Phase 3: Eradication - Remove malicious code - Patch vulnerabilities - Update security rules #### Phase 4: Recovery - Restore systems from backups - Verify system integrity - Resume normal operations #### Phase 5: Post-Incident - Document incident - Conduct root cause analysis - Update security procedures ### Incident Response Procedures ```python class IncidentResponse: def __init__(self): self.logger = SecurityAuditLogger() def detect_incident(self, incident_type, severity, details): """Detect and log security incident""" incident_id = self.create_incident( incident_type=incident_type, severity=severity, details=details ) # Log incident self.logger.log_security_event( event_type='incident_detected', severity=severity, description=f'Security incident detected: {incident_type}', incident_id=incident_id, **details ) # Trigger alerts if severity in ['HIGH', 'CRITICAL']: self.trigger_alert(incident_id, severity) return incident_id def contain_incident(self, incident_id): """Contain security incident""" # Revoke tokens self.revoke_all_tokens() # Block suspicious IPs self.block_suspicious_ips() # Isolate affected systems self.isolate_systems() self.logger.log_security_event( event_type='incident_contained', severity='INFO', description='Incident containment measures applied', incident_id=incident_id ) def trigger_alert(self, incident_id, severity): """Trigger incident alert""" # Send to PagerDuty, Slack, email, etc. pass ``` --- ## 9. Vulnerability Management ### Dependency Scanning ```bash # Scan Python dependencies pip install safety safety check # Scan with Snyk snyk test # Scan Docker images docker scan bdr-agent-factory:latest ``` ### Security Testing ```bash # Static analysis bandit -r bdr_agent_factory/ # Dependency vulnerabilities pip-audit # SAST (Static Application Security Testing) semgrep --config=auto . # DAST (Dynamic Application Security Testing) zap-cli quick-scan http://localhost:8000 ``` --- ## 10. Compliance ### GDPR Compliance - **Data minimization**: Collect only necessary data - **Right to erasure**: Implement data deletion - **Data portability**: Export user data - **Consent management**: Track user consent - **Breach notification**: 72-hour notification requirement ### HIPAA Compliance - **Access controls**: Role-based access - **Audit trails**: Complete logging - **Encryption**: Data at rest and in transit - **Breach notification**: Timely notification - **Business associate agreements**: Vendor contracts ### SOC 2 Compliance - **Security**: Access controls, encryption - **Availability**: Uptime monitoring - **Processing integrity**: Data validation - **Confidentiality**: Data protection - **Privacy**: Privacy controls --- ## Security Checklist - [ ] OAuth 2.0 authentication implemented - [ ] JWT token validation configured - [ ] RBAC permissions defined - [ ] TLS/SSL certificates installed - [ ] Data encryption at rest enabled - [ ] Field-level encryption for sensitive data - [ ] Input validation on all endpoints - [ ] SQL injection prevention verified - [ ] XSS prevention implemented - [ ] Rate limiting configured - [ ] Security headers set - [ ] Secrets management configured - [ ] Audit logging enabled - [ ] Incident response plan documented - [ ] Vulnerability scanning automated - [ ] Dependency updates scheduled - [ ] Security testing in CI/CD - [ ] Compliance requirements met - [ ] Penetration testing scheduled - [ ] Security training completed --- ## Support For security issues: - Email: security@bdragentfactory.com - Bug Bounty: https://bdragentfactory.com/security/bug-bounty - Security Policy: See SECURITY.md