Fraud-Chatbot / src /services /fraud_analyzer.py
ahmzakif's picture
feat: add new project
fd99b61 verified
"""Fraud analyzer service using LLM and RAG."""
import logging
from typing import Dict, List, Optional
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnablePassthrough
from src.data.processor import FraudDataProcessor
from src.llm.groq_client import GroqClient
from src.rag.vector_store import VectorStore
logger = logging.getLogger(__name__)
class FraudAnalyzer:
"""Service for analyzing fraud using LLM and RAG."""
def __init__(
self,
groq_client: Optional[GroqClient] = None,
vector_store: Optional[VectorStore] = None,
data_processor: Optional[FraudDataProcessor] = None,
) -> None:
"""Initialize fraud analyzer.
Args:
groq_client: Groq LLM client. If None, creates a new one.
vector_store: Vector store for RAG. If None, creates a new one.
data_processor: Data processor. If None, creates a new one.
"""
self.groq_client = groq_client or GroqClient()
self.vector_store = vector_store
self.data_processor = data_processor or FraudDataProcessor()
# Initialize RAG chain if vector store is available
self.rag_chain = None
if self.vector_store and self.vector_store.retriever:
self._setup_rag_chain()
def _setup_rag_chain(self) -> None:
"""Setup RAG chain for document retrieval."""
if not self.vector_store or not self.vector_store.retriever:
return
template = """You are an expert fraud detection analyst. Use the following context from fraud detection research papers and reports to analyze transactions.
Context:
{context}
Question: {question}
IMPORTANT CITATION RULES:
- When using information from the context, add an inline citation immediately after the relevant sentence.
- Format citations as: [Source X] where X corresponds to the source number in the context.
- Place citations at the end of sentences that use information from sources.
- You can cite multiple sources if needed: [Source 1, Source 2]
Provide a detailed analysis with:
1. Risk assessment (Low/Medium/High) with inline citations. (Note: Ignore 'fraud_' prefix in merchant names as it's a synthetic data artifact.)
2. Key indicators of potential fraud with inline citations
3. Recommendations with inline citations
4. Confidence level (0-100%)
Example: "This transaction shows high risk indicators based on the merchant category. [Source 1]"
"""
prompt = ChatPromptTemplate.from_template(template)
from langchain_core.documents import Document
def format_docs(docs: List[Document]) -> str:
# Format docs with source numbers
formatted = []
for i, doc in enumerate(docs, 1):
formatted.append(f"[Source {i}]\n{doc.page_content}")
return "\n\n".join(formatted)
self.rag_chain = (
{
"context": self.vector_store.retriever | format_docs,
"question": RunnablePassthrough(),
}
| prompt
| self.groq_client.llm
)
logger.info("RAG chain initialized")
def analyze_transaction(
self,
transaction_id: Optional[int] = None,
transaction_data: Optional[Dict] = None,
use_rag: bool = True,
) -> Dict:
"""Analyze a transaction for fraud.
Args:
transaction_id: Transaction ID from dataset.
transaction_data: Direct transaction data dictionary.
use_rag: Whether to use RAG for context.
Returns:
Analysis results dictionary.
"""
# Get transaction data
if transaction_data:
transaction = transaction_data
elif transaction_id is not None:
transaction = self.data_processor.get_transaction_summary(transaction_id)
else:
raise ValueError("Either transaction_id or transaction_data must be provided")
# Format transaction for LLM
formatted_transaction = self.data_processor.format_transaction_for_llm(transaction)
# Collect sources
sources = []
# Create analysis prompt
if use_rag and self.rag_chain:
# Use RAG chain
query = f"Analyze this transaction for fraud indicators:\n\n{formatted_transaction}"
try:
# Get relevant documents first to collect sources
if self.vector_store:
docs = self.vector_store.similarity_search(query, k=5)
# Collect source information
for doc in docs:
source_file = doc.metadata.get('source', 'Unknown')
page_num = doc.metadata.get('page', 'N/A')
doc_type = doc.metadata.get('type', 'document')
# Format source info with numbers
if doc_type == 'fraud_pattern':
category = doc.metadata.get('category', 'N/A')
sources.append(f"Source {len(sources)+1}: CSV Data - Fraud Pattern Analysis ({category})")
elif doc_type == 'statistical_summary':
scope = doc.metadata.get('scope', 'N/A')
sources.append(f"Source {len(sources)+1}: CSV Data - Statistical Summary ({scope})")
elif doc_type == 'merchant_profile':
merchant = doc.metadata.get('merchant', 'N/A')
sources.append(f"Source {len(sources)+1}: CSV Data - Merchant Profile ({merchant})")
elif doc_type == 'location_insight':
state = doc.metadata.get('state', 'N/A')
sources.append(f"Source {len(sources)+1}: CSV Data - Location Analysis ({state})")
else:
# PDF document
if page_num != 'N/A':
sources.append(f"Source {len(sources)+1}: {source_file}, Page {page_num}")
else:
sources.append(f"Source {len(sources)+1}: {source_file}")
response = self.rag_chain.invoke(query)
# Extract content from response
if hasattr(response, "content"):
analysis_text = response.content
elif isinstance(response, str):
analysis_text = response
else:
analysis_text = str(response)
except Exception as e:
logger.warning(f"RAG chain failed, falling back to direct LLM: {str(e)}")
analysis_text = self._direct_analysis(formatted_transaction)
sources = [] # Clear sources on fallback
else:
# Direct LLM analysis
analysis_text = self._direct_analysis(formatted_transaction)
# Add source reference list at the end if available
if sources:
analysis_text += "\n\n---\n\n**📚 Source References:**\n"
for source in sources:
analysis_text += f"\n- {source}"
return {
"transaction": transaction,
"analysis": analysis_text,
"formatted_transaction": formatted_transaction,
"sources": sources,
}
def _direct_analysis(self, formatted_transaction: str) -> str:
"""Perform direct LLM analysis without RAG.
Args:
formatted_transaction: Formatted transaction string.
Returns:
Analysis text.
"""
system_message = """You are an expert fraud detection analyst with deep knowledge of payment fraud patterns,
transaction anomalies, and risk indicators. Analyze transactions carefully and provide detailed assessments."""
prompt = f"""Analyze the following transaction for fraud indicators:
{formatted_transaction}
Provide a detailed analysis with:
1. Risk assessment (Low/Medium/High)
2. Key indicators of potential fraud (if any)
3. Specific red flags or suspicious patterns
4. Recommendations
5. Confidence level (0-100%)
Be specific and cite patterns from the transaction data."""
try:
response = self.groq_client.invoke(prompt, system_message=system_message)
return response
except Exception as e:
logger.error(f"Error in direct analysis: {str(e)}")
raise
def batch_analyze(
self,
transaction_ids: List[int],
use_rag: bool = True,
) -> List[Dict]:
"""Analyze multiple transactions.
Args:
transaction_ids: List of transaction IDs.
use_rag: Whether to use RAG for context.
Returns:
List of analysis results.
"""
results = []
for transaction_id in transaction_ids:
try:
result = self.analyze_transaction(transaction_id=transaction_id, use_rag=use_rag)
results.append(result)
except Exception as e:
logger.error(f"Error analyzing transaction {transaction_id}: {str(e)}")
results.append({"transaction_id": transaction_id, "error": str(e)})
return results