"""Smart Query Engine - Answers ANY question about your data Automatically excludes ID columns and handles statistics properly""" import pandas as pd import re class QueryEngine: def __init__(self, df, schema): self.df = df self.schema = schema # Filter out ID columns from numeric columns self.numeric_columns = [col for col in self.schema['numeric'] if not self._is_id_column(col)] self.id_columns = [col for col in self.schema['numeric'] if self._is_id_column(col)] # Also check text columns that might be IDs for col in self.schema['text']: if self._is_id_column(col): self.id_columns.append(col) # Print warning about excluded ID columns if self.id_columns: print(f"⚠️ Excluded ID columns from calculations: {self.id_columns}") def _is_id_column(self, col_name): """Check if a column is likely an ID (should not be aggregated)""" col_lower = col_name.lower() # Pattern-based detection id_patterns = ['id', '_id', 'id_', 'key', '_key', 'pk', 'sk', 'uuid', 'guid', 'code', 'number', 'nbr', '_nbr', 'patient', 'encounter'] for pattern in id_patterns: if pattern == col_lower or col_lower.endswith(pattern) or col_lower.startswith(pattern): return True # Specific column names exact_id_names = ['id', 'uid', 'uuid', 'row_id', 'record_id', 'encounter_id', 'patient_id', 'customer_id', 'product_id', 'user_id', 'employee_id', 'patient_nbr', 'encounter_nbr', 'member_id'] if col_lower in exact_id_names: return True # Uniqueness-based detection (for columns with enough data) if len(self.df) > 10: try: uniqueness = self.df[col_name].nunique() / len(self.df[col_name]) # If >80% unique values, it's likely an ID if uniqueness > 0.8: return True except: pass return False def _get_meaningful_numeric_columns(self): """Return only meaningful numeric columns (exclude IDs)""" if self.numeric_columns: return self.numeric_columns return [] def answer_question(self, question): """Answer ANY question about the data""" question_lower = question.lower().strip() # ============ STEP 1: FULL SUMMARY FIRST! ============ if any(word in question_lower for word in ['summary statistics', 'summary', 'statistics', 'describe', 'overview', 'tell me about', 'what is in', 'dataset summary']): return self._format_full_summary() # ============ STEP 2: STATISTICS FOR SPECIFIC COLUMN ============ stat_patterns = [ r'(?:statistics|statistic|summary|stats?|describe)\s+(\w+)', r'(\w+)\s+(?:statistics|statistic|summary|stats?|describe)' ] for pattern in stat_patterns: match = re.search(pattern, question_lower) if match: col_candidate = match.group(1) for col in self.df.columns: if col.lower() == col_candidate or col_candidate in col.lower(): return self._handle_column_statistics(col) # ============ STEP 3: CHECK FOR ID COLUMN QUESTIONS ============ for id_col in self.id_columns: if id_col.lower() in question_lower: return self._handle_id_question(id_col) # ============ STEP 4: NUMERIC CALCULATIONS ============ if any(word in question_lower for word in ['total', 'sum', 'add up', 'combined']): result = self._handle_total_question(question_lower) if result: return result if any(word in question_lower for word in ['average', 'mean', 'avg']): result = self._handle_average_question(question_lower) if result: return result if any(word in question_lower for word in ['minimum', 'min', 'lowest', 'smallest', 'least']): result = self._handle_min_question(question_lower) if result: return result if any(word in question_lower for word in ['maximum', 'max', 'highest', 'largest', 'most', 'greatest']): result = self._handle_max_question(question_lower) if result: return result if any(word in question_lower for word in ['top', 'best']): result = self._handle_ranking_question(question_lower) if result: return result # ============ STEP 5: GROUP BY ============ if any(word in question_lower for word in ['by', 'per', 'for each', 'grouped by']): result = self._handle_group_question(question_lower) if result: return result # ============ STEP 6: COUNT ============ if any(word in question_lower for word in ['count', 'how many', 'number of']): result = self._handle_count_question(question_lower) if result: return result # ============ STEP 7: DATA PREVIEW ============ if any(word in question_lower for word in ['show', 'display', 'view', 'preview', 'see', 'list']): result = self._handle_show_question(question_lower) if result: return result # ============ STEP 8: SMART RESPONSE ============ return self._smart_response(question_lower) def _handle_column_statistics(self, col_name): """Provide detailed statistics for a specific column""" # Check if it's an ID column if col_name in self.id_columns: return f"""⚠️ **'{col_name}' is an ID column** Statistics for ID columns are not meaningful because: • IDs are unique identifiers, not measurements • Each ID appears only once typically **What you CAN do:** • Count how many IDs: "{col_name} count" • View the data: "Show {col_name}" • Analyze other columns: {', '.join(self._get_meaningful_numeric_columns()[:3]) if self._get_meaningful_numeric_columns() else 'None found'}""" # Check if it's a meaningful numeric column elif col_name in self._get_meaningful_numeric_columns(): stats = self.df[col_name].describe() output = f"📊 **Statistics for {col_name}**\n\n" output += f"• **Count**: {stats['count']:,.0f}\n" output += f"• **Mean**: {stats['mean']:,.2f}\n" output += f"• **Standard Deviation**: {stats['std']:,.2f}\n" output += f"• **Minimum**: {stats['min']:,.2f}\n" output += f"• **25th Percentile**: {stats['25%']:,.2f}\n" output += f"• **Median (50th)**: {stats['50%']:,.2f}\n" output += f"• **75th Percentile**: {stats['75%']:,.2f}\n" output += f"• **Maximum**: {stats['max']:,.2f}\n" output += f"• **Total**: {self.df[col_name].sum():,.2f}" return output # Check if it's a categorical/text column elif col_name in self.df.columns: output = f"📊 **Statistics for {col_name}**\n\n" output += f"• **Unique values**: {self.df[col_name].nunique():,}\n" output += f"• **Most common**: {self.df[col_name].mode()[0] if len(self.df[col_name].mode()) > 0 else 'N/A'}\n" output += f"• **Missing values**: {self.df[col_name].isnull().sum():,}\n" output += "\n**Top 5 values:**\n" for val, count in self.df[col_name].value_counts().head(5).items(): output += f" • {val}: {count} ({count/len(self.df)*100:.1f}%)\n" return output return f"❌ Column '{col_name}' not found. Available columns: {', '.join(self.df.columns[:10])}..." def _handle_id_question(self, id_col): """Handle questions about ID columns""" unique_count = self.df[id_col].nunique() return f"""⚠️ **'{id_col}' is an ID column** (unique identifier) Averages, sums, or other mathematical calculations on ID values are **not meaningful** because: • IDs are just labels, not measurements • Each ID is typically unique **What you can do instead:** • Count how many unique IDs: {unique_count} unique values • Group data by other columns: "Show [category] by [metric]" • Analyze meaningful numeric columns: {', '.join(self._get_meaningful_numeric_columns()[:3]) if self._get_meaningful_numeric_columns() else 'None found'}""" def _handle_total_question(self, question): """Handle total/sum questions""" for col in self._get_meaningful_numeric_columns(): if col.lower() in question: total = self.df[col].sum() return f"💰 **Total {col}**: {total:,.2f}" if self._get_meaningful_numeric_columns(): col = self._get_meaningful_numeric_columns()[0] total = self.df[col].sum() return f"💰 **Total {col}**: {total:,.2f}" return None def _handle_average_question(self, question): """Handle average/mean questions""" for col in self._get_meaningful_numeric_columns(): if col.lower() in question: avg = self.df[col].mean() return f"📊 **Average {col}**: {avg:,.2f}" if self._get_meaningful_numeric_columns(): col = self._get_meaningful_numeric_columns()[0] avg = self.df[col].mean() return f"📊 **Average {col}**: {avg:,.2f}" return None def _handle_min_question(self, question): """Handle minimum questions""" for col in self._get_meaningful_numeric_columns(): if col.lower() in question: min_val = self.df[col].min() return f"📉 **Minimum {col}**: {min_val:,.2f}" return None def _handle_max_question(self, question): """Handle maximum questions""" for col in self._get_meaningful_numeric_columns(): if col.lower() in question: max_val = self.df[col].max() return f"🏆 **Maximum {col}**: {max_val:,.2f}" return None def _handle_ranking_question(self, question): """Handle top/best questions""" n_match = re.search(r'top\s+(\d+)', question) n = int(n_match.group(1)) if n_match else 5 metric = None for col in self._get_meaningful_numeric_columns(): if col.lower() in question: metric = col break if not metric and self._get_meaningful_numeric_columns(): metric = self._get_meaningful_numeric_columns()[0] category = None for col in self.schema['categorical']: if col.lower() in question: category = col break if not category and self.schema['categorical']: category = self.schema['categorical'][0] if metric and category: result = self.df.groupby(category)[metric].sum().sort_values(ascending=False).head(n) output = f"🏆 **Top {n} {category} by {metric}**\n\n" for idx, (item, val) in enumerate(result.items(), 1): output += f"{idx}. **{item}**: {val:,.2f}\n" return output return None def _handle_group_question(self, question): """Handle group by questions""" metric = None category = None for col in self._get_meaningful_numeric_columns(): if col.lower() in question: metric = col break for col in self.schema['categorical']: if col.lower() in question: category = col break if metric and category: result = self.df.groupby(category)[metric].sum().sort_values(ascending=False) output = f"📊 **{metric} by {category}**\n\n" for idx, (item, val) in enumerate(result.items(), 1): output += f"{idx}. **{item}**: {val:,.2f}\n" output += f"\n**Total**: {result.sum():,.2f}" return output return None def _handle_count_question(self, question): """Handle count questions""" for col in self.df.columns: if col.lower() in question: unique_count = self.df[col].nunique() return f"📊 **{col}**: {unique_count} unique values" if 'rows' in question or 'records' in question: return f"📊 **Total records**: {len(self.df):,} rows" return None def _handle_show_question(self, question): """Handle show/display questions""" n_match = re.search(r'(\d+)', question) n = int(n_match.group(1)) if n_match else 5 output = f"**📊 Data Preview (First {n} rows)**\n\n```\n" output += self.df.head(n).to_string() output += "\n```" return output def _format_full_summary(self): """Provide complete dataset summary""" meaningful_numeric = self._get_meaningful_numeric_columns() output = "📊 **Complete Data Summary**\n\n" output += f"**Dataset Size**: {len(self.df):,} rows × {len(self.df.columns)} columns\n\n" output += "**Column Types:**\n" output += f"• Meaningful numeric columns: {len(meaningful_numeric)}\n" output += f"• ID columns (excluded): {len(self.id_columns)}\n" output += f"• Categorical columns: {len(self.schema['categorical'])}\n" if meaningful_numeric: output += "\n**Key Numeric Statistics:**\n" for col in meaningful_numeric[:5]: output += f"• {col}: Mean={self.df[col].mean():.2f}, Total={self.df[col].sum():,.0f}\n" if self.schema['categorical']: output += "\n**Categorical Columns:**\n" for col in self.schema['categorical'][:3]: output += f"• {col}: {self.df[col].nunique()} unique values\n" return output def _smart_response(self, question): """Generate intelligent response for unrecognized questions""" meaningful_numeric = self._get_meaningful_numeric_columns() output = "💡 **I understand you're asking about your data.**\n\n" output += "📊 **Here's what's available:**\n" output += f"• {len(self.df):,} rows, {len(self.df.columns)} columns\n" if meaningful_numeric: output += f"• Numeric columns to analyze: {', '.join(meaningful_numeric[:5])}\n" if self.schema['categorical']: output += f"• Categories to group by: {', '.join(self.schema['categorical'][:3])}\n" output += "\n📝 **Try these example questions:**\n\n" if meaningful_numeric: example = meaningful_numeric[0] output += f"• 'Statistics {example}'\n" output += f"• 'Total {example}'\n" output += f"• 'Average {example}'\n" if self.schema['categorical'] and meaningful_numeric: output += f"• 'Top 5 {self.schema['categorical'][0]} by {meaningful_numeric[0]}'\n" output += "• 'Summary statistics'\n" output += "• 'Show me the data'" return output