Spaces:
Sleeping
Sleeping
| """CSV document generator for RAG system.""" | |
| import logging | |
| from pathlib import Path | |
| from typing import List, Dict, Any | |
| import pandas as pd | |
| from langchain_core.documents import Document | |
| logger = logging.getLogger(__name__) | |
| class CSVDocumentGenerator: | |
| """Generate documents from CSV data for RAG system.""" | |
| def __init__(self, csv_path: Path, sample_size: int = 1050000) -> None: | |
| """Initialize CSV document generator. | |
| Args: | |
| csv_path: Path to the CSV file. | |
| sample_size: Number of rows to sample from CSV (to handle large files). | |
| """ | |
| self.csv_path = Path(csv_path) | |
| self.sample_size = sample_size | |
| self.df: pd.DataFrame = None | |
| def load_data(self) -> None: | |
| """Load CSV data with sampling for efficiency.""" | |
| if not self.csv_path.exists(): | |
| raise FileNotFoundError(f"CSV file not found: {self.csv_path}") | |
| try: | |
| logger.info(f"Loading CSV data from {self.csv_path}") | |
| # Load with sampling to handle large file | |
| self.df = pd.read_csv(self.csv_path, nrows=self.sample_size) | |
| # Clean merchant names (remove 'fraud_' prefix common in synthetic datasets) | |
| if 'merchant' in self.df.columns: | |
| self.df['merchant'] = self.df['merchant'].str.replace('fraud_', '', regex=False) | |
| logger.info(f"Loaded {len(self.df)} rows from CSV (merchant names cleaned)") | |
| except Exception as e: | |
| logger.error(f"Error loading CSV: {str(e)}") | |
| raise | |
| def generate_fraud_pattern_documents(self) -> List[Document]: | |
| """Generate documents about fraud patterns by category. | |
| Returns: | |
| List of documents containing fraud pattern insights. | |
| """ | |
| if self.df is None: | |
| self.load_data() | |
| documents = [] | |
| # Fraud patterns by category | |
| category_fraud = self.df.groupby('category').agg({ | |
| 'is_fraud': ['sum', 'mean', 'count'] | |
| }).round(4) | |
| for category in category_fraud.index: | |
| fraud_count = int(category_fraud.loc[category, ('is_fraud', 'sum')]) | |
| fraud_rate = float(category_fraud.loc[category, ('is_fraud', 'mean')] * 100) | |
| total_txns = int(category_fraud.loc[category, ('is_fraud', 'count')]) | |
| content = f"""Fraud Pattern Analysis - Category: {category} | |
| Based on historical transaction data analysis: | |
| - Total Transactions: {total_txns:,} | |
| - Fraud Cases: {fraud_count:,} | |
| - Fraud Rate: {fraud_rate:.2f}% | |
| - Risk Level: {'HIGH' if fraud_rate > 5 else 'MEDIUM' if fraud_rate > 1 else 'LOW'} | |
| This category shows {'significant' if fraud_rate > 5 else 'moderate' if fraud_rate > 1 else 'low'} fraud activity in the historical dataset. | |
| """ | |
| documents.append(Document( | |
| page_content=content, | |
| metadata={ | |
| "source": "fraudTrain.csv", | |
| "type": "fraud_pattern", | |
| "category": category, | |
| "fraud_rate": fraud_rate | |
| } | |
| )) | |
| logger.info(f"Generated {len(documents)} category fraud pattern documents") | |
| return documents | |
| def generate_statistical_summaries(self) -> List[Document]: | |
| """Generate statistical summary documents. | |
| Returns: | |
| List of documents containing statistical insights. | |
| """ | |
| if self.df is None: | |
| self.load_data() | |
| documents = [] | |
| # Overall statistics | |
| total_txns = len(self.df) | |
| fraud_txns = int(self.df['is_fraud'].sum()) | |
| fraud_rate = float(self.df['is_fraud'].mean() * 100) | |
| avg_amount = float(self.df['amt'].mean()) | |
| fraud_avg_amount = float(self.df[self.df['is_fraud'] == 1]['amt'].mean()) | |
| legit_avg_amount = float(self.df[self.df['is_fraud'] == 0]['amt'].mean()) | |
| overall_summary = f"""Overall Fraud Detection Statistics | |
| Dataset Summary: | |
| - Total Transactions Analyzed: {total_txns:,} | |
| - Fraudulent Transactions: {fraud_txns:,} | |
| - Overall Fraud Rate: {fraud_rate:.2f}% | |
| - Average Transaction Amount: ${avg_amount:.2f} | |
| - Average Fraud Amount: ${fraud_avg_amount:.2f} | |
| - Average Legitimate Amount: ${legit_avg_amount:.2f} | |
| Key Insight: Fraudulent transactions have an average amount of ${fraud_avg_amount:.2f} compared to ${legit_avg_amount:.2f} for legitimate transactions. | |
| """ | |
| documents.append(Document( | |
| page_content=overall_summary, | |
| metadata={ | |
| "source": "fraudTrain.csv", | |
| "type": "statistical_summary", | |
| "scope": "overall" | |
| } | |
| )) | |
| # Amount range analysis | |
| amount_bins = [0, 10, 50, 100, 500, 1000, float('inf')] | |
| amount_labels = ['$0-10', '$10-50', '$50-100', '$100-500', '$500-1000', '$1000+'] | |
| self.df['amount_range'] = pd.cut(self.df['amt'], bins=amount_bins, labels=amount_labels) | |
| amount_fraud = self.df.groupby('amount_range', observed=True).agg({ | |
| 'is_fraud': ['sum', 'mean', 'count'] | |
| }).round(4) | |
| amount_content = "Fraud Patterns by Transaction Amount\n\n" | |
| for amt_range in amount_labels: | |
| if amt_range in amount_fraud.index: | |
| fraud_count = int(amount_fraud.loc[amt_range, ('is_fraud', 'sum')]) | |
| fraud_rate = float(amount_fraud.loc[amt_range, ('is_fraud', 'mean')] * 100) | |
| total = int(amount_fraud.loc[amt_range, ('is_fraud', 'count')]) | |
| amount_content += f""" | |
| Amount Range: {amt_range} | |
| - Total Transactions: {total:,} | |
| - Fraud Cases: {fraud_count:,} | |
| - Fraud Rate: {fraud_rate:.2f}% | |
| """ | |
| documents.append(Document( | |
| page_content=amount_content, | |
| metadata={ | |
| "source": "fraudTrain.csv", | |
| "type": "statistical_summary", | |
| "scope": "amount_analysis" | |
| } | |
| )) | |
| logger.info(f"Generated {len(documents)} statistical summary documents") | |
| return documents | |
| def generate_merchant_profiles(self) -> List[Document]: | |
| """Generate merchant risk profile documents. | |
| Returns: | |
| List of documents containing merchant insights. | |
| """ | |
| if self.df is None: | |
| self.load_data() | |
| documents = [] | |
| # Top merchants by transaction volume | |
| merchant_stats = self.df.groupby('merchant').agg({ | |
| 'is_fraud': ['sum', 'mean', 'count'], | |
| 'amt': 'mean' | |
| }).round(4) | |
| # Get top 20 merchants by volume | |
| top_merchants = merchant_stats.nlargest(20, ('is_fraud', 'count')) | |
| for merchant in top_merchants.index: | |
| fraud_count = int(top_merchants.loc[merchant, ('is_fraud', 'sum')]) | |
| fraud_rate = float(top_merchants.loc[merchant, ('is_fraud', 'mean')] * 100) | |
| total_txns = int(top_merchants.loc[merchant, ('is_fraud', 'count')]) | |
| avg_amt = float(top_merchants.loc[merchant, ('amt', 'mean')]) | |
| content = f"""Merchant Risk Profile: {merchant} | |
| Transaction Analysis: | |
| - Total Transactions: {total_txns:,} | |
| - Fraudulent Transactions: {fraud_count:,} | |
| - Fraud Rate: {fraud_rate:.2f}% | |
| - Average Transaction Amount: ${avg_amt:.2f} | |
| - Risk Assessment: {'HIGH RISK' if fraud_rate > 10 else 'MEDIUM RISK' if fraud_rate > 5 else 'LOW RISK'} | |
| This merchant profile is based on historical transaction patterns and can help identify similar fraud patterns. | |
| """ | |
| documents.append(Document( | |
| page_content=content, | |
| metadata={ | |
| "source": "fraudTrain.csv", | |
| "type": "merchant_profile", | |
| "merchant": merchant, | |
| "fraud_rate": fraud_rate | |
| } | |
| )) | |
| logger.info(f"Generated {len(documents)} merchant profile documents") | |
| return documents | |
| def generate_location_insights(self) -> List[Document]: | |
| """Generate location-based fraud insights. | |
| Returns: | |
| List of documents containing location insights. | |
| """ | |
| if self.df is None: | |
| self.load_data() | |
| documents = [] | |
| # State-level analysis | |
| state_fraud = self.df.groupby('state').agg({ | |
| 'is_fraud': ['sum', 'mean', 'count'] | |
| }).round(4) | |
| # Get top 15 states by transaction volume | |
| top_states = state_fraud.nlargest(15, ('is_fraud', 'count')) | |
| for state in top_states.index: | |
| fraud_count = int(top_states.loc[state, ('is_fraud', 'sum')]) | |
| fraud_rate = float(top_states.loc[state, ('is_fraud', 'mean')] * 100) | |
| total_txns = int(top_states.loc[state, ('is_fraud', 'count')]) | |
| content = f"""Geographic Fraud Analysis - State: {state} | |
| Location-based Fraud Patterns: | |
| - Total Transactions: {total_txns:,} | |
| - Fraud Cases: {fraud_count:,} | |
| - Fraud Rate: {fraud_rate:.2f}% | |
| - Geographic Risk Level: {'HIGH' if fraud_rate > 5 else 'MEDIUM' if fraud_rate > 2 else 'LOW'} | |
| This geographic area shows {'elevated' if fraud_rate > 5 else 'moderate' if fraud_rate > 2 else 'normal'} fraud activity levels. | |
| """ | |
| documents.append(Document( | |
| page_content=content, | |
| metadata={ | |
| "source": "fraudTrain.csv", | |
| "type": "location_insight", | |
| "state": state, | |
| "fraud_rate": fraud_rate | |
| } | |
| )) | |
| logger.info(f"Generated {len(documents)} location insight documents") | |
| return documents | |
| def generate_all_documents(self) -> List[Document]: | |
| """Generate all types of documents from CSV data. | |
| Returns: | |
| List of all generated documents. | |
| """ | |
| all_documents = [] | |
| logger.info("Generating all document types from CSV data...") | |
| all_documents.extend(self.generate_fraud_pattern_documents()) | |
| all_documents.extend(self.generate_statistical_summaries()) | |
| all_documents.extend(self.generate_merchant_profiles()) | |
| all_documents.extend(self.generate_location_insights()) | |
| logger.info(f"Generated total of {len(all_documents)} documents from CSV data") | |
| return all_documents | |