from __future__ import annotations import os import re from dataclasses import dataclass, field from typing import Dict, Any, List, Optional, Set, Tuple import pandas as pd def saferead(path: str) -> Optional[pd.DataFrame]: """Safely read various file formats into DataFrames.""" name = (path or "").lower() try: if name.endswith(".csv"): return pd.read_csv(path, low_memory=False) if name.endswith(".xlsx") or name.endswith(".xls"): return pd.read_excel(path) if name.endswith(".tsv"): return pd.read_csv(path, sep='\t', low_memory=False) if name.endswith(".json"): return pd.read_json(path) if name.endswith(".parquet"): return pd.read_parquet(path) except Exception as e: print(f"Warning: Could not read {path}: {e}") return None return None def dtypeof_series(s: pd.Series) -> str: """Determine the semantic type of a pandas Series.""" if pd.api.types.is_integer_dtype(s): return "int" if pd.api.types.is_float_dtype(s): return "float" if pd.api.types.is_bool_dtype(s): return "bool" if pd.api.types.is_datetime64_any_dtype(s): return "datetime" # Check if string column could be numeric if s.dtype == 'object': sample = s.dropna().head(100) if len(sample) > 0: try: numeric_sample = pd.to_numeric(sample, errors='coerce') if numeric_sample.notna().sum() > len(sample) * 0.7: return "numeric_as_string" except: pass return "string" def detect_column_purpose(col_name: str, series: pd.Series) -> str: """Detect the likely purpose/semantic meaning of a column.""" col_lower = col_name.lower() # ID/Key patterns if re.search(r'\bid\b|identifier|key|code', col_lower): return "identifier" # Time/Date patterns if re.search(r'\btime\b|date|duration|wait|delay|length', col_lower): if dtypeof_series(series) in ['int', 'float', 'numeric_as_string']: return "time_metric" else: return "temporal" # Financial patterns if re.search(r'\bcost\b|price|budget|fee|expense|revenue|income', col_lower): return "financial_metric" # Location/Geographic patterns if re.search(r'\bzone\b|region|area|district|location|address|city|state', col_lower): return "geographic" # Entity/Organization patterns if re.search(r'\bfacility\b|hospital|clinic|organization|company|department', col_lower): return "entity" # Category/Classification patterns if re.search(r'\btype\b|category|specialty|service|class|group', col_lower): return "category" # Performance/Quality patterns if re.search(r'\bscore\b|rating|quality|performance|satisfaction|outcome', col_lower): return "performance_metric" # Count/Volume patterns if re.search(r'\bcount\b|number|quantity|volume|total|sum', col_lower): return "count_metric" # Rate/Percentage patterns if re.search(r'\brate\b|ratio|percent|frequency|proportion', col_lower): return "rate_metric" # Capacity patterns if re.search(r'\bcapacity\b|beds|seats|slots|availability|utilization', col_lower): return "capacity_metric" # Generic categorization based on data characteristics unique_ratio = series.nunique() / len(series) if len(series) > 0 else 0 if dtypeof_series(series) in ['int', 'float', 'numeric_as_string']: return "numeric_metric" elif unique_ratio < 0.1: return "low_cardinality_category" elif unique_ratio < 0.5: return "category" else: return "text" def profiledf(df: pd.DataFrame, max_examples: int = 3) -> Dict[str, Any]: """Generate a comprehensive profile of a DataFrame.""" cols = [] numeric_cols = [] categorical_cols = [] for c in df.columns: s = df[c] dtype = dtypeof_series(s) purpose = detect_column_purpose(str(c), s) ex_vals = s.dropna().astype(str).head(max_examples).tolist() if len(s) else [] col_profile = { "name": str(c), "dtype": dtype, "purpose": purpose, "n_non_null": int(s.notna().sum()), "n_unique": int(s.nunique(dropna=True)), "examples": ex_vals, "missing_ratio": round(s.isna().sum() / len(s), 3) if len(s) > 0 else 0 } # Add statistics for numeric columns if dtype in ['int', 'float', 'numeric_as_string']: try: if dtype == 'numeric_as_string': numeric_series = pd.to_numeric(s, errors='coerce') else: numeric_series = s col_profile.update({ "min": float(numeric_series.min()) if not numeric_series.isna().all() else None, "max": float(numeric_series.max()) if not numeric_series.isna().all() else None, "mean": float(numeric_series.mean()) if not numeric_series.isna().all() else None, "std": float(numeric_series.std()) if not numeric_series.isna().all() else None }) numeric_cols.append(str(c)) except: pass # Track categorical columns if purpose in ['category', 'entity', 'geographic', 'low_cardinality_category']: categorical_cols.append(str(c)) cols.append(col_profile) return { "n_rows": int(len(df)), "n_cols": int(df.shape[1]), "columns": cols, "numeric_columns": numeric_cols, "categorical_columns": categorical_cols, "analysis_potential": _assess_analysis_potential(cols) } def _assess_analysis_potential(column_profiles: List[Dict[str, Any]]) -> Dict[str, Any]: """Assess what types of analysis are possible with this data.""" potential = { "can_rank": False, "can_compare_groups": False, "can_analyze_trends": False, "has_entities": False, "has_metrics": False, "suggested_grouping_cols": [], "suggested_metric_cols": [] } entity_cols = [] metric_cols = [] for col in column_profiles: purpose = col.get("purpose", "") # Identify grouping/entity columns if purpose in ["entity", "category", "geographic", "low_cardinality_category"]: entity_cols.append(col["name"]) if col.get("n_unique", 0) >= 2: # At least 2 groups needed potential["suggested_grouping_cols"].append(col["name"]) # Identify metric columns if purpose.endswith("_metric") or purpose in ["numeric_metric"]: metric_cols.append(col["name"]) if col.get("n_non_null", 0) > 0: # Has actual data potential["suggested_metric_cols"].append(col["name"]) # Assess capabilities potential["has_entities"] = len(entity_cols) > 0 potential["has_metrics"] = len(metric_cols) > 0 potential["can_rank"] = len(potential["suggested_grouping_cols"]) > 0 and len(potential["suggested_metric_cols"]) > 0 potential["can_compare_groups"] = potential["can_rank"] potential["can_analyze_trends"] = any(col.get("purpose") == "temporal" for col in column_profiles) return potential @dataclass class TableEntry: name: str path: str df: pd.DataFrame profile: Dict[str, Any] = field(default_factory=dict) def get_grouping_columns(self) -> List[str]: """Get columns suitable for grouping analysis.""" return self.profile.get("analysis_potential", {}).get("suggested_grouping_cols", []) def get_metric_columns(self) -> List[str]: """Get columns suitable as metrics.""" return self.profile.get("analysis_potential", {}).get("suggested_metric_cols", []) def can_support_ranking(self) -> bool: """Check if this table can support ranking analysis.""" return self.profile.get("analysis_potential", {}).get("can_rank", False) class DataRegistry: """Registry for managing multiple data tables with analysis capabilities.""" def __init__(self): self._tables: Dict[str, TableEntry] = {} def clear(self) -> None: """Clear all tables from the registry.""" self._tables.clear() def add_path(self, path: str) -> Optional[str]: """Add a data file to the registry.""" if not path or not os.path.exists(path): return None df = saferead(path) if df is None: return None # Generate unique name base = os.path.splitext(os.path.basename(path))[0] # Remove extension for cleaner names key = base i = 2 while key in self._tables: key = f"{base}_{i}" i += 1 # Profile the dataframe prof = profiledf(df) self._tables[key] = TableEntry(name=key, path=path, df=df, profile=prof) return key def add_dataframe(self, df: pd.DataFrame, name: str) -> str: """Add a DataFrame directly to the registry.""" # Ensure unique name key = name i = 2 while key in self._tables: key = f"{name}_{i}" i += 1 prof = profiledf(df) self._tables[key] = TableEntry(name=key, path="", df=df, profile=prof) return key def names(self) -> List[str]: """Get names of all tables.""" return list(self._tables.keys()) def get(self, name: str) -> Optional[pd.DataFrame]: """Get a DataFrame by name.""" return self._tables.get(name).df if name in self._tables else None def get_table(self, name: str) -> Optional[TableEntry]: """Get a TableEntry by name.""" return self._tables.get(name) def get_profile(self, name: str) -> Dict[str, Any]: """Get the profile of a table.""" return self._tables.get(name).profile if name in self._tables else {} def iter_tables(self) -> List[TableEntry]: """Iterate over all table entries.""" return list(self._tables.values()) def get_analysis_ready_tables(self) -> List[TableEntry]: """Get tables that are ready for analysis (have both grouping and metric columns).""" return [t for t in self._tables.values() if t.can_support_ranking()] def find_tables_with_column_purpose(self, purpose: str) -> List[Tuple[str, str]]: """Find tables and columns that match a specific purpose.""" matches = [] for table in self._tables.values(): for col in table.profile.get("columns", []): if col.get("purpose") == purpose: matches.append((table.name, col["name"])) return matches def get_all_numeric_columns(self) -> Dict[str, List[str]]: """Get all numeric columns across all tables.""" numeric_cols = {} for table in self._tables.values(): numeric_cols[table.name] = table.profile.get("numeric_columns", []) return numeric_cols def get_all_categorical_columns(self) -> Dict[str, List[str]]: """Get all categorical columns across all tables.""" categorical_cols = {} for table in self._tables.values(): categorical_cols[table.name] = table.profile.get("categorical_columns", []) return categorical_cols def summarize_for_prompt(self, col_cap: int = 600) -> str: """Generate a summary suitable for LLM prompts.""" if not self._tables: return "No data tables available." lines = [] for t in self.iter_tables(): # Basic info n_rows = t.profile.get('n_rows', 0) n_cols = t.profile.get('n_cols', 0) # Column info with purposes cols_with_purpose = [] for col in t.profile.get("columns", []): name = col["name"] purpose = col.get("purpose", "unknown") if purpose != "text": # Skip generic text columns for brevity cols_with_purpose.append(f"{name}({purpose})") else: cols_with_purpose.append(name) cols_str = ", ".join(cols_with_purpose) if len(cols_str) > col_cap: cols_str = cols_str[:col_cap] + "…" # Analysis potential potential = t.profile.get("analysis_potential", {}) capabilities = [] if potential.get("can_rank"): capabilities.append("can_rank") if potential.get("can_compare_groups"): capabilities.append("can_compare") if potential.get("can_analyze_trends"): capabilities.append("can_trend") cap_str = f" [{','.join(capabilities)}]" if capabilities else "" lines.append(f"- {t.name}: {n_rows} rows, {n_cols} cols{cap_str}") lines.append(f" Columns: {cols_str}") return "\n".join(lines) def get_analysis_suggestions(self) -> Dict[str, List[str]]: """Get suggestions for possible analyses based on available data.""" suggestions = { "rankings": [], "comparisons": [], "trends": [] } for table in self._tables.values(): grouping_cols = table.get_grouping_columns() metric_cols = table.get_metric_columns() # Ranking suggestions for group_col in grouping_cols[:2]: # Limit to avoid overwhelming for metric_col in metric_cols[:2]: suggestions["rankings"].append(f"Rank {group_col} by {metric_col} (table: {table.name})") # Comparison suggestions for group_col in grouping_cols[:2]: for metric_col in metric_cols[:2]: suggestions["comparisons"].append(f"Compare {metric_col} across {group_col} (table: {table.name})") return suggestions