smart-analytics-copilot / app /query_engine.py
SamadhiDBS's picture
Update app/query_engine.py
c93951d verified
"""Smart Query Engine - Answers ANY question about your data
Automatically excludes ID columns and handles statistics properly"""
import pandas as pd
import re
class QueryEngine:
def __init__(self, df, schema):
self.df = df
self.schema = schema
# Filter out ID columns from numeric columns
self.numeric_columns = [col for col in self.schema['numeric'] if not self._is_id_column(col)]
self.id_columns = [col for col in self.schema['numeric'] if self._is_id_column(col)]
# Also check text columns that might be IDs
for col in self.schema['text']:
if self._is_id_column(col):
self.id_columns.append(col)
# Print warning about excluded ID columns
if self.id_columns:
print(f"⚠️ Excluded ID columns from calculations: {self.id_columns}")
def _is_id_column(self, col_name):
"""Check if a column is likely an ID (should not be aggregated)"""
col_lower = col_name.lower()
# Pattern-based detection
id_patterns = ['id', '_id', 'id_', 'key', '_key', 'pk', 'sk', 'uuid', 'guid',
'code', 'number', 'nbr', '_nbr', 'patient', 'encounter']
for pattern in id_patterns:
if pattern == col_lower or col_lower.endswith(pattern) or col_lower.startswith(pattern):
return True
# Specific column names
exact_id_names = ['id', 'uid', 'uuid', 'row_id', 'record_id', 'encounter_id',
'patient_id', 'customer_id', 'product_id', 'user_id', 'employee_id',
'patient_nbr', 'encounter_nbr', 'member_id']
if col_lower in exact_id_names:
return True
# Uniqueness-based detection (for columns with enough data)
if len(self.df) > 10:
try:
uniqueness = self.df[col_name].nunique() / len(self.df[col_name])
# If >80% unique values, it's likely an ID
if uniqueness > 0.8:
return True
except:
pass
return False
def _get_meaningful_numeric_columns(self):
"""Return only meaningful numeric columns (exclude IDs)"""
if self.numeric_columns:
return self.numeric_columns
return []
def answer_question(self, question):
"""Answer ANY question about the data"""
question_lower = question.lower().strip()
# ============ STEP 1: FULL SUMMARY FIRST! ============
if any(word in question_lower for word in ['summary statistics', 'summary', 'statistics', 'describe', 'overview', 'tell me about', 'what is in', 'dataset summary']):
return self._format_full_summary()
# ============ STEP 2: STATISTICS FOR SPECIFIC COLUMN ============
stat_patterns = [
r'(?:statistics|statistic|summary|stats?|describe)\s+(\w+)',
r'(\w+)\s+(?:statistics|statistic|summary|stats?|describe)'
]
for pattern in stat_patterns:
match = re.search(pattern, question_lower)
if match:
col_candidate = match.group(1)
for col in self.df.columns:
if col.lower() == col_candidate or col_candidate in col.lower():
return self._handle_column_statistics(col)
# ============ STEP 3: CHECK FOR ID COLUMN QUESTIONS ============
for id_col in self.id_columns:
if id_col.lower() in question_lower:
return self._handle_id_question(id_col)
# ============ STEP 4: NUMERIC CALCULATIONS ============
if any(word in question_lower for word in ['total', 'sum', 'add up', 'combined']):
result = self._handle_total_question(question_lower)
if result:
return result
if any(word in question_lower for word in ['average', 'mean', 'avg']):
result = self._handle_average_question(question_lower)
if result:
return result
if any(word in question_lower for word in ['minimum', 'min', 'lowest', 'smallest', 'least']):
result = self._handle_min_question(question_lower)
if result:
return result
if any(word in question_lower for word in ['maximum', 'max', 'highest', 'largest', 'most', 'greatest']):
result = self._handle_max_question(question_lower)
if result:
return result
if any(word in question_lower for word in ['top', 'best']):
result = self._handle_ranking_question(question_lower)
if result:
return result
# ============ STEP 5: GROUP BY ============
if any(word in question_lower for word in ['by', 'per', 'for each', 'grouped by']):
result = self._handle_group_question(question_lower)
if result:
return result
# ============ STEP 6: COUNT ============
if any(word in question_lower for word in ['count', 'how many', 'number of']):
result = self._handle_count_question(question_lower)
if result:
return result
# ============ STEP 7: DATA PREVIEW ============
if any(word in question_lower for word in ['show', 'display', 'view', 'preview', 'see', 'list']):
result = self._handle_show_question(question_lower)
if result:
return result
# ============ STEP 8: SMART RESPONSE ============
return self._smart_response(question_lower)
def _handle_column_statistics(self, col_name):
"""Provide detailed statistics for a specific column"""
# Check if it's an ID column
if col_name in self.id_columns:
return f"""⚠️ **'{col_name}' is an ID column**
Statistics for ID columns are not meaningful because:
β€’ IDs are unique identifiers, not measurements
β€’ Each ID appears only once typically
**What you CAN do:**
β€’ Count how many IDs: "{col_name} count"
β€’ View the data: "Show {col_name}"
β€’ Analyze other columns: {', '.join(self._get_meaningful_numeric_columns()[:3]) if self._get_meaningful_numeric_columns() else 'None found'}"""
# Check if it's a meaningful numeric column
elif col_name in self._get_meaningful_numeric_columns():
stats = self.df[col_name].describe()
output = f"πŸ“Š **Statistics for {col_name}**\n\n"
output += f"β€’ **Count**: {stats['count']:,.0f}\n"
output += f"β€’ **Mean**: {stats['mean']:,.2f}\n"
output += f"β€’ **Standard Deviation**: {stats['std']:,.2f}\n"
output += f"β€’ **Minimum**: {stats['min']:,.2f}\n"
output += f"β€’ **25th Percentile**: {stats['25%']:,.2f}\n"
output += f"β€’ **Median (50th)**: {stats['50%']:,.2f}\n"
output += f"β€’ **75th Percentile**: {stats['75%']:,.2f}\n"
output += f"β€’ **Maximum**: {stats['max']:,.2f}\n"
output += f"β€’ **Total**: {self.df[col_name].sum():,.2f}"
return output
# Check if it's a categorical/text column
elif col_name in self.df.columns:
output = f"πŸ“Š **Statistics for {col_name}**\n\n"
output += f"β€’ **Unique values**: {self.df[col_name].nunique():,}\n"
output += f"β€’ **Most common**: {self.df[col_name].mode()[0] if len(self.df[col_name].mode()) > 0 else 'N/A'}\n"
output += f"β€’ **Missing values**: {self.df[col_name].isnull().sum():,}\n"
output += "\n**Top 5 values:**\n"
for val, count in self.df[col_name].value_counts().head(5).items():
output += f" β€’ {val}: {count} ({count/len(self.df)*100:.1f}%)\n"
return output
return f"❌ Column '{col_name}' not found. Available columns: {', '.join(self.df.columns[:10])}..."
def _handle_id_question(self, id_col):
"""Handle questions about ID columns"""
unique_count = self.df[id_col].nunique()
return f"""⚠️ **'{id_col}' is an ID column** (unique identifier)
Averages, sums, or other mathematical calculations on ID values are **not meaningful** because:
β€’ IDs are just labels, not measurements
β€’ Each ID is typically unique
**What you can do instead:**
β€’ Count how many unique IDs: {unique_count} unique values
β€’ Group data by other columns: "Show [category] by [metric]"
β€’ Analyze meaningful numeric columns: {', '.join(self._get_meaningful_numeric_columns()[:3]) if self._get_meaningful_numeric_columns() else 'None found'}"""
def _handle_total_question(self, question):
"""Handle total/sum questions"""
for col in self._get_meaningful_numeric_columns():
if col.lower() in question:
total = self.df[col].sum()
return f"πŸ’° **Total {col}**: {total:,.2f}"
if self._get_meaningful_numeric_columns():
col = self._get_meaningful_numeric_columns()[0]
total = self.df[col].sum()
return f"πŸ’° **Total {col}**: {total:,.2f}"
return None
def _handle_average_question(self, question):
"""Handle average/mean questions"""
for col in self._get_meaningful_numeric_columns():
if col.lower() in question:
avg = self.df[col].mean()
return f"πŸ“Š **Average {col}**: {avg:,.2f}"
if self._get_meaningful_numeric_columns():
col = self._get_meaningful_numeric_columns()[0]
avg = self.df[col].mean()
return f"πŸ“Š **Average {col}**: {avg:,.2f}"
return None
def _handle_min_question(self, question):
"""Handle minimum questions"""
for col in self._get_meaningful_numeric_columns():
if col.lower() in question:
min_val = self.df[col].min()
return f"πŸ“‰ **Minimum {col}**: {min_val:,.2f}"
return None
def _handle_max_question(self, question):
"""Handle maximum questions"""
for col in self._get_meaningful_numeric_columns():
if col.lower() in question:
max_val = self.df[col].max()
return f"πŸ† **Maximum {col}**: {max_val:,.2f}"
return None
def _handle_ranking_question(self, question):
"""Handle top/best questions"""
n_match = re.search(r'top\s+(\d+)', question)
n = int(n_match.group(1)) if n_match else 5
metric = None
for col in self._get_meaningful_numeric_columns():
if col.lower() in question:
metric = col
break
if not metric and self._get_meaningful_numeric_columns():
metric = self._get_meaningful_numeric_columns()[0]
category = None
for col in self.schema['categorical']:
if col.lower() in question:
category = col
break
if not category and self.schema['categorical']:
category = self.schema['categorical'][0]
if metric and category:
result = self.df.groupby(category)[metric].sum().sort_values(ascending=False).head(n)
output = f"πŸ† **Top {n} {category} by {metric}**\n\n"
for idx, (item, val) in enumerate(result.items(), 1):
output += f"{idx}. **{item}**: {val:,.2f}\n"
return output
return None
def _handle_group_question(self, question):
"""Handle group by questions"""
metric = None
category = None
for col in self._get_meaningful_numeric_columns():
if col.lower() in question:
metric = col
break
for col in self.schema['categorical']:
if col.lower() in question:
category = col
break
if metric and category:
result = self.df.groupby(category)[metric].sum().sort_values(ascending=False)
output = f"πŸ“Š **{metric} by {category}**\n\n"
for idx, (item, val) in enumerate(result.items(), 1):
output += f"{idx}. **{item}**: {val:,.2f}\n"
output += f"\n**Total**: {result.sum():,.2f}"
return output
return None
def _handle_count_question(self, question):
"""Handle count questions"""
for col in self.df.columns:
if col.lower() in question:
unique_count = self.df[col].nunique()
return f"πŸ“Š **{col}**: {unique_count} unique values"
if 'rows' in question or 'records' in question:
return f"πŸ“Š **Total records**: {len(self.df):,} rows"
return None
def _handle_show_question(self, question):
"""Handle show/display questions"""
n_match = re.search(r'(\d+)', question)
n = int(n_match.group(1)) if n_match else 5
output = f"**πŸ“Š Data Preview (First {n} rows)**\n\n```\n"
output += self.df.head(n).to_string()
output += "\n```"
return output
def _format_full_summary(self):
"""Provide complete dataset summary"""
meaningful_numeric = self._get_meaningful_numeric_columns()
output = "πŸ“Š **Complete Data Summary**\n\n"
output += f"**Dataset Size**: {len(self.df):,} rows Γ— {len(self.df.columns)} columns\n\n"
output += "**Column Types:**\n"
output += f"β€’ Meaningful numeric columns: {len(meaningful_numeric)}\n"
output += f"β€’ ID columns (excluded): {len(self.id_columns)}\n"
output += f"β€’ Categorical columns: {len(self.schema['categorical'])}\n"
if meaningful_numeric:
output += "\n**Key Numeric Statistics:**\n"
for col in meaningful_numeric[:5]:
output += f"β€’ {col}: Mean={self.df[col].mean():.2f}, Total={self.df[col].sum():,.0f}\n"
if self.schema['categorical']:
output += "\n**Categorical Columns:**\n"
for col in self.schema['categorical'][:3]:
output += f"β€’ {col}: {self.df[col].nunique()} unique values\n"
return output
def _smart_response(self, question):
"""Generate intelligent response for unrecognized questions"""
meaningful_numeric = self._get_meaningful_numeric_columns()
output = "πŸ’‘ **I understand you're asking about your data.**\n\n"
output += "πŸ“Š **Here's what's available:**\n"
output += f"β€’ {len(self.df):,} rows, {len(self.df.columns)} columns\n"
if meaningful_numeric:
output += f"β€’ Numeric columns to analyze: {', '.join(meaningful_numeric[:5])}\n"
if self.schema['categorical']:
output += f"β€’ Categories to group by: {', '.join(self.schema['categorical'][:3])}\n"
output += "\nπŸ“ **Try these example questions:**\n\n"
if meaningful_numeric:
example = meaningful_numeric[0]
output += f"β€’ 'Statistics {example}'\n"
output += f"β€’ 'Total {example}'\n"
output += f"β€’ 'Average {example}'\n"
if self.schema['categorical'] and meaningful_numeric:
output += f"β€’ 'Top 5 {self.schema['categorical'][0]} by {meaningful_numeric[0]}'\n"
output += "β€’ 'Summary statistics'\n"
output += "β€’ 'Show me the data'"
return output