| | from typing import Dict, List, Any, Optional, Tuple
|
| | import pandas as pd
|
| | import numpy as np
|
| | from pathlib import Path
|
| | import os
|
| | import chardet
|
| | import csv
|
| |
|
| | class CSVHelpers:
|
| | """Helper utilities for CSV preprocessing and analysis."""
|
| |
|
| | @staticmethod
|
| | def detect_encoding(file_path: str, sample_size: int = 10000) -> str:
|
| | """Detect the encoding of a CSV file."""
|
| | with open(file_path, 'rb') as f:
|
| | raw_data = f.read(sample_size)
|
| | result = chardet.detect(raw_data)
|
| | return result['encoding']
|
| |
|
| | @staticmethod
|
| | def detect_delimiter(file_path: str, encoding: str = 'utf-8') -> str:
|
| | """Detect the delimiter used in a CSV file."""
|
| | with open(file_path, 'r', encoding=encoding) as csvfile:
|
| | sample = csvfile.read(4096)
|
| |
|
| |
|
| | for delimiter in [',', ';', '\t', '|']:
|
| | sniffer = csv.Sniffer()
|
| | try:
|
| | if delimiter in sample:
|
| | dialect = sniffer.sniff(sample, delimiters=delimiter)
|
| | return dialect.delimiter
|
| | except:
|
| | continue
|
| |
|
| |
|
| | return ','
|
| |
|
| | @staticmethod
|
| | def preprocess_csv(file_path: str) -> Tuple[pd.DataFrame, Dict[str, Any]]:
|
| | """
|
| | Preprocess a CSV file with automatic encoding and delimiter detection.
|
| | Returns the DataFrame and metadata about the preprocessing.
|
| | """
|
| |
|
| | try:
|
| | encoding = CSVHelpers.detect_encoding(file_path)
|
| | except:
|
| | encoding = 'utf-8'
|
| |
|
| |
|
| | try:
|
| | delimiter = CSVHelpers.detect_delimiter(file_path, encoding)
|
| | except:
|
| | delimiter = ','
|
| |
|
| |
|
| | df = pd.read_csv(file_path, encoding=encoding, delimiter=delimiter, low_memory=False)
|
| |
|
| |
|
| | metadata = {
|
| | "original_shape": df.shape,
|
| | "encoding": encoding,
|
| | "delimiter": delimiter,
|
| | "columns": list(df.columns),
|
| | "dtypes": {col: str(dtype) for col, dtype in df.dtypes.items()}
|
| | }
|
| |
|
| |
|
| | missing_counts = df.isna().sum()
|
| | metadata["missing_values"] = {col: int(count) for col, count in missing_counts.items() if count > 0}
|
| |
|
| |
|
| | duplicates = df.duplicated().sum()
|
| | metadata["duplicate_rows"] = int(duplicates)
|
| |
|
| | return df, metadata
|
| |
|
| | @staticmethod
|
| | def infer_column_types(df: pd.DataFrame) -> Dict[str, str]:
|
| | """
|
| | Infer semantic types of columns (beyond pandas dtypes).
|
| | Examples: date, categorical, numeric, text, etc.
|
| | """
|
| | column_types = {}
|
| |
|
| | for column in df.columns:
|
| |
|
| | if df[column].isna().all():
|
| | column_types[column] = "unknown"
|
| | continue
|
| |
|
| |
|
| | dtype = df[column].dtype
|
| |
|
| |
|
| | if pd.api.types.is_datetime64_dtype(df[column]):
|
| | column_types[column] = "datetime"
|
| |
|
| |
|
| | elif dtype == 'object':
|
| | try:
|
| |
|
| | sample = df[column].dropna().head(10)
|
| | pd.to_datetime(sample)
|
| | column_types[column] = "potential_datetime"
|
| | except:
|
| |
|
| | unique_ratio = df[column].nunique() / len(df)
|
| | if unique_ratio < 0.1:
|
| | column_types[column] = "categorical"
|
| | else:
|
| | column_types[column] = "text"
|
| |
|
| |
|
| | elif pd.api.types.is_numeric_dtype(dtype):
|
| |
|
| | if df[column].nunique() == len(df) and df[column].min() >= 0:
|
| | column_types[column] = "id"
|
| |
|
| | elif df[column].nunique() <= 2:
|
| | column_types[column] = "binary"
|
| |
|
| | elif pd.api.types.is_integer_dtype(dtype):
|
| | column_types[column] = "integer"
|
| | else:
|
| | column_types[column] = "float"
|
| |
|
| |
|
| | elif pd.api.types.is_bool_dtype(dtype):
|
| | column_types[column] = "boolean"
|
| |
|
| |
|
| | else:
|
| | column_types[column] = "unknown"
|
| |
|
| | return column_types
|
| |
|
| | @staticmethod
|
| | def suggest_visualizations(df: pd.DataFrame) -> List[Dict[str, Any]]:
|
| | """
|
| | Suggest appropriate visualizations based on data types.
|
| | Returns a list of visualization suggestions.
|
| | """
|
| | suggestions = []
|
| | column_types = CSVHelpers.infer_column_types(df)
|
| | numeric_columns = [col for col, type in column_types.items()
|
| | if type in ["integer", "float"]]
|
| | categorical_columns = [col for col, type in column_types.items()
|
| | if type in ["categorical", "binary"]]
|
| | datetime_columns = [col for col, type in column_types.items()
|
| | if type in ["datetime", "potential_datetime"]]
|
| |
|
| |
|
| | for col in numeric_columns[:3]:
|
| | suggestions.append({
|
| | "chart_type": "histogram",
|
| | "column": col,
|
| | "title": f"Distribution of {col}"
|
| | })
|
| |
|
| |
|
| | for col in categorical_columns[:3]:
|
| | suggestions.append({
|
| | "chart_type": "bar",
|
| | "x_column": col,
|
| | "y_column": "count",
|
| | "title": f"Count by {col}"
|
| | })
|
| |
|
| |
|
| | if datetime_columns and numeric_columns:
|
| | suggestions.append({
|
| | "chart_type": "line",
|
| | "x_column": datetime_columns[0],
|
| | "y_column": numeric_columns[0],
|
| | "title": f"{numeric_columns[0]} over Time"
|
| | })
|
| |
|
| |
|
| | if len(numeric_columns) >= 2:
|
| | suggestions.append({
|
| | "chart_type": "scatter",
|
| | "x_column": numeric_columns[0],
|
| | "y_column": numeric_columns[1],
|
| | "title": f"{numeric_columns[1]} vs {numeric_columns[0]}"
|
| | })
|
| |
|
| | return suggestions
|
| |
|