Medica_DecisionSupportAI / data_registry.py
Rajan Sharma
Update data_registry.py
5613174 verified
raw
history blame
14.4 kB
from __future__ import annotations
import os
import re
from dataclasses import dataclass, field
from typing import Dict, Any, List, Optional, Set, Tuple
import pandas as pd
def saferead(path: str) -> Optional[pd.DataFrame]:
"""Safely read various file formats into DataFrames."""
name = (path or "").lower()
try:
if name.endswith(".csv"):
return pd.read_csv(path, low_memory=False)
if name.endswith(".xlsx") or name.endswith(".xls"):
return pd.read_excel(path)
if name.endswith(".tsv"):
return pd.read_csv(path, sep='\t', low_memory=False)
if name.endswith(".json"):
return pd.read_json(path)
if name.endswith(".parquet"):
return pd.read_parquet(path)
except Exception as e:
print(f"Warning: Could not read {path}: {e}")
return None
return None
def dtypeof_series(s: pd.Series) -> str:
"""Determine the semantic type of a pandas Series."""
if pd.api.types.is_integer_dtype(s):
return "int"
if pd.api.types.is_float_dtype(s):
return "float"
if pd.api.types.is_bool_dtype(s):
return "bool"
if pd.api.types.is_datetime64_any_dtype(s):
return "datetime"
# Check if string column could be numeric
if s.dtype == 'object':
sample = s.dropna().head(100)
if len(sample) > 0:
try:
numeric_sample = pd.to_numeric(sample, errors='coerce')
if numeric_sample.notna().sum() > len(sample) * 0.7:
return "numeric_as_string"
except:
pass
return "string"
def detect_column_purpose(col_name: str, series: pd.Series) -> str:
"""Detect the likely purpose/semantic meaning of a column."""
col_lower = col_name.lower()
# ID/Key patterns
if re.search(r'\bid\b|identifier|key|code', col_lower):
return "identifier"
# Time/Date patterns
if re.search(r'\btime\b|date|duration|wait|delay|length', col_lower):
if dtypeof_series(series) in ['int', 'float', 'numeric_as_string']:
return "time_metric"
else:
return "temporal"
# Financial patterns
if re.search(r'\bcost\b|price|budget|fee|expense|revenue|income', col_lower):
return "financial_metric"
# Location/Geographic patterns
if re.search(r'\bzone\b|region|area|district|location|address|city|state', col_lower):
return "geographic"
# Entity/Organization patterns
if re.search(r'\bfacility\b|hospital|clinic|organization|company|department', col_lower):
return "entity"
# Category/Classification patterns
if re.search(r'\btype\b|category|specialty|service|class|group', col_lower):
return "category"
# Performance/Quality patterns
if re.search(r'\bscore\b|rating|quality|performance|satisfaction|outcome', col_lower):
return "performance_metric"
# Count/Volume patterns
if re.search(r'\bcount\b|number|quantity|volume|total|sum', col_lower):
return "count_metric"
# Rate/Percentage patterns
if re.search(r'\brate\b|ratio|percent|frequency|proportion', col_lower):
return "rate_metric"
# Capacity patterns
if re.search(r'\bcapacity\b|beds|seats|slots|availability|utilization', col_lower):
return "capacity_metric"
# Generic categorization based on data characteristics
unique_ratio = series.nunique() / len(series) if len(series) > 0 else 0
if dtypeof_series(series) in ['int', 'float', 'numeric_as_string']:
return "numeric_metric"
elif unique_ratio < 0.1:
return "low_cardinality_category"
elif unique_ratio < 0.5:
return "category"
else:
return "text"
def profiledf(df: pd.DataFrame, max_examples: int = 3) -> Dict[str, Any]:
"""Generate a comprehensive profile of a DataFrame."""
cols = []
numeric_cols = []
categorical_cols = []
for c in df.columns:
s = df[c]
dtype = dtypeof_series(s)
purpose = detect_column_purpose(str(c), s)
ex_vals = s.dropna().astype(str).head(max_examples).tolist() if len(s) else []
col_profile = {
"name": str(c),
"dtype": dtype,
"purpose": purpose,
"n_non_null": int(s.notna().sum()),
"n_unique": int(s.nunique(dropna=True)),
"examples": ex_vals,
"missing_ratio": round(s.isna().sum() / len(s), 3) if len(s) > 0 else 0
}
# Add statistics for numeric columns
if dtype in ['int', 'float', 'numeric_as_string']:
try:
if dtype == 'numeric_as_string':
numeric_series = pd.to_numeric(s, errors='coerce')
else:
numeric_series = s
col_profile.update({
"min": float(numeric_series.min()) if not numeric_series.isna().all() else None,
"max": float(numeric_series.max()) if not numeric_series.isna().all() else None,
"mean": float(numeric_series.mean()) if not numeric_series.isna().all() else None,
"std": float(numeric_series.std()) if not numeric_series.isna().all() else None
})
numeric_cols.append(str(c))
except:
pass
# Track categorical columns
if purpose in ['category', 'entity', 'geographic', 'low_cardinality_category']:
categorical_cols.append(str(c))
cols.append(col_profile)
return {
"n_rows": int(len(df)),
"n_cols": int(df.shape[1]),
"columns": cols,
"numeric_columns": numeric_cols,
"categorical_columns": categorical_cols,
"analysis_potential": _assess_analysis_potential(cols)
}
def _assess_analysis_potential(column_profiles: List[Dict[str, Any]]) -> Dict[str, Any]:
"""Assess what types of analysis are possible with this data."""
potential = {
"can_rank": False,
"can_compare_groups": False,
"can_analyze_trends": False,
"has_entities": False,
"has_metrics": False,
"suggested_grouping_cols": [],
"suggested_metric_cols": []
}
entity_cols = []
metric_cols = []
for col in column_profiles:
purpose = col.get("purpose", "")
# Identify grouping/entity columns
if purpose in ["entity", "category", "geographic", "low_cardinality_category"]:
entity_cols.append(col["name"])
if col.get("n_unique", 0) >= 2: # At least 2 groups needed
potential["suggested_grouping_cols"].append(col["name"])
# Identify metric columns
if purpose.endswith("_metric") or purpose in ["numeric_metric"]:
metric_cols.append(col["name"])
if col.get("n_non_null", 0) > 0: # Has actual data
potential["suggested_metric_cols"].append(col["name"])
# Assess capabilities
potential["has_entities"] = len(entity_cols) > 0
potential["has_metrics"] = len(metric_cols) > 0
potential["can_rank"] = len(potential["suggested_grouping_cols"]) > 0 and len(potential["suggested_metric_cols"]) > 0
potential["can_compare_groups"] = potential["can_rank"]
potential["can_analyze_trends"] = any(col.get("purpose") == "temporal" for col in column_profiles)
return potential
@dataclass
class TableEntry:
name: str
path: str
df: pd.DataFrame
profile: Dict[str, Any] = field(default_factory=dict)
def get_grouping_columns(self) -> List[str]:
"""Get columns suitable for grouping analysis."""
return self.profile.get("analysis_potential", {}).get("suggested_grouping_cols", [])
def get_metric_columns(self) -> List[str]:
"""Get columns suitable as metrics."""
return self.profile.get("analysis_potential", {}).get("suggested_metric_cols", [])
def can_support_ranking(self) -> bool:
"""Check if this table can support ranking analysis."""
return self.profile.get("analysis_potential", {}).get("can_rank", False)
class DataRegistry:
"""Registry for managing multiple data tables with analysis capabilities."""
def __init__(self):
self._tables: Dict[str, TableEntry] = {}
def clear(self) -> None:
"""Clear all tables from the registry."""
self._tables.clear()
def add_path(self, path: str) -> Optional[str]:
"""Add a data file to the registry."""
if not path or not os.path.exists(path):
return None
df = saferead(path)
if df is None:
return None
# Generate unique name
base = os.path.splitext(os.path.basename(path))[0] # Remove extension for cleaner names
key = base
i = 2
while key in self._tables:
key = f"{base}_{i}"
i += 1
# Profile the dataframe
prof = profiledf(df)
self._tables[key] = TableEntry(name=key, path=path, df=df, profile=prof)
return key
def add_dataframe(self, df: pd.DataFrame, name: str) -> str:
"""Add a DataFrame directly to the registry."""
# Ensure unique name
key = name
i = 2
while key in self._tables:
key = f"{name}_{i}"
i += 1
prof = profiledf(df)
self._tables[key] = TableEntry(name=key, path="", df=df, profile=prof)
return key
def names(self) -> List[str]:
"""Get names of all tables."""
return list(self._tables.keys())
def get(self, name: str) -> Optional[pd.DataFrame]:
"""Get a DataFrame by name."""
return self._tables.get(name).df if name in self._tables else None
def get_table(self, name: str) -> Optional[TableEntry]:
"""Get a TableEntry by name."""
return self._tables.get(name)
def get_profile(self, name: str) -> Dict[str, Any]:
"""Get the profile of a table."""
return self._tables.get(name).profile if name in self._tables else {}
def iter_tables(self) -> List[TableEntry]:
"""Iterate over all table entries."""
return list(self._tables.values())
def get_analysis_ready_tables(self) -> List[TableEntry]:
"""Get tables that are ready for analysis (have both grouping and metric columns)."""
return [t for t in self._tables.values() if t.can_support_ranking()]
def find_tables_with_column_purpose(self, purpose: str) -> List[Tuple[str, str]]:
"""Find tables and columns that match a specific purpose."""
matches = []
for table in self._tables.values():
for col in table.profile.get("columns", []):
if col.get("purpose") == purpose:
matches.append((table.name, col["name"]))
return matches
def get_all_numeric_columns(self) -> Dict[str, List[str]]:
"""Get all numeric columns across all tables."""
numeric_cols = {}
for table in self._tables.values():
numeric_cols[table.name] = table.profile.get("numeric_columns", [])
return numeric_cols
def get_all_categorical_columns(self) -> Dict[str, List[str]]:
"""Get all categorical columns across all tables."""
categorical_cols = {}
for table in self._tables.values():
categorical_cols[table.name] = table.profile.get("categorical_columns", [])
return categorical_cols
def summarize_for_prompt(self, col_cap: int = 600) -> str:
"""Generate a summary suitable for LLM prompts."""
if not self._tables:
return "No data tables available."
lines = []
for t in self.iter_tables():
# Basic info
n_rows = t.profile.get('n_rows', 0)
n_cols = t.profile.get('n_cols', 0)
# Column info with purposes
cols_with_purpose = []
for col in t.profile.get("columns", []):
name = col["name"]
purpose = col.get("purpose", "unknown")
if purpose != "text": # Skip generic text columns for brevity
cols_with_purpose.append(f"{name}({purpose})")
else:
cols_with_purpose.append(name)
cols_str = ", ".join(cols_with_purpose)
if len(cols_str) > col_cap:
cols_str = cols_str[:col_cap] + "…"
# Analysis potential
potential = t.profile.get("analysis_potential", {})
capabilities = []
if potential.get("can_rank"):
capabilities.append("can_rank")
if potential.get("can_compare_groups"):
capabilities.append("can_compare")
if potential.get("can_analyze_trends"):
capabilities.append("can_trend")
cap_str = f" [{','.join(capabilities)}]" if capabilities else ""
lines.append(f"- {t.name}: {n_rows} rows, {n_cols} cols{cap_str}")
lines.append(f" Columns: {cols_str}")
return "\n".join(lines)
def get_analysis_suggestions(self) -> Dict[str, List[str]]:
"""Get suggestions for possible analyses based on available data."""
suggestions = {
"rankings": [],
"comparisons": [],
"trends": []
}
for table in self._tables.values():
grouping_cols = table.get_grouping_columns()
metric_cols = table.get_metric_columns()
# Ranking suggestions
for group_col in grouping_cols[:2]: # Limit to avoid overwhelming
for metric_col in metric_cols[:2]:
suggestions["rankings"].append(f"Rank {group_col} by {metric_col} (table: {table.name})")
# Comparison suggestions
for group_col in grouping_cols[:2]:
for metric_col in metric_cols[:2]:
suggestions["comparisons"].append(f"Compare {metric_col} across {group_col} (table: {table.name})")
return suggestions