wanderlust.ai / src /wanderlust_ai /core /data_cleaning.py
BlakeL's picture
Upload 115 files
3f9f85b verified
"""
Robust Data Cleaning and Normalization for Travel Data
This module handles messy real-world travel data by providing intelligent
cleaning, normalization, and validation for inconsistent API responses.
"""
import re
import logging
import unicodedata
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Any, Union, Tuple, Set
from dataclasses import dataclass, field
from enum import Enum
import json
import difflib
from decimal import Decimal, InvalidOperation
from .error_categorization import CategorizedError, ErrorContext
class DataQualityLevel(str, Enum):
"""Quality levels for cleaned data."""
EXCELLENT = "excellent" # 95-100% confidence
GOOD = "good" # 80-94% confidence
FAIR = "fair" # 60-79% confidence
POOR = "poor" # 40-59% confidence
UNRELIABLE = "unreliable" # <40% confidence
class CleaningAction(str, Enum):
"""Types of cleaning actions performed."""
NORMALIZE = "normalize"
VALIDATE = "validate"
CORRECT = "correct"
DEDUPLICATE = "deduplicate"
ENRICH = "enrich"
FILTER = "filter"
STANDARDIZE = "standardize"
@dataclass
class CleaningResult:
"""Result of a data cleaning operation."""
original_value: Any
cleaned_value: Any
quality_score: float # 0.0 to 1.0
quality_level: DataQualityLevel
actions_taken: List[CleaningAction]
confidence: float # 0.0 to 1.0
warnings: List[str] = field(default_factory=list)
metadata: Dict[str, Any] = field(default_factory=dict)
@dataclass
class ValidationRule:
"""A validation rule for data cleaning."""
name: str
pattern: Optional[Union[str, re.Pattern]]
validator_func: Optional[callable]
correction_func: Optional[callable]
required: bool = True
error_message: str = "Validation failed"
warning_threshold: float = 0.8
class DataCleaner:
"""
Comprehensive data cleaner for travel data with intelligent
normalization and quality assessment.
"""
def __init__(self):
self.logger = logging.getLogger(__name__)
# Common patterns for travel data
self._patterns = self._build_patterns()
# Standard values for normalization
self._standards = self._build_standards()
# Validation rules
self._validation_rules = self._build_validation_rules()
# Data quality thresholds
self._quality_thresholds = {
DataQualityLevel.EXCELLENT: 0.95,
DataQualityLevel.GOOD: 0.80,
DataQualityLevel.FAIR: 0.60,
DataQualityLevel.POOR: 0.40,
DataQualityLevel.UNRELIABLE: 0.0
}
def _build_patterns(self) -> Dict[str, re.Pattern]:
"""Build regex patterns for common travel data formats."""
return {
# Flight patterns
"flight_number": re.compile(r'^[A-Z]{2,3}\d{1,4}[A-Z]?$'),
"airline_code": re.compile(r'^[A-Z]{2,3}$'),
"airport_code": re.compile(r'^[A-Z]{3}$'),
# Price patterns
"price": re.compile(r'^\$?[\d,]+\.?\d*$'),
"currency": re.compile(r'^[A-Z]{3}$'),
# Date/time patterns
"date_iso": re.compile(r'^\d{4}-\d{2}-\d{2}$'),
"datetime_iso": re.compile(r'^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}'),
"time_12h": re.compile(r'^(1[0-2]|[1-9]):[0-5]\d\s?(AM|PM)$', re.IGNORECASE),
"time_24h": re.compile(r'^([01]\d|2[0-3]):[0-5]\d$'),
# Location patterns
"city_name": re.compile(r'^[A-Za-z\s\-\'\.]+$'),
"country_code": re.compile(r'^[A-Z]{2,3}$'),
"postal_code": re.compile(r'^[A-Z0-9\s\-]{3,10}$'),
# Contact patterns
"email": re.compile(r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'),
"phone": re.compile(r'^[\+]?[\d\s\-\(\)]{10,15}$'),
# Duration patterns
"duration": re.compile(r'^(\d+h\s?)?(\d+m)?$', re.IGNORECASE),
"duration_minutes": re.compile(r'^\d+\s*(min|minutes?)?$', re.IGNORECASE)
}
def _build_standards(self) -> Dict[str, Any]:
"""Build standard values for data normalization."""
return {
# Common airline mappings
"airlines": {
"american": "American Airlines",
"aa": "American Airlines",
"delta": "Delta Air Lines",
"dl": "Delta Air Lines",
"united": "United Airlines",
"ua": "United Airlines",
"southwest": "Southwest Airlines",
"wn": "Southwest Airlines",
"jetblue": "JetBlue Airways",
"b6": "JetBlue Airways",
"alaska": "Alaska Airlines",
"as": "Alaska Airlines",
"spirit": "Spirit Airlines",
"nk": "Spirit Airlines",
"frontier": "Frontier Airlines",
"f9": "Frontier Airlines"
},
# Common airport mappings
"airports": {
"nyc": "JFK",
"new york": "JFK",
"manhattan": "JFK",
"los angeles": "LAX",
"la": "LAX",
"chicago": "ORD",
"miami": "MIA",
"boston": "BOS",
"san francisco": "SFO",
"seattle": "SEA",
"atlanta": "ATL",
"denver": "DEN",
"las vegas": "LAS"
},
# Currency standards
"currencies": {
"usd": "USD",
"dollar": "USD",
"$": "USD",
"euro": "EUR",
"€": "EUR",
"pound": "GBP",
"£": "GBP"
},
# Time zone standards
"timezones": {
"est": "America/New_York",
"pst": "America/Los_Angeles",
"cst": "America/Chicago",
"mst": "America/Denver",
"utc": "UTC",
"gmt": "UTC"
}
}
def _build_validation_rules(self) -> Dict[str, ValidationRule]:
"""Build validation rules for different data types."""
return {
"flight_number": ValidationRule(
name="flight_number",
pattern=self._patterns["flight_number"],
validator_func=self._validate_flight_number,
correction_func=self._correct_flight_number,
error_message="Invalid flight number format"
),
"airport_code": ValidationRule(
name="airport_code",
pattern=self._patterns["airport_code"],
validator_func=self._validate_airport_code,
correction_func=self._correct_airport_code,
error_message="Invalid airport code"
),
"price": ValidationRule(
name="price",
pattern=self._patterns["price"],
validator_func=self._validate_price,
correction_func=self._correct_price,
error_message="Invalid price format"
),
"date": ValidationRule(
name="date",
pattern=None,
validator_func=self._validate_date,
correction_func=self._correct_date,
error_message="Invalid date format"
),
"email": ValidationRule(
name="email",
pattern=self._patterns["email"],
validator_func=self._validate_email,
correction_func=self._correct_email,
error_message="Invalid email format"
)
}
def clean_flight_data(self, raw_flight: Dict[str, Any]) -> Dict[str, Any]:
"""Clean and normalize flight data."""
cleaned_flight = {}
quality_scores = []
all_actions = []
warnings = []
# Clean each field
field_mappings = {
"airline": self.clean_airline,
"flight_number": self.clean_flight_number,
"departure_city": self.clean_location,
"arrival_city": self.clean_location,
"departure_airport": self.clean_airport_code,
"arrival_airport": self.clean_airport_code,
"departure_time": self.clean_datetime,
"arrival_time": self.clean_datetime,
"price": self.clean_price,
"currency": self.clean_currency,
"duration_minutes": self.clean_duration,
"stops": self.clean_stops,
"aircraft": self.clean_aircraft,
"booking_class": self.clean_booking_class
}
for field, cleaner_func in field_mappings.items():
if field in raw_flight:
result = cleaner_func(raw_flight[field])
cleaned_flight[field] = result.cleaned_value
quality_scores.append(result.quality_score)
all_actions.extend(result.actions_taken)
warnings.extend(result.warnings)
# Calculate overall quality
overall_quality = sum(quality_scores) / len(quality_scores) if quality_scores else 0.0
quality_level = self._get_quality_level(overall_quality)
# Add metadata
cleaned_flight["_quality_metadata"] = {
"overall_quality_score": overall_quality,
"quality_level": quality_level.value,
"cleaning_actions": list(set(all_actions)),
"warnings": warnings,
"fields_cleaned": len(quality_scores),
"timestamp": datetime.now().isoformat()
}
return cleaned_flight
def clean_airline(self, value: Any) -> CleaningResult:
"""Clean airline name or code."""
if not value:
return CleaningResult(
original_value=value,
cleaned_value=None,
quality_score=0.0,
quality_level=DataQualityLevel.UNRELIABLE,
actions_taken=[CleaningAction.FILTER],
confidence=0.0,
warnings=["Missing airline information"]
)
original = str(value).strip()
actions = []
warnings = []
# Normalize case and whitespace
normalized = re.sub(r'\s+', ' ', original.upper().strip())
actions.append(CleaningAction.NORMALIZE)
# Check if it's a standard airline
standards = self._standards["airlines"]
cleaned_value = normalized
# Try exact match first
if normalized in standards:
cleaned_value = standards[normalized]
actions.append(CleaningAction.STANDARDIZE)
else:
# Try fuzzy matching
best_match = self._fuzzy_match_airline(normalized)
if best_match:
cleaned_value = best_match
actions.append(CleaningAction.CORRECT)
warnings.append(f"Airline corrected from '{original}' to '{cleaned_value}'")
# Calculate quality score
quality_score = 0.9 if actions else 0.5
if warnings:
quality_score -= 0.2
return CleaningResult(
original_value=original,
cleaned_value=cleaned_value,
quality_score=quality_score,
quality_level=self._get_quality_level(quality_score),
actions_taken=actions,
confidence=quality_score,
warnings=warnings
)
def clean_flight_number(self, value: Any) -> CleaningResult:
"""Clean flight number."""
if not value:
return CleaningResult(
original_value=value,
cleaned_value=None,
quality_score=0.0,
quality_level=DataQualityLevel.UNRELIABLE,
actions_taken=[CleaningAction.FILTER],
confidence=0.0,
warnings=["Missing flight number"]
)
original = str(value).strip().upper()
actions = []
warnings = []
# Remove common prefixes/suffixes
cleaned = re.sub(r'^(FLIGHT|FLT)\s*', '', original)
cleaned = re.sub(r'\s*(FLIGHT|FLT)$', '', cleaned)
if cleaned != original:
actions.append(CleaningAction.NORMALIZE)
# Validate format
if self._patterns["flight_number"].match(cleaned):
quality_score = 1.0
else:
# Try to correct common issues
corrected = self._correct_flight_number_format(cleaned)
if corrected != cleaned:
cleaned = corrected
actions.append(CleaningAction.CORRECT)
warnings.append(f"Flight number corrected from '{original}' to '{cleaned}'")
quality_score = 0.8
else:
quality_score = 0.3
warnings.append(f"Invalid flight number format: '{original}'")
return CleaningResult(
original_value=original,
cleaned_value=cleaned,
quality_score=quality_score,
quality_level=self._get_quality_level(quality_score),
actions_taken=actions,
confidence=quality_score,
warnings=warnings
)
def clean_location(self, value: Any) -> CleaningResult:
"""Clean location names."""
if not value:
return CleaningResult(
original_value=value,
cleaned_value=None,
quality_score=0.0,
quality_level=DataQualityLevel.UNRELIABLE,
actions_taken=[CleaningAction.FILTER],
confidence=0.0,
warnings=["Missing location information"]
)
original = str(value).strip()
actions = []
warnings = []
# Normalize Unicode and case
normalized = unicodedata.normalize('NFKD', original)
normalized = re.sub(r'[^\w\s\-\'\.]', '', normalized)
normalized = ' '.join(word.capitalize() for word in normalized.split())
if normalized != original:
actions.append(CleaningAction.NORMALIZE)
# Check for common city mappings
standards = self._standards["airports"]
cleaned_value = normalized
for key, airport_code in standards.items():
if key.lower() in normalized.lower():
cleaned_value = f"{normalized} ({airport_code})"
actions.append(CleaningAction.ENRICH)
break
quality_score = 0.9 if len(normalized.split()) >= 2 else 0.7
return CleaningResult(
original_value=original,
cleaned_value=cleaned_value,
quality_score=quality_score,
quality_level=self._get_quality_level(quality_score),
actions_taken=actions,
confidence=quality_score,
warnings=warnings
)
def clean_airport_code(self, value: Any) -> CleaningResult:
"""Clean airport codes."""
if not value:
return CleaningResult(
original_value=value,
cleaned_value=None,
quality_score=0.0,
quality_level=DataQualityLevel.UNRELIABLE,
actions_taken=[CleaningAction.FILTER],
confidence=0.0,
warnings=["Missing airport code"]
)
original = str(value).strip().upper()
actions = []
warnings = []
# Check format
if self._patterns["airport_code"].match(original):
quality_score = 1.0
cleaned_value = original
else:
# Try to find similar codes
corrected = self._find_similar_airport_code(original)
if corrected:
cleaned_value = corrected
actions.append(CleaningAction.CORRECT)
warnings.append(f"Airport code corrected from '{original}' to '{cleaned_value}'")
quality_score = 0.8
else:
cleaned_value = original
quality_score = 0.3
warnings.append(f"Invalid airport code: '{original}'")
return CleaningResult(
original_value=original,
cleaned_value=cleaned_value,
quality_score=quality_score,
quality_level=self._get_quality_level(quality_score),
actions_taken=actions,
confidence=quality_score,
warnings=warnings
)
def clean_price(self, value: Any) -> CleaningResult:
"""Clean price data."""
if not value:
return CleaningResult(
original_value=value,
cleaned_value=None,
quality_score=0.0,
quality_level=DataQualityLevel.UNRELIABLE,
actions_taken=[CleaningAction.FILTER],
confidence=0.0,
warnings=["Missing price information"]
)
original = str(value).strip()
actions = []
warnings = []
# Remove currency symbols and normalize
cleaned = re.sub(r'[^\d.,]', '', original)
cleaned = cleaned.replace(',', '')
try:
price = Decimal(cleaned)
if price < 0:
price = abs(price)
actions.append(CleaningAction.CORRECT)
warnings.append("Negative price corrected to positive")
# Check for reasonable price range
if price > 10000:
warnings.append("Unusually high price detected")
elif price < 10:
warnings.append("Unusually low price detected")
quality_score = 0.9
if warnings:
quality_score -= 0.2
return CleaningResult(
original_value=original,
cleaned_value=float(price),
quality_score=quality_score,
quality_level=self._get_quality_level(quality_score),
actions_taken=actions,
confidence=quality_score,
warnings=warnings
)
except (InvalidOperation, ValueError):
return CleaningResult(
original_value=original,
cleaned_value=None,
quality_score=0.0,
quality_level=DataQualityLevel.UNRELIABLE,
actions_taken=[CleaningAction.FILTER],
confidence=0.0,
warnings=[f"Invalid price format: '{original}'"]
)
def clean_datetime(self, value: Any) -> CleaningResult:
"""Clean datetime data."""
if not value:
return CleaningResult(
original_value=value,
cleaned_value=None,
quality_score=0.0,
quality_level=DataQualityLevel.UNRELIABLE,
actions_taken=[CleaningAction.FILTER],
confidence=0.0,
warnings=["Missing datetime information"]
)
original = str(value).strip()
actions = []
warnings = []
# Try to parse the datetime
parsed_dt = self._parse_datetime(original)
if parsed_dt:
# Check for reasonable date range
now = datetime.now()
if parsed_dt < now - timedelta(days=365):
warnings.append("Date is more than a year in the past")
elif parsed_dt > now + timedelta(days=365):
warnings.append("Date is more than a year in the future")
quality_score = 0.9
if warnings:
quality_score -= 0.2
return CleaningResult(
original_value=original,
cleaned_value=parsed_dt.isoformat(),
quality_score=quality_score,
quality_level=self._get_quality_level(quality_score),
actions_taken=actions,
confidence=quality_score,
warnings=warnings
)
else:
return CleaningResult(
original_value=original,
cleaned_value=None,
quality_score=0.0,
quality_level=DataQualityLevel.UNRELIABLE,
actions_taken=[CleaningAction.FILTER],
confidence=0.0,
warnings=[f"Invalid datetime format: '{original}'"]
)
def clean_duration(self, value: Any) -> CleaningResult:
"""Clean duration data."""
if not value:
return CleaningResult(
original_value=value,
cleaned_value=None,
quality_score=0.0,
quality_level=DataQualityLevel.UNRELIABLE,
actions_taken=[CleaningAction.FILTER],
confidence=0.0,
warnings=["Missing duration information"]
)
original = str(value).strip()
actions = []
warnings = []
# Parse duration
minutes = self._parse_duration_to_minutes(original)
if minutes is not None:
# Check for reasonable duration
if minutes > 1440: # More than 24 hours
warnings.append("Unusually long duration detected")
elif minutes < 30:
warnings.append("Unusually short duration detected")
quality_score = 0.9
if warnings:
quality_score -= 0.2
return CleaningResult(
original_value=original,
cleaned_value=minutes,
quality_score=quality_score,
quality_level=self._get_quality_level(quality_score),
actions_taken=actions,
confidence=quality_score,
warnings=warnings
)
else:
return CleaningResult(
original_value=original,
cleaned_value=None,
quality_score=0.0,
quality_level=DataQualityLevel.UNRELIABLE,
actions_taken=[CleaningAction.FILTER],
confidence=0.0,
warnings=[f"Invalid duration format: '{original}'"]
)
def clean_stops(self, value: Any) -> CleaningResult:
"""Clean number of stops."""
if value is None:
return CleaningResult(
original_value=value,
cleaned_value=0,
quality_score=0.8,
quality_level=DataQualityLevel.GOOD,
actions_taken=[CleaningAction.CORRECT],
confidence=0.8,
warnings=["Assumed 0 stops for missing data"]
)
try:
stops = int(str(value))
if stops < 0:
stops = 0
actions = [CleaningAction.CORRECT]
warnings = ["Negative stops corrected to 0"]
elif stops > 5:
actions = []
warnings = ["Unusually high number of stops"]
else:
actions = []
warnings = []
quality_score = 0.9
if warnings:
quality_score -= 0.2
return CleaningResult(
original_value=value,
cleaned_value=stops,
quality_score=quality_score,
quality_level=self._get_quality_level(quality_score),
actions_taken=actions,
confidence=quality_score,
warnings=warnings
)
except (ValueError, TypeError):
return CleaningResult(
original_value=value,
cleaned_value=0,
quality_score=0.5,
quality_level=DataQualityLevel.POOR,
actions_taken=[CleaningAction.CORRECT],
confidence=0.5,
warnings=[f"Invalid stops format: '{value}', assumed 0"]
)
def clean_aircraft(self, value: Any) -> CleaningResult:
"""Clean aircraft information."""
if not value:
return CleaningResult(
original_value=value,
cleaned_value=None,
quality_score=0.0,
quality_level=DataQualityLevel.UNRELIABLE,
actions_taken=[CleaningAction.FILTER],
confidence=0.0,
warnings=["Missing aircraft information"]
)
original = str(value).strip()
actions = []
# Normalize aircraft codes
normalized = original.upper()
# Common aircraft mappings
aircraft_mappings = {
"737": "Boeing 737",
"747": "Boeing 747",
"777": "Boeing 777",
"787": "Boeing 787",
"A320": "Airbus A320",
"A321": "Airbus A321",
"A330": "Airbus A330",
"A350": "Airbus A350"
}
cleaned_value = normalized
for code, full_name in aircraft_mappings.items():
if code in normalized:
cleaned_value = full_name
actions.append(CleaningAction.STANDARDIZE)
break
return CleaningResult(
original_value=original,
cleaned_value=cleaned_value,
quality_score=0.8,
quality_level=DataQualityLevel.GOOD,
actions_taken=actions,
confidence=0.8
)
def clean_booking_class(self, value: Any) -> CleaningResult:
"""Clean booking class information."""
if not value:
return CleaningResult(
original_value=value,
cleaned_value="Economy",
quality_score=0.7,
quality_level=DataQualityLevel.GOOD,
actions_taken=[CleaningAction.CORRECT],
confidence=0.7,
warnings=["Assumed Economy class for missing data"]
)
original = str(value).strip().upper()
# Standard booking class mappings
class_mappings = {
"Y": "Economy",
"B": "Economy",
"M": "Economy",
"H": "Economy",
"K": "Economy",
"L": "Economy",
"V": "Economy",
"N": "Economy",
"Q": "Economy",
"T": "Economy",
"W": "Premium Economy",
"S": "Premium Economy",
"C": "Business",
"D": "Business",
"I": "Business",
"J": "Business",
"Z": "Business",
"F": "First",
"A": "First",
"P": "First"
}
cleaned_value = class_mappings.get(original, original)
actions = [CleaningAction.STANDARDIZE] if cleaned_value != original else []
return CleaningResult(
original_value=original,
cleaned_value=cleaned_value,
quality_score=0.9,
quality_level=DataQualityLevel.EXCELLENT,
actions_taken=actions,
confidence=0.9
)
def clean_currency(self, value: Any) -> CleaningResult:
"""Clean currency information."""
if not value:
return CleaningResult(
original_value=value,
cleaned_value="USD",
quality_score=0.7,
quality_level=DataQualityLevel.GOOD,
actions_taken=[CleaningAction.CORRECT],
confidence=0.7,
warnings=["Assumed USD for missing currency"]
)
original = str(value).strip().upper()
# Check standards
standards = self._standards["currencies"]
cleaned_value = standards.get(original, original)
actions = [CleaningAction.STANDARDIZE] if cleaned_value != original else []
return CleaningResult(
original_value=original,
cleaned_value=cleaned_value,
quality_score=0.9,
quality_level=DataQualityLevel.EXCELLENT,
actions_taken=actions,
confidence=0.9
)
# Helper methods
def _get_quality_level(self, score: float) -> DataQualityLevel:
"""Convert quality score to quality level."""
for level, threshold in self._quality_thresholds.items():
if score >= threshold:
return level
return DataQualityLevel.UNRELIABLE
def _fuzzy_match_airline(self, airline: str) -> Optional[str]:
"""Find fuzzy match for airline name."""
standards = self._standards["airlines"]
# Try exact substring match
for key, value in standards.items():
if key in airline.lower() or airline.lower() in key:
return value
# Try fuzzy string matching
best_match = difflib.get_close_matches(airline, standards.keys(), n=1, cutoff=0.6)
if best_match:
return standards[best_match[0]]
return None
def _correct_flight_number_format(self, flight_number: str) -> str:
"""Attempt to correct flight number format."""
# Remove spaces
corrected = flight_number.replace(' ', '')
# Add missing airline code if just numbers
if corrected.isdigit():
corrected = f"XX{corrected}" # Generic code
return corrected
def _find_similar_airport_code(self, code: str) -> Optional[str]:
"""Find similar airport code."""
# Common airport codes for fuzzy matching
common_codes = [
"JFK", "LAX", "ORD", "DFW", "DEN", "SFO", "SEA", "LAS", "MIA", "BOS",
"ATL", "PHX", "MCO", "DTW", "MSP", "CLT", "EWR", "PHL", "LGA", "BWI"
]
if len(code) == 3 and code.isalpha():
# Try fuzzy matching
best_match = difflib.get_close_matches(code, common_codes, n=1, cutoff=0.6)
if best_match:
return best_match[0]
return None
def _parse_datetime(self, value: str) -> Optional[datetime]:
"""Parse various datetime formats."""
formats = [
"%Y-%m-%d",
"%Y-%m-%dT%H:%M:%S",
"%Y-%m-%dT%H:%M:%SZ",
"%m/%d/%Y",
"%d/%m/%Y",
"%Y-%m-%d %H:%M:%S",
"%m/%d/%Y %H:%M",
"%d/%m/%Y %H:%M"
]
for fmt in formats:
try:
return datetime.strptime(value, fmt)
except ValueError:
continue
return None
def _parse_duration_to_minutes(self, value: str) -> Optional[int]:
"""Parse duration string to minutes."""
# Handle various formats: "2h 30m", "150m", "2.5h", etc.
# Extract hours and minutes
hours = 0
minutes = 0
# Look for hours
hour_match = re.search(r'(\d+(?:\.\d+)?)\s*h', value, re.IGNORECASE)
if hour_match:
hours = int(float(hour_match.group(1)))
# Look for minutes
min_match = re.search(r'(\d+)\s*m', value, re.IGNORECASE)
if min_match:
minutes = int(min_match.group(1))
# If no explicit format, try to parse as total minutes
if not hour_match and not min_match:
try:
# Try to extract just numbers
numbers = re.findall(r'\d+', value)
if numbers:
return int(numbers[0])
except ValueError:
pass
return hours * 60 + minutes if hours > 0 or minutes > 0 else None
def _validate_flight_number(self, value: str) -> bool:
"""Validate flight number format."""
return bool(self._patterns["flight_number"].match(value))
def _validate_airport_code(self, value: str) -> bool:
"""Validate airport code format."""
return bool(self._patterns["airport_code"].match(value))
def _validate_price(self, value: str) -> bool:
"""Validate price format."""
try:
cleaned = re.sub(r'[^\d.,]', '', str(value))
Decimal(cleaned.replace(',', ''))
return True
except (InvalidOperation, ValueError):
return False
def _validate_date(self, value: str) -> bool:
"""Validate date format."""
return self._parse_datetime(value) is not None
def _validate_email(self, value: str) -> bool:
"""Validate email format."""
return bool(self._patterns["email"].match(value))
def _correct_flight_number(self, value: str) -> str:
"""Correct flight number format."""
return self._correct_flight_number_format(value)
def _correct_airport_code(self, value: str) -> str:
"""Correct airport code."""
corrected = self._find_similar_airport_code(value)
return corrected if corrected else value
def _correct_price(self, value: str) -> str:
"""Correct price format."""
try:
cleaned = re.sub(r'[^\d.,]', '', str(value))
price = Decimal(cleaned.replace(',', ''))
return str(price)
except (InvalidOperation, ValueError):
return value
def _correct_date(self, value: str) -> str:
"""Correct date format."""
parsed = self._parse_datetime(value)
return parsed.isoformat() if parsed else value
def _correct_email(self, value: str) -> str:
"""Correct email format."""
return value.lower().strip()
# Global data cleaner instance
_global_data_cleaner: Optional[DataCleaner] = None
def get_global_data_cleaner() -> DataCleaner:
"""Get the global data cleaner instance."""
global _global_data_cleaner
if _global_data_cleaner is None:
_global_data_cleaner = DataCleaner()
return _global_data_cleaner
def clean_travel_data(data: Dict[str, Any], data_type: str = "flight") -> Dict[str, Any]:
"""Convenience function to clean travel data."""
cleaner = get_global_data_cleaner()
if data_type == "flight":
return cleaner.clean_flight_data(data)
else:
# For other data types, implement similar methods
return data