File size: 3,885 Bytes
4aa0277 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 |
"""
Data Loader Agent - Handles loading data from various sources
"""
import pandas as pd
import numpy as np
import json
import sqlite3
import requests
from io import StringIO
class DataLoaderAgent:
"""Agent responsible for loading data from various sources"""
def __init__(self):
self.supported_formats = ['csv', 'json', 'txt', 'sql', 'api', 'excel']
def load_data(self, source, source_type='csv', **kwargs):
"""
Load data from various sources
Args:
source: Path to file, URL, or database table name
source_type: Type of source ('csv', 'json', 'sql', 'api', 'excel')
**kwargs: Additional parameters for specific loaders
Returns:
Dictionary with status, data, and metadata
"""
try:
if source_type == 'csv':
data = self._load_csv(source, **kwargs)
elif source_type == 'excel':
data = self._load_excel(source, **kwargs)
elif source_type == 'json':
data = self._load_json(source, **kwargs)
elif source_type == 'sql':
data = self._load_sql(source, **kwargs)
elif source_type == 'api':
data = self._load_api(source, **kwargs)
else:
raise ValueError(f"Unsupported source type: {source_type}")
return {
'status': 'success',
'data': data,
'info': {
'shape': data.shape,
'columns': list(data.columns),
'dtypes': data.dtypes.to_dict(),
'memory_usage': f"{data.memory_usage(deep=True).sum() / 1024**2:.2f} MB"
}
}
except Exception as e:
return {
'status': 'error',
'error': str(e),
'data': None
}
def _load_csv(self, source, **kwargs):
"""Load CSV data from file or URL"""
if isinstance(source, str) and source.startswith('http'):
return pd.read_csv(source, **kwargs)
else:
return pd.read_csv(source, **kwargs)
def _load_excel(self, source, **kwargs):
"""Load Excel data from file or URL"""
if isinstance(source, str) and source.startswith('http'):
return pd.read_excel(source, **kwargs)
else:
return pd.read_excel(source, **kwargs)
def _load_json(self, source, **kwargs):
"""Load JSON data from file or URL"""
if isinstance(source, str) and source.startswith('http'):
response = requests.get(source)
data = pd.json_normalize(response.json())
else:
with open(source, 'r') as f:
json_data = json.load(f)
data = pd.json_normalize(json_data)
return data
def _load_sql(self, source, **kwargs):
"""Load data from SQL database"""
database = kwargs.get('database', 'database.db')
query = kwargs.get('query', f'SELECT * FROM {source}')
conn = sqlite3.connect(database)
data = pd.read_sql_query(query, conn)
conn.close()
return data
def _load_api(self, source, **kwargs):
"""Load data from API endpoint"""
headers = kwargs.get('headers', {})
params = kwargs.get('params', {})
response = requests.get(source, headers=headers, params=params)
response.raise_for_status()
data = pd.json_normalize(response.json())
return data
def get_sample(self, data, n=5):
"""Get a sample of the data for quick inspection"""
return {
'head': data.head(n).to_dict('records'),
'tail': data.tail(n).to_dict('records'),
'random_sample': data.sample(min(n, len(data))).to_dict('records')
}
|