import logging from dataclasses import dataclass from typing import Any from core.plugin_system import PluginContext, PluginInterface, PluginMetadata logger = logging.getLogger(__name__) @dataclass class TypologyConfig: similarity_threshold: float limit: int class TypologyAnalysisPlugin(PluginInterface): """ AI-powered typology analysis plugin. Uses semantic search to match case data against known fraud typologies. """ @property def metadata(self) -> PluginMetadata: return PluginMetadata( name="typology_analysis", version="1.0.0", namespace="zenith/intelligence/typology_analysis", author="Zenith Team", description="Analyzes cases against known fraud typologies using semantic search", dependencies={}, capabilities=["intelligence", "case_analysis"], security_level="official", api_version="v1", ) async def initialize(self, context: PluginContext) -> bool: self.context = context config_dict = ( context.config if context.config else {"similarity_threshold": 0.3, "limit": 3} ) self.config = TypologyConfig(**config_dict) # Dependency injection self.ai_service = context.get_service("ai_service") if not self.ai_service: logger.warning( "AI Service not available in context. Typology analysis will fail." ) return True async def execute(self, inputs: dict[str, Any]) -> dict[str, Any]: """ Expects {"case_data": {...}} """ case_data = inputs.get("case_data") if not case_data: return {"error": "No case data provided"} if not self.ai_service: return {"error": "AI Service unavailable"} return await self._analyze_typology_context(case_data) async def _analyze_typology_context( self, case_data: dict[str, Any] ) -> dict[str, Any]: """ RAG: Extract context from case and search Typology Knowledge Base. Copied and adapted from legacy AIService. """ insights = [] recommendations = [] confidence = 0.0 # 1. Construct Query from Case Data query_parts = [] # Transactions transactions = case_data.get("transactions", []) for t in transactions: if t.get("description"): query_parts.append(t["description"]) if t.get("amount", 0) > 5000: query_parts.append(f"High value transaction {t.get('amount')}") # Entities entities = case_data.get("entities", []) for e in entities: if e.get("type"): query_parts.append(e["type"]) # Evidence (Summaries) evidence = case_data.get("evidence", []) for ev in evidence: if ev.get("summary"): query_parts.append(ev["summary"]) if not query_parts: return { "insights": ["Insufficient data for typology analysis"], "risk_score": 0, } search_query = " ".join(query_parts)[:1000] # Limit query length # 2. Semantic Search in Knowledge Base # We assume ai_service exposes semantic_search(query, limit, filters) results = await self.ai_service.semantic_search( search_query, limit=self.config.limit, filters=None, ) # 3. Process Results matches = [] for res in results: if res["similarity"] > self.config.similarity_threshold: matches.append(res) if matches: confidence = max(m["similarity"] for m in matches) # Take top match top_match = matches[0] typology_name = ( top_match["metadata"] .get("filename", "Unknown") .replace(".md", "") .replace("_", " ") .title() ) insights.append( f"Activity matches '{typology_name}' typology patterns (Confidence: {confidence:.2f})" ) # Extract indicators from content content_lines = top_match["content"].split("\n") indicators = [ line.strip("- ") for line in content_lines if line.strip().startswith("-") ][:3] if indicators: recommendations.append( f"Check for {typology_name} indicators: {', '.join(indicators)}" ) return { "insights": insights, "recommendations": recommendations, "confidence": confidence, "risk_score": int(confidence * 100), "typology_matches": matches, } async def cleanup(self) -> None: self.ai_service = None def validate_config(self, config: dict[str, Any]) -> list[str]: return []