Spaces:
Sleeping
Sleeping
| """Smart Query Engine - Answers ANY question about your data | |
| Automatically excludes ID columns and handles statistics properly""" | |
| import pandas as pd | |
| import re | |
| class QueryEngine: | |
| def __init__(self, df, schema): | |
| self.df = df | |
| self.schema = schema | |
| # Filter out ID columns from numeric columns | |
| self.numeric_columns = [col for col in self.schema['numeric'] if not self._is_id_column(col)] | |
| self.id_columns = [col for col in self.schema['numeric'] if self._is_id_column(col)] | |
| # Also check text columns that might be IDs | |
| for col in self.schema['text']: | |
| if self._is_id_column(col): | |
| self.id_columns.append(col) | |
| # Print warning about excluded ID columns | |
| if self.id_columns: | |
| print(f"β οΈ Excluded ID columns from calculations: {self.id_columns}") | |
| def _is_id_column(self, col_name): | |
| """Check if a column is likely an ID (should not be aggregated)""" | |
| col_lower = col_name.lower() | |
| # Pattern-based detection | |
| id_patterns = ['id', '_id', 'id_', 'key', '_key', 'pk', 'sk', 'uuid', 'guid', | |
| 'code', 'number', 'nbr', '_nbr', 'patient', 'encounter'] | |
| for pattern in id_patterns: | |
| if pattern == col_lower or col_lower.endswith(pattern) or col_lower.startswith(pattern): | |
| return True | |
| # Specific column names | |
| exact_id_names = ['id', 'uid', 'uuid', 'row_id', 'record_id', 'encounter_id', | |
| 'patient_id', 'customer_id', 'product_id', 'user_id', 'employee_id', | |
| 'patient_nbr', 'encounter_nbr', 'member_id'] | |
| if col_lower in exact_id_names: | |
| return True | |
| # Uniqueness-based detection (for columns with enough data) | |
| if len(self.df) > 10: | |
| try: | |
| uniqueness = self.df[col_name].nunique() / len(self.df[col_name]) | |
| # If >80% unique values, it's likely an ID | |
| if uniqueness > 0.8: | |
| return True | |
| except: | |
| pass | |
| return False | |
| def _get_meaningful_numeric_columns(self): | |
| """Return only meaningful numeric columns (exclude IDs)""" | |
| if self.numeric_columns: | |
| return self.numeric_columns | |
| return [] | |
| def answer_question(self, question): | |
| """Answer ANY question about the data""" | |
| question_lower = question.lower().strip() | |
| # ============ STEP 1: FULL SUMMARY FIRST! ============ | |
| if any(word in question_lower for word in ['summary statistics', 'summary', 'statistics', 'describe', 'overview', 'tell me about', 'what is in', 'dataset summary']): | |
| return self._format_full_summary() | |
| # ============ STEP 2: STATISTICS FOR SPECIFIC COLUMN ============ | |
| stat_patterns = [ | |
| r'(?:statistics|statistic|summary|stats?|describe)\s+(\w+)', | |
| r'(\w+)\s+(?:statistics|statistic|summary|stats?|describe)' | |
| ] | |
| for pattern in stat_patterns: | |
| match = re.search(pattern, question_lower) | |
| if match: | |
| col_candidate = match.group(1) | |
| for col in self.df.columns: | |
| if col.lower() == col_candidate or col_candidate in col.lower(): | |
| return self._handle_column_statistics(col) | |
| # ============ STEP 3: CHECK FOR ID COLUMN QUESTIONS ============ | |
| for id_col in self.id_columns: | |
| if id_col.lower() in question_lower: | |
| return self._handle_id_question(id_col) | |
| # ============ STEP 4: NUMERIC CALCULATIONS ============ | |
| if any(word in question_lower for word in ['total', 'sum', 'add up', 'combined']): | |
| result = self._handle_total_question(question_lower) | |
| if result: | |
| return result | |
| if any(word in question_lower for word in ['average', 'mean', 'avg']): | |
| result = self._handle_average_question(question_lower) | |
| if result: | |
| return result | |
| if any(word in question_lower for word in ['minimum', 'min', 'lowest', 'smallest', 'least']): | |
| result = self._handle_min_question(question_lower) | |
| if result: | |
| return result | |
| if any(word in question_lower for word in ['maximum', 'max', 'highest', 'largest', 'most', 'greatest']): | |
| result = self._handle_max_question(question_lower) | |
| if result: | |
| return result | |
| if any(word in question_lower for word in ['top', 'best']): | |
| result = self._handle_ranking_question(question_lower) | |
| if result: | |
| return result | |
| # ============ STEP 5: GROUP BY ============ | |
| if any(word in question_lower for word in ['by', 'per', 'for each', 'grouped by']): | |
| result = self._handle_group_question(question_lower) | |
| if result: | |
| return result | |
| # ============ STEP 6: COUNT ============ | |
| if any(word in question_lower for word in ['count', 'how many', 'number of']): | |
| result = self._handle_count_question(question_lower) | |
| if result: | |
| return result | |
| # ============ STEP 7: DATA PREVIEW ============ | |
| if any(word in question_lower for word in ['show', 'display', 'view', 'preview', 'see', 'list']): | |
| result = self._handle_show_question(question_lower) | |
| if result: | |
| return result | |
| # ============ STEP 8: SMART RESPONSE ============ | |
| return self._smart_response(question_lower) | |
| def _handle_column_statistics(self, col_name): | |
| """Provide detailed statistics for a specific column""" | |
| # Check if it's an ID column | |
| if col_name in self.id_columns: | |
| return f"""β οΈ **'{col_name}' is an ID column** | |
| Statistics for ID columns are not meaningful because: | |
| β’ IDs are unique identifiers, not measurements | |
| β’ Each ID appears only once typically | |
| **What you CAN do:** | |
| β’ Count how many IDs: "{col_name} count" | |
| β’ View the data: "Show {col_name}" | |
| β’ Analyze other columns: {', '.join(self._get_meaningful_numeric_columns()[:3]) if self._get_meaningful_numeric_columns() else 'None found'}""" | |
| # Check if it's a meaningful numeric column | |
| elif col_name in self._get_meaningful_numeric_columns(): | |
| stats = self.df[col_name].describe() | |
| output = f"π **Statistics for {col_name}**\n\n" | |
| output += f"β’ **Count**: {stats['count']:,.0f}\n" | |
| output += f"β’ **Mean**: {stats['mean']:,.2f}\n" | |
| output += f"β’ **Standard Deviation**: {stats['std']:,.2f}\n" | |
| output += f"β’ **Minimum**: {stats['min']:,.2f}\n" | |
| output += f"β’ **25th Percentile**: {stats['25%']:,.2f}\n" | |
| output += f"β’ **Median (50th)**: {stats['50%']:,.2f}\n" | |
| output += f"β’ **75th Percentile**: {stats['75%']:,.2f}\n" | |
| output += f"β’ **Maximum**: {stats['max']:,.2f}\n" | |
| output += f"β’ **Total**: {self.df[col_name].sum():,.2f}" | |
| return output | |
| # Check if it's a categorical/text column | |
| elif col_name in self.df.columns: | |
| output = f"π **Statistics for {col_name}**\n\n" | |
| output += f"β’ **Unique values**: {self.df[col_name].nunique():,}\n" | |
| output += f"β’ **Most common**: {self.df[col_name].mode()[0] if len(self.df[col_name].mode()) > 0 else 'N/A'}\n" | |
| output += f"β’ **Missing values**: {self.df[col_name].isnull().sum():,}\n" | |
| output += "\n**Top 5 values:**\n" | |
| for val, count in self.df[col_name].value_counts().head(5).items(): | |
| output += f" β’ {val}: {count} ({count/len(self.df)*100:.1f}%)\n" | |
| return output | |
| return f"β Column '{col_name}' not found. Available columns: {', '.join(self.df.columns[:10])}..." | |
| def _handle_id_question(self, id_col): | |
| """Handle questions about ID columns""" | |
| unique_count = self.df[id_col].nunique() | |
| return f"""β οΈ **'{id_col}' is an ID column** (unique identifier) | |
| Averages, sums, or other mathematical calculations on ID values are **not meaningful** because: | |
| β’ IDs are just labels, not measurements | |
| β’ Each ID is typically unique | |
| **What you can do instead:** | |
| β’ Count how many unique IDs: {unique_count} unique values | |
| β’ Group data by other columns: "Show [category] by [metric]" | |
| β’ Analyze meaningful numeric columns: {', '.join(self._get_meaningful_numeric_columns()[:3]) if self._get_meaningful_numeric_columns() else 'None found'}""" | |
| def _handle_total_question(self, question): | |
| """Handle total/sum questions""" | |
| for col in self._get_meaningful_numeric_columns(): | |
| if col.lower() in question: | |
| total = self.df[col].sum() | |
| return f"π° **Total {col}**: {total:,.2f}" | |
| if self._get_meaningful_numeric_columns(): | |
| col = self._get_meaningful_numeric_columns()[0] | |
| total = self.df[col].sum() | |
| return f"π° **Total {col}**: {total:,.2f}" | |
| return None | |
| def _handle_average_question(self, question): | |
| """Handle average/mean questions""" | |
| for col in self._get_meaningful_numeric_columns(): | |
| if col.lower() in question: | |
| avg = self.df[col].mean() | |
| return f"π **Average {col}**: {avg:,.2f}" | |
| if self._get_meaningful_numeric_columns(): | |
| col = self._get_meaningful_numeric_columns()[0] | |
| avg = self.df[col].mean() | |
| return f"π **Average {col}**: {avg:,.2f}" | |
| return None | |
| def _handle_min_question(self, question): | |
| """Handle minimum questions""" | |
| for col in self._get_meaningful_numeric_columns(): | |
| if col.lower() in question: | |
| min_val = self.df[col].min() | |
| return f"π **Minimum {col}**: {min_val:,.2f}" | |
| return None | |
| def _handle_max_question(self, question): | |
| """Handle maximum questions""" | |
| for col in self._get_meaningful_numeric_columns(): | |
| if col.lower() in question: | |
| max_val = self.df[col].max() | |
| return f"π **Maximum {col}**: {max_val:,.2f}" | |
| return None | |
| def _handle_ranking_question(self, question): | |
| """Handle top/best questions""" | |
| n_match = re.search(r'top\s+(\d+)', question) | |
| n = int(n_match.group(1)) if n_match else 5 | |
| metric = None | |
| for col in self._get_meaningful_numeric_columns(): | |
| if col.lower() in question: | |
| metric = col | |
| break | |
| if not metric and self._get_meaningful_numeric_columns(): | |
| metric = self._get_meaningful_numeric_columns()[0] | |
| category = None | |
| for col in self.schema['categorical']: | |
| if col.lower() in question: | |
| category = col | |
| break | |
| if not category and self.schema['categorical']: | |
| category = self.schema['categorical'][0] | |
| if metric and category: | |
| result = self.df.groupby(category)[metric].sum().sort_values(ascending=False).head(n) | |
| output = f"π **Top {n} {category} by {metric}**\n\n" | |
| for idx, (item, val) in enumerate(result.items(), 1): | |
| output += f"{idx}. **{item}**: {val:,.2f}\n" | |
| return output | |
| return None | |
| def _handle_group_question(self, question): | |
| """Handle group by questions""" | |
| metric = None | |
| category = None | |
| for col in self._get_meaningful_numeric_columns(): | |
| if col.lower() in question: | |
| metric = col | |
| break | |
| for col in self.schema['categorical']: | |
| if col.lower() in question: | |
| category = col | |
| break | |
| if metric and category: | |
| result = self.df.groupby(category)[metric].sum().sort_values(ascending=False) | |
| output = f"π **{metric} by {category}**\n\n" | |
| for idx, (item, val) in enumerate(result.items(), 1): | |
| output += f"{idx}. **{item}**: {val:,.2f}\n" | |
| output += f"\n**Total**: {result.sum():,.2f}" | |
| return output | |
| return None | |
| def _handle_count_question(self, question): | |
| """Handle count questions""" | |
| for col in self.df.columns: | |
| if col.lower() in question: | |
| unique_count = self.df[col].nunique() | |
| return f"π **{col}**: {unique_count} unique values" | |
| if 'rows' in question or 'records' in question: | |
| return f"π **Total records**: {len(self.df):,} rows" | |
| return None | |
| def _handle_show_question(self, question): | |
| """Handle show/display questions""" | |
| n_match = re.search(r'(\d+)', question) | |
| n = int(n_match.group(1)) if n_match else 5 | |
| output = f"**π Data Preview (First {n} rows)**\n\n```\n" | |
| output += self.df.head(n).to_string() | |
| output += "\n```" | |
| return output | |
| def _format_full_summary(self): | |
| """Provide complete dataset summary""" | |
| meaningful_numeric = self._get_meaningful_numeric_columns() | |
| output = "π **Complete Data Summary**\n\n" | |
| output += f"**Dataset Size**: {len(self.df):,} rows Γ {len(self.df.columns)} columns\n\n" | |
| output += "**Column Types:**\n" | |
| output += f"β’ Meaningful numeric columns: {len(meaningful_numeric)}\n" | |
| output += f"β’ ID columns (excluded): {len(self.id_columns)}\n" | |
| output += f"β’ Categorical columns: {len(self.schema['categorical'])}\n" | |
| if meaningful_numeric: | |
| output += "\n**Key Numeric Statistics:**\n" | |
| for col in meaningful_numeric[:5]: | |
| output += f"β’ {col}: Mean={self.df[col].mean():.2f}, Total={self.df[col].sum():,.0f}\n" | |
| if self.schema['categorical']: | |
| output += "\n**Categorical Columns:**\n" | |
| for col in self.schema['categorical'][:3]: | |
| output += f"β’ {col}: {self.df[col].nunique()} unique values\n" | |
| return output | |
| def _smart_response(self, question): | |
| """Generate intelligent response for unrecognized questions""" | |
| meaningful_numeric = self._get_meaningful_numeric_columns() | |
| output = "π‘ **I understand you're asking about your data.**\n\n" | |
| output += "π **Here's what's available:**\n" | |
| output += f"β’ {len(self.df):,} rows, {len(self.df.columns)} columns\n" | |
| if meaningful_numeric: | |
| output += f"β’ Numeric columns to analyze: {', '.join(meaningful_numeric[:5])}\n" | |
| if self.schema['categorical']: | |
| output += f"β’ Categories to group by: {', '.join(self.schema['categorical'][:3])}\n" | |
| output += "\nπ **Try these example questions:**\n\n" | |
| if meaningful_numeric: | |
| example = meaningful_numeric[0] | |
| output += f"β’ 'Statistics {example}'\n" | |
| output += f"β’ 'Total {example}'\n" | |
| output += f"β’ 'Average {example}'\n" | |
| if self.schema['categorical'] and meaningful_numeric: | |
| output += f"β’ 'Top 5 {self.schema['categorical'][0]} by {meaningful_numeric[0]}'\n" | |
| output += "β’ 'Summary statistics'\n" | |
| output += "β’ 'Show me the data'" | |
| return output |