""" Robust Input Validation for Travel Data This module provides comprehensive input validation that handles edge cases, impossible dates, invalid locations, extreme budgets, and other real-world input challenges. """ import re import logging from datetime import datetime, timedelta, date from typing import Dict, List, Optional, Any, Union, Tuple, Set from dataclasses import dataclass, field from enum import Enum import math from decimal import Decimal, InvalidOperation from .error_categorization import CategorizedError, ErrorContext class ValidationSeverity(str, Enum): """Severity levels for validation issues.""" INFO = "info" # Informational, no action needed WARNING = "warning" # Caution, but proceed with care ERROR = "error" # Issue found, correction needed CRITICAL = "critical" # Invalid input, must be corrected class ValidationAction(str, Enum): """Actions to take for validation issues.""" ACCEPT = "accept" # Accept as-is CORRECT = "correct" # Auto-correct the value SUGGEST = "suggest" # Suggest correction to user REJECT = "reject" # Reject the input entirely REQUIRE_CONFIRMATION = "require_confirmation" # Ask user to confirm @dataclass class ValidationIssue: """A validation issue found in input data.""" field_name: str issue_type: str severity: ValidationSeverity message: str suggested_value: Optional[Any] = None suggested_action: ValidationAction = ValidationAction.SUGGEST confidence: float = 0.8 # Confidence in the suggestion metadata: Dict[str, Any] = field(default_factory=dict) @dataclass class ValidationResult: """Result of input validation.""" is_valid: bool issues: List[ValidationIssue] corrected_data: Dict[str, Any] confidence_score: float # Overall confidence in the data warnings: List[str] = field(default_factory=list) suggestions: List[str] = field(default_factory=list) metadata: Dict[str, Any] = field(default_factory=dict) class InputValidator: """ Comprehensive input validator for travel data that handles edge cases and provides intelligent suggestions for corrections. """ def __init__(self): self.logger = logging.getLogger(__name__) # Validation rules and thresholds self._rules = self._build_validation_rules() # Common patterns self._patterns = self._build_patterns() # Known locations and airports self._locations = self._build_location_database() # Price thresholds for different travel types self._price_thresholds = self._build_price_thresholds() def _build_validation_rules(self) -> Dict[str, Dict[str, Any]]: """Build validation rules for different data types.""" return { "dates": { "min_future_days": 0, # Minimum days in future for booking "max_future_days": 365, # Maximum days in future "min_stay_days": 1, # Minimum stay duration "max_stay_days": 365, # Maximum stay duration "min_advance_booking": 0, # Minimum advance booking time "max_advance_booking": 365 # Maximum advance booking time }, "locations": { "min_name_length": 2, # Minimum location name length "max_name_length": 100, # Maximum location name length "require_country": False, # Whether country is required "allow_partial": True # Allow partial matches }, "budgets": { "min_flight_price": 50, # Minimum reasonable flight price "max_flight_price": 50000, # Maximum reasonable flight price "min_hotel_price": 20, # Minimum reasonable hotel price per night "max_hotel_price": 5000, # Maximum reasonable hotel price per night "min_total_budget": 100, # Minimum total trip budget "max_total_budget": 100000 # Maximum total trip budget }, "passengers": { "min_adults": 1, # Minimum adults "max_adults": 20, # Maximum adults "min_children": 0, # Minimum children "max_children": 20, # Maximum children "min_infants": 0, # Minimum infants "max_infants": 10, # Maximum infants "max_total_passengers": 30 # Maximum total passengers } } def _build_patterns(self) -> Dict[str, re.Pattern]: """Build regex patterns for validation.""" return { "airport_code": re.compile(r'^[A-Z]{3}$'), "city_name": re.compile(r'^[A-Za-z\s\-\'\.]{2,100}$'), "country_code": re.compile(r'^[A-Z]{2,3}$'), "date_iso": re.compile(r'^\d{4}-\d{2}-\d{2}$'), "datetime_iso": re.compile(r'^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}'), "price": re.compile(r'^\$?[\d,]+\.?\d*$'), "email": re.compile(r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'), "phone": re.compile(r'^[\+]?[\d\s\-\(\)]{10,15}$') } def _build_location_database(self) -> Dict[str, Dict[str, Any]]: """Build database of known locations and airports.""" return { # Major US cities "new york": { "airports": ["JFK", "LGA", "EWR"], "country": "US", "timezone": "America/New_York", "alternatives": ["nyc", "manhattan", "brooklyn"] }, "los angeles": { "airports": ["LAX", "BUR", "SNA"], "country": "US", "timezone": "America/Los_Angeles", "alternatives": ["la", "hollywood", "beverly hills"] }, "chicago": { "airports": ["ORD", "MDW"], "country": "US", "timezone": "America/Chicago", "alternatives": ["chi"] }, "miami": { "airports": ["MIA", "FLL"], "country": "US", "timezone": "America/New_York", "alternatives": ["south beach", "miami beach"] }, "boston": { "airports": ["BOS"], "country": "US", "timezone": "America/New_York", "alternatives": ["bos"] }, "san francisco": { "airports": ["SFO", "OAK", "SJC"], "country": "US", "timezone": "America/Los_Angeles", "alternatives": ["sf", "san fran", "bay area"] }, "seattle": { "airports": ["SEA"], "country": "US", "timezone": "America/Los_Angeles", "alternatives": ["sea"] }, "atlanta": { "airports": ["ATL"], "country": "US", "timezone": "America/New_York", "alternatives": ["atl"] }, "denver": { "airports": ["DEN"], "country": "US", "timezone": "America/Denver", "alternatives": ["den"] }, "las vegas": { "airports": ["LAS"], "country": "US", "timezone": "America/Los_Angeles", "alternatives": ["vegas", "las vegas"] }, # International cities "london": { "airports": ["LHR", "LGW", "STN", "LTN"], "country": "GB", "timezone": "Europe/London", "alternatives": ["london uk", "london england"] }, "paris": { "airports": ["CDG", "ORY"], "country": "FR", "timezone": "Europe/Paris", "alternatives": ["paris france"] }, "tokyo": { "airports": ["NRT", "HND"], "country": "JP", "timezone": "Asia/Tokyo", "alternatives": ["tokyo japan"] }, "sydney": { "airports": ["SYD"], "country": "AU", "timezone": "Australia/Sydney", "alternatives": ["sydney australia"] } } def _build_price_thresholds(self) -> Dict[str, Dict[str, float]]: """Build price thresholds for different travel scenarios.""" return { "domestic_flights": { "budget": {"min": 100, "max": 800}, "premium": {"min": 800, "max": 2000}, "luxury": {"min": 2000, "max": 10000} }, "international_flights": { "budget": {"min": 300, "max": 1200}, "premium": {"min": 1200, "max": 5000}, "luxury": {"min": 5000, "max": 25000} }, "hotels": { "budget": {"min": 50, "max": 150}, "mid_range": {"min": 150, "max": 400}, "luxury": {"min": 400, "max": 2000} } } def validate_travel_request(self, request_data: Dict[str, Any]) -> ValidationResult: """Validate a complete travel request.""" issues = [] corrected_data = request_data.copy() warnings = [] suggestions = [] # Validate dates date_issues = self._validate_dates(request_data) issues.extend(date_issues) # Validate locations location_issues = self._validate_locations(request_data) issues.extend(location_issues) # Validate budget budget_issues = self._validate_budget(request_data) issues.extend(budget_issues) # Validate passengers passenger_issues = self._validate_passengers(request_data) issues.extend(passenger_issues) # Validate contact information contact_issues = self._validate_contact_info(request_data) issues.extend(contact_issues) # Apply corrections for issue in issues: if issue.suggested_action == ValidationAction.CORRECT and issue.suggested_value is not None: corrected_data[issue.field_name] = issue.suggested_value # Calculate overall confidence confidence_score = self._calculate_confidence_score(issues, len(request_data)) # Determine if valid critical_issues = [i for i in issues if i.severity == ValidationSeverity.CRITICAL] error_issues = [i for i in issues if i.severity == ValidationSeverity.ERROR] is_valid = len(critical_issues) == 0 and len(error_issues) <= 1 # Allow one error # Generate warnings and suggestions for issue in issues: if issue.severity == ValidationSeverity.WARNING: warnings.append(f"{issue.field_name}: {issue.message}") if issue.suggested_action == ValidationAction.SUGGEST: suggestions.append(f"Consider {issue.message}") return ValidationResult( is_valid=is_valid, issues=issues, corrected_data=corrected_data, confidence_score=confidence_score, warnings=warnings, suggestions=suggestions, metadata={ "total_fields": len(request_data), "issues_found": len(issues), "critical_issues": len(critical_issues), "error_issues": len(error_issues), "warning_issues": len([i for i in issues if i.severity == ValidationSeverity.WARNING]) } ) def _validate_dates(self, data: Dict[str, Any]) -> List[ValidationIssue]: """Validate date-related fields.""" issues = [] rules = self._rules["dates"] # Validate departure date if "departure_date" in data: departure_issues = self._validate_single_date( "departure_date", data["departure_date"], rules ) issues.extend(departure_issues) # Validate return date if "return_date" in data: return_issues = self._validate_single_date( "return_date", data["return_date"], rules ) issues.extend(return_issues) # Validate date consistency if "departure_date" in data and "return_date" in data: consistency_issues = self._validate_date_consistency( data["departure_date"], data["return_date"] ) issues.extend(consistency_issues) return issues def _validate_single_date(self, field_name: str, value: Any, rules: Dict[str, Any]) -> List[ValidationIssue]: """Validate a single date field.""" issues = [] if not value: issues.append(ValidationIssue( field_name=field_name, issue_type="missing_date", severity=ValidationSeverity.ERROR, message="Date is required", suggested_action=ValidationAction.REJECT )) return issues # Parse the date parsed_date = self._parse_date(value) if not parsed_date: issues.append(ValidationIssue( field_name=field_name, issue_type="invalid_date_format", severity=ValidationSeverity.ERROR, message=f"Invalid date format: {value}", suggested_action=ValidationAction.SUGGEST )) return issues today = date.today() days_from_today = (parsed_date - today).days # Check if date is in the past if days_from_today < rules["min_future_days"]: if days_from_today < 0: severity = ValidationSeverity.CRITICAL message = f"Date is in the past: {value}" suggested_value = today.strftime("%Y-%m-%d") else: severity = ValidationSeverity.WARNING message = f"Date is very soon: {value}" suggested_value = (today + timedelta(days=rules["min_advance_booking"])).strftime("%Y-%m-%d") issues.append(ValidationIssue( field_name=field_name, issue_type="date_too_early", severity=severity, message=message, suggested_value=suggested_value, suggested_action=ValidationAction.CORRECT, confidence=0.9 )) # Check if date is too far in the future if days_from_today > rules["max_future_days"]: issues.append(ValidationIssue( field_name=field_name, issue_type="date_too_far", severity=ValidationSeverity.WARNING, message=f"Date is very far in the future: {value}", suggested_value=(today + timedelta(days=rules["max_advance_booking"])).strftime("%Y-%m-%d"), suggested_action=ValidationAction.SUGGEST, confidence=0.8 )) return issues def _validate_date_consistency(self, departure_date: Any, return_date: Any) -> List[ValidationIssue]: """Validate consistency between departure and return dates.""" issues = [] departure = self._parse_date(departure_date) return_date_parsed = self._parse_date(return_date) if not departure or not return_date_parsed: return issues # Check if return is before departure if return_date_parsed <= departure: issues.append(ValidationIssue( field_name="return_date", issue_type="return_before_departure", severity=ValidationSeverity.CRITICAL, message="Return date must be after departure date", suggested_value=(departure + timedelta(days=7)).strftime("%Y-%m-%d"), suggested_action=ValidationAction.CORRECT, confidence=0.95 )) # Check for unusually short or long trips trip_duration = (return_date_parsed - departure).days rules = self._rules["dates"] if trip_duration < rules["min_stay_days"]: issues.append(ValidationIssue( field_name="trip_duration", issue_type="trip_too_short", severity=ValidationSeverity.WARNING, message=f"Trip duration is very short: {trip_duration} days", suggested_action=ValidationAction.SUGGEST, confidence=0.8 )) if trip_duration > rules["max_stay_days"]: issues.append(ValidationIssue( field_name="trip_duration", issue_type="trip_too_long", severity=ValidationSeverity.WARNING, message=f"Trip duration is very long: {trip_duration} days", suggested_action=ValidationAction.SUGGEST, confidence=0.8 )) return issues def _validate_locations(self, data: Dict[str, Any]) -> List[ValidationIssue]: """Validate location-related fields.""" issues = [] rules = self._rules["locations"] # Validate origin if "origin" in data: origin_issues = self._validate_single_location("origin", data["origin"], rules) issues.extend(origin_issues) # Validate destination if "destination" in data: dest_issues = self._validate_single_location("destination", data["destination"], rules) issues.extend(dest_issues) # Validate location consistency if "origin" in data and "destination" in data: consistency_issues = self._validate_location_consistency(data["origin"], data["destination"]) issues.extend(consistency_issues) return issues def _validate_single_location(self, field_name: str, value: Any, rules: Dict[str, Any]) -> List[ValidationIssue]: """Validate a single location field.""" issues = [] if not value: issues.append(ValidationIssue( field_name=field_name, issue_type="missing_location", severity=ValidationSeverity.ERROR, message="Location is required", suggested_action=ValidationAction.REJECT )) return issues location_str = str(value).strip() # Check length if len(location_str) < rules["min_name_length"]: issues.append(ValidationIssue( field_name=field_name, issue_type="location_too_short", severity=ValidationSeverity.ERROR, message=f"Location name too short: {location_str}", suggested_action=ValidationAction.REJECT )) if len(location_str) > rules["max_name_length"]: issues.append(ValidationIssue( field_name=field_name, issue_type="location_too_long", severity=ValidationSeverity.WARNING, message=f"Location name very long: {location_str}", suggested_action=ValidationAction.SUGGEST )) # Check if location is recognized recognized_location = self._find_location(location_str) if not recognized_location: issues.append(ValidationIssue( field_name=field_name, issue_type="unrecognized_location", severity=ValidationSeverity.WARNING, message=f"Location not recognized: {location_str}", suggested_action=ValidationAction.SUGGEST, confidence=0.6 )) else: # Suggest standard name if different if location_str.lower() != recognized_location["name"].lower(): issues.append(ValidationIssue( field_name=field_name, issue_type="location_suggestion", severity=ValidationSeverity.INFO, message=f"Did you mean: {recognized_location['name']}?", suggested_value=recognized_location["name"], suggested_action=ValidationAction.SUGGEST, confidence=0.9 )) return issues def _validate_location_consistency(self, origin: Any, destination: Any) -> List[ValidationIssue]: """Validate consistency between origin and destination.""" issues = [] origin_loc = self._find_location(str(origin)) dest_loc = self._find_location(str(destination)) if origin_loc and dest_loc: # Check if origin and destination are the same if origin_loc["name"].lower() == dest_loc["name"].lower(): issues.append(ValidationIssue( field_name="destination", issue_type="same_origin_destination", severity=ValidationSeverity.CRITICAL, message="Origin and destination cannot be the same", suggested_action=ValidationAction.REJECT )) # Check for unusual travel patterns if origin_loc.get("country") == dest_loc.get("country"): if origin_loc["name"] in ["New York", "Los Angeles"] and dest_loc["name"] in ["New York", "Los Angeles"]: issues.append(ValidationIssue( field_name="travel_pattern", issue_type="domestic_flight_suggestion", severity=ValidationSeverity.INFO, message="Consider domestic flight options", suggested_action=ValidationAction.ACCEPT )) return issues def _validate_budget(self, data: Dict[str, Any]) -> List[ValidationIssue]: """Validate budget-related fields.""" issues = [] rules = self._rules["budgets"] # Validate flight budget if "flight_budget" in data: flight_issues = self._validate_price_range( "flight_budget", data["flight_budget"], rules["min_flight_price"], rules["max_flight_price"] ) issues.extend(flight_issues) # Validate hotel budget if "hotel_budget" in data: hotel_issues = self._validate_price_range( "hotel_budget", data["hotel_budget"], rules["min_hotel_price"], rules["max_hotel_price"] ) issues.extend(hotel_issues) # Validate total budget if "total_budget" in data: total_issues = self._validate_price_range( "total_budget", data["total_budget"], rules["min_total_budget"], rules["max_total_budget"] ) issues.extend(total_issues) # Validate budget consistency if "flight_budget" in data and "hotel_budget" in data and "total_budget" in data: consistency_issues = self._validate_budget_consistency(data) issues.extend(consistency_issues) return issues def _validate_price_range(self, field_name: str, value: Any, min_price: float, max_price: float) -> List[ValidationIssue]: """Validate a price field against reasonable ranges.""" issues = [] if not value: issues.append(ValidationIssue( field_name=field_name, issue_type="missing_price", severity=ValidationSeverity.WARNING, message="Price information is missing", suggested_action=ValidationAction.SUGGEST )) return issues # Parse price price = self._parse_price(value) if price is None: issues.append(ValidationIssue( field_name=field_name, issue_type="invalid_price_format", severity=ValidationSeverity.ERROR, message=f"Invalid price format: {value}", suggested_action=ValidationAction.SUGGEST )) return issues # Check price range if price < min_price: issues.append(ValidationIssue( field_name=field_name, issue_type="price_too_low", severity=ValidationSeverity.WARNING, message=f"Price seems unusually low: ${price}", suggested_value=min_price, suggested_action=ValidationAction.SUGGEST, confidence=0.8 )) if price > max_price: issues.append(ValidationIssue( field_name=field_name, issue_type="price_too_high", severity=ValidationSeverity.WARNING, message=f"Price seems unusually high: ${price}", suggested_action=ValidationAction.SUGGEST, confidence=0.8 )) return issues def _validate_budget_consistency(self, data: Dict[str, Any]) -> List[ValidationIssue]: """Validate consistency between different budget components.""" issues = [] flight_budget = self._parse_price(data.get("flight_budget")) hotel_budget = self._parse_price(data.get("hotel_budget")) total_budget = self._parse_price(data.get("total_budget")) if flight_budget and hotel_budget and total_budget: # Check if components add up to total estimated_total = flight_budget + hotel_budget if estimated_total > total_budget * 1.5: # Components are much higher than total issues.append(ValidationIssue( field_name="budget_consistency", issue_type="budget_components_high", severity=ValidationSeverity.WARNING, message=f"Flight + hotel budget (${estimated_total:.0f}) is much higher than total budget (${total_budget:.0f})", suggested_action=ValidationAction.SUGGEST, confidence=0.9 )) if estimated_total < total_budget * 0.3: # Components are much lower than total issues.append(ValidationIssue( field_name="budget_consistency", issue_type="budget_components_low", severity=ValidationSeverity.INFO, message=f"Consider adding budget for activities, food, and transportation", suggested_action=ValidationAction.SUGGEST, confidence=0.8 )) return issues def _validate_passengers(self, data: Dict[str, Any]) -> List[ValidationIssue]: """Validate passenger information.""" issues = [] rules = self._rules["passengers"] # Validate adults if "adults" in data: adults = self._parse_int(data["adults"]) if adults is not None: if adults < rules["min_adults"]: issues.append(ValidationIssue( field_name="adults", issue_type="too_few_adults", severity=ValidationSeverity.ERROR, message="At least one adult is required", suggested_value=rules["min_adults"], suggested_action=ValidationAction.CORRECT, confidence=0.95 )) elif adults > rules["max_adults"]: issues.append(ValidationIssue( field_name="adults", issue_type="too_many_adults", severity=ValidationSeverity.WARNING, message=f"Large group booking: {adults} adults", suggested_action=ValidationAction.SUGGEST, confidence=0.8 )) # Validate children if "children" in data: children = self._parse_int(data["children"]) if children is not None and children > rules["max_children"]: issues.append(ValidationIssue( field_name="children", issue_type="too_many_children", severity=ValidationSeverity.WARNING, message=f"Large number of children: {children}", suggested_action=ValidationAction.SUGGEST, confidence=0.8 )) # Validate infants if "infants" in data: infants = self._parse_int(data["infants"]) if infants is not None and infants > rules["max_infants"]: issues.append(ValidationIssue( field_name="infants", issue_type="too_many_infants", severity=ValidationSeverity.WARNING, message=f"Large number of infants: {infants}", suggested_action=ValidationAction.SUGGEST, confidence=0.8 )) # Validate total passengers total_passengers = 0 for field in ["adults", "children", "infants"]: if field in data: count = self._parse_int(data[field]) if count is not None: total_passengers += count if total_passengers > rules["max_total_passengers"]: issues.append(ValidationIssue( field_name="total_passengers", issue_type="too_many_passengers", severity=ValidationSeverity.ERROR, message=f"Total passengers ({total_passengers}) exceeds maximum allowed", suggested_action=ValidationAction.REJECT, confidence=0.95 )) return issues def _validate_contact_info(self, data: Dict[str, Any]) -> List[ValidationIssue]: """Validate contact information.""" issues = [] # Validate email if "email" in data and data["email"]: if not self._patterns["email"].match(str(data["email"])): issues.append(ValidationIssue( field_name="email", issue_type="invalid_email_format", severity=ValidationSeverity.ERROR, message=f"Invalid email format: {data['email']}", suggested_action=ValidationAction.SUGGEST, confidence=0.9 )) # Validate phone if "phone" in data and data["phone"]: if not self._patterns["phone"].match(str(data["phone"])): issues.append(ValidationIssue( field_name="phone", issue_type="invalid_phone_format", severity=ValidationSeverity.WARNING, message=f"Invalid phone format: {data['phone']}", suggested_action=ValidationAction.SUGGEST, confidence=0.8 )) return issues def _calculate_confidence_score(self, issues: List[ValidationIssue], total_fields: int) -> float: """Calculate overall confidence score for the validated data.""" if total_fields == 0: return 0.0 # Base confidence base_confidence = 1.0 # Deduct for issues for issue in issues: if issue.severity == ValidationSeverity.CRITICAL: base_confidence -= 0.3 elif issue.severity == ValidationSeverity.ERROR: base_confidence -= 0.2 elif issue.severity == ValidationSeverity.WARNING: base_confidence -= 0.1 elif issue.severity == ValidationSeverity.INFO: base_confidence -= 0.05 # Ensure confidence is between 0 and 1 return max(0.0, min(1.0, base_confidence)) # Helper methods def _parse_date(self, value: Any) -> Optional[date]: """Parse various date formats.""" if not value: return None # Try to parse as date object first if isinstance(value, date): return value if isinstance(value, datetime): return value.date() # Try string parsing date_str = str(value).strip() formats = [ "%Y-%m-%d", "%m/%d/%Y", "%d/%m/%Y", "%Y-%m-%dT%H:%M:%S", "%Y-%m-%dT%H:%M:%SZ" ] for fmt in formats: try: parsed = datetime.strptime(date_str, fmt) return parsed.date() except ValueError: continue return None def _parse_price(self, value: Any) -> Optional[float]: """Parse price from various formats.""" if not value: return None try: # Handle numeric types if isinstance(value, (int, float)): return float(value) # Handle string values price_str = str(value).strip() # Remove currency symbols and commas cleaned = re.sub(r'[^\d.,]', '', price_str) cleaned = cleaned.replace(',', '') return float(cleaned) except (ValueError, TypeError): return None def _parse_int(self, value: Any) -> Optional[int]: """Parse integer from various formats.""" if not value: return None try: return int(float(str(value))) except (ValueError, TypeError): return None def _find_location(self, location_str: str) -> Optional[Dict[str, Any]]: """Find location in the database.""" location_str = location_str.lower().strip() # Direct match if location_str in self._locations: return {"name": self._locations[location_str]["airports"][0], **self._locations[location_str]} # Check alternatives for key, data in self._locations.items(): if location_str in data.get("alternatives", []): return {"name": key.title(), **data} # Fuzzy matching for key in self._locations.keys(): if location_str in key or key in location_str: return {"name": key.title(), **self._locations[key]} return None # Global input validator instance _global_input_validator: Optional[InputValidator] = None def get_global_input_validator() -> InputValidator: """Get the global input validator instance.""" global _global_input_validator if _global_input_validator is None: _global_input_validator = InputValidator() return _global_input_validator def validate_travel_input(data: Dict[str, Any]) -> ValidationResult: """Convenience function to validate travel input data.""" validator = get_global_input_validator() return validator.validate_travel_request(data)