File size: 3,885 Bytes
4aa0277
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
"""
Data Loader Agent - Handles loading data from various sources
"""

import pandas as pd
import numpy as np
import json
import sqlite3
import requests
from io import StringIO


class DataLoaderAgent:
    """Agent responsible for loading data from various sources"""

    def __init__(self):
        self.supported_formats = ['csv', 'json', 'txt', 'sql', 'api', 'excel']

    def load_data(self, source, source_type='csv', **kwargs):
        """
        Load data from various sources

        Args:
            source: Path to file, URL, or database table name
            source_type: Type of source ('csv', 'json', 'sql', 'api', 'excel')
            **kwargs: Additional parameters for specific loaders

        Returns:
            Dictionary with status, data, and metadata
        """
        try:
            if source_type == 'csv':
                data = self._load_csv(source, **kwargs)
            elif source_type == 'excel':
                data = self._load_excel(source, **kwargs)
            elif source_type == 'json':
                data = self._load_json(source, **kwargs)
            elif source_type == 'sql':
                data = self._load_sql(source, **kwargs)
            elif source_type == 'api':
                data = self._load_api(source, **kwargs)
            else:
                raise ValueError(f"Unsupported source type: {source_type}")

            return {
                'status': 'success',
                'data': data,
                'info': {
                    'shape': data.shape,
                    'columns': list(data.columns),
                    'dtypes': data.dtypes.to_dict(),
                    'memory_usage': f"{data.memory_usage(deep=True).sum() / 1024**2:.2f} MB"
                }
            }

        except Exception as e:
            return {
                'status': 'error',
                'error': str(e),
                'data': None
            }

    def _load_csv(self, source, **kwargs):
        """Load CSV data from file or URL"""
        if isinstance(source, str) and source.startswith('http'):
            return pd.read_csv(source, **kwargs)
        else:
            return pd.read_csv(source, **kwargs)

    def _load_excel(self, source, **kwargs):
        """Load Excel data from file or URL"""
        if isinstance(source, str) and source.startswith('http'):
            return pd.read_excel(source, **kwargs)
        else:
            return pd.read_excel(source, **kwargs)

    def _load_json(self, source, **kwargs):
        """Load JSON data from file or URL"""
        if isinstance(source, str) and source.startswith('http'):
            response = requests.get(source)
            data = pd.json_normalize(response.json())
        else:
            with open(source, 'r') as f:
                json_data = json.load(f)
            data = pd.json_normalize(json_data)
        return data

    def _load_sql(self, source, **kwargs):
        """Load data from SQL database"""
        database = kwargs.get('database', 'database.db')
        query = kwargs.get('query', f'SELECT * FROM {source}')

        conn = sqlite3.connect(database)
        data = pd.read_sql_query(query, conn)
        conn.close()
        return data

    def _load_api(self, source, **kwargs):
        """Load data from API endpoint"""
        headers = kwargs.get('headers', {})
        params = kwargs.get('params', {})

        response = requests.get(source, headers=headers, params=params)
        response.raise_for_status()

        data = pd.json_normalize(response.json())
        return data

    def get_sample(self, data, n=5):
        """Get a sample of the data for quick inspection"""
        return {
            'head': data.head(n).to_dict('records'),
            'tail': data.tail(n).to_dict('records'),
            'random_sample': data.sample(min(n, len(data))).to_dict('records')
        }