Spaces:
Paused
Paused
| """ | |
| Domain-Driven Microservices Architecture for Zenith | |
| Implements clean architecture with bounded contexts | |
| """ | |
| import asyncio | |
| import logging | |
| import os | |
| from abc import ABC, abstractmethod | |
| from dataclasses import dataclass | |
| from enum import Enum | |
| from typing import Any | |
| from fastapi import FastAPI, HTTPException | |
| # Configure logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| # Development flag - set via environment variable | |
| IS_DEVELOPMENT = os.getenv("ENVIRONMENT", "development").lower() == "development" | |
| class BoundedContext(Enum): | |
| """Domain bounded contexts""" | |
| CASE_MANAGEMENT = "case_management" | |
| FRAUD_DETECTION = "fraud_detection" | |
| EVIDENCE_PROCESSING = "evidence_processing" | |
| USER_MANAGEMENT = "user_management" | |
| REPORTING_ANALYTICS = "reporting_analytics" | |
| INTEGRATION_HUB = "integration_hub" | |
| class DomainEvent: | |
| """Base domain event""" | |
| event_id: str | |
| event_type: str | |
| aggregate_id: str | |
| event_data: dict[str, Any] | |
| timestamp: float | |
| version: int = 1 | |
| class DomainEventBus: | |
| """In-memory domain event bus for inter-service communication""" | |
| def __init__(self): | |
| self._handlers: dict[str, list[callable]] = {} | |
| self._published_events: list[DomainEvent] = [] | |
| def subscribe(self, event_type: str, handler: callable): | |
| """Subscribe to domain events""" | |
| if event_type not in self._handlers: | |
| self._handlers[event_type] = [] | |
| self._handlers[event_type].append(handler) | |
| async def publish(self, event: DomainEvent): | |
| """Publish domain event to all subscribers""" | |
| logger.info(f"Publishing event: {event.event_type}") | |
| self._published_events.append(event) | |
| if event.event_type in self._handlers: | |
| tasks = [] | |
| for handler in self._handlers[event.event_type]: | |
| tasks.append(handler(event)) | |
| await asyncio.gather(*tasks, return_exceptions=True) | |
| def get_published_events(self, event_type: str | None = None) -> list[DomainEvent]: | |
| """Get published events, optionally filtered by type""" | |
| if event_type: | |
| return [e for e in self._published_events if e.event_type == event_type] | |
| return self._published_events.copy() | |
| # Global event bus instance | |
| event_bus = DomainEventBus() | |
| class Microservice(ABC): | |
| """Base microservice class""" | |
| def __init__(self, name: str, bounded_context: BoundedContext): | |
| self.name = name | |
| self.bounded_context = bounded_context | |
| self.app = FastAPI(title=f"{name} Service") | |
| self.dependencies = {} | |
| self._setup_routes() | |
| self._setup_event_handlers() | |
| def _setup_routes(self): | |
| """Setup FastAPI routes""" | |
| def _setup_event_handlers(self): | |
| """Setup domain event handlers""" | |
| def add_dependency(self, name: str, dependency: Any): | |
| """Add dependency injection""" | |
| self.dependencies[name] = dependency | |
| def get_dependency(self, name: str) -> Any: | |
| """Get dependency""" | |
| return self.dependencies.get(name) | |
| class CaseManagementService(Microservice): | |
| """Case Management Bounded Context""" | |
| def __init__(self): | |
| super().__init__("Case Management", BoundedContext.CASE_MANAGEMENT) | |
| self.cases = {} # In-memory storage (would be database in production) | |
| def _setup_routes(self): | |
| async def create_case(case_data: dict[str, Any]): | |
| case_id = f"case_{len(self.cases) + 1}" | |
| self.cases[case_id] = case_data | |
| # Publish domain event | |
| event = DomainEvent( | |
| event_id=f"evt_{case_id}", | |
| event_type="case.created", | |
| aggregate_id=case_id, | |
| event_data=case_data, | |
| timestamp=asyncio.get_event_loop().time(), | |
| ) | |
| await event_bus.publish(event) | |
| return {"case_id": case_id, "status": "created"} | |
| async def get_case(case_id: str): | |
| if case_id not in self.cases: | |
| raise HTTPException(status_code=404, detail="Case not found") | |
| return self.cases[case_id] | |
| async def update_case_status(case_id: str, status: str): | |
| if case_id not in self.cases: | |
| raise HTTPException(status_code=404, detail="Case not found") | |
| old_status = self.cases[case_id].get("status") | |
| self.cases[case_id]["status"] = status | |
| # Publish status change event | |
| event = DomainEvent( | |
| event_id=f"evt_{case_id}_status", | |
| event_type="case.status_changed", | |
| aggregate_id=case_id, | |
| event_data={"old_status": old_status, "new_status": status}, | |
| timestamp=asyncio.get_event_loop().time(), | |
| ) | |
| await event_bus.publish(event) | |
| return {"status": "updated"} | |
| def _setup_event_handlers(self): | |
| # Handle fraud detection events | |
| async def handle_fraud_detected(event: DomainEvent): | |
| case_id = event.aggregate_id | |
| logger.info(f"Case {case_id} marked as potential fraud") | |
| event_bus.subscribe("fraud.detected", handle_fraud_detected) | |
| class FraudDetectionService(Microservice): | |
| """Fraud Detection Bounded Context""" | |
| def __init__(self): | |
| super().__init__("Fraud Detection", BoundedContext.FRAUD_DETECTION) | |
| self.fraud_rules = {} | |
| self.active_scans = set() | |
| def _setup_routes(self): | |
| async def scan_transaction(transaction: dict[str, Any]): | |
| transaction_id = transaction.get("id", "unknown") | |
| # Simulate fraud detection logic | |
| is_fraudulent = self._detect_fraud(transaction) | |
| result = { | |
| "transaction_id": transaction_id, | |
| "is_fraudulent": is_fraudulent, | |
| "risk_score": 0.8 if is_fraudulent else 0.2, | |
| "flags": ["high_amount", "unusual_location"] if is_fraudulent else [], | |
| } | |
| if is_fraudulent: | |
| # Publish fraud detected event | |
| event = DomainEvent( | |
| event_id=f"fraud_{transaction_id}", | |
| event_type="fraud.detected", | |
| aggregate_id=transaction.get("case_id", "unknown"), | |
| event_data=result, | |
| timestamp=asyncio.get_event_loop().time(), | |
| ) | |
| await event_bus.publish(event) | |
| return result | |
| async def create_fraud_rule(rule: dict[str, Any]): | |
| rule_id = f"rule_{len(self.fraud_rules) + 1}" | |
| self.fraud_rules[rule_id] = rule | |
| return {"rule_id": rule_id, "status": "created"} | |
| async def get_fraud_rules(): | |
| return list(self.fraud_rules.values()) | |
| def _detect_fraud(self, transaction: dict[str, Any]) -> bool: | |
| """Simple fraud detection logic""" | |
| amount = transaction.get("amount", 0) | |
| location = transaction.get("location", "") | |
| # Simple rules | |
| if amount > 10000: | |
| return True | |
| return location in ["high_risk_country_1", "high_risk_country_2"] | |
| def _setup_event_handlers(self): | |
| # Handle case status changes | |
| async def handle_case_status_changed(event: DomainEvent): | |
| case_id = event.aggregate_id | |
| new_status = event.event_data.get("new_status") | |
| if new_status == "INVESTIGATING": | |
| logger.info(f"Initiating fraud scan for case {case_id}") | |
| self.active_scans.add(case_id) | |
| event_bus.subscribe("case.status_changed", handle_case_status_changed) | |
| class EvidenceProcessingService(Microservice): | |
| """Evidence Processing Bounded Context""" | |
| def __init__(self): | |
| super().__init__("Evidence Processing", BoundedContext.EVIDENCE_PROCESSING) | |
| self.processing_queue = asyncio.Queue() | |
| self.processed_evidence = {} | |
| def _setup_routes(self): | |
| async def process_evidence(evidence_data: dict[str, Any]): | |
| evidence_id = evidence_data.get( | |
| "id", f"evidence_{len(self.processed_evidence) + 1}" | |
| ) | |
| # Add to processing queue | |
| await self.processing_queue.put(evidence_data) | |
| # Start background processing | |
| task = asyncio.create_task( | |
| self._process_evidence(evidence_id, evidence_data) | |
| ) | |
| self._background_tasks.append(task) | |
| return {"evidence_id": evidence_id, "status": "processing"} | |
| async def get_evidence(evidence_id: str): | |
| if evidence_id not in self.processed_evidence: | |
| raise HTTPException(status_code=404, detail="Evidence not found") | |
| return self.processed_evidence[evidence_id] | |
| async def _process_evidence(self, evidence_id: str, evidence_data: dict[str, Any]): | |
| """Background evidence processing""" | |
| try: | |
| logger.info(f"Processing evidence {evidence_id}") | |
| # Simulate processing time only in development | |
| if IS_DEVELOPMENT: | |
| await asyncio.sleep(2) | |
| # Mock OCR and analysis | |
| processed_data = { | |
| **evidence_data, | |
| "ocr_text": "Extracted text from evidence document...", | |
| "sentiment_score": 0.7, | |
| "entities_extracted": ["John Doe", "ABC Bank", "$50,000"], | |
| "processing_status": "completed", | |
| "processed_at": asyncio.get_event_loop().time(), | |
| } | |
| self.processed_evidence[evidence_id] = processed_data | |
| # Publish processing completed event | |
| event = DomainEvent( | |
| event_id=f"evidence_processed_{evidence_id}", | |
| event_type="evidence.processed", | |
| aggregate_id=evidence_id, | |
| event_data=processed_data, | |
| timestamp=asyncio.get_event_loop().time(), | |
| ) | |
| await event_bus.publish(event) | |
| logger.info(f"Evidence {evidence_id} processing completed") | |
| except Exception as e: | |
| logger.error(f"Evidence processing failed for {evidence_id}: {e}") | |
| def _setup_event_handlers(self): | |
| # Handle case creation events | |
| async def handle_case_created(event: DomainEvent): | |
| case_id = event.aggregate_id | |
| logger.info(f"Evidence processing service notified of new case {case_id}") | |
| event_bus.subscribe("case.created", handle_case_created) | |
| class UserManagementService(Microservice): | |
| """User Management Bounded Context""" | |
| def __init__(self): | |
| super().__init__("User Management", BoundedContext.USER_MANAGEMENT) | |
| self.users = {} | |
| self.roles = { | |
| "admin": ["read", "write", "delete", "manage_users"], | |
| "investigator": ["read", "write", "investigate"], | |
| "analyst": ["read", "analyze"], | |
| "viewer": ["read"], | |
| } | |
| def _setup_routes(self): | |
| async def create_user(user_data: dict[str, Any]): | |
| user_id = user_data.get("email", f"user_{len(self.users) + 1}") | |
| user_data["permissions"] = self.roles.get( | |
| user_data.get("role", "viewer"), [] | |
| ) | |
| self.users[user_id] = user_data | |
| # Publish user created event | |
| event = DomainEvent( | |
| event_id=f"user_{user_id}", | |
| event_type="user.created", | |
| aggregate_id=user_id, | |
| event_data=user_data, | |
| timestamp=asyncio.get_event_loop().time(), | |
| ) | |
| await event_bus.publish(event) | |
| return {"user_id": user_id, "status": "created"} | |
| async def get_user(user_id: str): | |
| if user_id not in self.users: | |
| raise HTTPException(status_code=404, detail="User not found") | |
| return self.users[user_id] | |
| async def get_user_permissions(user_id: str): | |
| if user_id not in self.users: | |
| raise HTTPException(status_code=404, detail="User not found") | |
| return {"permissions": self.users[user_id].get("permissions", [])} | |
| def _setup_event_handlers(self): | |
| # Handle case assignment events | |
| async def handle_case_assigned(event: DomainEvent): | |
| user_id = event.event_data.get("assignee_id") | |
| case_id = event.aggregate_id | |
| if user_id in self.users: | |
| logger.info(f"User {user_id} assigned to case {case_id}") | |
| event_bus.subscribe("case.assigned", handle_case_assigned) | |
| class ReportingAnalyticsService(Microservice): | |
| """Reporting & Analytics Bounded Context""" | |
| def __init__(self): | |
| super().__init__("Reporting & Analytics", BoundedContext.REPORTING_ANALYTICS) | |
| self.reports = {} | |
| self.analytics_cache = {} | |
| def _setup_routes(self): | |
| async def generate_report(report_config: dict[str, Any]): | |
| report_id = f"report_{len(self.reports) + 1}" | |
| report_type = report_config.get("type", "case_summary") | |
| # Generate mock report data | |
| report_data = await self._generate_report_data(report_type, report_config) | |
| self.reports[report_id] = { | |
| "id": report_id, | |
| "type": report_type, | |
| "data": report_data, | |
| "generated_at": asyncio.get_event_loop().time(), | |
| "config": report_config, | |
| } | |
| return {"report_id": report_id, "status": "generated", "data": report_data} | |
| async def get_dashboard_analytics(): | |
| # Aggregate analytics from all services | |
| analytics = { | |
| "total_cases": await self._get_total_cases(), | |
| "fraud_detected": await self._get_fraud_count(), | |
| "evidence_processed": await self._get_evidence_count(), | |
| "active_users": await self._get_active_users(), | |
| "system_health": "healthy", | |
| } | |
| return analytics | |
| async def _generate_report_data( | |
| self, report_type: str, config: dict[str, Any] | |
| ) -> dict[str, Any]: | |
| """Generate report data based on type""" | |
| if report_type == "case_summary": | |
| return { | |
| "total_cases": 150, | |
| "open_cases": 45, | |
| "closed_cases": 105, | |
| "high_priority": 12, | |
| "fraud_cases": 28, | |
| } | |
| elif report_type == "fraud_trends": | |
| return { | |
| "monthly_fraud_count": [5, 8, 12, 15, 9, 11], | |
| "fraud_amounts": [50000, 75000, 120000, 150000, 90000, 110000], | |
| "detection_accuracy": 0.94, | |
| } | |
| else: | |
| return {"message": f"Report type {report_type} generated"} | |
| async def _get_total_cases(self) -> int: | |
| # Would query case management service | |
| return 150 | |
| async def _get_fraud_count(self) -> int: | |
| # Would query fraud detection service | |
| return 28 | |
| async def _get_evidence_count(self) -> int: | |
| # Would query evidence processing service | |
| return 320 | |
| async def _get_active_users(self) -> int: | |
| # Would query user management service | |
| return 45 | |
| def _setup_event_handlers(self): | |
| # Listen to all domain events for analytics | |
| async def handle_any_event(event: DomainEvent): | |
| # Update analytics cache | |
| event_type = event.event_type | |
| if event_type not in self.analytics_cache: | |
| self.analytics_cache[event_type] = 0 | |
| self.analytics_cache[event_type] += 1 | |
| # Subscribe to multiple event types | |
| for event_type in [ | |
| "case.created", | |
| "fraud.detected", | |
| "evidence.processed", | |
| "user.created", | |
| ]: | |
| event_bus.subscribe(event_type, handle_any_event) | |
| class IntegrationHubService(Microservice): | |
| """Integration Hub Bounded Context""" | |
| def __init__(self): | |
| super().__init__("Integration Hub", BoundedContext.INTEGRATION_HUB) | |
| self.integrations = {} | |
| self.webhooks = {} | |
| def _setup_routes(self): | |
| async def create_integration(integration_config: dict[str, Any]): | |
| integration_id = f"integration_{len(self.integrations) + 1}" | |
| self.integrations[integration_id] = integration_config | |
| return {"integration_id": integration_id, "status": "created"} | |
| async def handle_webhook(webhook_id: str, payload: dict[str, Any]): | |
| if webhook_id not in self.webhooks: | |
| raise HTTPException(status_code=404, detail="Webhook not found") | |
| webhook_config = self.webhooks[webhook_id] | |
| # Process webhook based on configuration | |
| event_type = webhook_config.get("event_type", "external.event") | |
| event = DomainEvent( | |
| event_id=f"webhook_{webhook_id}_{asyncio.get_event_loop().time()}", | |
| event_type=event_type, | |
| aggregate_id=webhook_id, | |
| event_data=payload, | |
| timestamp=asyncio.get_event_loop().time(), | |
| ) | |
| await event_bus.publish(event) | |
| return {"status": "processed"} | |
| async def get_integration_status(): | |
| # Return status of all integrations | |
| return { | |
| integration_id: { | |
| "status": "active", | |
| "last_sync": asyncio.get_event_loop().time(), | |
| "health": "good", | |
| } | |
| for integration_id in self.integrations | |
| } | |
| def _setup_event_handlers(self): | |
| # Handle external events | |
| async def handle_external_event(event: DomainEvent): | |
| logger.info(f"Processing external event: {event.event_type}") | |
| event_bus.subscribe("external.event", handle_external_event) | |
| class MicroservicesOrchestrator: | |
| """Orchestrates all microservices""" | |
| def __init__(self): | |
| self.services: dict[BoundedContext, Microservice] = {} | |
| self.service_ports = { | |
| BoundedContext.CASE_MANAGEMENT: 8001, | |
| BoundedContext.FRAUD_DETECTION: 8002, | |
| BoundedContext.EVIDENCE_PROCESSING: 8003, | |
| BoundedContext.USER_MANAGEMENT: 8004, | |
| BoundedContext.REPORTING_ANALYTICS: 8005, | |
| BoundedContext.INTEGRATION_HUB: 8006, | |
| } | |
| def register_service(self, service: Microservice): | |
| """Register a microservice""" | |
| self.services[service.bounded_context] = service | |
| logger.info(f"Registered service: {service.name}") | |
| def get_service(self, bounded_context: BoundedContext) -> Microservice | None: | |
| """Get a registered service""" | |
| return self.services.get(bounded_context) | |
| async def start_all_services(self): | |
| """Start all registered microservices""" | |
| logger.info("Starting all microservices...") | |
| tasks = [] | |
| for bounded_context, service in self.services.items(): | |
| port = self.service_ports[bounded_context] | |
| task = asyncio.create_task(self._start_service(service, port)) | |
| tasks.append(task) | |
| self._service_tasks.append(task) | |
| await asyncio.gather(*tasks) | |
| logger.info("All microservices started") | |
| async def _start_service(self, service: Microservice, port: int): | |
| """Start a single service on specified port""" | |
| try: | |
| import uvicorn | |
| config = uvicorn.Config( | |
| service.app, host="0.0.0.0", port=port, log_level="info" | |
| ) | |
| server = uvicorn.Server(config) | |
| await server.serve() | |
| except Exception as e: | |
| logger.error(f"Failed to start {service.name} on port {port}: {e}") | |
| async def health_check_all(self) -> dict[str, bool]: | |
| """Perform health check on all services""" | |
| health_status = {} | |
| for service in self.services.values(): | |
| try: | |
| # Simple health check - would be more sophisticated in production | |
| health_status[service.name] = True | |
| except Exception as e: | |
| logger.error(f"Health check failed for {service.name}: {e}") | |
| health_status[service.name] = False | |
| return health_status | |
| def get_service_discovery_info(self) -> dict[str, dict[str, Any]]: | |
| """Get service discovery information""" | |
| discovery_info = {} | |
| for bounded_context, service in self.services.items(): | |
| port = self.service_ports[bounded_context] | |
| discovery_info[service.name] = { | |
| "bounded_context": bounded_context.value, | |
| "port": port, | |
| "endpoint": f"http://localhost:{port}", | |
| "status": "active", | |
| } | |
| return discovery_info | |
| # Create and configure microservices | |
| def create_microservices_orchestrator() -> MicroservicesOrchestrator: | |
| """Create and configure the microservices orchestrator""" | |
| orchestrator = MicroservicesOrchestrator() | |
| # Register all microservices | |
| orchestrator.register_service(CaseManagementService()) | |
| orchestrator.register_service(FraudDetectionService()) | |
| orchestrator.register_service(EvidenceProcessingService()) | |
| orchestrator.register_service(UserManagementService()) | |
| orchestrator.register_service(ReportingAnalyticsService()) | |
| orchestrator.register_service(IntegrationHubService()) | |
| return orchestrator | |
| # Export for use | |
| __all__ = [ | |
| "BoundedContext", | |
| "CaseManagementService", | |
| "DomainEvent", | |
| "DomainEventBus", | |
| "EvidenceProcessingService", | |
| "FraudDetectionService", | |
| "IntegrationHubService", | |
| "Microservice", | |
| "MicroservicesOrchestrator", | |
| "ReportingAnalyticsService", | |
| "UserManagementService", | |
| "create_microservices_orchestrator", | |
| "event_bus", | |
| ] | |