| | import pandas as pd |
| | import numpy as np |
| | from typing import Dict, List, Tuple, Any, Optional |
| | from utils.logger import setup_logger |
| | import io |
| |
|
| | logger = setup_logger(__name__) |
| |
|
| | class DataProcessor: |
| | """Core data processing class for handling file operations and data analysis""" |
| | |
| | def __init__(self): |
| | self.supported_formats = ['csv', 'xlsx', 'xls'] |
| | |
| | def read_file(self, file_content: bytes, filename: str) -> pd.DataFrame: |
| | """ |
| | Read uploaded file and return pandas DataFrame |
| | |
| | Args: |
| | file_content: File content as bytes |
| | filename: Name of the uploaded file |
| | |
| | Returns: |
| | pandas DataFrame |
| | |
| | Raises: |
| | ValueError: If file format is not supported |
| | Exception: If file cannot be read |
| | """ |
| | try: |
| | file_extension = filename.lower().split('.')[-1] |
| | |
| | if file_extension not in self.supported_formats: |
| | raise ValueError(f"Unsupported file format: {file_extension}") |
| | |
| | if file_extension == 'csv': |
| | |
| | for encoding in ['utf-8', 'latin-1', 'cp1252']: |
| | try: |
| | df = pd.read_csv(io.BytesIO(file_content), encoding=encoding) |
| | logger.info(f"Successfully read CSV file with {encoding} encoding") |
| | break |
| | except UnicodeDecodeError: |
| | continue |
| | else: |
| | raise ValueError("Could not decode CSV file with any supported encoding") |
| | |
| | elif file_extension in ['xlsx', 'xls']: |
| | df = pd.read_excel(io.BytesIO(file_content)) |
| | logger.info(f"Successfully read Excel file: {filename}") |
| | |
| | logger.info(f"Loaded dataset with shape: {df.shape}") |
| | return df |
| | |
| | except Exception as e: |
| | logger.error(f"Error reading file {filename}: {str(e)}") |
| | raise |
| | |
| | def analyze_data_quality(self, df: pd.DataFrame) -> Dict[str, Any]: |
| | """ |
| | Analyze data quality and return comprehensive statistics |
| | |
| | Args: |
| | df: pandas DataFrame to analyze |
| | |
| | Returns: |
| | Dictionary containing data quality metrics |
| | """ |
| | analysis = { |
| | 'shape': df.shape, |
| | 'columns': list(df.columns), |
| | 'dtypes': df.dtypes.to_dict(), |
| | 'missing_values': {}, |
| | 'duplicates': df.duplicated().sum(), |
| | 'column_stats': {} |
| | } |
| | |
| | |
| | for col in df.columns: |
| | col_analysis = { |
| | 'dtype': str(df[col].dtype), |
| | 'missing_count': df[col].isnull().sum(), |
| | 'missing_percentage': (df[col].isnull().sum() / len(df)) * 100, |
| | 'unique_values': df[col].nunique(), |
| | 'sample_values': df[col].dropna().head(5).tolist() |
| | } |
| | |
| | |
| | if df[col].dtype in ['int64', 'float64']: |
| | col_analysis.update({ |
| | 'mean': df[col].mean(), |
| | 'median': df[col].median(), |
| | 'std': df[col].std(), |
| | 'min': df[col].min(), |
| | 'max': df[col].max(), |
| | 'outliers': self._detect_outliers(df[col]) |
| | }) |
| | elif df[col].dtype == 'object': |
| | col_analysis.update({ |
| | 'most_common': df[col].value_counts().head(3).to_dict(), |
| | 'avg_length': df[col].astype(str).str.len().mean() |
| | }) |
| | |
| | analysis['column_stats'][col] = col_analysis |
| | analysis['missing_values'][col] = col_analysis['missing_count'] |
| | |
| | logger.info(f"Data quality analysis completed for {df.shape[0]} rows, {df.shape[1]} columns") |
| | return analysis |
| | |
| | def _detect_outliers(self, series: pd.Series) -> int: |
| | """ |
| | Detect outliers using IQR method |
| | |
| | Args: |
| | series: pandas Series with numeric data |
| | |
| | Returns: |
| | Number of outliers detected |
| | """ |
| | if series.dtype not in ['int64', 'float64']: |
| | return 0 |
| | |
| | Q1 = series.quantile(0.25) |
| | Q3 = series.quantile(0.75) |
| | IQR = Q3 - Q1 |
| | lower_bound = Q1 - 1.5 * IQR |
| | upper_bound = Q3 + 1.5 * IQR |
| | |
| | outliers = ((series < lower_bound) | (series > upper_bound)).sum() |
| | return outliers |
| | |
| | def detect_data_types(self, df: pd.DataFrame) -> Dict[str, str]: |
| | """ |
| | Detect and suggest optimal data types for each column |
| | |
| | Args: |
| | df: pandas DataFrame |
| | |
| | Returns: |
| | Dictionary mapping column names to suggested data types |
| | """ |
| | suggestions = {} |
| | |
| | for col in df.columns: |
| | current_type = str(df[col].dtype) |
| | |
| | |
| | if current_type in ['int64', 'float64', 'bool', 'datetime64[ns]']: |
| | suggestions[col] = current_type |
| | continue |
| | |
| | |
| | non_null_series = df[col].dropna() |
| | |
| | if len(non_null_series) == 0: |
| | suggestions[col] = 'object' |
| | continue |
| | |
| | |
| | try: |
| | pd.to_numeric(non_null_series) |
| | if non_null_series.astype(str).str.contains(r'\.').any(): |
| | suggestions[col] = 'float64' |
| | else: |
| | suggestions[col] = 'int64' |
| | continue |
| | except (ValueError, TypeError): |
| | pass |
| | |
| | |
| | try: |
| | pd.to_datetime(non_null_series) |
| | suggestions[col] = 'datetime64[ns]' |
| | continue |
| | except (ValueError, TypeError): |
| | pass |
| | |
| | |
| | unique_vals = set(non_null_series.astype(str).str.lower()) |
| | if unique_vals.issubset({'true', 'false', '1', '0', 'yes', 'no'}): |
| | suggestions[col] = 'bool' |
| | continue |
| | |
| | suggestions[col] = 'object' |
| | |
| | return suggestions |
| |
|