Spaces:
Sleeping
Sleeping
| """ | |
| Robust Data Cleaning and Normalization for Travel Data | |
| This module handles messy real-world travel data by providing intelligent | |
| cleaning, normalization, and validation for inconsistent API responses. | |
| """ | |
| import re | |
| import logging | |
| import unicodedata | |
| from datetime import datetime, timedelta | |
| from typing import Dict, List, Optional, Any, Union, Tuple, Set | |
| from dataclasses import dataclass, field | |
| from enum import Enum | |
| import json | |
| import difflib | |
| from decimal import Decimal, InvalidOperation | |
| from .error_categorization import CategorizedError, ErrorContext | |
| class DataQualityLevel(str, Enum): | |
| """Quality levels for cleaned data.""" | |
| EXCELLENT = "excellent" # 95-100% confidence | |
| GOOD = "good" # 80-94% confidence | |
| FAIR = "fair" # 60-79% confidence | |
| POOR = "poor" # 40-59% confidence | |
| UNRELIABLE = "unreliable" # <40% confidence | |
| class CleaningAction(str, Enum): | |
| """Types of cleaning actions performed.""" | |
| NORMALIZE = "normalize" | |
| VALIDATE = "validate" | |
| CORRECT = "correct" | |
| DEDUPLICATE = "deduplicate" | |
| ENRICH = "enrich" | |
| FILTER = "filter" | |
| STANDARDIZE = "standardize" | |
| class CleaningResult: | |
| """Result of a data cleaning operation.""" | |
| original_value: Any | |
| cleaned_value: Any | |
| quality_score: float # 0.0 to 1.0 | |
| quality_level: DataQualityLevel | |
| actions_taken: List[CleaningAction] | |
| confidence: float # 0.0 to 1.0 | |
| warnings: List[str] = field(default_factory=list) | |
| metadata: Dict[str, Any] = field(default_factory=dict) | |
| class ValidationRule: | |
| """A validation rule for data cleaning.""" | |
| name: str | |
| pattern: Optional[Union[str, re.Pattern]] | |
| validator_func: Optional[callable] | |
| correction_func: Optional[callable] | |
| required: bool = True | |
| error_message: str = "Validation failed" | |
| warning_threshold: float = 0.8 | |
| class DataCleaner: | |
| """ | |
| Comprehensive data cleaner for travel data with intelligent | |
| normalization and quality assessment. | |
| """ | |
| def __init__(self): | |
| self.logger = logging.getLogger(__name__) | |
| # Common patterns for travel data | |
| self._patterns = self._build_patterns() | |
| # Standard values for normalization | |
| self._standards = self._build_standards() | |
| # Validation rules | |
| self._validation_rules = self._build_validation_rules() | |
| # Data quality thresholds | |
| self._quality_thresholds = { | |
| DataQualityLevel.EXCELLENT: 0.95, | |
| DataQualityLevel.GOOD: 0.80, | |
| DataQualityLevel.FAIR: 0.60, | |
| DataQualityLevel.POOR: 0.40, | |
| DataQualityLevel.UNRELIABLE: 0.0 | |
| } | |
| def _build_patterns(self) -> Dict[str, re.Pattern]: | |
| """Build regex patterns for common travel data formats.""" | |
| return { | |
| # Flight patterns | |
| "flight_number": re.compile(r'^[A-Z]{2,3}\d{1,4}[A-Z]?$'), | |
| "airline_code": re.compile(r'^[A-Z]{2,3}$'), | |
| "airport_code": re.compile(r'^[A-Z]{3}$'), | |
| # Price patterns | |
| "price": re.compile(r'^\$?[\d,]+\.?\d*$'), | |
| "currency": re.compile(r'^[A-Z]{3}$'), | |
| # Date/time patterns | |
| "date_iso": re.compile(r'^\d{4}-\d{2}-\d{2}$'), | |
| "datetime_iso": re.compile(r'^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}'), | |
| "time_12h": re.compile(r'^(1[0-2]|[1-9]):[0-5]\d\s?(AM|PM)$', re.IGNORECASE), | |
| "time_24h": re.compile(r'^([01]\d|2[0-3]):[0-5]\d$'), | |
| # Location patterns | |
| "city_name": re.compile(r'^[A-Za-z\s\-\'\.]+$'), | |
| "country_code": re.compile(r'^[A-Z]{2,3}$'), | |
| "postal_code": re.compile(r'^[A-Z0-9\s\-]{3,10}$'), | |
| # Contact patterns | |
| "email": re.compile(r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'), | |
| "phone": re.compile(r'^[\+]?[\d\s\-\(\)]{10,15}$'), | |
| # Duration patterns | |
| "duration": re.compile(r'^(\d+h\s?)?(\d+m)?$', re.IGNORECASE), | |
| "duration_minutes": re.compile(r'^\d+\s*(min|minutes?)?$', re.IGNORECASE) | |
| } | |
| def _build_standards(self) -> Dict[str, Any]: | |
| """Build standard values for data normalization.""" | |
| return { | |
| # Common airline mappings | |
| "airlines": { | |
| "american": "American Airlines", | |
| "aa": "American Airlines", | |
| "delta": "Delta Air Lines", | |
| "dl": "Delta Air Lines", | |
| "united": "United Airlines", | |
| "ua": "United Airlines", | |
| "southwest": "Southwest Airlines", | |
| "wn": "Southwest Airlines", | |
| "jetblue": "JetBlue Airways", | |
| "b6": "JetBlue Airways", | |
| "alaska": "Alaska Airlines", | |
| "as": "Alaska Airlines", | |
| "spirit": "Spirit Airlines", | |
| "nk": "Spirit Airlines", | |
| "frontier": "Frontier Airlines", | |
| "f9": "Frontier Airlines" | |
| }, | |
| # Common airport mappings | |
| "airports": { | |
| "nyc": "JFK", | |
| "new york": "JFK", | |
| "manhattan": "JFK", | |
| "los angeles": "LAX", | |
| "la": "LAX", | |
| "chicago": "ORD", | |
| "miami": "MIA", | |
| "boston": "BOS", | |
| "san francisco": "SFO", | |
| "seattle": "SEA", | |
| "atlanta": "ATL", | |
| "denver": "DEN", | |
| "las vegas": "LAS" | |
| }, | |
| # Currency standards | |
| "currencies": { | |
| "usd": "USD", | |
| "dollar": "USD", | |
| "$": "USD", | |
| "euro": "EUR", | |
| "€": "EUR", | |
| "pound": "GBP", | |
| "£": "GBP" | |
| }, | |
| # Time zone standards | |
| "timezones": { | |
| "est": "America/New_York", | |
| "pst": "America/Los_Angeles", | |
| "cst": "America/Chicago", | |
| "mst": "America/Denver", | |
| "utc": "UTC", | |
| "gmt": "UTC" | |
| } | |
| } | |
| def _build_validation_rules(self) -> Dict[str, ValidationRule]: | |
| """Build validation rules for different data types.""" | |
| return { | |
| "flight_number": ValidationRule( | |
| name="flight_number", | |
| pattern=self._patterns["flight_number"], | |
| validator_func=self._validate_flight_number, | |
| correction_func=self._correct_flight_number, | |
| error_message="Invalid flight number format" | |
| ), | |
| "airport_code": ValidationRule( | |
| name="airport_code", | |
| pattern=self._patterns["airport_code"], | |
| validator_func=self._validate_airport_code, | |
| correction_func=self._correct_airport_code, | |
| error_message="Invalid airport code" | |
| ), | |
| "price": ValidationRule( | |
| name="price", | |
| pattern=self._patterns["price"], | |
| validator_func=self._validate_price, | |
| correction_func=self._correct_price, | |
| error_message="Invalid price format" | |
| ), | |
| "date": ValidationRule( | |
| name="date", | |
| pattern=None, | |
| validator_func=self._validate_date, | |
| correction_func=self._correct_date, | |
| error_message="Invalid date format" | |
| ), | |
| "email": ValidationRule( | |
| name="email", | |
| pattern=self._patterns["email"], | |
| validator_func=self._validate_email, | |
| correction_func=self._correct_email, | |
| error_message="Invalid email format" | |
| ) | |
| } | |
| def clean_flight_data(self, raw_flight: Dict[str, Any]) -> Dict[str, Any]: | |
| """Clean and normalize flight data.""" | |
| cleaned_flight = {} | |
| quality_scores = [] | |
| all_actions = [] | |
| warnings = [] | |
| # Clean each field | |
| field_mappings = { | |
| "airline": self.clean_airline, | |
| "flight_number": self.clean_flight_number, | |
| "departure_city": self.clean_location, | |
| "arrival_city": self.clean_location, | |
| "departure_airport": self.clean_airport_code, | |
| "arrival_airport": self.clean_airport_code, | |
| "departure_time": self.clean_datetime, | |
| "arrival_time": self.clean_datetime, | |
| "price": self.clean_price, | |
| "currency": self.clean_currency, | |
| "duration_minutes": self.clean_duration, | |
| "stops": self.clean_stops, | |
| "aircraft": self.clean_aircraft, | |
| "booking_class": self.clean_booking_class | |
| } | |
| for field, cleaner_func in field_mappings.items(): | |
| if field in raw_flight: | |
| result = cleaner_func(raw_flight[field]) | |
| cleaned_flight[field] = result.cleaned_value | |
| quality_scores.append(result.quality_score) | |
| all_actions.extend(result.actions_taken) | |
| warnings.extend(result.warnings) | |
| # Calculate overall quality | |
| overall_quality = sum(quality_scores) / len(quality_scores) if quality_scores else 0.0 | |
| quality_level = self._get_quality_level(overall_quality) | |
| # Add metadata | |
| cleaned_flight["_quality_metadata"] = { | |
| "overall_quality_score": overall_quality, | |
| "quality_level": quality_level.value, | |
| "cleaning_actions": list(set(all_actions)), | |
| "warnings": warnings, | |
| "fields_cleaned": len(quality_scores), | |
| "timestamp": datetime.now().isoformat() | |
| } | |
| return cleaned_flight | |
| def clean_airline(self, value: Any) -> CleaningResult: | |
| """Clean airline name or code.""" | |
| if not value: | |
| return CleaningResult( | |
| original_value=value, | |
| cleaned_value=None, | |
| quality_score=0.0, | |
| quality_level=DataQualityLevel.UNRELIABLE, | |
| actions_taken=[CleaningAction.FILTER], | |
| confidence=0.0, | |
| warnings=["Missing airline information"] | |
| ) | |
| original = str(value).strip() | |
| actions = [] | |
| warnings = [] | |
| # Normalize case and whitespace | |
| normalized = re.sub(r'\s+', ' ', original.upper().strip()) | |
| actions.append(CleaningAction.NORMALIZE) | |
| # Check if it's a standard airline | |
| standards = self._standards["airlines"] | |
| cleaned_value = normalized | |
| # Try exact match first | |
| if normalized in standards: | |
| cleaned_value = standards[normalized] | |
| actions.append(CleaningAction.STANDARDIZE) | |
| else: | |
| # Try fuzzy matching | |
| best_match = self._fuzzy_match_airline(normalized) | |
| if best_match: | |
| cleaned_value = best_match | |
| actions.append(CleaningAction.CORRECT) | |
| warnings.append(f"Airline corrected from '{original}' to '{cleaned_value}'") | |
| # Calculate quality score | |
| quality_score = 0.9 if actions else 0.5 | |
| if warnings: | |
| quality_score -= 0.2 | |
| return CleaningResult( | |
| original_value=original, | |
| cleaned_value=cleaned_value, | |
| quality_score=quality_score, | |
| quality_level=self._get_quality_level(quality_score), | |
| actions_taken=actions, | |
| confidence=quality_score, | |
| warnings=warnings | |
| ) | |
| def clean_flight_number(self, value: Any) -> CleaningResult: | |
| """Clean flight number.""" | |
| if not value: | |
| return CleaningResult( | |
| original_value=value, | |
| cleaned_value=None, | |
| quality_score=0.0, | |
| quality_level=DataQualityLevel.UNRELIABLE, | |
| actions_taken=[CleaningAction.FILTER], | |
| confidence=0.0, | |
| warnings=["Missing flight number"] | |
| ) | |
| original = str(value).strip().upper() | |
| actions = [] | |
| warnings = [] | |
| # Remove common prefixes/suffixes | |
| cleaned = re.sub(r'^(FLIGHT|FLT)\s*', '', original) | |
| cleaned = re.sub(r'\s*(FLIGHT|FLT)$', '', cleaned) | |
| if cleaned != original: | |
| actions.append(CleaningAction.NORMALIZE) | |
| # Validate format | |
| if self._patterns["flight_number"].match(cleaned): | |
| quality_score = 1.0 | |
| else: | |
| # Try to correct common issues | |
| corrected = self._correct_flight_number_format(cleaned) | |
| if corrected != cleaned: | |
| cleaned = corrected | |
| actions.append(CleaningAction.CORRECT) | |
| warnings.append(f"Flight number corrected from '{original}' to '{cleaned}'") | |
| quality_score = 0.8 | |
| else: | |
| quality_score = 0.3 | |
| warnings.append(f"Invalid flight number format: '{original}'") | |
| return CleaningResult( | |
| original_value=original, | |
| cleaned_value=cleaned, | |
| quality_score=quality_score, | |
| quality_level=self._get_quality_level(quality_score), | |
| actions_taken=actions, | |
| confidence=quality_score, | |
| warnings=warnings | |
| ) | |
| def clean_location(self, value: Any) -> CleaningResult: | |
| """Clean location names.""" | |
| if not value: | |
| return CleaningResult( | |
| original_value=value, | |
| cleaned_value=None, | |
| quality_score=0.0, | |
| quality_level=DataQualityLevel.UNRELIABLE, | |
| actions_taken=[CleaningAction.FILTER], | |
| confidence=0.0, | |
| warnings=["Missing location information"] | |
| ) | |
| original = str(value).strip() | |
| actions = [] | |
| warnings = [] | |
| # Normalize Unicode and case | |
| normalized = unicodedata.normalize('NFKD', original) | |
| normalized = re.sub(r'[^\w\s\-\'\.]', '', normalized) | |
| normalized = ' '.join(word.capitalize() for word in normalized.split()) | |
| if normalized != original: | |
| actions.append(CleaningAction.NORMALIZE) | |
| # Check for common city mappings | |
| standards = self._standards["airports"] | |
| cleaned_value = normalized | |
| for key, airport_code in standards.items(): | |
| if key.lower() in normalized.lower(): | |
| cleaned_value = f"{normalized} ({airport_code})" | |
| actions.append(CleaningAction.ENRICH) | |
| break | |
| quality_score = 0.9 if len(normalized.split()) >= 2 else 0.7 | |
| return CleaningResult( | |
| original_value=original, | |
| cleaned_value=cleaned_value, | |
| quality_score=quality_score, | |
| quality_level=self._get_quality_level(quality_score), | |
| actions_taken=actions, | |
| confidence=quality_score, | |
| warnings=warnings | |
| ) | |
| def clean_airport_code(self, value: Any) -> CleaningResult: | |
| """Clean airport codes.""" | |
| if not value: | |
| return CleaningResult( | |
| original_value=value, | |
| cleaned_value=None, | |
| quality_score=0.0, | |
| quality_level=DataQualityLevel.UNRELIABLE, | |
| actions_taken=[CleaningAction.FILTER], | |
| confidence=0.0, | |
| warnings=["Missing airport code"] | |
| ) | |
| original = str(value).strip().upper() | |
| actions = [] | |
| warnings = [] | |
| # Check format | |
| if self._patterns["airport_code"].match(original): | |
| quality_score = 1.0 | |
| cleaned_value = original | |
| else: | |
| # Try to find similar codes | |
| corrected = self._find_similar_airport_code(original) | |
| if corrected: | |
| cleaned_value = corrected | |
| actions.append(CleaningAction.CORRECT) | |
| warnings.append(f"Airport code corrected from '{original}' to '{cleaned_value}'") | |
| quality_score = 0.8 | |
| else: | |
| cleaned_value = original | |
| quality_score = 0.3 | |
| warnings.append(f"Invalid airport code: '{original}'") | |
| return CleaningResult( | |
| original_value=original, | |
| cleaned_value=cleaned_value, | |
| quality_score=quality_score, | |
| quality_level=self._get_quality_level(quality_score), | |
| actions_taken=actions, | |
| confidence=quality_score, | |
| warnings=warnings | |
| ) | |
| def clean_price(self, value: Any) -> CleaningResult: | |
| """Clean price data.""" | |
| if not value: | |
| return CleaningResult( | |
| original_value=value, | |
| cleaned_value=None, | |
| quality_score=0.0, | |
| quality_level=DataQualityLevel.UNRELIABLE, | |
| actions_taken=[CleaningAction.FILTER], | |
| confidence=0.0, | |
| warnings=["Missing price information"] | |
| ) | |
| original = str(value).strip() | |
| actions = [] | |
| warnings = [] | |
| # Remove currency symbols and normalize | |
| cleaned = re.sub(r'[^\d.,]', '', original) | |
| cleaned = cleaned.replace(',', '') | |
| try: | |
| price = Decimal(cleaned) | |
| if price < 0: | |
| price = abs(price) | |
| actions.append(CleaningAction.CORRECT) | |
| warnings.append("Negative price corrected to positive") | |
| # Check for reasonable price range | |
| if price > 10000: | |
| warnings.append("Unusually high price detected") | |
| elif price < 10: | |
| warnings.append("Unusually low price detected") | |
| quality_score = 0.9 | |
| if warnings: | |
| quality_score -= 0.2 | |
| return CleaningResult( | |
| original_value=original, | |
| cleaned_value=float(price), | |
| quality_score=quality_score, | |
| quality_level=self._get_quality_level(quality_score), | |
| actions_taken=actions, | |
| confidence=quality_score, | |
| warnings=warnings | |
| ) | |
| except (InvalidOperation, ValueError): | |
| return CleaningResult( | |
| original_value=original, | |
| cleaned_value=None, | |
| quality_score=0.0, | |
| quality_level=DataQualityLevel.UNRELIABLE, | |
| actions_taken=[CleaningAction.FILTER], | |
| confidence=0.0, | |
| warnings=[f"Invalid price format: '{original}'"] | |
| ) | |
| def clean_datetime(self, value: Any) -> CleaningResult: | |
| """Clean datetime data.""" | |
| if not value: | |
| return CleaningResult( | |
| original_value=value, | |
| cleaned_value=None, | |
| quality_score=0.0, | |
| quality_level=DataQualityLevel.UNRELIABLE, | |
| actions_taken=[CleaningAction.FILTER], | |
| confidence=0.0, | |
| warnings=["Missing datetime information"] | |
| ) | |
| original = str(value).strip() | |
| actions = [] | |
| warnings = [] | |
| # Try to parse the datetime | |
| parsed_dt = self._parse_datetime(original) | |
| if parsed_dt: | |
| # Check for reasonable date range | |
| now = datetime.now() | |
| if parsed_dt < now - timedelta(days=365): | |
| warnings.append("Date is more than a year in the past") | |
| elif parsed_dt > now + timedelta(days=365): | |
| warnings.append("Date is more than a year in the future") | |
| quality_score = 0.9 | |
| if warnings: | |
| quality_score -= 0.2 | |
| return CleaningResult( | |
| original_value=original, | |
| cleaned_value=parsed_dt.isoformat(), | |
| quality_score=quality_score, | |
| quality_level=self._get_quality_level(quality_score), | |
| actions_taken=actions, | |
| confidence=quality_score, | |
| warnings=warnings | |
| ) | |
| else: | |
| return CleaningResult( | |
| original_value=original, | |
| cleaned_value=None, | |
| quality_score=0.0, | |
| quality_level=DataQualityLevel.UNRELIABLE, | |
| actions_taken=[CleaningAction.FILTER], | |
| confidence=0.0, | |
| warnings=[f"Invalid datetime format: '{original}'"] | |
| ) | |
| def clean_duration(self, value: Any) -> CleaningResult: | |
| """Clean duration data.""" | |
| if not value: | |
| return CleaningResult( | |
| original_value=value, | |
| cleaned_value=None, | |
| quality_score=0.0, | |
| quality_level=DataQualityLevel.UNRELIABLE, | |
| actions_taken=[CleaningAction.FILTER], | |
| confidence=0.0, | |
| warnings=["Missing duration information"] | |
| ) | |
| original = str(value).strip() | |
| actions = [] | |
| warnings = [] | |
| # Parse duration | |
| minutes = self._parse_duration_to_minutes(original) | |
| if minutes is not None: | |
| # Check for reasonable duration | |
| if minutes > 1440: # More than 24 hours | |
| warnings.append("Unusually long duration detected") | |
| elif minutes < 30: | |
| warnings.append("Unusually short duration detected") | |
| quality_score = 0.9 | |
| if warnings: | |
| quality_score -= 0.2 | |
| return CleaningResult( | |
| original_value=original, | |
| cleaned_value=minutes, | |
| quality_score=quality_score, | |
| quality_level=self._get_quality_level(quality_score), | |
| actions_taken=actions, | |
| confidence=quality_score, | |
| warnings=warnings | |
| ) | |
| else: | |
| return CleaningResult( | |
| original_value=original, | |
| cleaned_value=None, | |
| quality_score=0.0, | |
| quality_level=DataQualityLevel.UNRELIABLE, | |
| actions_taken=[CleaningAction.FILTER], | |
| confidence=0.0, | |
| warnings=[f"Invalid duration format: '{original}'"] | |
| ) | |
| def clean_stops(self, value: Any) -> CleaningResult: | |
| """Clean number of stops.""" | |
| if value is None: | |
| return CleaningResult( | |
| original_value=value, | |
| cleaned_value=0, | |
| quality_score=0.8, | |
| quality_level=DataQualityLevel.GOOD, | |
| actions_taken=[CleaningAction.CORRECT], | |
| confidence=0.8, | |
| warnings=["Assumed 0 stops for missing data"] | |
| ) | |
| try: | |
| stops = int(str(value)) | |
| if stops < 0: | |
| stops = 0 | |
| actions = [CleaningAction.CORRECT] | |
| warnings = ["Negative stops corrected to 0"] | |
| elif stops > 5: | |
| actions = [] | |
| warnings = ["Unusually high number of stops"] | |
| else: | |
| actions = [] | |
| warnings = [] | |
| quality_score = 0.9 | |
| if warnings: | |
| quality_score -= 0.2 | |
| return CleaningResult( | |
| original_value=value, | |
| cleaned_value=stops, | |
| quality_score=quality_score, | |
| quality_level=self._get_quality_level(quality_score), | |
| actions_taken=actions, | |
| confidence=quality_score, | |
| warnings=warnings | |
| ) | |
| except (ValueError, TypeError): | |
| return CleaningResult( | |
| original_value=value, | |
| cleaned_value=0, | |
| quality_score=0.5, | |
| quality_level=DataQualityLevel.POOR, | |
| actions_taken=[CleaningAction.CORRECT], | |
| confidence=0.5, | |
| warnings=[f"Invalid stops format: '{value}', assumed 0"] | |
| ) | |
| def clean_aircraft(self, value: Any) -> CleaningResult: | |
| """Clean aircraft information.""" | |
| if not value: | |
| return CleaningResult( | |
| original_value=value, | |
| cleaned_value=None, | |
| quality_score=0.0, | |
| quality_level=DataQualityLevel.UNRELIABLE, | |
| actions_taken=[CleaningAction.FILTER], | |
| confidence=0.0, | |
| warnings=["Missing aircraft information"] | |
| ) | |
| original = str(value).strip() | |
| actions = [] | |
| # Normalize aircraft codes | |
| normalized = original.upper() | |
| # Common aircraft mappings | |
| aircraft_mappings = { | |
| "737": "Boeing 737", | |
| "747": "Boeing 747", | |
| "777": "Boeing 777", | |
| "787": "Boeing 787", | |
| "A320": "Airbus A320", | |
| "A321": "Airbus A321", | |
| "A330": "Airbus A330", | |
| "A350": "Airbus A350" | |
| } | |
| cleaned_value = normalized | |
| for code, full_name in aircraft_mappings.items(): | |
| if code in normalized: | |
| cleaned_value = full_name | |
| actions.append(CleaningAction.STANDARDIZE) | |
| break | |
| return CleaningResult( | |
| original_value=original, | |
| cleaned_value=cleaned_value, | |
| quality_score=0.8, | |
| quality_level=DataQualityLevel.GOOD, | |
| actions_taken=actions, | |
| confidence=0.8 | |
| ) | |
| def clean_booking_class(self, value: Any) -> CleaningResult: | |
| """Clean booking class information.""" | |
| if not value: | |
| return CleaningResult( | |
| original_value=value, | |
| cleaned_value="Economy", | |
| quality_score=0.7, | |
| quality_level=DataQualityLevel.GOOD, | |
| actions_taken=[CleaningAction.CORRECT], | |
| confidence=0.7, | |
| warnings=["Assumed Economy class for missing data"] | |
| ) | |
| original = str(value).strip().upper() | |
| # Standard booking class mappings | |
| class_mappings = { | |
| "Y": "Economy", | |
| "B": "Economy", | |
| "M": "Economy", | |
| "H": "Economy", | |
| "K": "Economy", | |
| "L": "Economy", | |
| "V": "Economy", | |
| "N": "Economy", | |
| "Q": "Economy", | |
| "T": "Economy", | |
| "W": "Premium Economy", | |
| "S": "Premium Economy", | |
| "C": "Business", | |
| "D": "Business", | |
| "I": "Business", | |
| "J": "Business", | |
| "Z": "Business", | |
| "F": "First", | |
| "A": "First", | |
| "P": "First" | |
| } | |
| cleaned_value = class_mappings.get(original, original) | |
| actions = [CleaningAction.STANDARDIZE] if cleaned_value != original else [] | |
| return CleaningResult( | |
| original_value=original, | |
| cleaned_value=cleaned_value, | |
| quality_score=0.9, | |
| quality_level=DataQualityLevel.EXCELLENT, | |
| actions_taken=actions, | |
| confidence=0.9 | |
| ) | |
| def clean_currency(self, value: Any) -> CleaningResult: | |
| """Clean currency information.""" | |
| if not value: | |
| return CleaningResult( | |
| original_value=value, | |
| cleaned_value="USD", | |
| quality_score=0.7, | |
| quality_level=DataQualityLevel.GOOD, | |
| actions_taken=[CleaningAction.CORRECT], | |
| confidence=0.7, | |
| warnings=["Assumed USD for missing currency"] | |
| ) | |
| original = str(value).strip().upper() | |
| # Check standards | |
| standards = self._standards["currencies"] | |
| cleaned_value = standards.get(original, original) | |
| actions = [CleaningAction.STANDARDIZE] if cleaned_value != original else [] | |
| return CleaningResult( | |
| original_value=original, | |
| cleaned_value=cleaned_value, | |
| quality_score=0.9, | |
| quality_level=DataQualityLevel.EXCELLENT, | |
| actions_taken=actions, | |
| confidence=0.9 | |
| ) | |
| # Helper methods | |
| def _get_quality_level(self, score: float) -> DataQualityLevel: | |
| """Convert quality score to quality level.""" | |
| for level, threshold in self._quality_thresholds.items(): | |
| if score >= threshold: | |
| return level | |
| return DataQualityLevel.UNRELIABLE | |
| def _fuzzy_match_airline(self, airline: str) -> Optional[str]: | |
| """Find fuzzy match for airline name.""" | |
| standards = self._standards["airlines"] | |
| # Try exact substring match | |
| for key, value in standards.items(): | |
| if key in airline.lower() or airline.lower() in key: | |
| return value | |
| # Try fuzzy string matching | |
| best_match = difflib.get_close_matches(airline, standards.keys(), n=1, cutoff=0.6) | |
| if best_match: | |
| return standards[best_match[0]] | |
| return None | |
| def _correct_flight_number_format(self, flight_number: str) -> str: | |
| """Attempt to correct flight number format.""" | |
| # Remove spaces | |
| corrected = flight_number.replace(' ', '') | |
| # Add missing airline code if just numbers | |
| if corrected.isdigit(): | |
| corrected = f"XX{corrected}" # Generic code | |
| return corrected | |
| def _find_similar_airport_code(self, code: str) -> Optional[str]: | |
| """Find similar airport code.""" | |
| # Common airport codes for fuzzy matching | |
| common_codes = [ | |
| "JFK", "LAX", "ORD", "DFW", "DEN", "SFO", "SEA", "LAS", "MIA", "BOS", | |
| "ATL", "PHX", "MCO", "DTW", "MSP", "CLT", "EWR", "PHL", "LGA", "BWI" | |
| ] | |
| if len(code) == 3 and code.isalpha(): | |
| # Try fuzzy matching | |
| best_match = difflib.get_close_matches(code, common_codes, n=1, cutoff=0.6) | |
| if best_match: | |
| return best_match[0] | |
| return None | |
| def _parse_datetime(self, value: str) -> Optional[datetime]: | |
| """Parse various datetime formats.""" | |
| formats = [ | |
| "%Y-%m-%d", | |
| "%Y-%m-%dT%H:%M:%S", | |
| "%Y-%m-%dT%H:%M:%SZ", | |
| "%m/%d/%Y", | |
| "%d/%m/%Y", | |
| "%Y-%m-%d %H:%M:%S", | |
| "%m/%d/%Y %H:%M", | |
| "%d/%m/%Y %H:%M" | |
| ] | |
| for fmt in formats: | |
| try: | |
| return datetime.strptime(value, fmt) | |
| except ValueError: | |
| continue | |
| return None | |
| def _parse_duration_to_minutes(self, value: str) -> Optional[int]: | |
| """Parse duration string to minutes.""" | |
| # Handle various formats: "2h 30m", "150m", "2.5h", etc. | |
| # Extract hours and minutes | |
| hours = 0 | |
| minutes = 0 | |
| # Look for hours | |
| hour_match = re.search(r'(\d+(?:\.\d+)?)\s*h', value, re.IGNORECASE) | |
| if hour_match: | |
| hours = int(float(hour_match.group(1))) | |
| # Look for minutes | |
| min_match = re.search(r'(\d+)\s*m', value, re.IGNORECASE) | |
| if min_match: | |
| minutes = int(min_match.group(1)) | |
| # If no explicit format, try to parse as total minutes | |
| if not hour_match and not min_match: | |
| try: | |
| # Try to extract just numbers | |
| numbers = re.findall(r'\d+', value) | |
| if numbers: | |
| return int(numbers[0]) | |
| except ValueError: | |
| pass | |
| return hours * 60 + minutes if hours > 0 or minutes > 0 else None | |
| def _validate_flight_number(self, value: str) -> bool: | |
| """Validate flight number format.""" | |
| return bool(self._patterns["flight_number"].match(value)) | |
| def _validate_airport_code(self, value: str) -> bool: | |
| """Validate airport code format.""" | |
| return bool(self._patterns["airport_code"].match(value)) | |
| def _validate_price(self, value: str) -> bool: | |
| """Validate price format.""" | |
| try: | |
| cleaned = re.sub(r'[^\d.,]', '', str(value)) | |
| Decimal(cleaned.replace(',', '')) | |
| return True | |
| except (InvalidOperation, ValueError): | |
| return False | |
| def _validate_date(self, value: str) -> bool: | |
| """Validate date format.""" | |
| return self._parse_datetime(value) is not None | |
| def _validate_email(self, value: str) -> bool: | |
| """Validate email format.""" | |
| return bool(self._patterns["email"].match(value)) | |
| def _correct_flight_number(self, value: str) -> str: | |
| """Correct flight number format.""" | |
| return self._correct_flight_number_format(value) | |
| def _correct_airport_code(self, value: str) -> str: | |
| """Correct airport code.""" | |
| corrected = self._find_similar_airport_code(value) | |
| return corrected if corrected else value | |
| def _correct_price(self, value: str) -> str: | |
| """Correct price format.""" | |
| try: | |
| cleaned = re.sub(r'[^\d.,]', '', str(value)) | |
| price = Decimal(cleaned.replace(',', '')) | |
| return str(price) | |
| except (InvalidOperation, ValueError): | |
| return value | |
| def _correct_date(self, value: str) -> str: | |
| """Correct date format.""" | |
| parsed = self._parse_datetime(value) | |
| return parsed.isoformat() if parsed else value | |
| def _correct_email(self, value: str) -> str: | |
| """Correct email format.""" | |
| return value.lower().strip() | |
| # Global data cleaner instance | |
| _global_data_cleaner: Optional[DataCleaner] = None | |
| def get_global_data_cleaner() -> DataCleaner: | |
| """Get the global data cleaner instance.""" | |
| global _global_data_cleaner | |
| if _global_data_cleaner is None: | |
| _global_data_cleaner = DataCleaner() | |
| return _global_data_cleaner | |
| def clean_travel_data(data: Dict[str, Any], data_type: str = "flight") -> Dict[str, Any]: | |
| """Convenience function to clean travel data.""" | |
| cleaner = get_global_data_cleaner() | |
| if data_type == "flight": | |
| return cleaner.clean_flight_data(data) | |
| else: | |
| # For other data types, implement similar methods | |
| return data | |