Spaces:
Sleeping
Sleeping
| from fastapi.responses import JSONResponse | |
| import json | |
| import math | |
| import traceback | |
| from datetime import datetime, timezone | |
| import re | |
| from src.utils.logger import logger | |
| from pydantic import BaseModel, Field | |
| from typing import List, Union | |
| import httpx | |
| from fastapi import HTTPException | |
| from bson import ObjectId | |
| def deserialize_objectid(id_str: str) -> ObjectId: | |
| """Chuyển string thành ObjectId (nếu hợp lệ), nếu không thì raise lỗi.""" | |
| try: | |
| return ObjectId(id_str) | |
| except Exception as e: | |
| raise ValueError(f"Invalid ObjectId string: {id_str}") from e | |
| def handle_validator_raise(func): | |
| """ | |
| Custom decorator to handle exceptions raised by the validator | |
| """ | |
| def wrapper(*args, **kwargs): | |
| try: | |
| return func(*args, **kwargs) | |
| except Exception as e: | |
| if len(traceback.extract_tb(e.__traceback__)) > 1: | |
| tb = traceback.extract_tb(e.__traceback__)[1] | |
| else: | |
| tb = traceback.extract_tb(e.__traceback__)[0] | |
| filename, lineno, function, line = tb.filename, tb.lineno, tb.name, tb.line | |
| error_type, error_msg = type(e).__name__, str(e) | |
| error_info = { | |
| "error": error_type, | |
| "message": error_msg, | |
| "step": function, | |
| "line": line, | |
| "filename": filename, | |
| "lineno": lineno, | |
| } | |
| logger.error(f"Exception: {error_info}") | |
| return wrapper | |
| def process_controller_output(ouput: JSONResponse): | |
| if ouput.status_code in [ | |
| 200, | |
| 201, | |
| ]: | |
| return json.loads(ouput.body.decode("utf-8")) | |
| else: | |
| return "Error" | |
| def format_weather_data(weather_data): | |
| try: | |
| current_weather = weather_data["current"] | |
| lat = weather_data["lat"] | |
| lon = weather_data["lon"] | |
| location = f"Latitude: {lat}, Longitude: {lon}" | |
| icon_url = f"http://openweathermap.org/img/wn/{current_weather['weather'][0]['icon']}@2x.png" | |
| formatted_weather = f"In {location}, the current weather is as follows:\n" | |
| formatted_weather += f" <img src='{icon_url}' width='100' height='100'/>\n" | |
| formatted_weather += ( | |
| f" Detailed status: {current_weather['weather'][0]['description']}\n" | |
| ) | |
| formatted_weather += f"Wind speed: {current_weather['wind_speed']} m/s, direction: {current_weather['wind_deg']}°\n" | |
| formatted_weather += f"Humidity: {current_weather['humidity']}%\n" | |
| formatted_weather += f"Temperature:\n" | |
| formatted_weather += f" - Current: {current_weather['temp'] - 273.15:.2f}°C\n" | |
| formatted_weather += ( | |
| f" - Feels like: {current_weather['feels_like'] - 273.15:.2f}°C\n" | |
| ) | |
| if "rain" in current_weather: | |
| formatted_weather += f"Rain: {current_weather['rain'].get('1h', 0)} mm\n" | |
| else: | |
| formatted_weather += "Rain: {}\n" | |
| formatted_weather += f"Cloud cover: {current_weather['clouds']}%\n" | |
| return formatted_weather | |
| except Exception as e: | |
| return f"Error formatting weather data: {e}" | |
| def haversine(lon1, lat1, lon2, lat2): | |
| lon1, lat1, lon2, lat2 = map(math.radians, [lon1, lat1, lon2, lat2]) | |
| dlon = lon2 - lon1 | |
| dlat = lat2 - lat1 | |
| a = ( | |
| math.sin(dlat / 2) ** 2 | |
| + math.cos(lat1) * math.cos(lat2) * math.sin(dlon / 2) ** 2 | |
| ) | |
| c = 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a)) | |
| R = 6371.0 | |
| distance = R * c | |
| return distance | |
| def format_geoapify_response( | |
| response, current_long, current_lat, include_latnlong=False | |
| ): | |
| formatted_data = [] | |
| for feature in response: | |
| formatted_item = {} | |
| place_lat = feature["geometry"]["coordinates"][1] | |
| place_lon = feature["geometry"]["coordinates"][0] | |
| distance = haversine(current_long, current_lat, place_lon, place_lat) | |
| if include_latnlong: | |
| formatted_item["lat"] = place_lat | |
| formatted_item["lon"] = place_lon | |
| formatted_item["name"] = feature["properties"]["address_line1"] | |
| formatted_item["address"] = feature["properties"]["formatted"] | |
| formatted_item["distance_km"] = str(round(distance, 2)) + " km" | |
| if "contact" in feature["properties"]: | |
| formatted_item["contact"] = feature["properties"]["contact"] | |
| if "website" in feature["properties"]: | |
| formatted_item["website"] = feature["properties"]["website"] | |
| if "accommodation" in feature["properties"]: | |
| formatted_item["accommodation"] = feature["properties"]["accommodation"] | |
| formatted_item["map_url"] = get_google_map_url(place_lat, place_lon) | |
| formatted_data.append(formatted_item) | |
| return formatted_data | |
| def format_accommodation_markdown(data): | |
| formatted = "" | |
| for entry in data: | |
| formatted += f"### {entry['Accommodation Name']}\n" | |
| formatted += f"- **Address:** {entry['Address']}\n" | |
| formatted += f"- **Distance from center:** {entry['distance_km']}\n" | |
| contact_info = entry.get("contact") | |
| if contact_info: | |
| formatted += "- **Contact:**\n" | |
| if "phone" in contact_info: | |
| formatted += f" - Phone: {contact_info['phone']}\n" | |
| if "email" in contact_info: | |
| formatted += f" - Email: {contact_info['email']}\n" | |
| if "website" in entry: | |
| formatted += f"- **Website:** [{entry['website']}]({entry['website']})\n" | |
| accommodation_info = entry.get("accommodation") | |
| if accommodation_info: | |
| formatted += "- **Accommodation Info:**\n" | |
| if "stars" in accommodation_info: | |
| formatted += f" - Stars: {accommodation_info['stars']}\n" | |
| if "rooms" in accommodation_info: | |
| formatted += f" - Rooms: {accommodation_info['rooms']}\n" | |
| formatted += "\n---\n\n" | |
| return formatted | |
| def convert_string_date_to_iso(input_str: str): | |
| if not input_str: | |
| raise ValueError("Input date string cannot be empty") | |
| try: | |
| # Try parsing with timezone information | |
| try: | |
| converted_datetime = datetime.strptime( | |
| input_str.strip(), "%Y-%m-%dT%H:%M:%S%z" | |
| ) | |
| except ValueError: | |
| # If parsing fails, assume UTC timezone | |
| converted_datetime = datetime.strptime( | |
| input_str.strip(), "%Y-%m-%dT%H:%M:%S" | |
| ) | |
| converted_datetime = converted_datetime.replace(tzinfo=None) | |
| raw_datetime = datetime( | |
| year=converted_datetime.year, | |
| month=converted_datetime.month, | |
| day=converted_datetime.day, | |
| hour=converted_datetime.hour, | |
| minute=converted_datetime.minute, | |
| second=converted_datetime.second, | |
| tzinfo=converted_datetime.tzinfo, | |
| ) | |
| return raw_datetime | |
| except ValueError as e: | |
| raise ValueError( | |
| f"Invalid date format. Expected format: YYYY-MM-DDThh:mm:ss+hh:mm or YYYY-MM-DDThh:mm:ss, got: {input_str}" | |
| ) | |
| except Exception as e: | |
| raise ValueError(f"Error converting date string: {str(e)}") | |
| def datetime_to_iso_string(dt: datetime) -> str: | |
| """Convert a datetime object to a string in the format YYYY-MM-DDTHH:MM:SS. | |
| Args: | |
| dt (datetime): The datetime object to convert. | |
| Returns: | |
| str: The formatted datetime string. | |
| """ | |
| converted_datetime = dt.strftime("%Y-%m-%dT%H:%M:%S") | |
| return converted_datetime | |
| def parse_itinerary(text): | |
| # Split the input text by date pattern | |
| days = re.split(r"(\d{2}/\d{2}/\d{4})", text) | |
| # Initialize an empty list to store each day's activities | |
| itinerary = [] | |
| # Define a regex to capture the "Additional information" section | |
| additional_info_pattern = re.compile(r"Additional information:(.*)", re.DOTALL) | |
| additional_info_match = additional_info_pattern.search(text) | |
| # If "Additional information" exists, capture it | |
| additional_info = ( | |
| additional_info_match.group(1).strip() if additional_info_match else "" | |
| ) | |
| # Loop through the days to extract date and activities | |
| for i in range(1, len(days), 2): # Skip even indexes as they are not dates | |
| date = days[i].strip() | |
| activities_text = days[i + 1].strip() | |
| # Find activities | |
| activities = [] | |
| activity_matches = re.findall( | |
| r"\((\d{1,2}:\d{2}) - (\d{1,2}:\d{2})\):\s*(.+)", activities_text | |
| ) | |
| for match in activity_matches: | |
| start_time, end_time, description = match | |
| activities.append( | |
| { | |
| "start_time": start_time, | |
| "end_time": end_time, | |
| "description": description, | |
| } | |
| ) | |
| # Append the day's activities to the itinerary | |
| itinerary.append({"date": date, "activities": activities}) | |
| # Return the itinerary along with the additional information as a string | |
| return {"itinerary": itinerary, "additional_info": additional_info} | |
| class Destination(BaseModel): | |
| id: int = Field(..., title="Destination Id", gt=0) | |
| name: str = Field(..., title="Destination Name", min_length=1) | |
| location: str = Field(..., title="Location", min_length=1) | |
| description: str = Field(..., title="Description", min_length=1) | |
| def format_include_destinations(include_destinations: List[Union[Destination, str]]): | |
| formatted_string = "" | |
| if not include_destinations: | |
| return "No destinations required" | |
| elif all( | |
| isinstance(destination, Destination) for destination in include_destinations | |
| ): | |
| for index, destination in enumerate(include_destinations): | |
| formatted_string += f"#Destination {int(index) + 1}: {destination.name}\n" | |
| formatted_string += f" Location: {destination.location}\n" | |
| formatted_string += f" Description: {destination.description}\n\n" | |
| else: | |
| for index, destination in enumerate(include_destinations): | |
| formatted_string += f"#Destination {int(index) + 1}: {destination}\n" | |
| async def call_external_api( | |
| method: str, | |
| url: str, | |
| headers: dict = None, | |
| params: dict = None, | |
| data: dict = None, | |
| json: dict = None, | |
| timeout: int = 10, | |
| ): | |
| async with httpx.AsyncClient(timeout=timeout) as client: | |
| try: | |
| response = await client.request( | |
| method=method, | |
| url=url, | |
| headers=headers, | |
| params=params, | |
| data=data, | |
| json=json, | |
| ) | |
| response.raise_for_status() # Raise an error for non-2xx/3xx responses | |
| return response.json() | |
| except httpx.HTTPStatusError as e: | |
| raise HTTPException(status_code=e.response.status_code, detail=str(e)) | |
| except httpx.RequestError as e: | |
| raise HTTPException(status_code=500, detail=f"Request failed: {str(e)}") | |
| def serialize_datetime(obj): | |
| if isinstance(obj, datetime): | |
| return obj.isoformat() | |
| if isinstance(obj, ObjectId): | |
| return str(obj) | |
| return obj | |
| def get_google_map_url(lat, long): | |
| return f"https://www.google.com/maps/search/{lat},{long}" | |