"""Fraud analyzer service using LLM and RAG.""" import logging from typing import Dict, List, Optional from langchain_core.prompts import ChatPromptTemplate from langchain_core.runnables import RunnablePassthrough from src.data.processor import FraudDataProcessor from src.llm.groq_client import GroqClient from src.rag.vector_store import VectorStore logger = logging.getLogger(__name__) class FraudAnalyzer: """Service for analyzing fraud using LLM and RAG.""" def __init__( self, groq_client: Optional[GroqClient] = None, vector_store: Optional[VectorStore] = None, data_processor: Optional[FraudDataProcessor] = None, ) -> None: """Initialize fraud analyzer. Args: groq_client: Groq LLM client. If None, creates a new one. vector_store: Vector store for RAG. If None, creates a new one. data_processor: Data processor. If None, creates a new one. """ self.groq_client = groq_client or GroqClient() self.vector_store = vector_store self.data_processor = data_processor or FraudDataProcessor() # Initialize RAG chain if vector store is available self.rag_chain = None if self.vector_store and self.vector_store.retriever: self._setup_rag_chain() def _setup_rag_chain(self) -> None: """Setup RAG chain for document retrieval.""" if not self.vector_store or not self.vector_store.retriever: return template = """You are an expert fraud detection analyst. Use the following context from fraud detection research papers and reports to analyze transactions. Context: {context} Question: {question} IMPORTANT CITATION RULES: - When using information from the context, add an inline citation immediately after the relevant sentence. - Format citations as: [Source X] where X corresponds to the source number in the context. - Place citations at the end of sentences that use information from sources. - You can cite multiple sources if needed: [Source 1, Source 2] Provide a detailed analysis with: 1. Risk assessment (Low/Medium/High) with inline citations. (Note: Ignore 'fraud_' prefix in merchant names as it's a synthetic data artifact.) 2. Key indicators of potential fraud with inline citations 3. Recommendations with inline citations 4. Confidence level (0-100%) Example: "This transaction shows high risk indicators based on the merchant category. [Source 1]" """ prompt = ChatPromptTemplate.from_template(template) from langchain_core.documents import Document def format_docs(docs: List[Document]) -> str: # Format docs with source numbers formatted = [] for i, doc in enumerate(docs, 1): formatted.append(f"[Source {i}]\n{doc.page_content}") return "\n\n".join(formatted) self.rag_chain = ( { "context": self.vector_store.retriever | format_docs, "question": RunnablePassthrough(), } | prompt | self.groq_client.llm ) logger.info("RAG chain initialized") def analyze_transaction( self, transaction_id: Optional[int] = None, transaction_data: Optional[Dict] = None, use_rag: bool = True, ) -> Dict: """Analyze a transaction for fraud. Args: transaction_id: Transaction ID from dataset. transaction_data: Direct transaction data dictionary. use_rag: Whether to use RAG for context. Returns: Analysis results dictionary. """ # Get transaction data if transaction_data: transaction = transaction_data elif transaction_id is not None: transaction = self.data_processor.get_transaction_summary(transaction_id) else: raise ValueError("Either transaction_id or transaction_data must be provided") # Format transaction for LLM formatted_transaction = self.data_processor.format_transaction_for_llm(transaction) # Collect sources sources = [] # Create analysis prompt if use_rag and self.rag_chain: # Use RAG chain query = f"Analyze this transaction for fraud indicators:\n\n{formatted_transaction}" try: # Get relevant documents first to collect sources if self.vector_store: docs = self.vector_store.similarity_search(query, k=5) # Collect source information for doc in docs: source_file = doc.metadata.get('source', 'Unknown') page_num = doc.metadata.get('page', 'N/A') doc_type = doc.metadata.get('type', 'document') # Format source info with numbers if doc_type == 'fraud_pattern': category = doc.metadata.get('category', 'N/A') sources.append(f"Source {len(sources)+1}: CSV Data - Fraud Pattern Analysis ({category})") elif doc_type == 'statistical_summary': scope = doc.metadata.get('scope', 'N/A') sources.append(f"Source {len(sources)+1}: CSV Data - Statistical Summary ({scope})") elif doc_type == 'merchant_profile': merchant = doc.metadata.get('merchant', 'N/A') sources.append(f"Source {len(sources)+1}: CSV Data - Merchant Profile ({merchant})") elif doc_type == 'location_insight': state = doc.metadata.get('state', 'N/A') sources.append(f"Source {len(sources)+1}: CSV Data - Location Analysis ({state})") else: # PDF document if page_num != 'N/A': sources.append(f"Source {len(sources)+1}: {source_file}, Page {page_num}") else: sources.append(f"Source {len(sources)+1}: {source_file}") response = self.rag_chain.invoke(query) # Extract content from response if hasattr(response, "content"): analysis_text = response.content elif isinstance(response, str): analysis_text = response else: analysis_text = str(response) except Exception as e: logger.warning(f"RAG chain failed, falling back to direct LLM: {str(e)}") analysis_text = self._direct_analysis(formatted_transaction) sources = [] # Clear sources on fallback else: # Direct LLM analysis analysis_text = self._direct_analysis(formatted_transaction) # Add source reference list at the end if available if sources: analysis_text += "\n\n---\n\n**📚 Source References:**\n" for source in sources: analysis_text += f"\n- {source}" return { "transaction": transaction, "analysis": analysis_text, "formatted_transaction": formatted_transaction, "sources": sources, } def _direct_analysis(self, formatted_transaction: str) -> str: """Perform direct LLM analysis without RAG. Args: formatted_transaction: Formatted transaction string. Returns: Analysis text. """ system_message = """You are an expert fraud detection analyst with deep knowledge of payment fraud patterns, transaction anomalies, and risk indicators. Analyze transactions carefully and provide detailed assessments.""" prompt = f"""Analyze the following transaction for fraud indicators: {formatted_transaction} Provide a detailed analysis with: 1. Risk assessment (Low/Medium/High) 2. Key indicators of potential fraud (if any) 3. Specific red flags or suspicious patterns 4. Recommendations 5. Confidence level (0-100%) Be specific and cite patterns from the transaction data.""" try: response = self.groq_client.invoke(prompt, system_message=system_message) return response except Exception as e: logger.error(f"Error in direct analysis: {str(e)}") raise def batch_analyze( self, transaction_ids: List[int], use_rag: bool = True, ) -> List[Dict]: """Analyze multiple transactions. Args: transaction_ids: List of transaction IDs. use_rag: Whether to use RAG for context. Returns: List of analysis results. """ results = [] for transaction_id in transaction_ids: try: result = self.analyze_transaction(transaction_id=transaction_id, use_rag=use_rag) results.append(result) except Exception as e: logger.error(f"Error analyzing transaction {transaction_id}: {str(e)}") results.append({"transaction_id": transaction_id, "error": str(e)}) return results