|
|
""" |
|
|
Data Validation Utilities |
|
|
|
|
|
Provides validation functions for schemas and generated data. |
|
|
""" |
|
|
|
|
|
import re |
|
|
from typing import Any, Dict, List, Optional, Union |
|
|
from datetime import datetime |
|
|
|
|
|
|
|
|
class SchemaValidator: |
|
|
"""Validates schema definitions.""" |
|
|
|
|
|
@staticmethod |
|
|
def validate_schema(schema: Dict[str, Any]) -> Dict[str, Any]: |
|
|
"""Validate a complete schema definition.""" |
|
|
errors = [] |
|
|
warnings = [] |
|
|
|
|
|
|
|
|
if 'name' not in schema: |
|
|
errors.append("Schema must have a 'name' field") |
|
|
if 'fields' not in schema: |
|
|
errors.append("Schema must have a 'fields' field") |
|
|
|
|
|
if 'fields' in schema: |
|
|
if not isinstance(schema['fields'], list): |
|
|
errors.append("'fields' must be a list") |
|
|
else: |
|
|
|
|
|
for i, field in enumerate(schema['fields']): |
|
|
field_errors = SchemaValidator.validate_field(field, i) |
|
|
errors.extend(field_errors) |
|
|
|
|
|
return { |
|
|
'valid': len(errors) == 0, |
|
|
'errors': errors, |
|
|
'warnings': warnings |
|
|
} |
|
|
|
|
|
@staticmethod |
|
|
def validate_field(field: Dict[str, Any], index: int) -> List[str]: |
|
|
"""Validate a single field definition.""" |
|
|
errors = [] |
|
|
|
|
|
|
|
|
required_props = ['name', 'type'] |
|
|
for prop in required_props: |
|
|
if prop not in field: |
|
|
errors.append(f"Field {index}: Missing required property '{prop}'") |
|
|
|
|
|
if 'name' in field: |
|
|
if not isinstance(field['name'], str) or not field['name'].strip(): |
|
|
errors.append(f"Field {index}: 'name' must be a non-empty string") |
|
|
elif not re.match(r'^[a-zA-Z_][a-zA-Z0-9_]*$', field['name']): |
|
|
errors.append(f"Field {index}: 'name' must be a valid identifier") |
|
|
|
|
|
if 'type' in field: |
|
|
valid_types = ['text', 'integer', 'float', 'date', 'boolean', 'categorical'] |
|
|
if field['type'] not in valid_types: |
|
|
errors.append(f"Field {index}: 'type' must be one of {valid_types}") |
|
|
|
|
|
|
|
|
if 'constraints' in field: |
|
|
constraint_errors = SchemaValidator.validate_constraints(field['constraints'], field.get('type'), index) |
|
|
errors.extend(constraint_errors) |
|
|
|
|
|
return errors |
|
|
|
|
|
@staticmethod |
|
|
def validate_constraints(constraints: Dict[str, Any], field_type: str, field_index: int) -> List[str]: |
|
|
"""Validate field constraints.""" |
|
|
errors = [] |
|
|
|
|
|
|
|
|
if field_type in ['integer', 'float']: |
|
|
if 'min_val' in constraints and 'max_val' in constraints: |
|
|
if constraints['min_val'] > constraints['max_val']: |
|
|
errors.append(f"Field {field_index}: min_val cannot be greater than max_val") |
|
|
|
|
|
|
|
|
if field_type == 'date': |
|
|
if 'start_date' in constraints: |
|
|
try: |
|
|
datetime.strptime(constraints['start_date'], '%Y-%m-%d') |
|
|
except ValueError: |
|
|
errors.append(f"Field {field_index}: start_date must be in YYYY-MM-DD format") |
|
|
|
|
|
if 'end_date' in constraints: |
|
|
try: |
|
|
datetime.strptime(constraints['end_date'], '%Y-%m-%d') |
|
|
except ValueError: |
|
|
errors.append(f"Field {field_index}: end_date must be in YYYY-MM-DD format") |
|
|
|
|
|
if 'start_date' in constraints and 'end_date' in constraints: |
|
|
try: |
|
|
start = datetime.strptime(constraints['start_date'], '%Y-%m-%d') |
|
|
end = datetime.strptime(constraints['end_date'], '%Y-%m-%d') |
|
|
if start > end: |
|
|
errors.append(f"Field {field_index}: start_date cannot be after end_date") |
|
|
except ValueError: |
|
|
pass |
|
|
|
|
|
|
|
|
if field_type == 'categorical': |
|
|
if 'categories' in constraints: |
|
|
if not isinstance(constraints['categories'], list) or len(constraints['categories']) == 0: |
|
|
errors.append(f"Field {field_index}: categories must be a non-empty list") |
|
|
|
|
|
|
|
|
if 'null_percentage' in constraints: |
|
|
null_pct = constraints['null_percentage'] |
|
|
if not isinstance(null_pct, (int, float)) or null_pct < 0 or null_pct > 100: |
|
|
errors.append(f"Field {field_index}: null_percentage must be between 0 and 100") |
|
|
|
|
|
return errors |
|
|
|
|
|
|
|
|
class DataValidator: |
|
|
"""Validates generated data against schema.""" |
|
|
|
|
|
@staticmethod |
|
|
def validate_data(data: List[Dict[str, Any]], schema: Dict[str, Any]) -> Dict[str, Any]: |
|
|
"""Validate generated data against schema.""" |
|
|
errors = [] |
|
|
warnings = [] |
|
|
|
|
|
if not data: |
|
|
warnings.append("No data generated") |
|
|
return {'valid': True, 'errors': errors, 'warnings': warnings} |
|
|
|
|
|
|
|
|
schema_fields = {field['name'] for field in schema.get('fields', [])} |
|
|
data_fields = set(data[0].keys()) if data else set() |
|
|
|
|
|
missing_fields = schema_fields - data_fields |
|
|
extra_fields = data_fields - schema_fields |
|
|
|
|
|
if missing_fields: |
|
|
errors.append(f"Missing fields in data: {missing_fields}") |
|
|
if extra_fields: |
|
|
warnings.append(f"Extra fields in data: {extra_fields}") |
|
|
|
|
|
|
|
|
for i, record in enumerate(data): |
|
|
record_errors = DataValidator.validate_record(record, schema, i) |
|
|
errors.extend(record_errors) |
|
|
|
|
|
return { |
|
|
'valid': len(errors) == 0, |
|
|
'errors': errors, |
|
|
'warnings': warnings, |
|
|
'record_count': len(data) |
|
|
} |
|
|
|
|
|
@staticmethod |
|
|
def validate_record(record: Dict[str, Any], schema: Dict[str, Any], record_index: int) -> List[str]: |
|
|
"""Validate a single record against schema.""" |
|
|
errors = [] |
|
|
|
|
|
for field in schema.get('fields', []): |
|
|
field_name = field['name'] |
|
|
field_type = field['type'] |
|
|
constraints = field.get('constraints', {}) |
|
|
|
|
|
if field_name not in record: |
|
|
errors.append(f"Record {record_index}: Missing field '{field_name}'") |
|
|
continue |
|
|
|
|
|
value = record[field_name] |
|
|
|
|
|
|
|
|
if value is None: |
|
|
if constraints.get('null_percentage', 0) == 0: |
|
|
errors.append(f"Record {record_index}: Field '{field_name}' cannot be null") |
|
|
continue |
|
|
|
|
|
|
|
|
type_errors = DataValidator.validate_value_type(value, field_type, field_name, record_index) |
|
|
errors.extend(type_errors) |
|
|
|
|
|
|
|
|
constraint_errors = DataValidator.validate_value_constraints( |
|
|
value, constraints, field_name, record_index |
|
|
) |
|
|
errors.extend(constraint_errors) |
|
|
|
|
|
return errors |
|
|
|
|
|
@staticmethod |
|
|
def validate_value_type(value: Any, expected_type: str, field_name: str, record_index: int) -> List[str]: |
|
|
"""Validate that a value matches the expected type.""" |
|
|
errors = [] |
|
|
|
|
|
if expected_type == 'integer': |
|
|
if not isinstance(value, int): |
|
|
errors.append(f"Record {record_index}: Field '{field_name}' must be an integer, got {type(value).__name__}") |
|
|
elif expected_type == 'float': |
|
|
if not isinstance(value, (int, float)): |
|
|
errors.append(f"Record {record_index}: Field '{field_name}' must be a number, got {type(value).__name__}") |
|
|
elif expected_type == 'text': |
|
|
if not isinstance(value, str): |
|
|
errors.append(f"Record {record_index}: Field '{field_name}' must be a string, got {type(value).__name__}") |
|
|
elif expected_type == 'date': |
|
|
if not isinstance(value, (str, datetime)): |
|
|
errors.append(f"Record {record_index}: Field '{field_name}' must be a date, got {type(value).__name__}") |
|
|
elif expected_type == 'boolean': |
|
|
if not isinstance(value, bool): |
|
|
errors.append(f"Record {record_index}: Field '{field_name}' must be a boolean, got {type(value).__name__}") |
|
|
|
|
|
return errors |
|
|
|
|
|
@staticmethod |
|
|
def validate_value_constraints(value: Any, constraints: Dict[str, Any], |
|
|
field_name: str, record_index: int) -> List[str]: |
|
|
"""Validate that a value meets the specified constraints.""" |
|
|
errors = [] |
|
|
|
|
|
|
|
|
if isinstance(value, (int, float)): |
|
|
if 'min_val' in constraints and value < constraints['min_val']: |
|
|
errors.append(f"Record {record_index}: Field '{field_name}' value {value} is below minimum {constraints['min_val']}") |
|
|
if 'max_val' in constraints and value > constraints['max_val']: |
|
|
errors.append(f"Record {record_index}: Field '{field_name}' value {value} is above maximum {constraints['max_val']}") |
|
|
|
|
|
|
|
|
if isinstance(value, str): |
|
|
if 'min_length' in constraints and len(value) < constraints['min_length']: |
|
|
errors.append(f"Record {record_index}: Field '{field_name}' length {len(value)} is below minimum {constraints['min_length']}") |
|
|
if 'max_length' in constraints and len(value) > constraints['max_length']: |
|
|
errors.append(f"Record {record_index}: Field '{field_name}' length {len(value)} is above maximum {constraints['max_length']}") |
|
|
|
|
|
|
|
|
if 'categories' in constraints: |
|
|
if value not in constraints['categories']: |
|
|
errors.append(f"Record {record_index}: Field '{field_name}' value '{value}' is not in allowed categories {constraints['categories']}") |
|
|
|
|
|
|
|
|
if 'pattern' in constraints and isinstance(value, str): |
|
|
if not re.match(constraints['pattern'], value): |
|
|
errors.append(f"Record {record_index}: Field '{field_name}' value '{value}' does not match pattern '{constraints['pattern']}'") |
|
|
|
|
|
return errors |
|
|
|
|
|
@staticmethod |
|
|
def generate_quality_report(data: List[Dict[str, Any]], schema: Dict[str, Any]) -> Dict[str, Any]: |
|
|
"""Generate a data quality report.""" |
|
|
if not data: |
|
|
return {'error': 'No data to analyze'} |
|
|
|
|
|
report = { |
|
|
'total_records': len(data), |
|
|
'total_fields': len(schema.get('fields', [])), |
|
|
'field_analysis': {}, |
|
|
'overall_quality_score': 0.0 |
|
|
} |
|
|
|
|
|
quality_scores = [] |
|
|
|
|
|
for field in schema.get('fields', []): |
|
|
field_name = field['name'] |
|
|
field_analysis = DataValidator.analyze_field(data, field_name) |
|
|
report['field_analysis'][field_name] = field_analysis |
|
|
quality_scores.append(field_analysis['quality_score']) |
|
|
|
|
|
if quality_scores: |
|
|
report['overall_quality_score'] = sum(quality_scores) / len(quality_scores) |
|
|
|
|
|
return report |
|
|
|
|
|
@staticmethod |
|
|
def analyze_field(data: List[Dict[str, Any]], field_name: str) -> Dict[str, Any]: |
|
|
"""Analyze a specific field in the dataset.""" |
|
|
values = [record.get(field_name) for record in data] |
|
|
|
|
|
|
|
|
total_count = len(values) |
|
|
null_count = sum(1 for v in values if v is None) |
|
|
non_null_count = total_count - null_count |
|
|
null_percentage = (null_count / total_count * 100) if total_count > 0 else 0 |
|
|
|
|
|
|
|
|
unique_values = len(set(v for v in values if v is not None)) |
|
|
uniqueness_ratio = (unique_values / non_null_count) if non_null_count > 0 else 0 |
|
|
|
|
|
|
|
|
non_null_values = [v for v in values if v is not None] |
|
|
if non_null_values: |
|
|
primary_type = type(non_null_values[0]).__name__ |
|
|
type_consistency = sum(1 for v in non_null_values if type(v).__name__ == primary_type) / len(non_null_values) |
|
|
else: |
|
|
primary_type = 'None' |
|
|
type_consistency = 1.0 |
|
|
|
|
|
|
|
|
quality_score = ( |
|
|
(1 - null_percentage / 100) * 40 + |
|
|
uniqueness_ratio * 30 + |
|
|
type_consistency * 30 |
|
|
) * 100 |
|
|
|
|
|
return { |
|
|
'total_count': total_count, |
|
|
'null_count': null_count, |
|
|
'non_null_count': non_null_count, |
|
|
'null_percentage': round(null_percentage, 2), |
|
|
'unique_values': unique_values, |
|
|
'uniqueness_ratio': round(uniqueness_ratio, 3), |
|
|
'primary_type': primary_type, |
|
|
'type_consistency': round(type_consistency, 3), |
|
|
'quality_score': round(quality_score, 2) |
|
|
} |
|
|
|