Spaces:
Sleeping
Sleeping
| """Fraud analyzer service using LLM and RAG.""" | |
| import logging | |
| from typing import Dict, List, Optional | |
| from langchain_core.prompts import ChatPromptTemplate | |
| from langchain_core.runnables import RunnablePassthrough | |
| from src.data.processor import FraudDataProcessor | |
| from src.llm.groq_client import GroqClient | |
| from src.rag.vector_store import VectorStore | |
| logger = logging.getLogger(__name__) | |
| class FraudAnalyzer: | |
| """Service for analyzing fraud using LLM and RAG.""" | |
| def __init__( | |
| self, | |
| groq_client: Optional[GroqClient] = None, | |
| vector_store: Optional[VectorStore] = None, | |
| data_processor: Optional[FraudDataProcessor] = None, | |
| ) -> None: | |
| """Initialize fraud analyzer. | |
| Args: | |
| groq_client: Groq LLM client. If None, creates a new one. | |
| vector_store: Vector store for RAG. If None, creates a new one. | |
| data_processor: Data processor. If None, creates a new one. | |
| """ | |
| self.groq_client = groq_client or GroqClient() | |
| self.vector_store = vector_store | |
| self.data_processor = data_processor or FraudDataProcessor() | |
| # Initialize RAG chain if vector store is available | |
| self.rag_chain = None | |
| if self.vector_store and self.vector_store.retriever: | |
| self._setup_rag_chain() | |
| def _setup_rag_chain(self) -> None: | |
| """Setup RAG chain for document retrieval.""" | |
| if not self.vector_store or not self.vector_store.retriever: | |
| return | |
| template = """You are an expert fraud detection analyst. Use the following context from fraud detection research papers and reports to analyze transactions. | |
| Context: | |
| {context} | |
| Question: {question} | |
| IMPORTANT CITATION RULES: | |
| - When using information from the context, add an inline citation immediately after the relevant sentence. | |
| - Format citations as: [Source X] where X corresponds to the source number in the context. | |
| - Place citations at the end of sentences that use information from sources. | |
| - You can cite multiple sources if needed: [Source 1, Source 2] | |
| Provide a detailed analysis with: | |
| 1. Risk assessment (Low/Medium/High) with inline citations. (Note: Ignore 'fraud_' prefix in merchant names as it's a synthetic data artifact.) | |
| 2. Key indicators of potential fraud with inline citations | |
| 3. Recommendations with inline citations | |
| 4. Confidence level (0-100%) | |
| Example: "This transaction shows high risk indicators based on the merchant category. [Source 1]" | |
| """ | |
| prompt = ChatPromptTemplate.from_template(template) | |
| from langchain_core.documents import Document | |
| def format_docs(docs: List[Document]) -> str: | |
| # Format docs with source numbers | |
| formatted = [] | |
| for i, doc in enumerate(docs, 1): | |
| formatted.append(f"[Source {i}]\n{doc.page_content}") | |
| return "\n\n".join(formatted) | |
| self.rag_chain = ( | |
| { | |
| "context": self.vector_store.retriever | format_docs, | |
| "question": RunnablePassthrough(), | |
| } | |
| | prompt | |
| | self.groq_client.llm | |
| ) | |
| logger.info("RAG chain initialized") | |
| def analyze_transaction( | |
| self, | |
| transaction_id: Optional[int] = None, | |
| transaction_data: Optional[Dict] = None, | |
| use_rag: bool = True, | |
| ) -> Dict: | |
| """Analyze a transaction for fraud. | |
| Args: | |
| transaction_id: Transaction ID from dataset. | |
| transaction_data: Direct transaction data dictionary. | |
| use_rag: Whether to use RAG for context. | |
| Returns: | |
| Analysis results dictionary. | |
| """ | |
| # Get transaction data | |
| if transaction_data: | |
| transaction = transaction_data | |
| elif transaction_id is not None: | |
| transaction = self.data_processor.get_transaction_summary(transaction_id) | |
| else: | |
| raise ValueError("Either transaction_id or transaction_data must be provided") | |
| # Format transaction for LLM | |
| formatted_transaction = self.data_processor.format_transaction_for_llm(transaction) | |
| # Collect sources | |
| sources = [] | |
| # Create analysis prompt | |
| if use_rag and self.rag_chain: | |
| # Use RAG chain | |
| query = f"Analyze this transaction for fraud indicators:\n\n{formatted_transaction}" | |
| try: | |
| # Get relevant documents first to collect sources | |
| if self.vector_store: | |
| docs = self.vector_store.similarity_search(query, k=5) | |
| # Collect source information | |
| for doc in docs: | |
| source_file = doc.metadata.get('source', 'Unknown') | |
| page_num = doc.metadata.get('page', 'N/A') | |
| doc_type = doc.metadata.get('type', 'document') | |
| # Format source info with numbers | |
| if doc_type == 'fraud_pattern': | |
| category = doc.metadata.get('category', 'N/A') | |
| sources.append(f"Source {len(sources)+1}: CSV Data - Fraud Pattern Analysis ({category})") | |
| elif doc_type == 'statistical_summary': | |
| scope = doc.metadata.get('scope', 'N/A') | |
| sources.append(f"Source {len(sources)+1}: CSV Data - Statistical Summary ({scope})") | |
| elif doc_type == 'merchant_profile': | |
| merchant = doc.metadata.get('merchant', 'N/A') | |
| sources.append(f"Source {len(sources)+1}: CSV Data - Merchant Profile ({merchant})") | |
| elif doc_type == 'location_insight': | |
| state = doc.metadata.get('state', 'N/A') | |
| sources.append(f"Source {len(sources)+1}: CSV Data - Location Analysis ({state})") | |
| else: | |
| # PDF document | |
| if page_num != 'N/A': | |
| sources.append(f"Source {len(sources)+1}: {source_file}, Page {page_num}") | |
| else: | |
| sources.append(f"Source {len(sources)+1}: {source_file}") | |
| response = self.rag_chain.invoke(query) | |
| # Extract content from response | |
| if hasattr(response, "content"): | |
| analysis_text = response.content | |
| elif isinstance(response, str): | |
| analysis_text = response | |
| else: | |
| analysis_text = str(response) | |
| except Exception as e: | |
| logger.warning(f"RAG chain failed, falling back to direct LLM: {str(e)}") | |
| analysis_text = self._direct_analysis(formatted_transaction) | |
| sources = [] # Clear sources on fallback | |
| else: | |
| # Direct LLM analysis | |
| analysis_text = self._direct_analysis(formatted_transaction) | |
| # Add source reference list at the end if available | |
| if sources: | |
| analysis_text += "\n\n---\n\n**📚 Source References:**\n" | |
| for source in sources: | |
| analysis_text += f"\n- {source}" | |
| return { | |
| "transaction": transaction, | |
| "analysis": analysis_text, | |
| "formatted_transaction": formatted_transaction, | |
| "sources": sources, | |
| } | |
| def _direct_analysis(self, formatted_transaction: str) -> str: | |
| """Perform direct LLM analysis without RAG. | |
| Args: | |
| formatted_transaction: Formatted transaction string. | |
| Returns: | |
| Analysis text. | |
| """ | |
| system_message = """You are an expert fraud detection analyst with deep knowledge of payment fraud patterns, | |
| transaction anomalies, and risk indicators. Analyze transactions carefully and provide detailed assessments.""" | |
| prompt = f"""Analyze the following transaction for fraud indicators: | |
| {formatted_transaction} | |
| Provide a detailed analysis with: | |
| 1. Risk assessment (Low/Medium/High) | |
| 2. Key indicators of potential fraud (if any) | |
| 3. Specific red flags or suspicious patterns | |
| 4. Recommendations | |
| 5. Confidence level (0-100%) | |
| Be specific and cite patterns from the transaction data.""" | |
| try: | |
| response = self.groq_client.invoke(prompt, system_message=system_message) | |
| return response | |
| except Exception as e: | |
| logger.error(f"Error in direct analysis: {str(e)}") | |
| raise | |
| def batch_analyze( | |
| self, | |
| transaction_ids: List[int], | |
| use_rag: bool = True, | |
| ) -> List[Dict]: | |
| """Analyze multiple transactions. | |
| Args: | |
| transaction_ids: List of transaction IDs. | |
| use_rag: Whether to use RAG for context. | |
| Returns: | |
| List of analysis results. | |
| """ | |
| results = [] | |
| for transaction_id in transaction_ids: | |
| try: | |
| result = self.analyze_transaction(transaction_id=transaction_id, use_rag=use_rag) | |
| results.append(result) | |
| except Exception as e: | |
| logger.error(f"Error analyzing transaction {transaction_id}: {str(e)}") | |
| results.append({"transaction_id": transaction_id, "error": str(e)}) | |
| return results | |