Spaces:
Sleeping
Sleeping
| import pandas as pd | |
| import numpy as np | |
| import re | |
| # question understand | |
| def detect_question_type(question: str): | |
| q = question.lower() | |
| patterns = { | |
| "highest": ["highest", "maximum", "max", "most", "top", "best", "largest"], | |
| "lowest": ["lowest", "minimum", "min", "least", "worst", "cheapest", "smallest"], | |
| "average": ["average", "mean", "avg"], | |
| "sum": ["sum", "total", "overall"], | |
| "count": ["count", "how many", "number of", "frequency"], | |
| "correlation": ["correlation", "relationship", "relation", "dependency"], | |
| "distribution": ["distribution", "spread", "breakdown"] | |
| } | |
| for key, words in patterns.items(): | |
| if any(w in q for w in words): | |
| return key | |
| return "unknown" | |
| # column match | |
| #def find_matching_columns(question, df): | |
| # q = question.lower() | |
| # matched = [] | |
| # for col in df.columns: | |
| # clean_col = col.lower().replace("_", " ") | |
| # direct match | |
| #if clean_col in q: | |
| # matched.append(col) | |
| # continue | |
| # partial token match | |
| #col_tokens = set(clean_col.split()) | |
| #q_tokens = set(q.split()) | |
| #if len(col_tokens.intersection(q_tokens)) > 0: | |
| # matched.append(col) | |
| # fuzzy keyword match (weak heuristic) | |
| #elif any(token in clean_col for token in q_tokens): | |
| # matched.append(col) | |
| #return list(set(matched)) | |
| def find_matching_columns(question, df): | |
| q = question.lower() | |
| scored = [] | |
| for col in df.columns: | |
| score = 0 | |
| clean = col.lower().replace("_", " ") | |
| # exact | |
| if clean in q: | |
| score += 10 | |
| # token overlap | |
| q_words = set(q.split()) | |
| c_words = set(clean.split()) | |
| overlap = len(q_words.intersection(c_words)) | |
| score += overlap * 3 | |
| # partial | |
| for w in q_words: | |
| if w in clean: | |
| score += 1 | |
| scored.append((col, score)) | |
| scored.sort(key=lambda x: x[1], reverse=True) | |
| matched = [ | |
| c for c, s in scored | |
| if s > 0 | |
| ] | |
| return matched[:5] | |
| ######### | |
| def get_numeric_columns(df): | |
| return df.select_dtypes(include=["int64", "float64"]).columns.tolist() | |
| def get_categorical_columns(df): | |
| return df.select_dtypes(include=["object", "category"]).columns.tolist() | |
| def pick_best_group_and_value(df, matched_cols): | |
| numeric = [c for c in matched_cols if c in get_numeric_columns(df)] | |
| cat = [c for c in matched_cols if c in get_categorical_columns(df)] | |
| if not numeric: | |
| numeric = get_numeric_columns(df) | |
| if not cat: | |
| cat = get_categorical_columns(df) | |
| if not numeric or not cat: | |
| return None, None | |
| return cat[0], numeric[0] | |
| # high & low | |
| def handle_groupby_question(df, matched_cols, qtype): | |
| group_col, value_col = pick_best_group_and_value(df, matched_cols) | |
| if not group_col or not value_col: | |
| return "Not enough data to compute group analysis." | |
| try: | |
| grouped = ( | |
| df.groupby(group_col)[value_col] | |
| .mean() | |
| .sort_values(ascending=(qtype == "lowest")) | |
| ) | |
| top = grouped.head(1) | |
| name = top.index[0] | |
| value = top.values[0] | |
| label = "highest" if qtype == "highest" else "lowest" | |
| return ( | |
| f"'{name}' has the {label} average {value_col}: " | |
| f"{value:.2f}." | |
| ) | |
| except Exception as e: | |
| return f"Group analysis failed: {e}" | |
| # avg , mean , sum , median | |
| def handle_numeric_stats(df, matched_cols, mode="average"): | |
| numeric_cols = [c for c in matched_cols if c in get_numeric_columns(df)] | |
| if not numeric_cols: | |
| numeric_cols = get_numeric_columns(df) | |
| if not numeric_cols: | |
| return "No numeric columns found." | |
| col = numeric_cols[0] | |
| try: | |
| if mode == "average": | |
| val = df[col].mean() | |
| elif mode == "sum": | |
| val = df[col].sum() | |
| elif mode == "median": | |
| val = df[col].median() | |
| else: | |
| val = df[col].mean() | |
| return f"{mode.title()} of '{col}' is {val:.2f}" | |
| except Exception as e: | |
| return f"Error calculating {mode}: {e}" | |
| # count , number of | |
| def handle_count_question(df, matched_cols): | |
| if not matched_cols: | |
| return f"Dataset contains {len(df)} rows." | |
| col = matched_cols[0] | |
| try: | |
| counts = df[col].value_counts().head(10) | |
| result = f"Top values in '{col}':\n" | |
| for k, v in counts.items(): | |
| result += f"- {k}: {v}\n" | |
| return result | |
| except Exception as e: | |
| return f"Count error: {e}" | |
| # correlation , relationship | |
| def handle_correlation_question(df, matched_cols): | |
| numeric_cols = [c for c in matched_cols if c in get_numeric_columns(df)] | |
| if len(numeric_cols) < 2: | |
| numeric_cols = get_numeric_columns(df) | |
| if len(numeric_cols) < 2: | |
| return "Need at least 2 numeric columns for correlation." | |
| best_pairs = [] | |
| # compute best correlation pairs | |
| for i in range(len(numeric_cols)): | |
| for j in range(i + 1, len(numeric_cols)): | |
| c1, c2 = numeric_cols[i], numeric_cols[j] | |
| corr = df[c1].corr(df[c2]) | |
| if pd.notna(corr): | |
| best_pairs.append((abs(corr), c1, c2, corr)) | |
| if not best_pairs: | |
| return "No correlation found." | |
| best_pairs.sort(reverse=True) | |
| _, c1, c2, corr = best_pairs[0] | |
| strength = "weak" | |
| if abs(corr) > 0.7: | |
| strength = "strong" | |
| elif abs(corr) > 0.4: | |
| strength = "moderate" | |
| direction = "positive" if corr > 0 else "negative" | |
| return ( | |
| f"Strongest correlation is between '{c1}' and '{c2}': " | |
| f"{corr:.3f} ({strength} {direction})." | |
| ) | |
| ##### main | |
| def answer_data_question(df, question): | |
| if not question or not question.strip(): | |
| return "Please enter a valid question." | |
| qtype = detect_question_type(question) | |
| matched_cols = find_matching_columns(question, df) | |
| # HIGH / LOW | |
| if qtype in ["highest", "lowest"]: | |
| return handle_groupby_question(df, matched_cols, qtype) | |
| # AVERAGE | |
| elif qtype == "average": | |
| return handle_numeric_stats(df, matched_cols, "average") | |
| # SUM | |
| elif qtype == "sum": | |
| return handle_numeric_stats(df, matched_cols, "sum") | |
| # COUNT | |
| elif qtype == "count": | |
| return handle_count_question(df, matched_cols) | |
| # CORRELATION | |
| elif qtype == "correlation": | |
| return handle_correlation_question(df, matched_cols) | |
| # UNKNOWN → fallback intelligent response | |
| else: | |
| return ( | |
| "I couldn't fully understand the question. " | |
| "Try asking about averages, highest/lowest values, counts, or relationships." | |
| ) |