import pandas as pd from llama_index import GPTVectorStoreIndex, Document from typing import Union, List import json class DataIngestionModule: def __init__(self): self.supported_formats = { 'csv': pd.read_csv, 'xlsx': pd.read_excel, 'json': pd.read_json } def load_data(self, file) -> pd.DataFrame: """Load data from various file formats""" file_extension = file.name.split('.')[-1].lower() if file_extension not in self.supported_formats: raise ValueError(f"Unsupported file format: {file_extension}") return self.supported_formats[file_extension](file) def preprocess_data(self, df: pd.DataFrame) -> pd.DataFrame: """Preprocess the dataframe""" # Remove duplicate rows df = df.drop_duplicates() # Handle missing values df = df.fillna('') # Convert all text columns to string text_columns = df.select_dtypes(include=['object']).columns for col in text_columns: df[col] = df[col].astype(str) return df def index_data(self, df: pd.DataFrame) -> GPTVectorStoreIndex: """Create a LlamaIndex index from the dataframe""" # Preprocess the data processed_df = self.preprocess_data(df) # Convert DataFrame rows to documents documents = [] for _, row in processed_df.iterrows(): # Combine all columns into a single text document text = " ".join([f"{col}: {val}" for col, val in row.items()]) documents.append(Document(text)) # Create and return the index return GPTVectorStoreIndex.from_documents(documents) def export_processed_data(self, df: pd.DataFrame, format: str, path: str): """Export processed data to specified format""" processed_df = self.preprocess_data(df) if format == 'csv': processed_df.to_csv(path, index=False) elif format == 'json': processed_df.to_json(path, orient='records') elif format == 'xlsx': processed_df.to_excel(path, index=False) else: raise ValueError(f"Unsupported export format: {format}")