|
|
""" |
|
|
AgentAI Integration Module for InklyAI |
|
|
Provides seamless integration between InklyAI signature verification and AgentAI systems. |
|
|
""" |
|
|
|
|
|
import asyncio |
|
|
import logging |
|
|
from typing import Dict, List, Optional, Union, Any |
|
|
from dataclasses import dataclass |
|
|
from datetime import datetime |
|
|
import json |
|
|
import hashlib |
|
|
import base64 |
|
|
|
|
|
from src.models.siamese_network import SignatureVerifier |
|
|
from src.data.preprocessing import SignaturePreprocessor |
|
|
|
|
|
|
|
|
@dataclass |
|
|
class AgentSignature: |
|
|
"""Represents a signature associated with an AI agent.""" |
|
|
agent_id: str |
|
|
signature_template: str |
|
|
created_at: datetime |
|
|
last_verified: Optional[datetime] = None |
|
|
verification_count: int = 0 |
|
|
is_active: bool = True |
|
|
|
|
|
|
|
|
@dataclass |
|
|
class VerificationResult: |
|
|
"""Result of signature verification.""" |
|
|
is_verified: bool |
|
|
similarity_score: float |
|
|
confidence: float |
|
|
agent_id: str |
|
|
timestamp: datetime |
|
|
verification_id: str |
|
|
|
|
|
|
|
|
class AgentAISignatureManager: |
|
|
""" |
|
|
Manages signature verification for AgentAI systems. |
|
|
""" |
|
|
|
|
|
def __init__(self, |
|
|
model_path: Optional[str] = None, |
|
|
threshold: float = 0.75, |
|
|
device: str = 'auto'): |
|
|
""" |
|
|
Initialize the AgentAI signature manager. |
|
|
|
|
|
Args: |
|
|
model_path: Path to trained model |
|
|
threshold: Verification threshold |
|
|
device: Device to run inference on |
|
|
""" |
|
|
self.verifier = SignatureVerifier( |
|
|
model_path=model_path, |
|
|
device=device |
|
|
) |
|
|
self.threshold = threshold |
|
|
self.preprocessor = SignaturePreprocessor() |
|
|
|
|
|
|
|
|
self.agent_signatures: Dict[str, AgentSignature] = {} |
|
|
|
|
|
|
|
|
self.verification_history: List[VerificationResult] = [] |
|
|
|
|
|
|
|
|
self.config = { |
|
|
'max_verification_attempts': 3, |
|
|
'signature_timeout': 300, |
|
|
'enable_audit_logging': True, |
|
|
'encrypt_signatures': True |
|
|
} |
|
|
|
|
|
|
|
|
self.logger = logging.getLogger('AgentAISignatureManager') |
|
|
self.logger.setLevel(logging.INFO) |
|
|
|
|
|
if not self.logger.handlers: |
|
|
handler = logging.StreamHandler() |
|
|
formatter = logging.Formatter( |
|
|
'%(asctime)s - %(name)s - %(levelname)s - %(message)s' |
|
|
) |
|
|
handler.setFormatter(formatter) |
|
|
self.logger.addHandler(handler) |
|
|
|
|
|
def register_agent_signature(self, |
|
|
agent_id: str, |
|
|
signature_template_path: str) -> bool: |
|
|
""" |
|
|
Register a signature template for an AI agent. |
|
|
|
|
|
Args: |
|
|
agent_id: Unique identifier for the agent |
|
|
signature_template_path: Path to signature template image |
|
|
|
|
|
Returns: |
|
|
True if registration successful, False otherwise |
|
|
""" |
|
|
try: |
|
|
|
|
|
if not self._validate_signature_template(signature_template_path): |
|
|
self.logger.error(f"Invalid signature template for agent {agent_id}") |
|
|
return False |
|
|
|
|
|
|
|
|
agent_signature = AgentSignature( |
|
|
agent_id=agent_id, |
|
|
signature_template=signature_template_path, |
|
|
created_at=datetime.now(), |
|
|
is_active=True |
|
|
) |
|
|
|
|
|
|
|
|
self.agent_signatures[agent_id] = agent_signature |
|
|
|
|
|
self.logger.info(f"Registered signature for agent {agent_id}") |
|
|
return True |
|
|
|
|
|
except Exception as e: |
|
|
self.logger.error(f"Failed to register signature for agent {agent_id}: {e}") |
|
|
return False |
|
|
|
|
|
def verify_agent_signature(self, |
|
|
agent_id: str, |
|
|
signature_image: Union[str, bytes], |
|
|
context: Optional[Dict[str, Any]] = None) -> VerificationResult: |
|
|
""" |
|
|
Verify a signature for a specific agent. |
|
|
|
|
|
Args: |
|
|
agent_id: Agent identifier |
|
|
signature_image: Signature image (path or bytes) |
|
|
context: Additional context for verification |
|
|
|
|
|
Returns: |
|
|
VerificationResult object |
|
|
""" |
|
|
verification_id = self._generate_verification_id() |
|
|
|
|
|
try: |
|
|
|
|
|
if agent_id not in self.agent_signatures: |
|
|
return VerificationResult( |
|
|
is_verified=False, |
|
|
similarity_score=0.0, |
|
|
confidence=0.0, |
|
|
agent_id=agent_id, |
|
|
timestamp=datetime.now(), |
|
|
verification_id=verification_id |
|
|
) |
|
|
|
|
|
agent_signature = self.agent_signatures[agent_id] |
|
|
|
|
|
|
|
|
if not agent_signature.is_active: |
|
|
self.logger.warning(f"Agent {agent_id} is not active") |
|
|
return VerificationResult( |
|
|
is_verified=False, |
|
|
similarity_score=0.0, |
|
|
confidence=0.0, |
|
|
agent_id=agent_id, |
|
|
timestamp=datetime.now(), |
|
|
verification_id=verification_id |
|
|
) |
|
|
|
|
|
|
|
|
similarity, is_genuine = self.verifier.verify_signatures( |
|
|
signature_image, |
|
|
agent_signature.signature_template, |
|
|
threshold=self.threshold |
|
|
) |
|
|
|
|
|
|
|
|
confidence = self._calculate_confidence(similarity, context) |
|
|
|
|
|
|
|
|
result = VerificationResult( |
|
|
is_verified=is_genuine, |
|
|
similarity_score=float(similarity), |
|
|
confidence=confidence, |
|
|
agent_id=agent_id, |
|
|
timestamp=datetime.now(), |
|
|
verification_id=verification_id |
|
|
) |
|
|
|
|
|
|
|
|
agent_signature.last_verified = datetime.now() |
|
|
agent_signature.verification_count += 1 |
|
|
|
|
|
|
|
|
if self.config['enable_audit_logging']: |
|
|
self._log_verification(result, context) |
|
|
|
|
|
|
|
|
self.verification_history.append(result) |
|
|
|
|
|
return result |
|
|
|
|
|
except Exception as e: |
|
|
self.logger.error(f"Verification failed for agent {agent_id}: {e}") |
|
|
return VerificationResult( |
|
|
is_verified=False, |
|
|
similarity_score=0.0, |
|
|
confidence=0.0, |
|
|
agent_id=agent_id, |
|
|
timestamp=datetime.now(), |
|
|
verification_id=verification_id |
|
|
) |
|
|
|
|
|
def batch_verify_agents(self, |
|
|
verification_requests: List[Dict[str, Any]]) -> List[VerificationResult]: |
|
|
""" |
|
|
Verify signatures for multiple agents in batch. |
|
|
|
|
|
Args: |
|
|
verification_requests: List of verification requests |
|
|
|
|
|
Returns: |
|
|
List of verification results |
|
|
""" |
|
|
results = [] |
|
|
|
|
|
for request in verification_requests: |
|
|
agent_id = request['agent_id'] |
|
|
signature_image = request['signature_image'] |
|
|
context = request.get('context', {}) |
|
|
|
|
|
result = self.verify_agent_signature(agent_id, signature_image, context) |
|
|
results.append(result) |
|
|
|
|
|
return results |
|
|
|
|
|
def get_agent_verification_stats(self, agent_id: str) -> Dict[str, Any]: |
|
|
""" |
|
|
Get verification statistics for an agent. |
|
|
|
|
|
Args: |
|
|
agent_id: Agent identifier |
|
|
|
|
|
Returns: |
|
|
Dictionary with verification statistics |
|
|
""" |
|
|
if agent_id not in self.agent_signatures: |
|
|
return {} |
|
|
|
|
|
agent_history = [ |
|
|
result for result in self.verification_history |
|
|
if result.agent_id == agent_id |
|
|
] |
|
|
|
|
|
if not agent_history: |
|
|
return { |
|
|
'total_verifications': 0, |
|
|
'successful_verifications': 0, |
|
|
'success_rate': 0.0, |
|
|
'average_similarity': 0.0, |
|
|
'last_verification': None |
|
|
} |
|
|
|
|
|
successful = sum(1 for result in agent_history if result.is_verified) |
|
|
total = len(agent_history) |
|
|
avg_similarity = sum(result.similarity_score for result in agent_history) / total |
|
|
|
|
|
return { |
|
|
'total_verifications': total, |
|
|
'successful_verifications': successful, |
|
|
'success_rate': successful / total, |
|
|
'average_similarity': avg_similarity, |
|
|
'last_verification': agent_history[-1].timestamp.isoformat() if agent_history else None |
|
|
} |
|
|
|
|
|
def deactivate_agent(self, agent_id: str) -> bool: |
|
|
""" |
|
|
Deactivate an agent's signature verification. |
|
|
|
|
|
Args: |
|
|
agent_id: Agent identifier |
|
|
|
|
|
Returns: |
|
|
True if successful, False otherwise |
|
|
""" |
|
|
if agent_id in self.agent_signatures: |
|
|
self.agent_signatures[agent_id].is_active = False |
|
|
self.logger.info(f"Deactivated agent {agent_id}") |
|
|
return True |
|
|
return False |
|
|
|
|
|
def reactivate_agent(self, agent_id: str) -> bool: |
|
|
""" |
|
|
Reactivate an agent's signature verification. |
|
|
|
|
|
Args: |
|
|
agent_id: Agent identifier |
|
|
|
|
|
Returns: |
|
|
True if successful, False otherwise |
|
|
""" |
|
|
if agent_id in self.agent_signatures: |
|
|
self.agent_signatures[agent_id].is_active = True |
|
|
self.logger.info(f"Reactivated agent {agent_id}") |
|
|
return True |
|
|
return False |
|
|
|
|
|
def _validate_signature_template(self, template_path: str) -> bool: |
|
|
"""Validate signature template file.""" |
|
|
try: |
|
|
|
|
|
self.preprocessor.preprocess_image(template_path) |
|
|
return True |
|
|
except Exception: |
|
|
return False |
|
|
|
|
|
def _calculate_confidence(self, similarity: float, context: Optional[Dict[str, Any]]) -> float: |
|
|
"""Calculate confidence score based on similarity and context.""" |
|
|
base_confidence = similarity |
|
|
|
|
|
|
|
|
if context: |
|
|
|
|
|
if 'time_since_last_verification' in context: |
|
|
time_factor = min(1.0, context['time_since_last_verification'] / 3600) |
|
|
base_confidence *= (0.8 + 0.2 * time_factor) |
|
|
|
|
|
|
|
|
if 'suspicious_activity' in context and context['suspicious_activity']: |
|
|
base_confidence *= 0.5 |
|
|
|
|
|
return min(1.0, max(0.0, base_confidence)) |
|
|
|
|
|
def _generate_verification_id(self) -> str: |
|
|
"""Generate unique verification ID.""" |
|
|
timestamp = datetime.now().isoformat() |
|
|
hash_input = f"{timestamp}_{len(self.verification_history)}" |
|
|
return hashlib.md5(hash_input.encode()).hexdigest()[:12] |
|
|
|
|
|
def _log_verification(self, result: VerificationResult, context: Optional[Dict[str, Any]]): |
|
|
"""Log verification result for audit purposes.""" |
|
|
log_entry = { |
|
|
'verification_id': result.verification_id, |
|
|
'agent_id': result.agent_id, |
|
|
'is_verified': result.is_verified, |
|
|
'similarity_score': result.similarity_score, |
|
|
'confidence': result.confidence, |
|
|
'timestamp': result.timestamp.isoformat(), |
|
|
'context': context or {} |
|
|
} |
|
|
|
|
|
self.logger.info(f"Verification logged: {json.dumps(log_entry)}") |
|
|
|
|
|
|
|
|
class AgentAISignatureAPI: |
|
|
""" |
|
|
REST API wrapper for AgentAI signature verification. |
|
|
""" |
|
|
|
|
|
def __init__(self, signature_manager: AgentAISignatureManager): |
|
|
""" |
|
|
Initialize the API wrapper. |
|
|
|
|
|
Args: |
|
|
signature_manager: AgentAISignatureManager instance |
|
|
""" |
|
|
self.signature_manager = signature_manager |
|
|
self.logger = logging.getLogger('AgentAISignatureAPI') |
|
|
|
|
|
def verify_signature_endpoint(self, request_data: Dict[str, Any]) -> Dict[str, Any]: |
|
|
""" |
|
|
API endpoint for signature verification. |
|
|
|
|
|
Args: |
|
|
request_data: Request data containing agent_id and signature_image |
|
|
|
|
|
Returns: |
|
|
API response dictionary |
|
|
""" |
|
|
try: |
|
|
agent_id = request_data['agent_id'] |
|
|
signature_image = request_data['signature_image'] |
|
|
context = request_data.get('context', {}) |
|
|
|
|
|
result = self.signature_manager.verify_agent_signature( |
|
|
agent_id, signature_image, context |
|
|
) |
|
|
|
|
|
return { |
|
|
'success': True, |
|
|
'verification_id': result.verification_id, |
|
|
'is_verified': result.is_verified, |
|
|
'similarity_score': result.similarity_score, |
|
|
'confidence': result.confidence, |
|
|
'timestamp': result.timestamp.isoformat() |
|
|
} |
|
|
|
|
|
except Exception as e: |
|
|
self.logger.error(f"API verification failed: {e}") |
|
|
return { |
|
|
'success': False, |
|
|
'error': str(e), |
|
|
'timestamp': datetime.now().isoformat() |
|
|
} |
|
|
|
|
|
def register_agent_endpoint(self, request_data: Dict[str, Any]) -> Dict[str, Any]: |
|
|
""" |
|
|
API endpoint for agent registration. |
|
|
|
|
|
Args: |
|
|
request_data: Request data containing agent_id and signature_template |
|
|
|
|
|
Returns: |
|
|
API response dictionary |
|
|
""" |
|
|
try: |
|
|
agent_id = request_data['agent_id'] |
|
|
signature_template = request_data['signature_template'] |
|
|
|
|
|
success = self.signature_manager.register_agent_signature( |
|
|
agent_id, signature_template |
|
|
) |
|
|
|
|
|
return { |
|
|
'success': success, |
|
|
'agent_id': agent_id, |
|
|
'timestamp': datetime.now().isoformat() |
|
|
} |
|
|
|
|
|
except Exception as e: |
|
|
self.logger.error(f"API registration failed: {e}") |
|
|
return { |
|
|
'success': False, |
|
|
'error': str(e), |
|
|
'timestamp': datetime.now().isoformat() |
|
|
} |
|
|
|
|
|
def get_stats_endpoint(self, agent_id: str) -> Dict[str, Any]: |
|
|
""" |
|
|
API endpoint for agent statistics. |
|
|
|
|
|
Args: |
|
|
agent_id: Agent identifier |
|
|
|
|
|
Returns: |
|
|
API response dictionary |
|
|
""" |
|
|
try: |
|
|
stats = self.signature_manager.get_agent_verification_stats(agent_id) |
|
|
|
|
|
return { |
|
|
'success': True, |
|
|
'agent_id': agent_id, |
|
|
'stats': stats, |
|
|
'timestamp': datetime.now().isoformat() |
|
|
} |
|
|
|
|
|
except Exception as e: |
|
|
self.logger.error(f"API stats failed: {e}") |
|
|
return { |
|
|
'success': False, |
|
|
'error': str(e), |
|
|
'timestamp': datetime.now().isoformat() |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
def create_agentai_integration_example(): |
|
|
"""Create an example AgentAI integration.""" |
|
|
|
|
|
|
|
|
signature_manager = AgentAISignatureManager( |
|
|
threshold=0.75, |
|
|
device='auto' |
|
|
) |
|
|
|
|
|
|
|
|
signature_manager.register_agent_signature( |
|
|
'agent_001', |
|
|
'data/samples/john_doe_1.png' |
|
|
) |
|
|
|
|
|
signature_manager.register_agent_signature( |
|
|
'agent_002', |
|
|
'data/samples/jane_smith_1.png' |
|
|
) |
|
|
|
|
|
|
|
|
api = AgentAISignatureAPI(signature_manager) |
|
|
|
|
|
return signature_manager, api |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
|
|
|
signature_manager, api = create_agentai_integration_example() |
|
|
|
|
|
|
|
|
result = signature_manager.verify_agent_signature( |
|
|
'agent_001', |
|
|
'data/samples/john_doe_2.png' |
|
|
) |
|
|
|
|
|
print(f"Verification result: {result}") |
|
|
|
|
|
|
|
|
stats = signature_manager.get_agent_verification_stats('agent_001') |
|
|
print(f"Agent stats: {stats}") |
|
|
|