"""CSV document generator for RAG system.""" import logging from pathlib import Path from typing import List, Dict, Any import pandas as pd from langchain_core.documents import Document logger = logging.getLogger(__name__) class CSVDocumentGenerator: """Generate documents from CSV data for RAG system.""" def __init__(self, csv_path: Path, sample_size: int = 1050000) -> None: """Initialize CSV document generator. Args: csv_path: Path to the CSV file. sample_size: Number of rows to sample from CSV (to handle large files). """ self.csv_path = Path(csv_path) self.sample_size = sample_size self.df: pd.DataFrame = None def load_data(self) -> None: """Load CSV data with sampling for efficiency.""" if not self.csv_path.exists(): raise FileNotFoundError(f"CSV file not found: {self.csv_path}") try: logger.info(f"Loading CSV data from {self.csv_path}") # Load with sampling to handle large file self.df = pd.read_csv(self.csv_path, nrows=self.sample_size) # Clean merchant names (remove 'fraud_' prefix common in synthetic datasets) if 'merchant' in self.df.columns: self.df['merchant'] = self.df['merchant'].str.replace('fraud_', '', regex=False) logger.info(f"Loaded {len(self.df)} rows from CSV (merchant names cleaned)") except Exception as e: logger.error(f"Error loading CSV: {str(e)}") raise def generate_fraud_pattern_documents(self) -> List[Document]: """Generate documents about fraud patterns by category. Returns: List of documents containing fraud pattern insights. """ if self.df is None: self.load_data() documents = [] # Fraud patterns by category category_fraud = self.df.groupby('category').agg({ 'is_fraud': ['sum', 'mean', 'count'] }).round(4) for category in category_fraud.index: fraud_count = int(category_fraud.loc[category, ('is_fraud', 'sum')]) fraud_rate = float(category_fraud.loc[category, ('is_fraud', 'mean')] * 100) total_txns = int(category_fraud.loc[category, ('is_fraud', 'count')]) content = f"""Fraud Pattern Analysis - Category: {category} Based on historical transaction data analysis: - Total Transactions: {total_txns:,} - Fraud Cases: {fraud_count:,} - Fraud Rate: {fraud_rate:.2f}% - Risk Level: {'HIGH' if fraud_rate > 5 else 'MEDIUM' if fraud_rate > 1 else 'LOW'} This category shows {'significant' if fraud_rate > 5 else 'moderate' if fraud_rate > 1 else 'low'} fraud activity in the historical dataset. """ documents.append(Document( page_content=content, metadata={ "source": "fraudTrain.csv", "type": "fraud_pattern", "category": category, "fraud_rate": fraud_rate } )) logger.info(f"Generated {len(documents)} category fraud pattern documents") return documents def generate_statistical_summaries(self) -> List[Document]: """Generate statistical summary documents. Returns: List of documents containing statistical insights. """ if self.df is None: self.load_data() documents = [] # Overall statistics total_txns = len(self.df) fraud_txns = int(self.df['is_fraud'].sum()) fraud_rate = float(self.df['is_fraud'].mean() * 100) avg_amount = float(self.df['amt'].mean()) fraud_avg_amount = float(self.df[self.df['is_fraud'] == 1]['amt'].mean()) legit_avg_amount = float(self.df[self.df['is_fraud'] == 0]['amt'].mean()) overall_summary = f"""Overall Fraud Detection Statistics Dataset Summary: - Total Transactions Analyzed: {total_txns:,} - Fraudulent Transactions: {fraud_txns:,} - Overall Fraud Rate: {fraud_rate:.2f}% - Average Transaction Amount: ${avg_amount:.2f} - Average Fraud Amount: ${fraud_avg_amount:.2f} - Average Legitimate Amount: ${legit_avg_amount:.2f} Key Insight: Fraudulent transactions have an average amount of ${fraud_avg_amount:.2f} compared to ${legit_avg_amount:.2f} for legitimate transactions. """ documents.append(Document( page_content=overall_summary, metadata={ "source": "fraudTrain.csv", "type": "statistical_summary", "scope": "overall" } )) # Amount range analysis amount_bins = [0, 10, 50, 100, 500, 1000, float('inf')] amount_labels = ['$0-10', '$10-50', '$50-100', '$100-500', '$500-1000', '$1000+'] self.df['amount_range'] = pd.cut(self.df['amt'], bins=amount_bins, labels=amount_labels) amount_fraud = self.df.groupby('amount_range', observed=True).agg({ 'is_fraud': ['sum', 'mean', 'count'] }).round(4) amount_content = "Fraud Patterns by Transaction Amount\n\n" for amt_range in amount_labels: if amt_range in amount_fraud.index: fraud_count = int(amount_fraud.loc[amt_range, ('is_fraud', 'sum')]) fraud_rate = float(amount_fraud.loc[amt_range, ('is_fraud', 'mean')] * 100) total = int(amount_fraud.loc[amt_range, ('is_fraud', 'count')]) amount_content += f""" Amount Range: {amt_range} - Total Transactions: {total:,} - Fraud Cases: {fraud_count:,} - Fraud Rate: {fraud_rate:.2f}% """ documents.append(Document( page_content=amount_content, metadata={ "source": "fraudTrain.csv", "type": "statistical_summary", "scope": "amount_analysis" } )) logger.info(f"Generated {len(documents)} statistical summary documents") return documents def generate_merchant_profiles(self) -> List[Document]: """Generate merchant risk profile documents. Returns: List of documents containing merchant insights. """ if self.df is None: self.load_data() documents = [] # Top merchants by transaction volume merchant_stats = self.df.groupby('merchant').agg({ 'is_fraud': ['sum', 'mean', 'count'], 'amt': 'mean' }).round(4) # Get top 20 merchants by volume top_merchants = merchant_stats.nlargest(20, ('is_fraud', 'count')) for merchant in top_merchants.index: fraud_count = int(top_merchants.loc[merchant, ('is_fraud', 'sum')]) fraud_rate = float(top_merchants.loc[merchant, ('is_fraud', 'mean')] * 100) total_txns = int(top_merchants.loc[merchant, ('is_fraud', 'count')]) avg_amt = float(top_merchants.loc[merchant, ('amt', 'mean')]) content = f"""Merchant Risk Profile: {merchant} Transaction Analysis: - Total Transactions: {total_txns:,} - Fraudulent Transactions: {fraud_count:,} - Fraud Rate: {fraud_rate:.2f}% - Average Transaction Amount: ${avg_amt:.2f} - Risk Assessment: {'HIGH RISK' if fraud_rate > 10 else 'MEDIUM RISK' if fraud_rate > 5 else 'LOW RISK'} This merchant profile is based on historical transaction patterns and can help identify similar fraud patterns. """ documents.append(Document( page_content=content, metadata={ "source": "fraudTrain.csv", "type": "merchant_profile", "merchant": merchant, "fraud_rate": fraud_rate } )) logger.info(f"Generated {len(documents)} merchant profile documents") return documents def generate_location_insights(self) -> List[Document]: """Generate location-based fraud insights. Returns: List of documents containing location insights. """ if self.df is None: self.load_data() documents = [] # State-level analysis state_fraud = self.df.groupby('state').agg({ 'is_fraud': ['sum', 'mean', 'count'] }).round(4) # Get top 15 states by transaction volume top_states = state_fraud.nlargest(15, ('is_fraud', 'count')) for state in top_states.index: fraud_count = int(top_states.loc[state, ('is_fraud', 'sum')]) fraud_rate = float(top_states.loc[state, ('is_fraud', 'mean')] * 100) total_txns = int(top_states.loc[state, ('is_fraud', 'count')]) content = f"""Geographic Fraud Analysis - State: {state} Location-based Fraud Patterns: - Total Transactions: {total_txns:,} - Fraud Cases: {fraud_count:,} - Fraud Rate: {fraud_rate:.2f}% - Geographic Risk Level: {'HIGH' if fraud_rate > 5 else 'MEDIUM' if fraud_rate > 2 else 'LOW'} This geographic area shows {'elevated' if fraud_rate > 5 else 'moderate' if fraud_rate > 2 else 'normal'} fraud activity levels. """ documents.append(Document( page_content=content, metadata={ "source": "fraudTrain.csv", "type": "location_insight", "state": state, "fraud_rate": fraud_rate } )) logger.info(f"Generated {len(documents)} location insight documents") return documents def generate_all_documents(self) -> List[Document]: """Generate all types of documents from CSV data. Returns: List of all generated documents. """ all_documents = [] logger.info("Generating all document types from CSV data...") all_documents.extend(self.generate_fraud_pattern_documents()) all_documents.extend(self.generate_statistical_summaries()) all_documents.extend(self.generate_merchant_profiles()) all_documents.extend(self.generate_location_insights()) logger.info(f"Generated total of {len(all_documents)} documents from CSV data") return all_documents