midlajvalappil's picture
Upload 14 files
6aa09c0 verified
"""
Data validation utilities for AI-Based Data Cleaner
"""
import pandas as pd
import numpy as np
from typing import Dict, List, Tuple, Any, Optional
import re
from utils.logger import setup_logger
logger = setup_logger(__name__)
class DataValidator:
"""Comprehensive data validation class"""
def __init__(self):
self.validation_rules = {
'email': r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$',
'phone': r'^[\+]?[1-9][\d]{0,15}$',
'url': r'^https?:\/\/(www\.)?[-a-zA-Z0-9@:%._\+~#=]{1,256}\.[a-zA-Z0-9()]{1,6}\b([-a-zA-Z0-9()@:%_\+.~#?&//=]*)$',
'date': r'^\d{4}-\d{2}-\d{2}$|^\d{2}\/\d{2}\/\d{4}$|^\d{2}-\d{2}-\d{4}$'
}
def validate_dataframe(self, df: pd.DataFrame) -> Dict[str, Any]:
"""
Comprehensive validation of a DataFrame
Args:
df: pandas DataFrame to validate
Returns:
Dictionary containing validation results
"""
validation_report = {
'is_valid': True,
'issues': [],
'warnings': [],
'column_validations': {},
'data_quality_score': 0.0
}
try:
# Basic structure validation
self._validate_structure(df, validation_report)
# Column-wise validation
for column in df.columns:
column_validation = self._validate_column(df[column], column)
validation_report['column_validations'][column] = column_validation
# Aggregate issues
if column_validation['issues']:
validation_report['issues'].extend(column_validation['issues'])
if column_validation['warnings']:
validation_report['warnings'].extend(column_validation['warnings'])
# Calculate overall data quality score
validation_report['data_quality_score'] = self._calculate_quality_score(df, validation_report)
# Determine if data is valid
validation_report['is_valid'] = len(validation_report['issues']) == 0
logger.info(f"Data validation completed. Quality score: {validation_report['data_quality_score']:.2f}")
except Exception as e:
logger.error(f"Error during data validation: {str(e)}")
validation_report['issues'].append(f"Validation error: {str(e)}")
validation_report['is_valid'] = False
return validation_report
def _validate_structure(self, df: pd.DataFrame, report: Dict):
"""Validate basic DataFrame structure"""
# Check if DataFrame is empty
if df.empty:
report['issues'].append("DataFrame is empty")
return
# Check for unnamed columns
unnamed_cols = [col for col in df.columns if str(col).startswith('Unnamed:')]
if unnamed_cols:
report['warnings'].append(f"Found {len(unnamed_cols)} unnamed columns: {unnamed_cols}")
# Check for duplicate column names
duplicate_cols = df.columns[df.columns.duplicated()].tolist()
if duplicate_cols:
report['issues'].append(f"Duplicate column names found: {duplicate_cols}")
# Check for extremely wide datasets
if df.shape[1] > 1000:
report['warnings'].append(f"Dataset has {df.shape[1]} columns, which may impact performance")
# Check for extremely long datasets
if df.shape[0] > 1000000:
report['warnings'].append(f"Dataset has {df.shape[0]} rows, which may impact performance")
def _validate_column(self, series: pd.Series, column_name: str) -> Dict[str, Any]:
"""Validate individual column"""
validation = {
'column_name': column_name,
'data_type': str(series.dtype),
'issues': [],
'warnings': [],
'quality_metrics': {}
}
# Basic metrics
total_count = len(series)
null_count = series.isnull().sum()
null_percentage = (null_count / total_count) * 100 if total_count > 0 else 0
unique_count = series.nunique()
validation['quality_metrics'] = {
'total_count': total_count,
'null_count': null_count,
'null_percentage': null_percentage,
'unique_count': unique_count,
'completeness_score': 100 - null_percentage
}
# Check for high missing data
if null_percentage > 50:
validation['issues'].append(f"Column '{column_name}' has {null_percentage:.1f}% missing values")
elif null_percentage > 20:
validation['warnings'].append(f"Column '{column_name}' has {null_percentage:.1f}% missing values")
# Check for single value columns
if unique_count == 1 and null_count == 0:
validation['warnings'].append(f"Column '{column_name}' contains only one unique value")
# Type-specific validations
if series.dtype == 'object':
self._validate_text_column(series, column_name, validation)
elif series.dtype in ['int64', 'float64']:
self._validate_numeric_column(series, column_name, validation)
return validation
def _validate_text_column(self, series: pd.Series, column_name: str, validation: Dict):
"""Validate text/object columns"""
non_null_series = series.dropna()
if len(non_null_series) == 0:
return
# Check for mixed data types in object column
type_counts = {}
for value in non_null_series.head(100): # Sample for performance
value_type = type(value).__name__
type_counts[value_type] = type_counts.get(value_type, 0) + 1
if len(type_counts) > 1:
validation['warnings'].append(f"Column '{column_name}' contains mixed data types: {type_counts}")
# Check string lengths for consistency
if all(isinstance(x, str) for x in non_null_series.head(100)):
lengths = non_null_series.astype(str).str.len()
avg_length = lengths.mean()
std_length = lengths.std()
validation['quality_metrics']['avg_string_length'] = avg_length
validation['quality_metrics']['string_length_std'] = std_length
# Check for extremely long strings
max_length = lengths.max()
if max_length > 1000:
validation['warnings'].append(f"Column '{column_name}' contains very long strings (max: {max_length} chars)")
# Check for potential data format patterns
self._detect_data_patterns(non_null_series, column_name, validation)
def _validate_numeric_column(self, series: pd.Series, column_name: str, validation: Dict):
"""Validate numeric columns"""
non_null_series = series.dropna()
if len(non_null_series) == 0:
return
# Basic numeric statistics
validation['quality_metrics'].update({
'mean': non_null_series.mean(),
'median': non_null_series.median(),
'std': non_null_series.std(),
'min': non_null_series.min(),
'max': non_null_series.max()
})
# Check for outliers
Q1 = non_null_series.quantile(0.25)
Q3 = non_null_series.quantile(0.75)
IQR = Q3 - Q1
lower_bound = Q1 - 1.5 * IQR
upper_bound = Q3 + 1.5 * IQR
outliers = ((non_null_series < lower_bound) | (non_null_series > upper_bound)).sum()
outlier_percentage = (outliers / len(non_null_series)) * 100
validation['quality_metrics']['outlier_count'] = outliers
validation['quality_metrics']['outlier_percentage'] = outlier_percentage
if outlier_percentage > 10:
validation['warnings'].append(f"Column '{column_name}' has {outlier_percentage:.1f}% outliers")
# Check for negative values where they might not make sense
if (non_null_series < 0).any():
negative_count = (non_null_series < 0).sum()
validation['quality_metrics']['negative_count'] = negative_count
# Common columns that shouldn't have negative values
if any(keyword in column_name.lower() for keyword in ['age', 'price', 'cost', 'amount', 'quantity', 'count']):
validation['warnings'].append(f"Column '{column_name}' contains {negative_count} negative values")
def _detect_data_patterns(self, series: pd.Series, column_name: str, validation: Dict):
"""Detect common data patterns in text columns"""
sample_values = series.head(50).astype(str)
# Check for email pattern
email_matches = sum(1 for val in sample_values if re.match(self.validation_rules['email'], val))
if email_matches > len(sample_values) * 0.8:
validation['quality_metrics']['detected_pattern'] = 'email'
# Validate all emails
invalid_emails = sum(1 for val in series.dropna().astype(str)
if not re.match(self.validation_rules['email'], val))
if invalid_emails > 0:
validation['warnings'].append(f"Column '{column_name}' appears to be emails but has {invalid_emails} invalid entries")
# Check for phone pattern
phone_matches = sum(1 for val in sample_values if re.match(self.validation_rules['phone'], val.replace(' ', '').replace('-', '').replace('(', '').replace(')', '')))
if phone_matches > len(sample_values) * 0.8:
validation['quality_metrics']['detected_pattern'] = 'phone'
# Check for URL pattern
url_matches = sum(1 for val in sample_values if re.match(self.validation_rules['url'], val))
if url_matches > len(sample_values) * 0.8:
validation['quality_metrics']['detected_pattern'] = 'url'
# Check for date pattern
date_matches = sum(1 for val in sample_values if re.match(self.validation_rules['date'], val))
if date_matches > len(sample_values) * 0.8:
validation['quality_metrics']['detected_pattern'] = 'date'
def _calculate_quality_score(self, df: pd.DataFrame, validation_report: Dict) -> float:
"""Calculate overall data quality score (0-100)"""
if df.empty:
return 0.0
total_score = 0.0
weights = {
'completeness': 0.4, # 40% weight for data completeness
'consistency': 0.3, # 30% weight for data consistency
'validity': 0.2, # 20% weight for data validity
'uniqueness': 0.1 # 10% weight for uniqueness
}
# Completeness score (based on missing values)
total_cells = df.shape[0] * df.shape[1]
missing_cells = df.isnull().sum().sum()
completeness_score = ((total_cells - missing_cells) / total_cells) * 100 if total_cells > 0 else 0
# Consistency score (based on data type consistency and patterns)
consistency_score = 100.0
for col_validation in validation_report['column_validations'].values():
if 'mixed data types' in str(col_validation.get('warnings', [])):
consistency_score -= 10
# Validity score (based on detected issues)
validity_score = 100.0
issue_count = len(validation_report['issues'])
validity_score = max(0, validity_score - (issue_count * 10))
# Uniqueness score (based on duplicate rows)
duplicate_percentage = (df.duplicated().sum() / len(df)) * 100 if len(df) > 0 else 0
uniqueness_score = max(0, 100 - duplicate_percentage)
# Calculate weighted average
total_score = (
completeness_score * weights['completeness'] +
consistency_score * weights['consistency'] +
validity_score * weights['validity'] +
uniqueness_score * weights['uniqueness']
)
return round(total_score, 2)