Spaces:
Sleeping
Sleeping
File size: 4,076 Bytes
8af0435 aff5a07 8af0435 ef7ab85 aff5a07 8af0435 ef7ab85 8af0435 ef7ab85 8af0435 4073913 ef7ab85 4073913 ef7ab85 4073913 ef7ab85 4073913 ef7ab85 4073913 ef7ab85 4073913 ef7ab85 4073913 ef7ab85 4073913 ef7ab85 8af0435 ef7ab85 8af0435 5613174 ef7ab85 5613174 ef7ab85 4073913 ef7ab85 5613174 ef7ab85 5613174 ef7ab85 5613174 ef7ab85 6c051db 5613174 ef7ab85 4073913 8af0435 ef7ab85 8af0435 ef7ab85 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 |
# data_registry.py
import pandas as pd
import os
from typing import Dict, List, Any, Optional, Union
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class DataRegistry:
def __init__(self):
self.data = {}
self.metadata = {}
def add_path(self, file_path: str) -> bool:
"""Add a file to the registry and return success status"""
try:
file_ext = os.path.splitext(file_path)[1].lower()
if file_ext == '.csv':
df = pd.read_csv(file_path)
elif file_ext in ['.xlsx', '.xls']:
df = pd.read_excel(file_path)
elif file_ext == '.json':
df = pd.read_json(file_path)
else:
logger.warning(f"Unsupported file type: {file_ext}")
return False
# Store with filename as key
filename = os.path.basename(file_path)
self.data[filename] = df
# Store metadata
self.metadata[filename] = {
"path": file_path,
"type": file_ext,
"shape": df.shape,
"columns": list(df.columns),
"data_types": df.dtypes.to_dict(),
"null_counts": df.isnull().sum().to_dict(),
"sample_data": df.head(3).to_dict()
}
logger.info(f"Successfully loaded {filename} with shape {df.shape}")
return True
except Exception as e:
logger.error(f"Error loading {file_path}: {str(e)}")
return False
def get(self, name: str) -> Optional[pd.DataFrame]:
"""Get a dataset by name"""
return self.data.get(name)
def names(self) -> List[str]:
"""Get all dataset names"""
return list(self.data.keys())
def get_data_by_type(self, data_type: str) -> List[str]:
"""Get datasets matching a type pattern"""
matching = []
for name, meta in self.metadata.items():
if data_type.lower() in name.lower():
matching.append(name)
return matching
def get_data_summary(self) -> Dict[str, Any]:
"""Generate a summary of all loaded datasets"""
return self.metadata
def find_related_datasets(self, keywords: List[str]) -> List[Dict[str, Any]]:
"""Find datasets containing specific keywords in columns or data"""
related = []
for name in self.names():
df = self.get(name)
if df is None:
continue
# Check column names
col_matches = [col for col in df.columns if any(kw in col.lower() for kw in keywords)]
# Check data content
data_matches = False
for col in df.select_dtypes(include=['object']).columns:
try:
# Create a boolean mask for rows containing any keyword
# This is the generic approach that works for any keywords
pattern = '|'.join(keywords)
mask = df[col].str.contains(pattern, case=False, na=False)
# Check if any match exists (this returns a single boolean)
if mask.any():
data_matches = True
break
except Exception as e:
# If there's an error with this column, skip it
logger.debug(f"Error checking column {col} for keywords: {str(e)}")
continue
if col_matches or data_matches:
related.append({
"name": name,
"matching_columns": col_matches,
"has_matching_data": data_matches
})
return related
def clear(self):
"""Clear all data"""
self.data.clear()
self.metadata.clear() |