File size: 2,289 Bytes
e1dd5ab
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
import pandas as pd
from llama_index import GPTVectorStoreIndex, Document
from typing import Union, List
import json

class DataIngestionModule:
    def __init__(self):
        self.supported_formats = {
            'csv': pd.read_csv,
            'xlsx': pd.read_excel,
            'json': pd.read_json
        }

    def load_data(self, file) -> pd.DataFrame:
        """Load data from various file formats"""
        file_extension = file.name.split('.')[-1].lower()
        
        if file_extension not in self.supported_formats:
            raise ValueError(f"Unsupported file format: {file_extension}")
        
        return self.supported_formats[file_extension](file)

    def preprocess_data(self, df: pd.DataFrame) -> pd.DataFrame:
        """Preprocess the dataframe"""
        # Remove duplicate rows
        df = df.drop_duplicates()
        
        # Handle missing values
        df = df.fillna('')
        
        # Convert all text columns to string
        text_columns = df.select_dtypes(include=['object']).columns
        for col in text_columns:
            df[col] = df[col].astype(str)
        
        return df

    def index_data(self, df: pd.DataFrame) -> GPTVectorStoreIndex:
        """Create a LlamaIndex index from the dataframe"""
        # Preprocess the data
        processed_df = self.preprocess_data(df)
        
        # Convert DataFrame rows to documents
        documents = []
        for _, row in processed_df.iterrows():
            # Combine all columns into a single text document
            text = " ".join([f"{col}: {val}" for col, val in row.items()])
            documents.append(Document(text))
        
        # Create and return the index
        return GPTVectorStoreIndex.from_documents(documents)

    def export_processed_data(self, df: pd.DataFrame, format: str, path: str):
        """Export processed data to specified format"""
        processed_df = self.preprocess_data(df)
        
        if format == 'csv':
            processed_df.to_csv(path, index=False)
        elif format == 'json':
            processed_df.to_json(path, orient='records')
        elif format == 'xlsx':
            processed_df.to_excel(path, index=False)
        else:
            raise ValueError(f"Unsupported export format: {format}")