Llama_Developer_Aid / modules /data_ingestion.py
Ashar086's picture
Create modules/data_ingestion.py
e1dd5ab verified
import pandas as pd
from llama_index import GPTVectorStoreIndex, Document
from typing import Union, List
import json
class DataIngestionModule:
def __init__(self):
self.supported_formats = {
'csv': pd.read_csv,
'xlsx': pd.read_excel,
'json': pd.read_json
}
def load_data(self, file) -> pd.DataFrame:
"""Load data from various file formats"""
file_extension = file.name.split('.')[-1].lower()
if file_extension not in self.supported_formats:
raise ValueError(f"Unsupported file format: {file_extension}")
return self.supported_formats[file_extension](file)
def preprocess_data(self, df: pd.DataFrame) -> pd.DataFrame:
"""Preprocess the dataframe"""
# Remove duplicate rows
df = df.drop_duplicates()
# Handle missing values
df = df.fillna('')
# Convert all text columns to string
text_columns = df.select_dtypes(include=['object']).columns
for col in text_columns:
df[col] = df[col].astype(str)
return df
def index_data(self, df: pd.DataFrame) -> GPTVectorStoreIndex:
"""Create a LlamaIndex index from the dataframe"""
# Preprocess the data
processed_df = self.preprocess_data(df)
# Convert DataFrame rows to documents
documents = []
for _, row in processed_df.iterrows():
# Combine all columns into a single text document
text = " ".join([f"{col}: {val}" for col, val in row.items()])
documents.append(Document(text))
# Create and return the index
return GPTVectorStoreIndex.from_documents(documents)
def export_processed_data(self, df: pd.DataFrame, format: str, path: str):
"""Export processed data to specified format"""
processed_df = self.preprocess_data(df)
if format == 'csv':
processed_df.to_csv(path, index=False)
elif format == 'json':
processed_df.to_json(path, orient='records')
elif format == 'xlsx':
processed_df.to_excel(path, index=False)
else:
raise ValueError(f"Unsupported export format: {format}")