import asyncio import logging import jwt import ssl import pytest from datetime import datetime, timedelta from typing import Dict, Any from cryptography.fernet import Fernet from dataclasses import dataclass import sys import os sys.path.append(os.path.join(os.path.dirname(__file__), '..')) from src.agents.aethero_agent_bootstrap import BaseAetheroAgent from src.agents.agent_bus import AgentBus, Message from src.monitoring.monitor import AetheroMonitor # Test configuration TEST_SECRET_KEY = Fernet.generate_key() TEST_JWT_SECRET = "test_jwt_secret" SSL_CERT_PATH = "./tests/certs/test_cert.pem" SSL_KEY_PATH = "./tests/certs/test_key.pem" @dataclass class SecurityContext: """Security context for testing authentication and authorization""" user_id: str roles: list permissions: list token: str = None class SecureAgent(BaseAetheroAgent): """Agent implementation with security features""" def __init__(self, agent_id: str, config: Dict[str, Any], logger: logging.Logger, agent_bus: AgentBus, security_context: SecurityContext): super().__init__(agent_id, config, logger, agent_bus) self.security_context = security_context self.encryption_key = Fernet(TEST_SECRET_KEY) async def process_task(self, task_data: Dict[str, Any], asl_context: Dict[str, Any]) -> Dict[str, Any]: """Process task with security checks""" if not self._verify_permissions(task_data.get("required_permissions", [])): raise PermissionError("Insufficient permissions") # Encrypt sensitive data if "sensitive_data" in task_data: task_data["sensitive_data"] = self._encrypt_data(task_data["sensitive_data"]) return await super().process_task(task_data, asl_context) def _verify_permissions(self, required_permissions: list) -> bool: """Verify agent has required permissions""" return all(perm in self.security_context.permissions for perm in required_permissions) def _encrypt_data(self, data: str) -> str: """Encrypt sensitive data""" return self.encryption_key.encrypt(data.encode()).decode() def _decrypt_data(self, encrypted_data: str) -> str: """Decrypt sensitive data""" return self.encryption_key.decrypt(encrypted_data.encode()).decode() def generate_jwt_token(security_context: SecurityContext) -> str: """Generate JWT token for authentication""" payload = { "user_id": security_context.user_id, "roles": security_context.roles, "permissions": security_context.permissions, "exp": datetime.utcnow() + timedelta(hours=1) } return jwt.encode(payload, TEST_JWT_SECRET, algorithm="HS256") async def setup_secure_environment(): """Set up secure testing environment""" # Create SSL context ssl_context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH) ssl_context.load_cert_chain(certfile=SSL_CERT_PATH, keyfile=SSL_KEY_PATH) # Create security context security_context = SecurityContext( user_id="test_user", roles=["admin"], permissions=["read", "write", "execute"] ) security_context.token = generate_jwt_token(security_context) return ssl_context, security_context async def test_authentication(): """Test authentication flow""" logger = logging.getLogger("security_test") agent_bus = AgentBus() # Set up secure environment ssl_context, security_context = await setup_secure_environment() # Create secure agent agent = SecureAgent( "secure_agent", {"pipeline_id": "security_test"}, logger, agent_bus, security_context ) # Test valid authentication try: decoded_token = jwt.decode( security_context.token, TEST_JWT_SECRET, algorithms=["HS256"] ) assert decoded_token["user_id"] == security_context.user_id logger.info("Authentication test passed") except Exception as e: logger.error(f"Authentication test failed: {str(e)}") raise async def test_authorization(): """Test authorization boundaries""" logger = logging.getLogger("security_test") agent_bus = AgentBus() # Set up secure environment _, security_context = await setup_secure_environment() # Create secure agent agent = SecureAgent( "secure_agent", {"pipeline_id": "security_test"}, logger, agent_bus, security_context ) # Test permission verification try: # Test with sufficient permissions assert agent._verify_permissions(["read", "write"]) # Test with insufficient permissions assert not agent._verify_permissions(["admin_override"]) logger.info("Authorization test passed") except Exception as e: logger.error(f"Authorization test failed: {str(e)}") raise async def test_data_encryption(): """Test data encryption in transit""" logger = logging.getLogger("security_test") agent_bus = AgentBus() # Set up secure environment _, security_context = await setup_secure_environment() # Create secure agent agent = SecureAgent( "secure_agent", {"pipeline_id": "security_test"}, logger, agent_bus, security_context ) # Test data encryption/decryption try: test_data = "sensitive information" encrypted = agent._encrypt_data(test_data) decrypted = agent._decrypt_data(encrypted) assert test_data == decrypted assert encrypted != test_data logger.info("Encryption test passed") except Exception as e: logger.error(f"Encryption test failed: {str(e)}") raise async def test_secure_message_flow(): """Test end-to-end secure message flow""" logger = logging.getLogger("security_test") agent_bus = AgentBus() # Set up secure environment _, security_context = await setup_secure_environment() # Create secure agent agent = SecureAgent( "secure_agent", {"pipeline_id": "security_test"}, logger, agent_bus, security_context ) try: # Test task with sensitive data task_data = { "task_id": "secure_task", "sensitive_data": "confidential information", "required_permissions": ["read", "write"] } result = await agent.execute_task(task_data, {}) # Verify sensitive data was encrypted assert "sensitive_data" in result assert result["sensitive_data"] != task_data["sensitive_data"] logger.info("Secure message flow test passed") except Exception as e: logger.error(f"Secure message flow test failed: {str(e)}") raise async def run_security_tests(): """Run all security tests""" logging.basicConfig(level=logging.INFO) logger = logging.getLogger("security_tests") try: logger.info("Starting security testing suite...") # Run authentication tests logger.info("\nTesting authentication...") await test_authentication() # Run authorization tests logger.info("\nTesting authorization...") await test_authorization() # Run encryption tests logger.info("\nTesting data encryption...") await test_data_encryption() # Run secure message flow tests logger.info("\nTesting secure message flow...") await test_secure_message_flow() logger.info("\nAll security tests completed successfully!") except Exception as e: logger.error(f"Error in security testing: {str(e)}") raise if __name__ == "__main__": asyncio.run(run_security_tests())