""" AgentAI Integration Module for InklyAI Provides seamless integration between InklyAI signature verification and AgentAI systems. """ import asyncio import logging from typing import Dict, List, Optional, Union, Any from dataclasses import dataclass from datetime import datetime import json import hashlib import base64 from src.models.siamese_network import SignatureVerifier from src.data.preprocessing import SignaturePreprocessor @dataclass class AgentSignature: """Represents a signature associated with an AI agent.""" agent_id: str signature_template: str # Path to signature template created_at: datetime last_verified: Optional[datetime] = None verification_count: int = 0 is_active: bool = True @dataclass class VerificationResult: """Result of signature verification.""" is_verified: bool similarity_score: float confidence: float agent_id: str timestamp: datetime verification_id: str class AgentAISignatureManager: """ Manages signature verification for AgentAI systems. """ def __init__(self, model_path: Optional[str] = None, threshold: float = 0.75, device: str = 'auto'): """ Initialize the AgentAI signature manager. Args: model_path: Path to trained model threshold: Verification threshold device: Device to run inference on """ self.verifier = SignatureVerifier( model_path=model_path, device=device ) self.threshold = threshold self.preprocessor = SignaturePreprocessor() # Agent signature registry self.agent_signatures: Dict[str, AgentSignature] = {} # Verification history self.verification_history: List[VerificationResult] = [] # Configuration self.config = { 'max_verification_attempts': 3, 'signature_timeout': 300, # seconds 'enable_audit_logging': True, 'encrypt_signatures': True } # Setup logging self.logger = logging.getLogger('AgentAISignatureManager') self.logger.setLevel(logging.INFO) if not self.logger.handlers: handler = logging.StreamHandler() formatter = logging.Formatter( '%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) handler.setFormatter(formatter) self.logger.addHandler(handler) def register_agent_signature(self, agent_id: str, signature_template_path: str) -> bool: """ Register a signature template for an AI agent. Args: agent_id: Unique identifier for the agent signature_template_path: Path to signature template image Returns: True if registration successful, False otherwise """ try: # Validate signature template if not self._validate_signature_template(signature_template_path): self.logger.error(f"Invalid signature template for agent {agent_id}") return False # Create agent signature record agent_signature = AgentSignature( agent_id=agent_id, signature_template=signature_template_path, created_at=datetime.now(), is_active=True ) # Store in registry self.agent_signatures[agent_id] = agent_signature self.logger.info(f"Registered signature for agent {agent_id}") return True except Exception as e: self.logger.error(f"Failed to register signature for agent {agent_id}: {e}") return False def verify_agent_signature(self, agent_id: str, signature_image: Union[str, bytes], context: Optional[Dict[str, Any]] = None) -> VerificationResult: """ Verify a signature for a specific agent. Args: agent_id: Agent identifier signature_image: Signature image (path or bytes) context: Additional context for verification Returns: VerificationResult object """ verification_id = self._generate_verification_id() try: # Check if agent is registered if agent_id not in self.agent_signatures: return VerificationResult( is_verified=False, similarity_score=0.0, confidence=0.0, agent_id=agent_id, timestamp=datetime.now(), verification_id=verification_id ) agent_signature = self.agent_signatures[agent_id] # Check if agent is active if not agent_signature.is_active: self.logger.warning(f"Agent {agent_id} is not active") return VerificationResult( is_verified=False, similarity_score=0.0, confidence=0.0, agent_id=agent_id, timestamp=datetime.now(), verification_id=verification_id ) # Perform signature verification similarity, is_genuine = self.verifier.verify_signatures( signature_image, agent_signature.signature_template, threshold=self.threshold ) # Calculate confidence based on similarity and context confidence = self._calculate_confidence(similarity, context) # Create verification result result = VerificationResult( is_verified=is_genuine, similarity_score=float(similarity), confidence=confidence, agent_id=agent_id, timestamp=datetime.now(), verification_id=verification_id ) # Update agent signature record agent_signature.last_verified = datetime.now() agent_signature.verification_count += 1 # Log verification if self.config['enable_audit_logging']: self._log_verification(result, context) # Store in history self.verification_history.append(result) return result except Exception as e: self.logger.error(f"Verification failed for agent {agent_id}: {e}") return VerificationResult( is_verified=False, similarity_score=0.0, confidence=0.0, agent_id=agent_id, timestamp=datetime.now(), verification_id=verification_id ) def batch_verify_agents(self, verification_requests: List[Dict[str, Any]]) -> List[VerificationResult]: """ Verify signatures for multiple agents in batch. Args: verification_requests: List of verification requests Returns: List of verification results """ results = [] for request in verification_requests: agent_id = request['agent_id'] signature_image = request['signature_image'] context = request.get('context', {}) result = self.verify_agent_signature(agent_id, signature_image, context) results.append(result) return results def get_agent_verification_stats(self, agent_id: str) -> Dict[str, Any]: """ Get verification statistics for an agent. Args: agent_id: Agent identifier Returns: Dictionary with verification statistics """ if agent_id not in self.agent_signatures: return {} agent_history = [ result for result in self.verification_history if result.agent_id == agent_id ] if not agent_history: return { 'total_verifications': 0, 'successful_verifications': 0, 'success_rate': 0.0, 'average_similarity': 0.0, 'last_verification': None } successful = sum(1 for result in agent_history if result.is_verified) total = len(agent_history) avg_similarity = sum(result.similarity_score for result in agent_history) / total return { 'total_verifications': total, 'successful_verifications': successful, 'success_rate': successful / total, 'average_similarity': avg_similarity, 'last_verification': agent_history[-1].timestamp.isoformat() if agent_history else None } def deactivate_agent(self, agent_id: str) -> bool: """ Deactivate an agent's signature verification. Args: agent_id: Agent identifier Returns: True if successful, False otherwise """ if agent_id in self.agent_signatures: self.agent_signatures[agent_id].is_active = False self.logger.info(f"Deactivated agent {agent_id}") return True return False def reactivate_agent(self, agent_id: str) -> bool: """ Reactivate an agent's signature verification. Args: agent_id: Agent identifier Returns: True if successful, False otherwise """ if agent_id in self.agent_signatures: self.agent_signatures[agent_id].is_active = True self.logger.info(f"Reactivated agent {agent_id}") return True return False def _validate_signature_template(self, template_path: str) -> bool: """Validate signature template file.""" try: # Try to load and preprocess the template self.preprocessor.preprocess_image(template_path) return True except Exception: return False def _calculate_confidence(self, similarity: float, context: Optional[Dict[str, Any]]) -> float: """Calculate confidence score based on similarity and context.""" base_confidence = similarity # Adjust confidence based on context if context: # Higher confidence for recent signatures if 'time_since_last_verification' in context: time_factor = min(1.0, context['time_since_last_verification'] / 3600) # 1 hour base_confidence *= (0.8 + 0.2 * time_factor) # Lower confidence for suspicious patterns if 'suspicious_activity' in context and context['suspicious_activity']: base_confidence *= 0.5 return min(1.0, max(0.0, base_confidence)) def _generate_verification_id(self) -> str: """Generate unique verification ID.""" timestamp = datetime.now().isoformat() hash_input = f"{timestamp}_{len(self.verification_history)}" return hashlib.md5(hash_input.encode()).hexdigest()[:12] def _log_verification(self, result: VerificationResult, context: Optional[Dict[str, Any]]): """Log verification result for audit purposes.""" log_entry = { 'verification_id': result.verification_id, 'agent_id': result.agent_id, 'is_verified': result.is_verified, 'similarity_score': result.similarity_score, 'confidence': result.confidence, 'timestamp': result.timestamp.isoformat(), 'context': context or {} } self.logger.info(f"Verification logged: {json.dumps(log_entry)}") class AgentAISignatureAPI: """ REST API wrapper for AgentAI signature verification. """ def __init__(self, signature_manager: AgentAISignatureManager): """ Initialize the API wrapper. Args: signature_manager: AgentAISignatureManager instance """ self.signature_manager = signature_manager self.logger = logging.getLogger('AgentAISignatureAPI') def verify_signature_endpoint(self, request_data: Dict[str, Any]) -> Dict[str, Any]: """ API endpoint for signature verification. Args: request_data: Request data containing agent_id and signature_image Returns: API response dictionary """ try: agent_id = request_data['agent_id'] signature_image = request_data['signature_image'] context = request_data.get('context', {}) result = self.signature_manager.verify_agent_signature( agent_id, signature_image, context ) return { 'success': True, 'verification_id': result.verification_id, 'is_verified': result.is_verified, 'similarity_score': result.similarity_score, 'confidence': result.confidence, 'timestamp': result.timestamp.isoformat() } except Exception as e: self.logger.error(f"API verification failed: {e}") return { 'success': False, 'error': str(e), 'timestamp': datetime.now().isoformat() } def register_agent_endpoint(self, request_data: Dict[str, Any]) -> Dict[str, Any]: """ API endpoint for agent registration. Args: request_data: Request data containing agent_id and signature_template Returns: API response dictionary """ try: agent_id = request_data['agent_id'] signature_template = request_data['signature_template'] success = self.signature_manager.register_agent_signature( agent_id, signature_template ) return { 'success': success, 'agent_id': agent_id, 'timestamp': datetime.now().isoformat() } except Exception as e: self.logger.error(f"API registration failed: {e}") return { 'success': False, 'error': str(e), 'timestamp': datetime.now().isoformat() } def get_stats_endpoint(self, agent_id: str) -> Dict[str, Any]: """ API endpoint for agent statistics. Args: agent_id: Agent identifier Returns: API response dictionary """ try: stats = self.signature_manager.get_agent_verification_stats(agent_id) return { 'success': True, 'agent_id': agent_id, 'stats': stats, 'timestamp': datetime.now().isoformat() } except Exception as e: self.logger.error(f"API stats failed: {e}") return { 'success': False, 'error': str(e), 'timestamp': datetime.now().isoformat() } # Example usage and integration patterns def create_agentai_integration_example(): """Create an example AgentAI integration.""" # Initialize signature manager signature_manager = AgentAISignatureManager( threshold=0.75, device='auto' ) # Register some example agents signature_manager.register_agent_signature( 'agent_001', 'data/samples/john_doe_1.png' ) signature_manager.register_agent_signature( 'agent_002', 'data/samples/jane_smith_1.png' ) # Create API wrapper api = AgentAISignatureAPI(signature_manager) return signature_manager, api if __name__ == "__main__": # Example usage signature_manager, api = create_agentai_integration_example() # Test verification result = signature_manager.verify_agent_signature( 'agent_001', 'data/samples/john_doe_2.png' ) print(f"Verification result: {result}") # Get stats stats = signature_manager.get_agent_verification_stats('agent_001') print(f"Agent stats: {stats}")