Spaces:
Sleeping
Sleeping
| from __future__ import annotations | |
| import os | |
| import re | |
| from dataclasses import dataclass, field | |
| from typing import Dict, Any, List, Optional, Set, Tuple | |
| import pandas as pd | |
| def saferead(path: str) -> Optional[pd.DataFrame]: | |
| """Safely read various file formats into DataFrames.""" | |
| name = (path or "").lower() | |
| try: | |
| if name.endswith(".csv"): | |
| return pd.read_csv(path, low_memory=False) | |
| if name.endswith(".xlsx") or name.endswith(".xls"): | |
| return pd.read_excel(path) | |
| if name.endswith(".tsv"): | |
| return pd.read_csv(path, sep='\t', low_memory=False) | |
| if name.endswith(".json"): | |
| return pd.read_json(path) | |
| if name.endswith(".parquet"): | |
| return pd.read_parquet(path) | |
| except Exception as e: | |
| print(f"Warning: Could not read {path}: {e}") | |
| return None | |
| return None | |
| def dtypeof_series(s: pd.Series) -> str: | |
| """Determine the semantic type of a pandas Series.""" | |
| if pd.api.types.is_integer_dtype(s): | |
| return "int" | |
| if pd.api.types.is_float_dtype(s): | |
| return "float" | |
| if pd.api.types.is_bool_dtype(s): | |
| return "bool" | |
| if pd.api.types.is_datetime64_any_dtype(s): | |
| return "datetime" | |
| # Check if string column could be numeric | |
| if s.dtype == 'object': | |
| sample = s.dropna().head(100) | |
| if len(sample) > 0: | |
| try: | |
| numeric_sample = pd.to_numeric(sample, errors='coerce') | |
| if numeric_sample.notna().sum() > len(sample) * 0.7: | |
| return "numeric_as_string" | |
| except: | |
| pass | |
| return "string" | |
| def detect_column_purpose(col_name: str, series: pd.Series) -> str: | |
| """Detect the likely purpose/semantic meaning of a column.""" | |
| col_lower = col_name.lower() | |
| # ID/Key patterns | |
| if re.search(r'\bid\b|identifier|key|code', col_lower): | |
| return "identifier" | |
| # Time/Date patterns | |
| if re.search(r'\btime\b|date|duration|wait|delay|length', col_lower): | |
| if dtypeof_series(series) in ['int', 'float', 'numeric_as_string']: | |
| return "time_metric" | |
| else: | |
| return "temporal" | |
| # Financial patterns | |
| if re.search(r'\bcost\b|price|budget|fee|expense|revenue|income', col_lower): | |
| return "financial_metric" | |
| # Location/Geographic patterns | |
| if re.search(r'\bzone\b|region|area|district|location|address|city|state', col_lower): | |
| return "geographic" | |
| # Entity/Organization patterns | |
| if re.search(r'\bfacility\b|hospital|clinic|organization|company|department', col_lower): | |
| return "entity" | |
| # Category/Classification patterns | |
| if re.search(r'\btype\b|category|specialty|service|class|group', col_lower): | |
| return "category" | |
| # Performance/Quality patterns | |
| if re.search(r'\bscore\b|rating|quality|performance|satisfaction|outcome', col_lower): | |
| return "performance_metric" | |
| # Count/Volume patterns | |
| if re.search(r'\bcount\b|number|quantity|volume|total|sum', col_lower): | |
| return "count_metric" | |
| # Rate/Percentage patterns | |
| if re.search(r'\brate\b|ratio|percent|frequency|proportion', col_lower): | |
| return "rate_metric" | |
| # Capacity patterns | |
| if re.search(r'\bcapacity\b|beds|seats|slots|availability|utilization', col_lower): | |
| return "capacity_metric" | |
| # Generic categorization based on data characteristics | |
| unique_ratio = series.nunique() / len(series) if len(series) > 0 else 0 | |
| if dtypeof_series(series) in ['int', 'float', 'numeric_as_string']: | |
| return "numeric_metric" | |
| elif unique_ratio < 0.1: | |
| return "low_cardinality_category" | |
| elif unique_ratio < 0.5: | |
| return "category" | |
| else: | |
| return "text" | |
| def profiledf(df: pd.DataFrame, max_examples: int = 3) -> Dict[str, Any]: | |
| """Generate a comprehensive profile of a DataFrame.""" | |
| cols = [] | |
| numeric_cols = [] | |
| categorical_cols = [] | |
| for c in df.columns: | |
| s = df[c] | |
| dtype = dtypeof_series(s) | |
| purpose = detect_column_purpose(str(c), s) | |
| ex_vals = s.dropna().astype(str).head(max_examples).tolist() if len(s) else [] | |
| col_profile = { | |
| "name": str(c), | |
| "dtype": dtype, | |
| "purpose": purpose, | |
| "n_non_null": int(s.notna().sum()), | |
| "n_unique": int(s.nunique(dropna=True)), | |
| "examples": ex_vals, | |
| "missing_ratio": round(s.isna().sum() / len(s), 3) if len(s) > 0 else 0 | |
| } | |
| # Add statistics for numeric columns | |
| if dtype in ['int', 'float', 'numeric_as_string']: | |
| try: | |
| if dtype == 'numeric_as_string': | |
| numeric_series = pd.to_numeric(s, errors='coerce') | |
| else: | |
| numeric_series = s | |
| col_profile.update({ | |
| "min": float(numeric_series.min()) if not numeric_series.isna().all() else None, | |
| "max": float(numeric_series.max()) if not numeric_series.isna().all() else None, | |
| "mean": float(numeric_series.mean()) if not numeric_series.isna().all() else None, | |
| "std": float(numeric_series.std()) if not numeric_series.isna().all() else None | |
| }) | |
| numeric_cols.append(str(c)) | |
| except: | |
| pass | |
| # Track categorical columns | |
| if purpose in ['category', 'entity', 'geographic', 'low_cardinality_category']: | |
| categorical_cols.append(str(c)) | |
| cols.append(col_profile) | |
| return { | |
| "n_rows": int(len(df)), | |
| "n_cols": int(df.shape[1]), | |
| "columns": cols, | |
| "numeric_columns": numeric_cols, | |
| "categorical_columns": categorical_cols, | |
| "analysis_potential": _assess_analysis_potential(cols) | |
| } | |
| def _assess_analysis_potential(column_profiles: List[Dict[str, Any]]) -> Dict[str, Any]: | |
| """Assess what types of analysis are possible with this data.""" | |
| potential = { | |
| "can_rank": False, | |
| "can_compare_groups": False, | |
| "can_analyze_trends": False, | |
| "has_entities": False, | |
| "has_metrics": False, | |
| "suggested_grouping_cols": [], | |
| "suggested_metric_cols": [] | |
| } | |
| entity_cols = [] | |
| metric_cols = [] | |
| for col in column_profiles: | |
| purpose = col.get("purpose", "") | |
| # Identify grouping/entity columns | |
| if purpose in ["entity", "category", "geographic", "low_cardinality_category"]: | |
| entity_cols.append(col["name"]) | |
| if col.get("n_unique", 0) >= 2: # At least 2 groups needed | |
| potential["suggested_grouping_cols"].append(col["name"]) | |
| # Identify metric columns | |
| if purpose.endswith("_metric") or purpose in ["numeric_metric"]: | |
| metric_cols.append(col["name"]) | |
| if col.get("n_non_null", 0) > 0: # Has actual data | |
| potential["suggested_metric_cols"].append(col["name"]) | |
| # Assess capabilities | |
| potential["has_entities"] = len(entity_cols) > 0 | |
| potential["has_metrics"] = len(metric_cols) > 0 | |
| potential["can_rank"] = len(potential["suggested_grouping_cols"]) > 0 and len(potential["suggested_metric_cols"]) > 0 | |
| potential["can_compare_groups"] = potential["can_rank"] | |
| potential["can_analyze_trends"] = any(col.get("purpose") == "temporal" for col in column_profiles) | |
| return potential | |
| class TableEntry: | |
| name: str | |
| path: str | |
| df: pd.DataFrame | |
| profile: Dict[str, Any] = field(default_factory=dict) | |
| def get_grouping_columns(self) -> List[str]: | |
| """Get columns suitable for grouping analysis.""" | |
| return self.profile.get("analysis_potential", {}).get("suggested_grouping_cols", []) | |
| def get_metric_columns(self) -> List[str]: | |
| """Get columns suitable as metrics.""" | |
| return self.profile.get("analysis_potential", {}).get("suggested_metric_cols", []) | |
| def can_support_ranking(self) -> bool: | |
| """Check if this table can support ranking analysis.""" | |
| return self.profile.get("analysis_potential", {}).get("can_rank", False) | |
| class DataRegistry: | |
| """Registry for managing multiple data tables with analysis capabilities.""" | |
| def __init__(self): | |
| self._tables: Dict[str, TableEntry] = {} | |
| def clear(self) -> None: | |
| """Clear all tables from the registry.""" | |
| self._tables.clear() | |
| def add_path(self, path: str) -> Optional[str]: | |
| """Add a data file to the registry.""" | |
| if not path or not os.path.exists(path): | |
| return None | |
| df = saferead(path) | |
| if df is None: | |
| return None | |
| # Generate unique name | |
| base = os.path.splitext(os.path.basename(path))[0] # Remove extension for cleaner names | |
| key = base | |
| i = 2 | |
| while key in self._tables: | |
| key = f"{base}_{i}" | |
| i += 1 | |
| # Profile the dataframe | |
| prof = profiledf(df) | |
| self._tables[key] = TableEntry(name=key, path=path, df=df, profile=prof) | |
| return key | |
| def add_dataframe(self, df: pd.DataFrame, name: str) -> str: | |
| """Add a DataFrame directly to the registry.""" | |
| # Ensure unique name | |
| key = name | |
| i = 2 | |
| while key in self._tables: | |
| key = f"{name}_{i}" | |
| i += 1 | |
| prof = profiledf(df) | |
| self._tables[key] = TableEntry(name=key, path="", df=df, profile=prof) | |
| return key | |
| def names(self) -> List[str]: | |
| """Get names of all tables.""" | |
| return list(self._tables.keys()) | |
| def get(self, name: str) -> Optional[pd.DataFrame]: | |
| """Get a DataFrame by name.""" | |
| return self._tables.get(name).df if name in self._tables else None | |
| def get_table(self, name: str) -> Optional[TableEntry]: | |
| """Get a TableEntry by name.""" | |
| return self._tables.get(name) | |
| def get_profile(self, name: str) -> Dict[str, Any]: | |
| """Get the profile of a table.""" | |
| return self._tables.get(name).profile if name in self._tables else {} | |
| def iter_tables(self) -> List[TableEntry]: | |
| """Iterate over all table entries.""" | |
| return list(self._tables.values()) | |
| def get_analysis_ready_tables(self) -> List[TableEntry]: | |
| """Get tables that are ready for analysis (have both grouping and metric columns).""" | |
| return [t for t in self._tables.values() if t.can_support_ranking()] | |
| def find_tables_with_column_purpose(self, purpose: str) -> List[Tuple[str, str]]: | |
| """Find tables and columns that match a specific purpose.""" | |
| matches = [] | |
| for table in self._tables.values(): | |
| for col in table.profile.get("columns", []): | |
| if col.get("purpose") == purpose: | |
| matches.append((table.name, col["name"])) | |
| return matches | |
| def get_all_numeric_columns(self) -> Dict[str, List[str]]: | |
| """Get all numeric columns across all tables.""" | |
| numeric_cols = {} | |
| for table in self._tables.values(): | |
| numeric_cols[table.name] = table.profile.get("numeric_columns", []) | |
| return numeric_cols | |
| def get_all_categorical_columns(self) -> Dict[str, List[str]]: | |
| """Get all categorical columns across all tables.""" | |
| categorical_cols = {} | |
| for table in self._tables.values(): | |
| categorical_cols[table.name] = table.profile.get("categorical_columns", []) | |
| return categorical_cols | |
| def summarize_for_prompt(self, col_cap: int = 600) -> str: | |
| """Generate a summary suitable for LLM prompts.""" | |
| if not self._tables: | |
| return "No data tables available." | |
| lines = [] | |
| for t in self.iter_tables(): | |
| # Basic info | |
| n_rows = t.profile.get('n_rows', 0) | |
| n_cols = t.profile.get('n_cols', 0) | |
| # Column info with purposes | |
| cols_with_purpose = [] | |
| for col in t.profile.get("columns", []): | |
| name = col["name"] | |
| purpose = col.get("purpose", "unknown") | |
| if purpose != "text": # Skip generic text columns for brevity | |
| cols_with_purpose.append(f"{name}({purpose})") | |
| else: | |
| cols_with_purpose.append(name) | |
| cols_str = ", ".join(cols_with_purpose) | |
| if len(cols_str) > col_cap: | |
| cols_str = cols_str[:col_cap] + "…" | |
| # Analysis potential | |
| potential = t.profile.get("analysis_potential", {}) | |
| capabilities = [] | |
| if potential.get("can_rank"): | |
| capabilities.append("can_rank") | |
| if potential.get("can_compare_groups"): | |
| capabilities.append("can_compare") | |
| if potential.get("can_analyze_trends"): | |
| capabilities.append("can_trend") | |
| cap_str = f" [{','.join(capabilities)}]" if capabilities else "" | |
| lines.append(f"- {t.name}: {n_rows} rows, {n_cols} cols{cap_str}") | |
| lines.append(f" Columns: {cols_str}") | |
| return "\n".join(lines) | |
| def get_analysis_suggestions(self) -> Dict[str, List[str]]: | |
| """Get suggestions for possible analyses based on available data.""" | |
| suggestions = { | |
| "rankings": [], | |
| "comparisons": [], | |
| "trends": [] | |
| } | |
| for table in self._tables.values(): | |
| grouping_cols = table.get_grouping_columns() | |
| metric_cols = table.get_metric_columns() | |
| # Ranking suggestions | |
| for group_col in grouping_cols[:2]: # Limit to avoid overwhelming | |
| for metric_col in metric_cols[:2]: | |
| suggestions["rankings"].append(f"Rank {group_col} by {metric_col} (table: {table.name})") | |
| # Comparison suggestions | |
| for group_col in grouping_cols[:2]: | |
| for metric_col in metric_cols[:2]: | |
| suggestions["comparisons"].append(f"Compare {metric_col} across {group_col} (table: {table.name})") | |
| return suggestions | |