Spaces:
Sleeping
Sleeping
| import os | |
| import dateparser | |
| from datetime import datetime, timedelta | |
| from amadeus import Client, ResponseError | |
| from langchain.tools import Tool | |
| import http.client | |
| import json | |
| import urllib.parse | |
| import traceback | |
| # API Credentials | |
| AMADEUS_API_KEY = os.getenv("AMADEUS_API_KEY") | |
| AMADEUS_API_SECRET = os.getenv("AMADEUS_API_SECRET") | |
| RAPIDAPI_KEY = os.getenv("RAPIDAPI_KEY") | |
| RAPIDAPI_HOST = "booking-com15.p.rapidapi.com" | |
| amadeus = Client(client_id=AMADEUS_API_KEY, client_secret=AMADEUS_API_SECRET) | |
| # Utility Functions | |
| def get_airport_code(city_name: str): | |
| try: | |
| response = amadeus.reference_data.locations.get(keyword=city_name, subType="AIRPORT,CITY") | |
| return response.data[0]["iataCode"] if response.data else None | |
| except ResponseError: | |
| return None | |
| def get_airline_name(airline_code: str): | |
| try: | |
| response = amadeus.reference_data.airlines.get(airlineCodes=airline_code) | |
| return response.data[0]["businessName"] if response.data else airline_code | |
| except ResponseError: | |
| return airline_code | |
| def format_duration(duration: str): | |
| return duration.replace("PT", "").replace("H", "h ").replace("M", "m").strip() | |
| def generate_booking_link(from_iata: str, to_iata: str, departure_date: str): | |
| return f"https://www.google.com/flights?hl=en#flt={from_iata}.{to_iata}.{departure_date};c:USD;e:1;s:0;sd:1;t:f" | |
| # π¨ Hotel Search Tool (Fixed) | |
| def search_hotels(query: str): | |
| """Search for hotels in a city with start and end dates using Booking.com API.""" | |
| try: | |
| print(f"DEBUG: Query received: {query}") | |
| words = query.lower().split() | |
| city, checkin_phrase, checkout_phrase = None, None, None | |
| # Extract city | |
| if "in" in words: | |
| in_index = words.index("in") + 1 | |
| end_index = words.index("from") if "from" in words else words.index("for") if "for" in words else len(words) | |
| city = " ".join(words[in_index:end_index]).title() | |
| print(f"DEBUG: Parsed city: {city}") | |
| # Extract dates | |
| if "from" in words: | |
| from_index = words.index("from") + 1 | |
| to_index = words.index("to") if "to" in words else len(words) | |
| checkin_phrase = " ".join(words[from_index:to_index]) | |
| if "to" in words: | |
| checkout_phrase = " ".join(words[words.index("to") + 1:]) | |
| print(f"DEBUG: Check-in phrase: {checkin_phrase}, Check-out phrase: {checkout_phrase if checkout_phrase else 'Not provided'}") | |
| if not city: | |
| return "β Could not detect a valid city. Use 'hotels in <city> from <date> to <date>'." | |
| # Parse dates | |
| checkin_date = dateparser.parse(checkin_phrase, settings={'PREFER_DATES_FROM': 'future'}) if checkin_phrase else None | |
| if not checkin_date: | |
| return "β Could not parse check-in date. Use formats like 'February 25 2025'." | |
| print(f"DEBUG: Parsed check-in date: {checkin_date}") | |
| checkout_date = dateparser.parse(checkout_phrase, settings={'PREFER_DATES_FROM': 'future'}) if checkout_phrase else checkin_date + timedelta(days=1) | |
| if not checkout_date: | |
| return "β Could not parse check-out date. Use formats like 'February 28 2025'." | |
| print(f"DEBUG: Parsed check-out date: {checkout_date}") | |
| if checkin_date >= checkout_date: | |
| checkin_date_str = checkin_date.strftime("%Y-%m-%d") | |
| checkout_date_str = checkout_date.strftime("%Y-%m-%d") | |
| return f"β Invalid dates: Check-in ({checkin_date_str}) must be before check-out ({checkout_date_str})." | |
| checkin_date_str = checkin_date.strftime("%Y-%m-%d") | |
| checkout_date_str = checkout_date.strftime("%Y-%m-%d") | |
| print(f"DEBUG: Formatted dates: {checkin_date_str} to {checkout_date_str}") | |
| # Validate API key | |
| if not RAPIDAPI_KEY: | |
| return "β API key is missing. Please contact Travelo LLC support." | |
| print(f"DEBUG: Using RapidAPI key: {RAPIDAPI_KEY[:5]}... (truncated)") | |
| # Get destination ID | |
| conn = http.client.HTTPSConnection(RAPIDAPI_HOST) | |
| headers = { | |
| "x-rapidapi-key": RAPIDAPI_KEY, | |
| "x-rapidapi-host": RAPIDAPI_HOST | |
| } | |
| dest_url = f"/api/v1/hotels/searchDestination?query={urllib.parse.quote(city)}&locale=en-us" | |
| print(f"DEBUG: Requesting destination ID with URL: {dest_url}") | |
| conn.request("GET", dest_url, headers=headers) | |
| res = conn.getresponse() | |
| dest_raw = res.read().decode("utf-8") | |
| print(f"DEBUG: Destination response - Status: {res.status}, Raw: {dest_raw}") | |
| if res.status != 200: | |
| conn.close() | |
| return f"β Failed to fetch destination ID for {city}. HTTP Status: {res.status}. Response: {dest_raw}" | |
| dest_data = json.loads(dest_raw) | |
| if not dest_data.get("data") or "dest_id" not in dest_data["data"][0]: | |
| conn.close() | |
| return f"β No valid destination ID found for {city}. Response: {json.dumps(dest_data)}" | |
| dest_id = next((item["dest_id"] for item in dest_data["data"] if item.get("search_type") == "city"), None) | |
| if not dest_id: | |
| conn.close() | |
| return f"β No city-level destination ID found for {city}. Response: {json.dumps(dest_data)}" | |
| print(f"DEBUG: Destination ID: {dest_id}") | |
| # Search hotels | |
| params = { | |
| "dest_id": dest_id, | |
| "search_type": "CITY", | |
| "adults": "1", | |
| "children_age": "0,17", | |
| "room_qty": "1", | |
| "page_number": "1", | |
| "units": "metric", | |
| "temperature_unit": "c", | |
| "languagecode": "en-us", | |
| "currency_code": "USD", | |
| "arrival_date": checkin_date_str, | |
| "departure_date": checkout_date_str | |
| } | |
| hotel_url = "/api/v1/hotels/searchHotels?" + urllib.parse.urlencode(params, safe=":/%") | |
| print(f"DEBUG: Requesting hotels with URL: {hotel_url}") | |
| conn.request("GET", hotel_url, headers=headers) | |
| res = conn.getresponse() | |
| hotel_raw = res.read().decode("utf-8") | |
| print(f"DEBUG: Hotel response - Status: {res.status}, Raw: {hotel_raw}") | |
| if res.status != 200: | |
| conn.close() | |
| return f"β Failed to fetch hotels. HTTP Status: {res.status}. Response: {hotel_raw}" | |
| data = json.loads(hotel_raw) | |
| if "data" not in data or "hotels" not in data["data"] or not data["data"]["hotels"]: | |
| error_msg = data.get("message", "No hotels available") | |
| conn.close() | |
| return f"β No hotels found in {city} for {checkin_date_str} to {checkout_date_str}. The API reported: {json.dumps(error_msg)}. Try adjusting your dates or destination." | |
| hotels = data["data"]["hotels"][:3] | |
| result = f"### Hotels in {city} ({checkin_date_str} to {checkout_date_str})\n" | |
| for hotel in hotels: | |
| property_data = hotel.get("property", {}) | |
| name = property_data.get("name", "Unknown Hotel") | |
| price = property_data.get("priceBreakdown", {}).get("grossPrice", {}).get("value", "N/A") | |
| currency = property_data.get("priceBreakdown", {}).get("currency", "USD") | |
| rating = property_data.get("reviewScore", "Not rated") | |
| hotel_id = hotel.get("hotel_id", "") | |
| # Ensure hotel_id corresponds to the correct format for Booking.com URLs | |
| booking_link = f"https://www.booking.com/hotel/fr/{hotel_id}.html" if hotel_id else "https://www.booking.com" | |
| result += f""" | |
| - **Hotel:** {name} | |
| - **Rating:** {rating} β | |
| - **Price:** {price} {currency} | |
| - [Book Now]({booking_link}) | |
| --- | |
| """ | |
| conn.close() | |
| return result | |
| except Exception as e: | |
| error_details = traceback.format_exc() | |
| print(f"DEBUG: Exception occurred: {error_details}") | |
| if 'conn' in locals(): | |
| conn.close() | |
| return f"β Error searching hotels: {str(e)}. Raw response (if any): {hotel_raw if 'hotel_raw' in locals() else 'N/A'}" | |
| # βοΈ Flight Search Tool (Unchanged) | |
| def search_flights(query: str): | |
| try: | |
| words = query.lower().split() | |
| from_city, to_city, date_phrase = None, None, None | |
| if "from" in words and "to" in words: | |
| from_index = words.index("from") + 1 | |
| to_index = words.index("to") + 1 | |
| from_city = " ".join(words[from_index:to_index - 1]).title() | |
| to_city = " ".join(words[to_index:words.index("in")]) if "in" in words else " ".join(words[to_index:]).title() | |
| date_phrase = " ".join(words[words.index("in") + 1:]) if "in" in words else None | |
| if not from_city or not to_city: | |
| return "β Could not detect valid departure and destination cities. Please use 'from <city> to <city>'." | |
| from_iata = get_airport_code(from_city) | |
| to_iata = get_airport_code(to_city) | |
| if not from_iata or not to_iata: | |
| return f"β Could not find airport codes for {from_city} or {to_city}." | |
| departure_date = dateparser.parse(date_phrase) if date_phrase else None | |
| if not departure_date: | |
| return "β Could not understand the travel date. Use formats like 'next week' or 'on May 15'." | |
| departure_date_str = departure_date.strftime("%Y-%m-%d") | |
| response = amadeus.shopping.flight_offers_search.get( | |
| originLocationCode=from_iata, | |
| destinationLocationCode=to_iata, | |
| departureDate=departure_date_str, | |
| adults=1, | |
| max=3 | |
| ) | |
| flights = response.data | |
| if not flights: | |
| return f"β No flights found from {from_city} to {to_city} on {departure_date_str}." | |
| result = f"### Flight Information from {from_city} ({from_iata}) to {to_city} ({to_iata})\n" | |
| for flight in flights: | |
| airline_code = flight["validatingAirlineCodes"][0] | |
| airline_name = get_airline_name(airline_code) | |
| price = flight["price"]["total"] | |
| duration = format_duration(flight["itineraries"][0]["duration"]) | |
| departure_time = flight["itineraries"][0]["segments"][0]["departure"]["at"] | |
| arrival_time = flight["itineraries"][0]["segments"][-1]["arrival"]["at"] | |
| stops = len(flight["itineraries"][0]["segments"]) - 1 | |
| booking_link = generate_booking_link(from_iata, to_iata, departure_date_str) | |
| result += f""" | |
| - **Airline:** {airline_name} | |
| - **Flight No:** {airline_code}123 | |
| - **Departure:** {departure_time} ({from_iata}) | |
| - **Arrival:** {arrival_time} ({to_iata}) | |
| - **Duration:** {duration} | |
| - **Stops:** {stops} | |
| - **Price:** ${price} | |
| - [Book Now]({booking_link}) | |
| --- | |
| """ | |
| return result | |
| except ResponseError as error: | |
| return f"β Error: {str(error)}" | |
| # Register Tools | |
| tools = [ | |
| Tool( | |
| name="Flight Booking", | |
| func=search_flights, | |
| description="Find flights using natural language. Example: 'Flight from Delhi to SFO in May'" | |
| ), | |
| Tool( | |
| name="Hotel Booking", | |
| func=search_hotels, | |
| description="Find hotels in a city with start and end dates. Example: 'Hotels in Paris from February 25 2025 to February 28 2025'" | |
| ) | |
| ] |