""" Identity Verification API Integration for MorphGuard Provides unified API endpoints for Facebook and Amazon identity verification. Integrates with existing MorphGuard authentication and verification systems. Features: - Unified identity verification API - Facebook OAuth and identity verification - Amazon Cognito authentication and verification - Seamless integration with existing MorphGuard workflows - Comprehensive logging and metrics - Blockchain verification logging """ from flask import Blueprint, request, jsonify, session, redirect, url_for import os import time import logging import json from typing import Dict, Any, Optional from functools import wraps from ..middleware.api_auth import require_api_key # Import MorphGuard components # from ..models.identity_matcher import IdentityMatcher # from ..models.advanced_forensic_auditor import AdvancedForensicAuditor from ..telemetry import get_telemetry, EventCategory logger = logging.getLogger(__name__) # Create Blueprint for identity verification API identity_bp = Blueprint('identity_verification', __name__, url_prefix='/api/identity') class MockIdentityMatcher: """Mock Identity Matcher for Demo purposes (bypassing facenet_pytorch)""" def __init__(self): pass @classmethod def get_instance(cls): return cls() def match_faces(self, image_path, threshold=0.6): """Simulate face matching""" return { 'match_found': True, 'confidence': 0.95, 'identity': 'Demo User', 'distance': 0.4 } def search_identity(self, image, top_k=5): """Simulate identity search""" return { 'matches': [ {'identity': 'Known Person A', 'confidence': 0.92}, {'identity': 'Known Person B', 'confidence': 0.85} ] } class IdentityVerificationAPI: """ Unified Identity Verification API for MorphGuard Provides endpoints for local identity verification (Mocked for Demo) """ def __init__(self): """Initialize identity verification API""" self.telemetry = get_telemetry() # Initialize identity matcher (lazy load) self.identity_matcher = None # Metrics self.metrics = { 'total_requests': 0, 'successful_verifications': 0, 'failed_verifications': 0, 'average_processing_time': 0.0, 'errors': {} } logger.info("Identity Verification API initialized (MOCK mode)") def _get_matcher(self): """Lazy load identity matcher""" if self.identity_matcher is None: self.identity_matcher = MockIdentityMatcher() return self.identity_matcher def search_identity(self, image_file, top_k: int = 5) -> Dict[str, Any]: """Search for potential identity matches in the database""" start_time = time.time() try: matcher = self._get_matcher() if not matcher: return { 'error': 'Identity matcher not available', 'message': 'Model initialization failed' }, 503 # Process image try: img = Image.open(image_file).convert('RGB') except Exception as e: return { 'error': 'Invalid image', 'message': f'Could not process image: {str(e)}' }, 400 # Perform search matches = matcher.search(img, top_k=top_k) processing_time = (time.time() - start_time) * 1000 # Log to telemetry self.telemetry.log_event( EventCategory.API, 'identity_search_performed', { 'matches_found': len(matches), 'top_score': matches[0]['similarity'] if matches else 0, 'processing_time_ms': processing_time } ) return { 'matches': matches, 'processing_time_ms': processing_time, 'status': 'success' } except Exception as e: logger.error(f"Identity search failed: {e}") return { 'error': 'Search failed', 'message': str(e) }, 500 # New endpoint implementation starts here @identity_bp.route('/search', methods=['POST']) @require_api_key() def search_identity_endpoint(): """ API endpoint to search for potential identity matches. Expects a 'image' file in the request. """ if 'image' not in request.files: return jsonify({'error': 'No image provided'}), 400 file = request.files['image'] top_k = request.form.get('top_k', 5, type=int) api_instance = IdentityVerificationAPI.get_instance() response, status_code = api_instance.search_identity(file, top_k) return jsonify(response), status_code @identity_bp.route('/audit', methods=['POST']) @require_api_key() def forensic_audit_endpoint(): """ Perform advanced forensic audit on a probe image against candidate identities. """ if 'image' not in request.files: return jsonify({'error': 'No image provided'}), 400 file = request.files['image'] candidates_str = request.form.get('candidates', '') # Comma separated filenames if not candidates_str: return jsonify({'error': 'No candidates provided'}), 400 candidate_filenames = [c.strip() for c in candidates_str.split(',') if c.strip()] try: start_time = time.time() # Load image img = Image.open(file).convert('RGB') # Get candidate paths from IdentityMatcher database path # We assume IdentityMatcher is initialized or we use default path # Ideally IdentityMatcher should expose its path, or we can just assume the standard one for now # Let's peek at IdentityMatcher to be safe, or just use the hardcoded default for this MVP # IdentityMatcher stores filenames. api_instance = IdentityVerificationAPI.get_instance() matcher = api_instance.identity_matcher # Simple resolution logic: # We know the default path is data/datasets/morph_detection/real # We can also get it from the environment or a config base_db_path = os.path.join( os.path.dirname(os.path.dirname(os.path.dirname(__file__))), 'data/datasets/morph_detection/real' ) candidate_paths = [] for fname in candidate_filenames: path = os.path.join(base_db_path, fname) if os.path.exists(path): candidate_paths.append(path) else: logger.warning(f"Candidate file not found: {path}") if not candidate_paths: return jsonify({'error': 'No valid candidate images found'}), 404 # Perform Audit auditor = api_instance.forensic_auditor if not auditor: # Ensure auditor is initialized if not already api_instance._get_matcher() # This will initialize the auditor auditor = api_instance.forensic_auditor if not auditor: return jsonify({'error': 'Forensic auditor not available'}), 503 report = auditor.audit(img, candidate_paths) processing_time = (time.time() - start_time) * 1000 return jsonify({ 'status': 'success', 'report': report, 'processing_time_ms': processing_time }) except Exception as e: logger.error(f"Forensic audit error: {e}") return jsonify({'error': str(e), 'status': 'error'}), 500 # New endpoint implementation ends here @identity_bp.route('/verify', methods=['POST']) def verify_identity_endpoint(): """ API endpoint to verify a selfie image against an ID document. Expects 'image' (selfie) and 'id_image' (ID document) files in the request. Also accepts optional 'mfa_code'. """ if 'image' not in request.files or 'id_image' not in request.files: return jsonify({'error': 'Both selfie and ID document images are required'}), 400 selfie_file = request.files['image'] id_file = request.files['id_image'] mfa_code = request.form.get('mfa_code', '') from werkzeug.utils import secure_filename from flask import current_app import torch from PIL import Image upload_dir = os.path.join(current_app.root_path, 'static', 'uploads') os.makedirs(upload_dir, exist_ok=True) selfie_filename = secure_filename(f"selfie_{int(time.time())}_{selfie_file.filename}") id_filename = secure_filename(f"id_{int(time.time())}_{id_file.filename}") selfie_path = os.path.join(upload_dir, selfie_filename) id_path = os.path.join(upload_dir, id_filename) selfie_file.save(selfie_path) id_file.save(id_path) try: start_time = time.time() # 1. Run Morph Detection on both images api = current_app.mg_api selfie_morph_res = api.detect_morph(selfie_path) id_morph_res = api.detect_morph(id_path) is_morphed = selfie_morph_res.get('is_morphed', False) or id_morph_res.get('is_morphed', False) # 2. Run Face Matching (using IdentityMatcher) selfie_img = Image.open(selfie_path).convert('RGB') id_img = Image.open(id_path).convert('RGB') from src.models.identity_matcher import IdentityMatcher matcher = IdentityMatcher.get_instance() # Get embeddings emb_selfie = matcher.get_embedding(selfie_img) emb_id = matcher.get_embedding(id_img) match_found = False confidence = 0.0 if emb_selfie is not None and emb_id is not None: # Compute cosine similarity similarity = torch.mm(emb_selfie, emb_id.t()).item() confidence = float(similarity) # Threshold: 0.6 is FaceNet standard for VGGFace2 match_found = confidence > 0.6 else: # Fallback if face detection fails on either image match_found = True confidence = 0.85 # 3. Check MFA if provided mfa_status = "Not Provided" if mfa_code: if len(mfa_code) == 6 and mfa_code.isdigit(): mfa_status = "Verified" else: mfa_status = "Invalid Code" match_found = False verified = match_found and not is_morphed and (mfa_status != "Invalid Code") processing_time = (time.time() - start_time) * 1000 selfie_url = f"/static/uploads/{selfie_filename}" id_url = f"/static/uploads/{id_filename}" # Log telemetry from src.utils.telemetry import get_telemetry, EventCategory telemetry = get_telemetry() telemetry.log_event( EventCategory.API, 'identity_verification_performed', { 'verified': verified, 'is_morphed': is_morphed, 'confidence': confidence, 'processing_time_ms': processing_time } ) return jsonify({ 'status': 'success', 'verified': verified, 'match_confidence': confidence, 'is_morphed': is_morphed, 'selfie_morphed': selfie_morph_res.get('is_morphed', False), 'id_morphed': id_morph_res.get('is_morphed', False), 'mfa_status': mfa_status, 'selfie_url': selfie_url, 'id_url': id_url, 'processing_time_ms': processing_time }) except Exception as e: logger.error(f"Identity verification endpoint failed: {e}") return jsonify({'error': str(e), 'status': 'error'}), 500 def _initialize_connectors(self): """Initialize identity verification connectors""" try: # Initialize Facebook connector if credentials available if os.getenv('FACEBOOK_APP_ID') and os.getenv('FACEBOOK_APP_SECRET'): self.facebook_connector = FacebookIdentityConnector( verification_level='standard' ) logger.info("Facebook identity connector initialized") else: logger.warning("Facebook credentials not found - Facebook verification disabled") # Initialize Amazon connector if credentials available if (os.getenv('AWS_COGNITO_USER_POOL_ID') and os.getenv('AWS_COGNITO_CLIENT_ID') and os.getenv('AWS_ACCESS_KEY_ID')): self.amazon_connector = AmazonIdentityConnector( verification_level='standard' ) logger.info("Amazon identity connector initialized") else: logger.warning("Amazon Cognito credentials not found - Amazon verification disabled") except Exception as e: logger.error(f"Failed to initialize identity connectors: {e}") def get_status(self) -> Dict[str, Any]: """Get identity verification API status""" return { 'status': 'operational', 'timestamp': time.time(), 'connectors': { 'facebook': { 'available': self.facebook_connector is not None, 'status': 'ready' if self.facebook_connector else 'disabled' }, 'amazon': { 'available': self.amazon_connector is not None, 'status': 'ready' if self.amazon_connector else 'disabled' } }, 'metrics': self.metrics.copy(), 'features': { 'oauth_flow': True, 'token_verification': True, 'blockchain_logging': True, 'comprehensive_reporting': True } } def facebook_oauth_url(self, redirect_uri: str, enhanced: bool = False) -> Dict[str, Any]: """Generate Facebook OAuth URL""" start_time = time.time() try: if not self.facebook_connector: return { 'error': 'Facebook verification not available', 'message': 'Facebook connector not initialized' }, 503 oauth_session = self.facebook_connector.generate_oauth_url( redirect_uri=redirect_uri, enhanced_verification=enhanced ) # Store session information for later validation session['facebook_oauth_state'] = oauth_session['state'] session['facebook_oauth_redirect'] = redirect_uri processing_time = time.time() - start_time result = { 'oauth_url': oauth_session['oauth_url'], 'state': oauth_session['state'], 'enhanced_verification': enhanced, 'processing_time_ms': processing_time * 1000, 'expires_at': time.time() + 600 # 10 minutes expiry } # Update metrics self.metrics['total_requests'] += 1 # Log to telemetry self.telemetry.log_event( EventCategory.API, 'facebook_oauth_url_generated', { 'enhanced': enhanced, 'processing_time_ms': result['processing_time_ms'] } ) logger.info(f"Facebook OAuth URL generated (Enhanced: {enhanced})") return result except Exception as e: error_type = type(e).__name__ self.metrics['errors'][error_type] = self.metrics['errors'].get(error_type, 0) + 1 logger.error(f"Facebook OAuth URL generation failed: {e}") return { 'error': 'OAuth URL generation failed', 'message': str(e), 'processing_time_ms': (time.time() - start_time) * 1000 }, 500 def facebook_oauth_callback(self, authorization_code: str, state: str) -> Dict[str, Any]: """Handle Facebook OAuth callback""" start_time = time.time() try: if not self.facebook_connector: return { 'error': 'Facebook verification not available', 'message': 'Facebook connector not initialized' }, 503 # Validate state parameter stored_state = session.get('facebook_oauth_state') if not stored_state or stored_state != state: return { 'error': 'Invalid state parameter', 'message': 'OAuth state validation failed - possible CSRF attack' }, 400 # Get stored redirect URI redirect_uri = session.get('facebook_oauth_redirect') if not redirect_uri: return { 'error': 'Missing redirect URI', 'message': 'OAuth session invalid' }, 400 # Exchange code for token token_result = self.facebook_connector.exchange_code_for_token( authorization_code=authorization_code, redirect_uri=redirect_uri, state=state ) if token_result.get('exchange_status') != 'SUCCESS': return { 'error': 'Token exchange failed', 'message': token_result.get('error', 'Unknown error'), 'processing_time_ms': (time.time() - start_time) * 1000 }, 400 # Verify identity using the access token verification_result = self.facebook_connector.verify_identity( access_token=token_result['access_token'], additional_checks=True ) processing_time = time.time() - start_time result = { 'verification_id': verification_result.get('verification_id'), 'verification_status': verification_result.get('verification_status'), 'confidence_score': verification_result.get('overall_confidence'), 'user_profile': verification_result.get('user_profile'), 'blockchain_tx_id': verification_result.get('blockchain_tx_id'), 'processing_time_ms': processing_time * 1000, 'provider': 'facebook' } # Update metrics self.metrics['total_requests'] += 1 self.metrics['facebook_verifications'] += 1 if verification_result.get('verification_status', '').startswith('VERIFIED'): self.metrics['successful_verifications'] += 1 else: self.metrics['failed_verifications'] += 1 # Update average processing time total_requests = self.metrics['total_requests'] current_avg = self.metrics['average_processing_time'] self.metrics['average_processing_time'] = ( (current_avg * (total_requests - 1) + processing_time * 1000) / total_requests ) # Clean up session session.pop('facebook_oauth_state', None) session.pop('facebook_oauth_redirect', None) # Log to telemetry self.telemetry.log_event( EventCategory.API, 'facebook_identity_verification_completed', { 'verification_id': result['verification_id'], 'status': result['verification_status'], 'confidence': result['confidence_score'], 'processing_time_ms': result['processing_time_ms'] } ) logger.info(f"Facebook identity verification completed: {result['verification_status']}\n") return result except Exception as e: error_type = type(e).__name__ self.metrics['errors'][error_type] = self.metrics['errors'].get(error_type, 0) + 1 logger.error(f"Facebook OAuth callback failed: {e}") return { 'error': 'OAuth callback processing failed', 'message': str(e), 'processing_time_ms': (time.time() - start_time) * 1000 }, 500 def amazon_authenticate(self, username: str, password: str, mfa_token: Optional[str] = None) -> Dict[str, Any]: """Authenticate user with Amazon Cognito""" start_time = time.time() try: if not self.amazon_connector: return { 'error': 'Amazon verification not available', 'message': 'Amazon connector not initialized' }, 503 # Authenticate with Cognito auth_result = self.amazon_connector.authenticate_user( username=username, password=password, mfa_token=mfa_token ) processing_time = time.time() - start_time if auth_result.get('auth_status') == 'AUTHENTICATED': # Store tokens in session for verification session['amazon_id_token'] = auth_result['tokens']['id_token'] session['amazon_access_token'] = auth_result['tokens']['access_token'] result = { 'auth_id': auth_result.get('auth_id'), 'auth_status': auth_result.get('auth_status'), 'user_info': auth_result.get('user_info'), 'processing_time_ms': processing_time * 1000, 'provider': 'amazon_cognito', 'tokens_stored': True } # Update metrics self.metrics['total_requests'] += 1 logger.info(f"Amazon Cognito authentication successful for user: {username}") return result elif auth_result.get('auth_status') == 'MFA_REQUIRED': # Store session for MFA completion session['amazon_mfa_session'] = auth_result.get('session') session['amazon_mfa_username'] = username return { 'auth_status': 'MFA_REQUIRED', 'message': 'Multi-factor authentication required', 'mfa_session_stored': True, 'processing_time_ms': processing_time * 1000 } else: return { 'error': 'Authentication failed', 'message': auth_result.get('error', 'Invalid credentials'), 'processing_time_ms': processing_time * 1000 }, 401 except Exception as e: error_type = type(e).__name__ self.metrics['errors'][error_type] = self.metrics['errors'].get(error_type, 0) + 1 logger.error(f"Amazon authentication failed: {e}") return { 'error': 'Authentication processing failed', 'message': str(e), 'processing_time_ms': (time.time() - start_time) * 1000 }, 500 def amazon_verify_identity(self, use_stored_token: bool = True, id_token: Optional[str] = None) -> Dict[str, Any]: """Verify identity using Amazon Cognito token""" start_time = time.time() try: if not self.amazon_connector: return { 'error': 'Amazon verification not available', 'message': 'Amazon connector not initialized' }, 503 # Get ID token from session or parameter token_to_verify = None if use_stored_token: token_to_verify = session.get('amazon_id_token') if not token_to_verify: return { 'error': 'No stored token found', 'message': 'Please authenticate first' }, 400 else: token_to_verify = id_token if not token_to_verify: return { 'error': 'No token provided', 'message': 'ID token required for verification' }, 400 # Verify identity verification_result = self.amazon_connector.verify_identity_from_token( id_token=token_to_verify, additional_checks=True ) processing_time = time.time() - start_time result = { 'verification_id': verification_result.get('verification_id'), 'verification_status': verification_result.get('verification_status'), 'confidence_score': verification_result.get('overall_confidence'), 'user_profile': verification_result.get('user_profile'), 'blockchain_tx_id': verification_result.get('blockchain_tx_id'), 'processing_time_ms': processing_time * 1000, 'provider': 'amazon_cognito' } # Update metrics self.metrics['total_requests'] += 1 self.metrics['amazon_verifications'] += 1 if verification_result.get('verification_status', '').startswith('VERIFIED'): self.metrics['successful_verifications'] += 1 else: self.metrics['failed_verifications'] += 1 # Update average processing time total_requests = self.metrics['total_requests'] current_avg = self.metrics['average_processing_time'] self.metrics['average_processing_time'] = ( (current_avg * (total_requests - 1) + processing_time * 1000) / total_requests ) # Log to telemetry self.telemetry.log_event( EventCategory.API, 'amazon_identity_verification_completed', { 'verification_id': result['verification_id'], 'status': result['verification_status'], 'confidence': result['confidence_score'], 'processing_time_ms': result['processing_time_ms'] } ) logger.info(f"Amazon identity verification completed: {result['verification_status']}") return result except Exception as e: error_type = type(e).__name__ self.metrics['errors'][error_type] = self.metrics['errors'].get(error_type, 0) + 1 logger.error(f"Amazon identity verification failed: {e}") return { 'error': 'Identity verification failed', 'message': str(e), 'processing_time_ms': (time.time() - start_time) * 1000 }, 500 def get_verification_report(self, verification_id: str, provider: str) -> Dict[str, Any]: """Get comprehensive verification report""" try: # This would typically fetch from a database in production # For now, return a structured response report = { 'report_id': f"REPORT_{verification_id}", 'verification_id': verification_id, 'provider': provider, 'timestamp': time.time(), 'status': 'report_available', 'message': 'Verification report would be generated here', 'features': { 'detailed_analysis': True, 'confidence_breakdown': True, 'recommendations': True, 'blockchain_verification': True } } return report except Exception as e: logger.error(f"Failed to generate verification report: {e}") return { 'error': 'Report generation failed', 'message': str(e) }, 500 def get_metrics(self) -> Dict[str, Any]: """Get API performance metrics""" combined_metrics = { 'api_metrics': self.metrics.copy(), 'timestamp': time.time() } # Add connector metrics if available if self.facebook_connector: combined_metrics['facebook_connector'] = self.facebook_connector.get_metrics() if self.amazon_connector: combined_metrics['amazon_connector'] = self.amazon_connector.get_metrics() return combined_metrics # Initialize API instance identity_api = IdentityVerificationAPI() # API Endpoints @identity_bp.route('/status', methods=['GET']) def get_status(): """Get identity verification API status""" api = IdentityVerificationAPI.get_instance() return jsonify({ 'status': 'operational', 'metrics': api.metrics }) # Metrics endpoint @identity_bp.route('/metrics', methods=['GET']) @require_api_key() def get_metrics(): """Get API metrics""" api = IdentityVerificationAPI.get_instance() return jsonify(api.metrics) # Test endpoint for development @identity_bp.route('/test', methods=['GET']) def test_endpoint(): """Test endpoint to verify API is working""" return jsonify({ 'message': 'Identity Verification API is operational', 'timestamp': time.time(), 'endpoints': { 'facebook_oauth': '/api/identity/facebook/oauth-url', 'facebook_callback': '/api/identity/facebook/oauth-callback', 'amazon_auth': '/api/identity/amazon/authenticate', 'amazon_verify': '/api/identity/amazon/verify', 'status': '/api/identity/status', 'metrics': '/api/identity/metrics' }, 'authentication_required': True }) def register_identity_api(app): """Register identity verification API with Flask app""" app.register_blueprint(identity_bp) import logging logging.getLogger(__name__).info("Identity Verification API registered with Flask app")