| """ |
| agent/nodes/data_profiler.py |
| Natural language data profiling — generates a comprehensive dataset overview |
| including type inference, missing values, cardinality, distributions, and correlations. |
| """ |
|
|
| import math |
| from collections import Counter |
| from typing import Any, Dict, List, Optional |
|
|
| from connectors.base import get_connector |
|
|
|
|
| def _safe_float(val: Any) -> Optional[float]: |
| try: |
| return float(val) |
| except (TypeError, ValueError): |
| return None |
|
|
|
|
| def _basic_stats(values: List[float]) -> Dict[str, Any]: |
| """Compute mean, median, min, max, std for a list of numbers.""" |
| if not values: |
| return {} |
| n = len(values) |
| sorted_vals = sorted(values) |
| mean = sum(values) / n |
| median = sorted_vals[n // 2] |
| variance = sum((x - mean) ** 2 for x in values) / n |
| return { |
| "count": n, |
| "mean": round(mean, 2), |
| "median": round(median, 2), |
| "min": round(sorted_vals[0], 2), |
| "max": round(sorted_vals[-1], 2), |
| "std": round(math.sqrt(variance), 2), |
| } |
|
|
|
|
| def _histogram_buckets(values: List[float], n_buckets: int = 8) -> List[Dict[str, Any]]: |
| """Create histogram buckets for visualization.""" |
| if not values: |
| return [] |
| min_val = min(values) |
| max_val = max(values) |
| if min_val == max_val: |
| return [{"range": f"{min_val:.2f}", "count": len(values)}] |
|
|
| step = (max_val - min_val) / n_buckets |
| buckets = [] |
| for i in range(n_buckets): |
| lo = min_val + i * step |
| hi = lo + step |
| count = sum(1 for v in values if lo <= v < hi) if i < n_buckets - 1 else sum(1 for v in values if lo <= v <= hi) |
| buckets.append({ |
| "range": f"{lo:.1f}-{hi:.1f}", |
| "count": count, |
| }) |
| return buckets |
|
|
|
|
| def profile_connector(connector_id: str) -> Dict[str, Any]: |
| """ |
| Generate a full data profile for a connector's data. |
| Returns structured profile data suitable for rendering. |
| """ |
| connector = get_connector(connector_id) |
| schema = connector.get_schema() |
|
|
| profile = { |
| "connector_id": connector_id, |
| "tables": [], |
| "total_tables": len(schema), |
| "total_columns": sum(len(t.get("columns", [])) for t in schema), |
| } |
|
|
| for table_info in schema[:10]: |
| table_name = table_info["table"] |
| columns = table_info.get("columns", []) |
| row_count = table_info.get("row_count", 0) |
|
|
| |
| try: |
| sample_sql = f'SELECT * FROM "{table_name}" LIMIT 1000' |
| sample_data = connector.execute_sql(sample_sql) |
| except Exception: |
| sample_data = [] |
|
|
| col_profiles = [] |
| for col in columns: |
| col_name = col["name"] |
| col_type = col.get("type", "unknown") |
|
|
| if not sample_data: |
| col_profiles.append({ |
| "name": col_name, |
| "type": col_type, |
| "null_count": 0, |
| "null_rate": 0, |
| "unique_count": 0, |
| "cardinality": "unknown", |
| }) |
| continue |
|
|
| raw_values = [row.get(col_name) for row in sample_data] |
| null_count = sum(1 for v in raw_values if v is None) |
| non_null = [v for v in raw_values if v is not None] |
| unique_count = len(set(str(v) for v in non_null)) |
|
|
| col_profile: Dict[str, Any] = { |
| "name": col_name, |
| "type": col_type, |
| "null_count": null_count, |
| "null_rate": round(null_count / max(len(raw_values), 1), 3), |
| "unique_count": unique_count, |
| "cardinality": "high" if unique_count > len(non_null) * 0.8 else ( |
| "medium" if unique_count > 10 else "low" |
| ), |
| } |
|
|
| |
| float_values = [v for v in (_safe_float(x) for x in non_null) if v is not None] |
| if float_values and len(float_values) > len(non_null) * 0.5: |
| col_profile["stats"] = _basic_stats(float_values) |
| col_profile["histogram"] = _histogram_buckets(float_values) |
| col_profile["inferred_type"] = "numeric" |
| else: |
| |
| str_values = [str(v) for v in non_null] |
| top_values = Counter(str_values).most_common(5) |
| col_profile["top_values"] = [ |
| {"value": v, "count": c, "frequency": round(c / max(len(str_values), 1), 3)} |
| for v, c in top_values |
| ] |
| col_profile["inferred_type"] = "categorical" |
|
|
| col_profiles.append(col_profile) |
|
|
| |
| numeric_cols = [c for c in col_profiles if c.get("inferred_type") == "numeric"] |
| correlations = [] |
| if len(numeric_cols) >= 2 and sample_data: |
| for i in range(len(numeric_cols)): |
| for j in range(i + 1, len(numeric_cols)): |
| col_a = numeric_cols[i]["name"] |
| col_b = numeric_cols[j]["name"] |
| vals_a = [_safe_float(r.get(col_a)) for r in sample_data] |
| vals_b = [_safe_float(r.get(col_b)) for r in sample_data] |
| paired = [(a, b) for a, b in zip(vals_a, vals_b) if a is not None and b is not None] |
| if len(paired) > 5: |
| corr = _pearson([p[0] for p in paired], [p[1] for p in paired]) |
| if corr is not None and abs(corr) > 0.3: |
| correlations.append({ |
| "column_a": col_a, |
| "column_b": col_b, |
| "correlation": round(corr, 3), |
| "strength": "strong" if abs(corr) > 0.7 else "moderate", |
| }) |
|
|
| profile["tables"].append({ |
| "name": table_name, |
| "row_count": row_count, |
| "column_count": len(columns), |
| "columns": col_profiles, |
| "correlations": correlations[:10], |
| }) |
|
|
| return profile |
|
|
|
|
| def _pearson(x: List[float], y: List[float]) -> Optional[float]: |
| """Compute Pearson correlation coefficient.""" |
| n = len(x) |
| if n < 3: |
| return None |
| mean_x = sum(x) / n |
| mean_y = sum(y) / n |
| dx = [xi - mean_x for xi in x] |
| dy = [yi - mean_y for yi in y] |
| num = sum(a * b for a, b in zip(dx, dy)) |
| den_x = math.sqrt(sum(a ** 2 for a in dx)) |
| den_y = math.sqrt(sum(b ** 2 for b in dy)) |
| if den_x == 0 or den_y == 0: |
| return None |
| return num / (den_x * den_y) |
|
|