Spaces:
Running
Running
| """ | |
| Data Profiling Tools | |
| Tools for analyzing and understanding dataset characteristics. | |
| """ | |
| import polars as pl | |
| import numpy as np | |
| from typing import Dict, Any, List, Optional | |
| from pathlib import Path | |
| import sys | |
| import os | |
| # Add parent directory to path for imports | |
| sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) | |
| from ..utils.polars_helpers import ( | |
| load_dataframe, | |
| get_numeric_columns, | |
| get_categorical_columns, | |
| get_datetime_columns, | |
| get_column_info, | |
| calculate_memory_usage, | |
| detect_id_columns, | |
| ) | |
| from ..utils.validation import ( | |
| validate_file_exists, | |
| validate_file_format, | |
| validate_dataframe, | |
| ) | |
| def profile_dataset(file_path: str) -> Dict[str, Any]: | |
| """ | |
| Get comprehensive statistics about a dataset. | |
| Args: | |
| file_path: Path to CSV or Parquet file | |
| Returns: | |
| Dictionary with dataset profile including: | |
| - shape (rows, columns) | |
| - column types | |
| - memory usage | |
| - null counts | |
| - unique values | |
| - missing value percentage per column (NEW) | |
| - unique value counts per column (NEW) | |
| - basic statistics for each column | |
| """ | |
| # Validation | |
| validate_file_exists(file_path) | |
| validate_file_format(file_path) | |
| # Load data | |
| df = load_dataframe(file_path) | |
| validate_dataframe(df) | |
| # Basic info | |
| profile = { | |
| "file_path": file_path, | |
| "shape": { | |
| "rows": len(df), | |
| "columns": len(df.columns) | |
| }, | |
| "memory_usage": calculate_memory_usage(df), | |
| "column_types": { | |
| "numeric": get_numeric_columns(df), | |
| "categorical": get_categorical_columns(df), | |
| "datetime": get_datetime_columns(df), | |
| "id_columns": detect_id_columns(df), | |
| }, | |
| "columns": {}, | |
| "missing_values_per_column": {}, # NEW: Per-column missing % | |
| "unique_counts_per_column": {} # NEW: Per-column unique counts | |
| } | |
| # Per-column statistics with enhanced missing % and unique counts | |
| for col in df.columns: | |
| # Get existing column info | |
| profile["columns"][col] = get_column_info(df, col) | |
| # NEW: Calculate missing value percentage for this column | |
| null_count = df[col].null_count() | |
| missing_pct = round((null_count / len(df)) * 100, 2) if len(df) > 0 else 0 | |
| profile["missing_values_per_column"][col] = { | |
| "count": int(null_count), | |
| "percentage": missing_pct | |
| } | |
| # NEW: Calculate unique value counts (with dict handling) | |
| try: | |
| # Try to get unique count directly | |
| unique_count = df[col].n_unique() | |
| profile["unique_counts_per_column"][col] = int(unique_count) | |
| except Exception as e: | |
| # If column contains unhashable types (dicts, lists), handle gracefully | |
| try: | |
| # Convert to string and then count unique | |
| unique_count = df[col].cast(pl.Utf8).n_unique() | |
| profile["unique_counts_per_column"][col] = int(unique_count) | |
| except: | |
| profile["unique_counts_per_column"][col] = "N/A (unhashable type)" | |
| # Overall statistics | |
| total_nulls = sum(df[col].null_count() for col in df.columns) | |
| total_cells = len(df) * len(df.columns) | |
| profile["overall_stats"] = { | |
| "total_cells": total_cells, | |
| "total_nulls": total_nulls, | |
| "null_percentage": round(total_nulls / total_cells * 100, 2) if total_cells > 0 else 0, | |
| "duplicate_rows": df.is_duplicated().sum(), | |
| "duplicate_percentage": round(df.is_duplicated().sum() / len(df) * 100, 2) if len(df) > 0 else 0, | |
| } | |
| return profile | |
| def get_smart_summary(file_path: str, n_samples: int = 30) -> Dict[str, Any]: | |
| """ | |
| Enhanced data summary with missing %, unique counts, and safe dict handling. | |
| This function provides a smarter, more LLM-friendly summary compared to profile_dataset(). | |
| It includes per-column missing percentages, unique value counts, and handles | |
| dictionary columns gracefully (converts to strings to avoid hashing errors). | |
| Args: | |
| file_path: Path to CSV or Parquet file | |
| n_samples: Number of sample rows to include (default: 30) | |
| Returns: | |
| Dictionary with comprehensive smart summary including: | |
| - Basic shape info | |
| - Column data types | |
| - Missing value percentage by column (sorted by % descending) | |
| - Unique value counts by column | |
| - First N sample rows | |
| - Descriptive statistics for numeric columns | |
| - Safe handling of dictionary/unhashable columns | |
| Example: | |
| >>> summary = get_smart_summary("data.csv") | |
| >>> print(summary["missing_summary"]) | |
| >>> # Output: [("col_A", 45.2), ("col_B", 12.3), ...] | |
| """ | |
| # Validation | |
| validate_file_exists(file_path) | |
| validate_file_format(file_path) | |
| # Load data | |
| df = load_dataframe(file_path) | |
| validate_dataframe(df) | |
| # Convert dictionary-type columns to strings (prevents unhashable dict errors) | |
| for col in df.columns: | |
| try: | |
| # Try to detect if column might contain dicts/lists | |
| sample = df[col].drop_nulls().head(5) | |
| if len(sample) > 0: | |
| first_val = sample[0] | |
| # Check if it's a complex type | |
| if isinstance(first_val, (dict, list)): | |
| df = df.with_columns(pl.col(col).cast(pl.Utf8).alias(col)) | |
| except: | |
| # If any error, just continue | |
| pass | |
| # Calculate missing value statistics (sorted by % descending) | |
| missing_stats = [] | |
| for col in df.columns: | |
| null_count = df[col].null_count() | |
| null_pct = round((null_count / len(df)) * 100, 2) if len(df) > 0 else 0 | |
| missing_stats.append({ | |
| "column": col, | |
| "count": int(null_count), | |
| "percentage": null_pct | |
| }) | |
| # Sort by percentage descending | |
| missing_stats.sort(key=lambda x: x["percentage"], reverse=True) | |
| # Calculate unique value counts | |
| unique_counts = {} | |
| for col in df.columns: | |
| try: | |
| unique_count = df[col].n_unique() | |
| unique_counts[col] = int(unique_count) | |
| except: | |
| # Fallback for unhashable types | |
| try: | |
| unique_count = df[col].cast(pl.Utf8).n_unique() | |
| unique_counts[col] = int(unique_count) | |
| except: | |
| unique_counts[col] = "N/A" | |
| # Get column data types | |
| column_types = {col: str(df[col].dtype) for col in df.columns} | |
| # Get sample rows (first n_samples) | |
| sample_data = df.head(n_samples).to_dicts() | |
| # Get descriptive statistics for numeric columns | |
| numeric_cols = get_numeric_columns(df) | |
| numeric_stats = {} | |
| if numeric_cols: | |
| df_numeric = df.select(numeric_cols) | |
| # Convert to pandas for describe() functionality | |
| df_pd = df_numeric.to_pandas() | |
| stats_df = df_pd.describe() | |
| numeric_stats = stats_df.to_dict() | |
| # Build comprehensive summary | |
| summary = { | |
| "file_path": file_path, | |
| "shape": { | |
| "rows": len(df), | |
| "columns": len(df.columns) | |
| }, | |
| "column_types": column_types, | |
| "missing_summary": missing_stats, # Sorted by % descending | |
| "unique_counts": unique_counts, | |
| "sample_data": sample_data, | |
| "numeric_statistics": numeric_stats, | |
| "memory_usage_mb": calculate_memory_usage(df), | |
| "summary_notes": [] | |
| } | |
| # Add helpful notes for LLM | |
| high_missing_cols = [item for item in missing_stats if item["percentage"] > 40] | |
| if high_missing_cols: | |
| summary["summary_notes"].append( | |
| f"{len(high_missing_cols)} column(s) have >40% missing values (consider dropping)" | |
| ) | |
| high_cardinality_cols = [col for col, count in unique_counts.items() | |
| if isinstance(count, int) and count > len(df) * 0.5] | |
| if high_cardinality_cols: | |
| summary["summary_notes"].append( | |
| f"{len(high_cardinality_cols)} column(s) have very high cardinality (>50% unique values)" | |
| ) | |
| return summary | |
| def detect_data_quality_issues(file_path: str) -> Dict[str, Any]: | |
| """ | |
| Detect data quality issues in the dataset. | |
| Args: | |
| file_path: Path to CSV or Parquet file | |
| Returns: | |
| Dictionary with detected issues organized by severity: | |
| - critical: Issues that will break model training | |
| - warning: Issues that may affect model performance | |
| - info: Observations that may be relevant | |
| """ | |
| # Validation | |
| validate_file_exists(file_path) | |
| validate_file_format(file_path) | |
| # Load data | |
| df = load_dataframe(file_path) | |
| validate_dataframe(df) | |
| issues = { | |
| "critical": [], | |
| "warning": [], | |
| "info": [] | |
| } | |
| # Check for completely null columns | |
| for col in df.columns: | |
| null_count = df[col].null_count() | |
| null_pct = (null_count / len(df)) * 100 | |
| if null_count == len(df): | |
| issues["critical"].append({ | |
| "type": "all_null_column", | |
| "column": col, | |
| "message": f"Column '{col}' has all null values" | |
| }) | |
| elif null_pct > 50: | |
| issues["warning"].append({ | |
| "type": "high_null_percentage", | |
| "column": col, | |
| "null_percentage": round(null_pct, 2), | |
| "message": f"Column '{col}' has {round(null_pct, 2)}% null values" | |
| }) | |
| elif null_pct > 10: | |
| issues["info"].append({ | |
| "type": "moderate_null_percentage", | |
| "column": col, | |
| "null_percentage": round(null_pct, 2), | |
| "message": f"Column '{col}' has {round(null_pct, 2)}% null values" | |
| }) | |
| # Check for duplicate rows | |
| dup_count = df.is_duplicated().sum() | |
| if dup_count > 0: | |
| dup_pct = (dup_count / len(df)) * 100 | |
| severity = "warning" if dup_pct > 10 else "info" | |
| issues[severity].append({ | |
| "type": "duplicate_rows", | |
| "count": int(dup_count), | |
| "percentage": round(dup_pct, 2), | |
| "message": f"Dataset has {dup_count} duplicate rows ({round(dup_pct, 2)}%)" | |
| }) | |
| # Check for outliers in numeric columns using IQR method | |
| numeric_cols = get_numeric_columns(df) | |
| for col in numeric_cols: | |
| col_data = df[col].drop_nulls() | |
| if len(col_data) == 0: | |
| continue | |
| q1 = col_data.quantile(0.25) | |
| q3 = col_data.quantile(0.75) | |
| iqr = q3 - q1 | |
| lower_bound = q1 - 1.5 * iqr | |
| upper_bound = q3 + 1.5 * iqr | |
| outliers = ((col_data < lower_bound) | (col_data > upper_bound)).sum() | |
| if outliers > 0: | |
| outlier_pct = (outliers / len(col_data)) * 100 | |
| if outlier_pct > 10: | |
| issues["warning"].append({ | |
| "type": "outliers", | |
| "column": col, | |
| "count": int(outliers), | |
| "percentage": round(outlier_pct, 2), | |
| "bounds": {"lower": float(lower_bound), "upper": float(upper_bound)}, | |
| "message": f"Column '{col}' has {outliers} outliers ({round(outlier_pct, 2)}%)" | |
| }) | |
| elif outlier_pct > 1: | |
| issues["info"].append({ | |
| "type": "outliers", | |
| "column": col, | |
| "count": int(outliers), | |
| "percentage": round(outlier_pct, 2), | |
| "bounds": {"lower": float(lower_bound), "upper": float(upper_bound)}, | |
| "message": f"Column '{col}' has {outliers} outliers ({round(outlier_pct, 2)}%)" | |
| }) | |
| # Check for high cardinality in categorical columns | |
| categorical_cols = get_categorical_columns(df) | |
| for col in categorical_cols: | |
| n_unique = df[col].n_unique() | |
| cardinality_pct = (n_unique / len(df)) * 100 | |
| if n_unique > 100 and cardinality_pct > 50: | |
| issues["warning"].append({ | |
| "type": "high_cardinality", | |
| "column": col, | |
| "unique_values": int(n_unique), | |
| "percentage": round(cardinality_pct, 2), | |
| "message": f"Column '{col}' has very high cardinality ({n_unique} unique values, {round(cardinality_pct, 2)}%)" | |
| }) | |
| # Check for constant columns (single unique value) | |
| for col in df.columns: | |
| n_unique = df[col].n_unique() | |
| if n_unique == 1: | |
| issues["warning"].append({ | |
| "type": "constant_column", | |
| "column": col, | |
| "message": f"Column '{col}' has only one unique value (constant)" | |
| }) | |
| # Check for imbalanced datasets (for potential target columns) | |
| for col in df.columns: | |
| col_data = df[col] | |
| n_unique = col_data.n_unique() | |
| # Check if this could be a target column (2-20 unique values) | |
| if 2 <= n_unique <= 20: | |
| value_counts = col_data.value_counts() | |
| if len(value_counts) >= 2: | |
| max_count = value_counts[value_counts.columns[1]][0] | |
| max_pct = (max_count / len(df)) * 100 | |
| if max_pct > 90: | |
| issues["warning"].append({ | |
| "type": "class_imbalance", | |
| "column": col, | |
| "dominant_class_percentage": round(max_pct, 2), | |
| "message": f"Column '{col}' may be imbalanced (dominant class: {round(max_pct, 2)}%)" | |
| }) | |
| # Summary | |
| issues["summary"] = { | |
| "total_issues": len(issues["critical"]) + len(issues["warning"]) + len(issues["info"]), | |
| "critical_count": len(issues["critical"]), | |
| "warning_count": len(issues["warning"]), | |
| "info_count": len(issues["info"]) | |
| } | |
| return issues | |
| def analyze_correlations(file_path: str, target: Optional[str] = None) -> Dict[str, Any]: | |
| """ | |
| Analyze correlations between features. | |
| Args: | |
| file_path: Path to CSV or Parquet file | |
| target: Optional target column to analyze correlations with | |
| Returns: | |
| Dictionary with correlation analysis including: | |
| - correlation matrix (for numeric columns) | |
| - top correlations with target (if specified) | |
| - highly correlated feature pairs | |
| """ | |
| # Validation | |
| validate_file_exists(file_path) | |
| validate_file_format(file_path) | |
| # Load data | |
| df = load_dataframe(file_path) | |
| validate_dataframe(df) | |
| numeric_cols = get_numeric_columns(df) | |
| if len(numeric_cols) < 2: | |
| return { | |
| "error": "Dataset must have at least 2 numeric columns for correlation analysis", | |
| "numeric_columns_found": len(numeric_cols) | |
| } | |
| # Select only numeric columns for correlation | |
| df_numeric = df.select(numeric_cols) | |
| # Calculate correlation matrix using pandas (Polars doesn't have native corr yet) | |
| df_pd = df_numeric.to_pandas() | |
| corr_matrix = df_pd.corr() | |
| result = { | |
| "numeric_columns": numeric_cols, | |
| "correlation_matrix": corr_matrix.to_dict() | |
| } | |
| # Find highly correlated pairs (excluding diagonal) | |
| high_corr_pairs = [] | |
| for i in range(len(corr_matrix.columns)): | |
| for j in range(i + 1, len(corr_matrix.columns)): | |
| col1 = corr_matrix.columns[i] | |
| col2 = corr_matrix.columns[j] | |
| corr_value = corr_matrix.iloc[i, j] | |
| if abs(corr_value) > 0.7: # High correlation threshold | |
| high_corr_pairs.append({ | |
| "feature_1": col1, | |
| "feature_2": col2, | |
| "correlation": round(float(corr_value), 4) | |
| }) | |
| # Sort by absolute correlation | |
| high_corr_pairs.sort(key=lambda x: abs(x["correlation"]), reverse=True) | |
| result["high_correlations"] = high_corr_pairs | |
| # If target specified, show top correlations with target | |
| if target: | |
| if target not in df.columns: | |
| result["target_correlations_error"] = f"Target column '{target}' not found" | |
| elif target not in numeric_cols: | |
| result["target_correlations_error"] = f"Target column '{target}' is not numeric" | |
| else: | |
| target_corrs = [] | |
| for col in numeric_cols: | |
| if col != target: | |
| corr_value = corr_matrix.loc[target, col] | |
| target_corrs.append({ | |
| "feature": col, | |
| "correlation": round(float(corr_value), 4) | |
| }) | |
| # Sort by absolute correlation | |
| target_corrs.sort(key=lambda x: abs(x["correlation"]), reverse=True) | |
| result["target_correlations"] = { | |
| "target": target, | |
| "top_features": target_corrs[:20] # Top 20 | |
| } | |
| return result | |