wanderlust.ai / src /wanderlust_ai /core /input_validation.py
BlakeL's picture
Upload 115 files
3f9f85b verified
"""
Robust Input Validation for Travel Data
This module provides comprehensive input validation that handles edge cases,
impossible dates, invalid locations, extreme budgets, and other real-world
input challenges.
"""
import re
import logging
from datetime import datetime, timedelta, date
from typing import Dict, List, Optional, Any, Union, Tuple, Set
from dataclasses import dataclass, field
from enum import Enum
import math
from decimal import Decimal, InvalidOperation
from .error_categorization import CategorizedError, ErrorContext
class ValidationSeverity(str, Enum):
"""Severity levels for validation issues."""
INFO = "info" # Informational, no action needed
WARNING = "warning" # Caution, but proceed with care
ERROR = "error" # Issue found, correction needed
CRITICAL = "critical" # Invalid input, must be corrected
class ValidationAction(str, Enum):
"""Actions to take for validation issues."""
ACCEPT = "accept" # Accept as-is
CORRECT = "correct" # Auto-correct the value
SUGGEST = "suggest" # Suggest correction to user
REJECT = "reject" # Reject the input entirely
REQUIRE_CONFIRMATION = "require_confirmation" # Ask user to confirm
@dataclass
class ValidationIssue:
"""A validation issue found in input data."""
field_name: str
issue_type: str
severity: ValidationSeverity
message: str
suggested_value: Optional[Any] = None
suggested_action: ValidationAction = ValidationAction.SUGGEST
confidence: float = 0.8 # Confidence in the suggestion
metadata: Dict[str, Any] = field(default_factory=dict)
@dataclass
class ValidationResult:
"""Result of input validation."""
is_valid: bool
issues: List[ValidationIssue]
corrected_data: Dict[str, Any]
confidence_score: float # Overall confidence in the data
warnings: List[str] = field(default_factory=list)
suggestions: List[str] = field(default_factory=list)
metadata: Dict[str, Any] = field(default_factory=dict)
class InputValidator:
"""
Comprehensive input validator for travel data that handles edge cases
and provides intelligent suggestions for corrections.
"""
def __init__(self):
self.logger = logging.getLogger(__name__)
# Validation rules and thresholds
self._rules = self._build_validation_rules()
# Common patterns
self._patterns = self._build_patterns()
# Known locations and airports
self._locations = self._build_location_database()
# Price thresholds for different travel types
self._price_thresholds = self._build_price_thresholds()
def _build_validation_rules(self) -> Dict[str, Dict[str, Any]]:
"""Build validation rules for different data types."""
return {
"dates": {
"min_future_days": 0, # Minimum days in future for booking
"max_future_days": 365, # Maximum days in future
"min_stay_days": 1, # Minimum stay duration
"max_stay_days": 365, # Maximum stay duration
"min_advance_booking": 0, # Minimum advance booking time
"max_advance_booking": 365 # Maximum advance booking time
},
"locations": {
"min_name_length": 2, # Minimum location name length
"max_name_length": 100, # Maximum location name length
"require_country": False, # Whether country is required
"allow_partial": True # Allow partial matches
},
"budgets": {
"min_flight_price": 50, # Minimum reasonable flight price
"max_flight_price": 50000, # Maximum reasonable flight price
"min_hotel_price": 20, # Minimum reasonable hotel price per night
"max_hotel_price": 5000, # Maximum reasonable hotel price per night
"min_total_budget": 100, # Minimum total trip budget
"max_total_budget": 100000 # Maximum total trip budget
},
"passengers": {
"min_adults": 1, # Minimum adults
"max_adults": 20, # Maximum adults
"min_children": 0, # Minimum children
"max_children": 20, # Maximum children
"min_infants": 0, # Minimum infants
"max_infants": 10, # Maximum infants
"max_total_passengers": 30 # Maximum total passengers
}
}
def _build_patterns(self) -> Dict[str, re.Pattern]:
"""Build regex patterns for validation."""
return {
"airport_code": re.compile(r'^[A-Z]{3}$'),
"city_name": re.compile(r'^[A-Za-z\s\-\'\.]{2,100}$'),
"country_code": re.compile(r'^[A-Z]{2,3}$'),
"date_iso": re.compile(r'^\d{4}-\d{2}-\d{2}$'),
"datetime_iso": re.compile(r'^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}'),
"price": re.compile(r'^\$?[\d,]+\.?\d*$'),
"email": re.compile(r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'),
"phone": re.compile(r'^[\+]?[\d\s\-\(\)]{10,15}$')
}
def _build_location_database(self) -> Dict[str, Dict[str, Any]]:
"""Build database of known locations and airports."""
return {
# Major US cities
"new york": {
"airports": ["JFK", "LGA", "EWR"],
"country": "US",
"timezone": "America/New_York",
"alternatives": ["nyc", "manhattan", "brooklyn"]
},
"los angeles": {
"airports": ["LAX", "BUR", "SNA"],
"country": "US",
"timezone": "America/Los_Angeles",
"alternatives": ["la", "hollywood", "beverly hills"]
},
"chicago": {
"airports": ["ORD", "MDW"],
"country": "US",
"timezone": "America/Chicago",
"alternatives": ["chi"]
},
"miami": {
"airports": ["MIA", "FLL"],
"country": "US",
"timezone": "America/New_York",
"alternatives": ["south beach", "miami beach"]
},
"boston": {
"airports": ["BOS"],
"country": "US",
"timezone": "America/New_York",
"alternatives": ["bos"]
},
"san francisco": {
"airports": ["SFO", "OAK", "SJC"],
"country": "US",
"timezone": "America/Los_Angeles",
"alternatives": ["sf", "san fran", "bay area"]
},
"seattle": {
"airports": ["SEA"],
"country": "US",
"timezone": "America/Los_Angeles",
"alternatives": ["sea"]
},
"atlanta": {
"airports": ["ATL"],
"country": "US",
"timezone": "America/New_York",
"alternatives": ["atl"]
},
"denver": {
"airports": ["DEN"],
"country": "US",
"timezone": "America/Denver",
"alternatives": ["den"]
},
"las vegas": {
"airports": ["LAS"],
"country": "US",
"timezone": "America/Los_Angeles",
"alternatives": ["vegas", "las vegas"]
},
# International cities
"london": {
"airports": ["LHR", "LGW", "STN", "LTN"],
"country": "GB",
"timezone": "Europe/London",
"alternatives": ["london uk", "london england"]
},
"paris": {
"airports": ["CDG", "ORY"],
"country": "FR",
"timezone": "Europe/Paris",
"alternatives": ["paris france"]
},
"tokyo": {
"airports": ["NRT", "HND"],
"country": "JP",
"timezone": "Asia/Tokyo",
"alternatives": ["tokyo japan"]
},
"sydney": {
"airports": ["SYD"],
"country": "AU",
"timezone": "Australia/Sydney",
"alternatives": ["sydney australia"]
}
}
def _build_price_thresholds(self) -> Dict[str, Dict[str, float]]:
"""Build price thresholds for different travel scenarios."""
return {
"domestic_flights": {
"budget": {"min": 100, "max": 800},
"premium": {"min": 800, "max": 2000},
"luxury": {"min": 2000, "max": 10000}
},
"international_flights": {
"budget": {"min": 300, "max": 1200},
"premium": {"min": 1200, "max": 5000},
"luxury": {"min": 5000, "max": 25000}
},
"hotels": {
"budget": {"min": 50, "max": 150},
"mid_range": {"min": 150, "max": 400},
"luxury": {"min": 400, "max": 2000}
}
}
def validate_travel_request(self, request_data: Dict[str, Any]) -> ValidationResult:
"""Validate a complete travel request."""
issues = []
corrected_data = request_data.copy()
warnings = []
suggestions = []
# Validate dates
date_issues = self._validate_dates(request_data)
issues.extend(date_issues)
# Validate locations
location_issues = self._validate_locations(request_data)
issues.extend(location_issues)
# Validate budget
budget_issues = self._validate_budget(request_data)
issues.extend(budget_issues)
# Validate passengers
passenger_issues = self._validate_passengers(request_data)
issues.extend(passenger_issues)
# Validate contact information
contact_issues = self._validate_contact_info(request_data)
issues.extend(contact_issues)
# Apply corrections
for issue in issues:
if issue.suggested_action == ValidationAction.CORRECT and issue.suggested_value is not None:
corrected_data[issue.field_name] = issue.suggested_value
# Calculate overall confidence
confidence_score = self._calculate_confidence_score(issues, len(request_data))
# Determine if valid
critical_issues = [i for i in issues if i.severity == ValidationSeverity.CRITICAL]
error_issues = [i for i in issues if i.severity == ValidationSeverity.ERROR]
is_valid = len(critical_issues) == 0 and len(error_issues) <= 1 # Allow one error
# Generate warnings and suggestions
for issue in issues:
if issue.severity == ValidationSeverity.WARNING:
warnings.append(f"{issue.field_name}: {issue.message}")
if issue.suggested_action == ValidationAction.SUGGEST:
suggestions.append(f"Consider {issue.message}")
return ValidationResult(
is_valid=is_valid,
issues=issues,
corrected_data=corrected_data,
confidence_score=confidence_score,
warnings=warnings,
suggestions=suggestions,
metadata={
"total_fields": len(request_data),
"issues_found": len(issues),
"critical_issues": len(critical_issues),
"error_issues": len(error_issues),
"warning_issues": len([i for i in issues if i.severity == ValidationSeverity.WARNING])
}
)
def _validate_dates(self, data: Dict[str, Any]) -> List[ValidationIssue]:
"""Validate date-related fields."""
issues = []
rules = self._rules["dates"]
# Validate departure date
if "departure_date" in data:
departure_issues = self._validate_single_date(
"departure_date", data["departure_date"], rules
)
issues.extend(departure_issues)
# Validate return date
if "return_date" in data:
return_issues = self._validate_single_date(
"return_date", data["return_date"], rules
)
issues.extend(return_issues)
# Validate date consistency
if "departure_date" in data and "return_date" in data:
consistency_issues = self._validate_date_consistency(
data["departure_date"], data["return_date"]
)
issues.extend(consistency_issues)
return issues
def _validate_single_date(self, field_name: str, value: Any, rules: Dict[str, Any]) -> List[ValidationIssue]:
"""Validate a single date field."""
issues = []
if not value:
issues.append(ValidationIssue(
field_name=field_name,
issue_type="missing_date",
severity=ValidationSeverity.ERROR,
message="Date is required",
suggested_action=ValidationAction.REJECT
))
return issues
# Parse the date
parsed_date = self._parse_date(value)
if not parsed_date:
issues.append(ValidationIssue(
field_name=field_name,
issue_type="invalid_date_format",
severity=ValidationSeverity.ERROR,
message=f"Invalid date format: {value}",
suggested_action=ValidationAction.SUGGEST
))
return issues
today = date.today()
days_from_today = (parsed_date - today).days
# Check if date is in the past
if days_from_today < rules["min_future_days"]:
if days_from_today < 0:
severity = ValidationSeverity.CRITICAL
message = f"Date is in the past: {value}"
suggested_value = today.strftime("%Y-%m-%d")
else:
severity = ValidationSeverity.WARNING
message = f"Date is very soon: {value}"
suggested_value = (today + timedelta(days=rules["min_advance_booking"])).strftime("%Y-%m-%d")
issues.append(ValidationIssue(
field_name=field_name,
issue_type="date_too_early",
severity=severity,
message=message,
suggested_value=suggested_value,
suggested_action=ValidationAction.CORRECT,
confidence=0.9
))
# Check if date is too far in the future
if days_from_today > rules["max_future_days"]:
issues.append(ValidationIssue(
field_name=field_name,
issue_type="date_too_far",
severity=ValidationSeverity.WARNING,
message=f"Date is very far in the future: {value}",
suggested_value=(today + timedelta(days=rules["max_advance_booking"])).strftime("%Y-%m-%d"),
suggested_action=ValidationAction.SUGGEST,
confidence=0.8
))
return issues
def _validate_date_consistency(self, departure_date: Any, return_date: Any) -> List[ValidationIssue]:
"""Validate consistency between departure and return dates."""
issues = []
departure = self._parse_date(departure_date)
return_date_parsed = self._parse_date(return_date)
if not departure or not return_date_parsed:
return issues
# Check if return is before departure
if return_date_parsed <= departure:
issues.append(ValidationIssue(
field_name="return_date",
issue_type="return_before_departure",
severity=ValidationSeverity.CRITICAL,
message="Return date must be after departure date",
suggested_value=(departure + timedelta(days=7)).strftime("%Y-%m-%d"),
suggested_action=ValidationAction.CORRECT,
confidence=0.95
))
# Check for unusually short or long trips
trip_duration = (return_date_parsed - departure).days
rules = self._rules["dates"]
if trip_duration < rules["min_stay_days"]:
issues.append(ValidationIssue(
field_name="trip_duration",
issue_type="trip_too_short",
severity=ValidationSeverity.WARNING,
message=f"Trip duration is very short: {trip_duration} days",
suggested_action=ValidationAction.SUGGEST,
confidence=0.8
))
if trip_duration > rules["max_stay_days"]:
issues.append(ValidationIssue(
field_name="trip_duration",
issue_type="trip_too_long",
severity=ValidationSeverity.WARNING,
message=f"Trip duration is very long: {trip_duration} days",
suggested_action=ValidationAction.SUGGEST,
confidence=0.8
))
return issues
def _validate_locations(self, data: Dict[str, Any]) -> List[ValidationIssue]:
"""Validate location-related fields."""
issues = []
rules = self._rules["locations"]
# Validate origin
if "origin" in data:
origin_issues = self._validate_single_location("origin", data["origin"], rules)
issues.extend(origin_issues)
# Validate destination
if "destination" in data:
dest_issues = self._validate_single_location("destination", data["destination"], rules)
issues.extend(dest_issues)
# Validate location consistency
if "origin" in data and "destination" in data:
consistency_issues = self._validate_location_consistency(data["origin"], data["destination"])
issues.extend(consistency_issues)
return issues
def _validate_single_location(self, field_name: str, value: Any, rules: Dict[str, Any]) -> List[ValidationIssue]:
"""Validate a single location field."""
issues = []
if not value:
issues.append(ValidationIssue(
field_name=field_name,
issue_type="missing_location",
severity=ValidationSeverity.ERROR,
message="Location is required",
suggested_action=ValidationAction.REJECT
))
return issues
location_str = str(value).strip()
# Check length
if len(location_str) < rules["min_name_length"]:
issues.append(ValidationIssue(
field_name=field_name,
issue_type="location_too_short",
severity=ValidationSeverity.ERROR,
message=f"Location name too short: {location_str}",
suggested_action=ValidationAction.REJECT
))
if len(location_str) > rules["max_name_length"]:
issues.append(ValidationIssue(
field_name=field_name,
issue_type="location_too_long",
severity=ValidationSeverity.WARNING,
message=f"Location name very long: {location_str}",
suggested_action=ValidationAction.SUGGEST
))
# Check if location is recognized
recognized_location = self._find_location(location_str)
if not recognized_location:
issues.append(ValidationIssue(
field_name=field_name,
issue_type="unrecognized_location",
severity=ValidationSeverity.WARNING,
message=f"Location not recognized: {location_str}",
suggested_action=ValidationAction.SUGGEST,
confidence=0.6
))
else:
# Suggest standard name if different
if location_str.lower() != recognized_location["name"].lower():
issues.append(ValidationIssue(
field_name=field_name,
issue_type="location_suggestion",
severity=ValidationSeverity.INFO,
message=f"Did you mean: {recognized_location['name']}?",
suggested_value=recognized_location["name"],
suggested_action=ValidationAction.SUGGEST,
confidence=0.9
))
return issues
def _validate_location_consistency(self, origin: Any, destination: Any) -> List[ValidationIssue]:
"""Validate consistency between origin and destination."""
issues = []
origin_loc = self._find_location(str(origin))
dest_loc = self._find_location(str(destination))
if origin_loc and dest_loc:
# Check if origin and destination are the same
if origin_loc["name"].lower() == dest_loc["name"].lower():
issues.append(ValidationIssue(
field_name="destination",
issue_type="same_origin_destination",
severity=ValidationSeverity.CRITICAL,
message="Origin and destination cannot be the same",
suggested_action=ValidationAction.REJECT
))
# Check for unusual travel patterns
if origin_loc.get("country") == dest_loc.get("country"):
if origin_loc["name"] in ["New York", "Los Angeles"] and dest_loc["name"] in ["New York", "Los Angeles"]:
issues.append(ValidationIssue(
field_name="travel_pattern",
issue_type="domestic_flight_suggestion",
severity=ValidationSeverity.INFO,
message="Consider domestic flight options",
suggested_action=ValidationAction.ACCEPT
))
return issues
def _validate_budget(self, data: Dict[str, Any]) -> List[ValidationIssue]:
"""Validate budget-related fields."""
issues = []
rules = self._rules["budgets"]
# Validate flight budget
if "flight_budget" in data:
flight_issues = self._validate_price_range(
"flight_budget", data["flight_budget"],
rules["min_flight_price"], rules["max_flight_price"]
)
issues.extend(flight_issues)
# Validate hotel budget
if "hotel_budget" in data:
hotel_issues = self._validate_price_range(
"hotel_budget", data["hotel_budget"],
rules["min_hotel_price"], rules["max_hotel_price"]
)
issues.extend(hotel_issues)
# Validate total budget
if "total_budget" in data:
total_issues = self._validate_price_range(
"total_budget", data["total_budget"],
rules["min_total_budget"], rules["max_total_budget"]
)
issues.extend(total_issues)
# Validate budget consistency
if "flight_budget" in data and "hotel_budget" in data and "total_budget" in data:
consistency_issues = self._validate_budget_consistency(data)
issues.extend(consistency_issues)
return issues
def _validate_price_range(self, field_name: str, value: Any, min_price: float, max_price: float) -> List[ValidationIssue]:
"""Validate a price field against reasonable ranges."""
issues = []
if not value:
issues.append(ValidationIssue(
field_name=field_name,
issue_type="missing_price",
severity=ValidationSeverity.WARNING,
message="Price information is missing",
suggested_action=ValidationAction.SUGGEST
))
return issues
# Parse price
price = self._parse_price(value)
if price is None:
issues.append(ValidationIssue(
field_name=field_name,
issue_type="invalid_price_format",
severity=ValidationSeverity.ERROR,
message=f"Invalid price format: {value}",
suggested_action=ValidationAction.SUGGEST
))
return issues
# Check price range
if price < min_price:
issues.append(ValidationIssue(
field_name=field_name,
issue_type="price_too_low",
severity=ValidationSeverity.WARNING,
message=f"Price seems unusually low: ${price}",
suggested_value=min_price,
suggested_action=ValidationAction.SUGGEST,
confidence=0.8
))
if price > max_price:
issues.append(ValidationIssue(
field_name=field_name,
issue_type="price_too_high",
severity=ValidationSeverity.WARNING,
message=f"Price seems unusually high: ${price}",
suggested_action=ValidationAction.SUGGEST,
confidence=0.8
))
return issues
def _validate_budget_consistency(self, data: Dict[str, Any]) -> List[ValidationIssue]:
"""Validate consistency between different budget components."""
issues = []
flight_budget = self._parse_price(data.get("flight_budget"))
hotel_budget = self._parse_price(data.get("hotel_budget"))
total_budget = self._parse_price(data.get("total_budget"))
if flight_budget and hotel_budget and total_budget:
# Check if components add up to total
estimated_total = flight_budget + hotel_budget
if estimated_total > total_budget * 1.5: # Components are much higher than total
issues.append(ValidationIssue(
field_name="budget_consistency",
issue_type="budget_components_high",
severity=ValidationSeverity.WARNING,
message=f"Flight + hotel budget (${estimated_total:.0f}) is much higher than total budget (${total_budget:.0f})",
suggested_action=ValidationAction.SUGGEST,
confidence=0.9
))
if estimated_total < total_budget * 0.3: # Components are much lower than total
issues.append(ValidationIssue(
field_name="budget_consistency",
issue_type="budget_components_low",
severity=ValidationSeverity.INFO,
message=f"Consider adding budget for activities, food, and transportation",
suggested_action=ValidationAction.SUGGEST,
confidence=0.8
))
return issues
def _validate_passengers(self, data: Dict[str, Any]) -> List[ValidationIssue]:
"""Validate passenger information."""
issues = []
rules = self._rules["passengers"]
# Validate adults
if "adults" in data:
adults = self._parse_int(data["adults"])
if adults is not None:
if adults < rules["min_adults"]:
issues.append(ValidationIssue(
field_name="adults",
issue_type="too_few_adults",
severity=ValidationSeverity.ERROR,
message="At least one adult is required",
suggested_value=rules["min_adults"],
suggested_action=ValidationAction.CORRECT,
confidence=0.95
))
elif adults > rules["max_adults"]:
issues.append(ValidationIssue(
field_name="adults",
issue_type="too_many_adults",
severity=ValidationSeverity.WARNING,
message=f"Large group booking: {adults} adults",
suggested_action=ValidationAction.SUGGEST,
confidence=0.8
))
# Validate children
if "children" in data:
children = self._parse_int(data["children"])
if children is not None and children > rules["max_children"]:
issues.append(ValidationIssue(
field_name="children",
issue_type="too_many_children",
severity=ValidationSeverity.WARNING,
message=f"Large number of children: {children}",
suggested_action=ValidationAction.SUGGEST,
confidence=0.8
))
# Validate infants
if "infants" in data:
infants = self._parse_int(data["infants"])
if infants is not None and infants > rules["max_infants"]:
issues.append(ValidationIssue(
field_name="infants",
issue_type="too_many_infants",
severity=ValidationSeverity.WARNING,
message=f"Large number of infants: {infants}",
suggested_action=ValidationAction.SUGGEST,
confidence=0.8
))
# Validate total passengers
total_passengers = 0
for field in ["adults", "children", "infants"]:
if field in data:
count = self._parse_int(data[field])
if count is not None:
total_passengers += count
if total_passengers > rules["max_total_passengers"]:
issues.append(ValidationIssue(
field_name="total_passengers",
issue_type="too_many_passengers",
severity=ValidationSeverity.ERROR,
message=f"Total passengers ({total_passengers}) exceeds maximum allowed",
suggested_action=ValidationAction.REJECT,
confidence=0.95
))
return issues
def _validate_contact_info(self, data: Dict[str, Any]) -> List[ValidationIssue]:
"""Validate contact information."""
issues = []
# Validate email
if "email" in data and data["email"]:
if not self._patterns["email"].match(str(data["email"])):
issues.append(ValidationIssue(
field_name="email",
issue_type="invalid_email_format",
severity=ValidationSeverity.ERROR,
message=f"Invalid email format: {data['email']}",
suggested_action=ValidationAction.SUGGEST,
confidence=0.9
))
# Validate phone
if "phone" in data and data["phone"]:
if not self._patterns["phone"].match(str(data["phone"])):
issues.append(ValidationIssue(
field_name="phone",
issue_type="invalid_phone_format",
severity=ValidationSeverity.WARNING,
message=f"Invalid phone format: {data['phone']}",
suggested_action=ValidationAction.SUGGEST,
confidence=0.8
))
return issues
def _calculate_confidence_score(self, issues: List[ValidationIssue], total_fields: int) -> float:
"""Calculate overall confidence score for the validated data."""
if total_fields == 0:
return 0.0
# Base confidence
base_confidence = 1.0
# Deduct for issues
for issue in issues:
if issue.severity == ValidationSeverity.CRITICAL:
base_confidence -= 0.3
elif issue.severity == ValidationSeverity.ERROR:
base_confidence -= 0.2
elif issue.severity == ValidationSeverity.WARNING:
base_confidence -= 0.1
elif issue.severity == ValidationSeverity.INFO:
base_confidence -= 0.05
# Ensure confidence is between 0 and 1
return max(0.0, min(1.0, base_confidence))
# Helper methods
def _parse_date(self, value: Any) -> Optional[date]:
"""Parse various date formats."""
if not value:
return None
# Try to parse as date object first
if isinstance(value, date):
return value
if isinstance(value, datetime):
return value.date()
# Try string parsing
date_str = str(value).strip()
formats = [
"%Y-%m-%d",
"%m/%d/%Y",
"%d/%m/%Y",
"%Y-%m-%dT%H:%M:%S",
"%Y-%m-%dT%H:%M:%SZ"
]
for fmt in formats:
try:
parsed = datetime.strptime(date_str, fmt)
return parsed.date()
except ValueError:
continue
return None
def _parse_price(self, value: Any) -> Optional[float]:
"""Parse price from various formats."""
if not value:
return None
try:
# Handle numeric types
if isinstance(value, (int, float)):
return float(value)
# Handle string values
price_str = str(value).strip()
# Remove currency symbols and commas
cleaned = re.sub(r'[^\d.,]', '', price_str)
cleaned = cleaned.replace(',', '')
return float(cleaned)
except (ValueError, TypeError):
return None
def _parse_int(self, value: Any) -> Optional[int]:
"""Parse integer from various formats."""
if not value:
return None
try:
return int(float(str(value)))
except (ValueError, TypeError):
return None
def _find_location(self, location_str: str) -> Optional[Dict[str, Any]]:
"""Find location in the database."""
location_str = location_str.lower().strip()
# Direct match
if location_str in self._locations:
return {"name": self._locations[location_str]["airports"][0], **self._locations[location_str]}
# Check alternatives
for key, data in self._locations.items():
if location_str in data.get("alternatives", []):
return {"name": key.title(), **data}
# Fuzzy matching
for key in self._locations.keys():
if location_str in key or key in location_str:
return {"name": key.title(), **self._locations[key]}
return None
# Global input validator instance
_global_input_validator: Optional[InputValidator] = None
def get_global_input_validator() -> InputValidator:
"""Get the global input validator instance."""
global _global_input_validator
if _global_input_validator is None:
_global_input_validator = InputValidator()
return _global_input_validator
def validate_travel_input(data: Dict[str, Any]) -> ValidationResult:
"""Convenience function to validate travel input data."""
validator = get_global_input_validator()
return validator.validate_travel_request(data)