cicboy's picture
update app.py
d839c93
# app.py
import os
import re
import json
import time
import traceback
import urllib.parse
from datetime import datetime
from typing import Any, Dict, List, Optional, Tuple
import gradio as gr
from dotenv import load_dotenv
from crewai import Agent, Task, Crew
from tools.google_maps_tool import GoogleMapsTool
from tools.route_planner_tool import RoutePlannerTool
from tools.weather_tool import WeatherTool
from tools.semantic_ranking_tool import SemanticRankingTool
from models.itinerary_model import ItineraryModel
from utils.date_utils import expand_dates
# ----------------------------
# Env
# ----------------------------
load_dotenv()
os.environ["LITELLM_PROVIDER"] = "openai"
os.environ["OPENAI_API_BASE"] = "https://api.openai.com/v1"
print("🔍 GOOGLE_MAPS_API_KEY found:", bool(os.getenv("GOOGLE_MAPS_API_KEY")))
print("🔍 OPENAI_API_KEY found:", bool(os.getenv("OPENAI_API_KEY")))
if not os.getenv("OPENAI_API_KEY"):
raise ValueError("Missing OPENAI_API_KEY")
if not os.getenv("GOOGLE_MAPS_API_KEY"):
raise ValueError("Missing GOOGLE_MAPS_API_KEY")
# ----------------------------
# Tools (Python-owned)
# ----------------------------
maps_tool = GoogleMapsTool()
weather_tool = WeatherTool()
route_tool = RoutePlannerTool()
semantic_tool = SemanticRankingTool()
# ----------------------------
# Helpers (preferences)
# ----------------------------
def preference_terms(preferences: str) -> List[str]:
"""Free-text -> list of preference terms (no hardcoding like 'bars')."""
if not preferences:
return []
parts = re.split(r"[,\n;/|]+", preferences)
terms = [p.strip() for p in parts if p.strip()]
seen = set()
out = []
for t in terms:
k = t.lower()
if k not in seen:
seen.add(k)
out.append(t)
return out[:12]
def extract_activity_terms(preferences: str, max_terms: int = 8) -> List[str]:
"""
Extract “activity-like” terms from preferences.
We avoid duplicating obvious meal intent tokens, but do NOT hardcode venues like bars.
"""
if not preferences:
return []
raw: List[str] = []
for part in preferences.replace(";", ",").split(","):
term = part.strip()
if term:
raw.append(term)
# skip meal/food intent tokens (still not hardcoding venues)
skip_tokens = ("food", "restaurant", "cuisine", "breakfast", "lunch", "dinner", "eat", "eating",
"vegan", "vegetarian", "gluten-free", "gluten free")
out: List[str] = []
for t in raw:
tl = t.lower()
if any(tok in tl for tok in skip_tokens):
continue
if t not in out:
out.append(t)
if len(out) >= max_terms:
break
return out
def to_iso_date(d: Any) -> str:
if isinstance(d, str):
s = d.replace("/", "-")
return datetime.fromisoformat(s).date().isoformat()
if hasattr(d, "date"):
return d.date().isoformat()
raise ValueError(f"Unsupported date value: {d!r}")
# ----------------------------
# Helpers (data shaping)
# ----------------------------
MEAL_CATS = {"breakfast", "lunch", "dinner"}
def compact_places(places_by_cat: Dict[str, List[Dict[str, Any]]], per_cat: int = 6) -> Dict[str, List[Dict[str, Any]]]:
"""
Keep only fields we need + cap count per category to keep planner context compact.
IMPORTANT: meal buckets must stay labeled as breakfast/lunch/dinner (even if the raw item has category like "vegetarian").
"""
out: Dict[str, List[Dict[str, Any]]] = {}
for cat, items in (places_by_cat or {}).items():
if not isinstance(items, list):
continue
cat_norm = (cat or "").lower().strip()
# Sort roughly by "quality": rating desc, then user_ratings_total desc
def score(x: Dict[str, Any]) -> Tuple[float, int]:
r = x.get("rating") or 0.0
n = x.get("user_ratings_total") or 0
return (float(r), int(n))
items_sorted = sorted(items, key=score, reverse=True)[:per_cat]
cleaned: List[Dict[str, Any]] = []
for x in items_sorted:
addr = (x.get("address") or x.get("formatted_address") or x.get("vicinity") or "").strip()
if not addr:
continue
raw_item_cat = (x.get("category") or "").strip()
item_cat = cat_norm if cat_norm in MEAL_CATS else (raw_item_cat or cat).strip()
cleaned.append(
{
"name": (x.get("name") or "").strip(),
"address": addr,
"category": item_cat,
"rating": x.get("rating"),
"user_ratings_total": x.get("user_ratings_total"),
"place_id": x.get("place_id"),
"types": x.get("types") or [],
"price_level": x.get("price_level"),
"business_status": x.get("business_status"),
}
)
out[cat] = cleaned
return out
def flatten_candidates(compact: Dict[str, List[Dict[str, Any]]]) -> List[Dict[str, Any]]:
"""
Build candidates list for semantic ranking tool.
Tool expects: name, category, description (we provide lightweight deterministic description).
"""
cands: List[Dict[str, Any]] = []
for cat, items in compact.items():
for it in items:
nm = it.get("name")
addr = it.get("address")
if not nm or not addr:
continue
types = ", ".join(it.get("types") or [])
price = it.get("price_level")
rating = it.get("rating")
reviews = it.get("user_ratings_total")
desc = (
f"{cat}. rating={rating} reviews={reviews} "
f"price_level={price} types={types}. Address={addr}"
)
cands.append(
{
"name": nm,
"category": cat,
"description": desc,
"address": addr,
"rating": rating,
"place_id": it.get("place_id"),
"user_ratings_total": reviews,
"price_level": price,
"types": it.get("types") or [],
}
)
# de-dupe by address (preserve order)
# For meals: allow same address to exist in breakfast AND lunch AND dinner (key = category+address)
seen = set()
out = []
for x in cands:
addr = (x.get("address") or "").lower().strip()
cat = (x.get("category") or "").lower().strip()
key = f"{cat}::{addr}" if cat in MEAL_CATS else addr
if key in seen:
continue
seen.add(key)
out.append(x)
return out
def semantic_rank(preferences: str, candidates: List[Dict[str, Any]], top_k: int = 30) -> List[Dict[str, Any]]:
if not preferences or not candidates:
return candidates[:top_k]
ranked = semantic_tool.run(
user_preferences=preferences,
candidates=candidates,
top_k=top_k,
distance_weight=0.0,
)
return ranked[:top_k] if isinstance(ranked, list) else []
def split_ranked_by_meals(
ranked: List[Dict[str, Any]],
meal_cats: Tuple[str, str, str] = ("breakfast", "lunch", "dinner"),
k_meal: int = 4,
k_activity: int = 10,
) -> Dict[str, List[Dict[str, Any]]]:
meals = {m: [] for m in meal_cats}
activities: List[Dict[str, Any]] = []
for x in ranked:
cat = (x.get("category") or "").lower().strip()
if cat in meals and len(meals[cat]) < k_meal:
meals[cat].append(x)
elif cat not in meal_cats:
activities.append(x)
# diversify activities by category a bit
picked: List[Dict[str, Any]] = []
per_cat_cap = 2
per_cat_count: Dict[str, int] = {}
for x in activities:
cat = (x.get("category") or "").lower().strip() or "other"
if per_cat_count.get(cat, 0) >= per_cat_cap:
continue
picked.append(x)
per_cat_count[cat] = per_cat_count.get(cat, 0) + 1
if len(picked) >= k_activity:
break
return {
"breakfast": meals["breakfast"],
"lunch": meals["lunch"],
"dinner": meals["dinner"],
"activities": picked,
}
def build_allowed_locations(bundle: Dict[str, List[Dict[str, Any]]], max_destinations: int = 9) -> List[str]:
"""
RoutePlannerTool NxN caps to 10 stops (origin + 9 destinations).
So we choose a compact allowed set for the planner to pick from.
"""
selected: List[str] = []
def take(items: List[Dict[str, Any]], n: int):
nonlocal selected
for it in items[:n]:
addr = (it.get("address") or "").strip()
if addr and addr not in selected:
selected.append(addr)
# meals first (structure), then activities
take(bundle.get("breakfast", []), 2)
take(bundle.get("lunch", []), 2)
take(bundle.get("dinner", []), 2)
take(bundle.get("activities", []), 5)
return selected[:max_destinations]
def place_lookup(compact: Dict[str, List[Dict[str, Any]]]) -> Dict[str, Dict[str, Any]]:
"""Address -> place metadata."""
out: Dict[str, Dict[str, Any]] = {}
for cat, items in compact.items():
for it in items:
addr = (it.get("address") or "").strip()
if not addr:
continue
out[addr] = it
return out
def maps_search_url(address: str, place_id: Optional[str] = None) -> str:
q = urllib.parse.quote_plus(address)
if place_id:
# Google Maps "search" url supports query_place_id
pid = urllib.parse.quote_plus(place_id)
return f"https://www.google.com/maps/search/?api=1&query={q}&query_place_id={pid}"
return f"https://www.google.com/maps/search/?api=1&query={q}"
# ----------------------------
# Helper functions
# ----------------------------
def build_meal_hints(preferences: str) -> Dict[str, Optional[str]]:
"""
Turns user preferences into short text hints for Google Places queries:
query = f"{hint} {meal} in {location}"
Returns: {"breakfast": "...", "lunch": "...", "dinner": "..."} with None if no hint.
"""
if not preferences or not str(preferences).strip():
return {"breakfast": None, "lunch": None, "dinner": None}
# basic tokenization for comma-separated / free text
raw = re.split(r"[,\n;/|]+", preferences.lower())
tokens = {t.strip().replace("_", "-") for t in raw if t.strip()}
# canonical sets
DIETARY = {
"vegan": "vegan",
"vegetarian": "vegetarian",
"gluten-free": "gluten free",
"gluten free": "gluten free",
}
CUISINES = {
"italian": "italian",
"thai": "thai",
"japanese": "japanese",
"chinese": "chinese",
"french": "french",
"seafood": "seafood",
"local": "local",
}
QUALITY = {
"michelin-star": "michelin star",
"michelin star": "michelin star",
"michelin": "michelin star",
}
# extract (order matters; keep short)
dietary = []
for key in ("vegan", "vegetarian", "gluten-free", "gluten free"):
if key in tokens:
dietary.append(DIETARY[key])
cuisines = []
for key in ("local", "italian", "thai", "japanese", "chinese", "french", "seafood"):
if key in tokens:
cuisines.append(CUISINES[key])
michelin = None
for key in ("michelin-star", "michelin star", "michelin"):
if key in tokens:
michelin = QUALITY[key]
break
# simple conflict guard: if vegan/vegetarian, drop seafood keyword
if any(x in dietary for x in ("vegan", "vegetarian")):
cuisines = [c for c in cuisines if c != "seafood"]
# cap to keep queries tight
dietary = dietary[:2] # e.g. "vegan gluten free"
cuisines = cuisines[:2] # e.g. "local italian"
def hint_for(meal: str) -> Optional[str]:
parts = []
parts += dietary
parts += cuisines
# Michelin is most relevant for lunch/dinner
if michelin and meal in ("lunch", "dinner"):
parts.append(michelin)
# keep it compact
hint = " ".join(parts).strip()
return hint if hint else None
return {
"breakfast": hint_for("breakfast"),
"lunch": hint_for("lunch"),
"dinner": hint_for("dinner"),
}
def extract_vibes(preferences: str, max_terms: int = 3) -> List[str]:
if not preferences or not str(preferences).strip():
return []
raw = re.split(r"[,\n;/|]+", preferences.lower())
tokens = {t.strip() for t in raw if t.strip()}
VIBES = {
"romantic": "romantic",
"relaxed": "relaxed",
"avoid crowds": "quiet",
"avoid crowd": "quiet",
"no crowds": "quiet",
"quiet": "quiet",
"fine dining": "fine dining",
"finedining": "fine dining",
"casual dining": "casual",
"casual": "casual",
}
# simple phrase matching (handles "avoid crowds" and "fine dining" in free text)
text = preferences.lower()
def has(term: str) -> bool:
return (term in text) if (" " in term) else (term in tokens)
out: List[str] = []
for key, normalized in VIBES.items():
if (" " in key and key in text) or (key in tokens):
if normalized not in out:
out.append(normalized)
if len(out) >= max_terms:
break
return out
def choose_mode_for_leg(
allowed_modes: List[str],
matrix: Dict[str, Any],
i: int,
j: int,
) -> Optional[str]:
"""
Mode choice rule:
- if walking allowed AND walking distance <= 2.0km -> walking
- else if public_transport allowed -> public_transport
- else if cycling allowed -> cycling
- else -> driving
Fallback: first mode with non-null duration.
"""
def get_dist(mode: str) -> Optional[float]:
try:
return matrix[mode]["distance_km"][i][j]
except Exception:
return None
def get_dur(mode: str) -> Optional[float]:
try:
return matrix[mode]["duration_min"][i][j]
except Exception:
return None
if "walking" in allowed_modes:
d = get_dist("walking")
if d is not None and d <= 2.0:
return "walking"
if "public_transport" in allowed_modes and get_dur("public_transport") is not None:
return "public_transport"
if "cycling" in allowed_modes and get_dur("cycling") is not None:
return "cycling"
if "driving" in allowed_modes and get_dur("driving") is not None:
return "driving"
for m in ("walking", "public_transport", "cycling", "driving"):
if m in allowed_modes and get_dur(m) is not None:
return m
return None
def merge_unique_by_address(primary, secondary, limit):
def key(x):
return (x.get("place_id") or x.get("address") or "").lower().strip()
seen = set(key(p) for p in primary if key(p))
out = list(primary)
for x in secondary:
k = key(x)
if not k or k in seen:
continue
out.append(x)
seen.add(k)
if len(out) >= limit:
break
return out
def compute_leg_metrics_from_matrix(
routes: Dict[str, Any],
allowed_modes: List[str],
from_loc: str,
to_loc: str,
) -> Tuple[Optional[str], Optional[float], Optional[int]]:
"""
Returns: (mode, distance_km, duration_minutes_int) using NxN matrix.
"""
stops = routes.get("stops") or []
matrix = routes.get("matrix") or {}
if not stops or not matrix:
return (None, None, None)
idx = {stops[k]: k for k in range(len(stops))}
if from_loc not in idx or to_loc not in idx:
return (None, None, None)
i, j = idx[from_loc], idx[to_loc]
mode = choose_mode_for_leg(allowed_modes, matrix, i, j)
if not mode:
return (None, None, None)
try:
d = matrix[mode]["distance_km"][i][j]
t = matrix[mode]["duration_min"][i][j]
except Exception:
return (None, None, None)
if d is None or t is None:
return (mode, None, None)
return (mode, float(d), int(round(float(t))))
def compute_leg_metrics_pairwise(
route_tool: RoutePlannerTool,
allowed_modes: List[str],
from_loc: str,
to_loc: str,
) -> Tuple[Optional[str], Optional[float], Optional[int]]:
"""
Pairwise fallback: calls RoutePlannerTool for a single leg (from -> to).
Handles RoutePlannerTool returning either a list or a dict.
"""
try:
out = route_tool.run(
origin=from_loc,
destinations=[to_loc],
modes=allowed_modes,
max_results=1,
return_matrix=False,
)
except Exception:
return (None, None, None)
# ✅ Normalize output into `best` (a dict for the best leg/route)
if isinstance(out, dict):
# common patterns
if "routes" in out and isinstance(out["routes"], list) and out["routes"]:
best = out["routes"][0]
elif "results" in out and isinstance(out["results"], list) and out["results"]:
best = out["results"][0]
else:
best = out
elif isinstance(out, list) and out:
best = out[0]
else:
return (None, None, None)
# ✅ Extract fields (support a couple of common key names)
mode = best.get("mode") or best.get("travel_mode")
dist = best.get("distance_km") or best.get("distance") # if your tool uses "distance"
dur = best.get("duration_min") or best.get("duration_minutes") or best.get("duration")
# ✅ Coerce types safely
try:
dist_km = float(dist) if dist is not None else None
except Exception:
dist_km = None
try:
dur_min = int(round(float(dur))) if dur is not None else None
except Exception:
dur_min = None
return (mode, dist_km, dur_min)
def extract_unique_locations_in_order(itinerary: ItineraryModel) -> List[str]:
"""
Unique activity locations in the order they appear across days.
"""
seen = set()
out: List[str] = []
for day in itinerary.days:
for act in day.activities:
loc = (act.location or "").strip()
if loc and loc not in seen:
seen.add(loc)
out.append(loc)
return out
def postprocess_itinerary(
itinerary: ItineraryModel,
route_tool: RoutePlannerTool,
allowed_modes: List[str],
origin: str,
addr_to_meta: Dict[str, Dict[str, Any]],
prefer_single_matrix: bool = True,
nxn_destination_cap: int = 9, # origin + 9 = 10 stops
) -> ItineraryModel:
"""
- Fills act.map_url deterministically
- Computes travel_mode / distance_from_prev / duration_minutes
- Uses per-day NxN matrix when day unique locations <= cap
- Falls back to pairwise per-leg when day exceeds cap or matrix missing values
- Sets itinerary.total_distance_km
"""
total_km = 0.0
# Fill map_url first (deterministic)
for day in itinerary.days:
for act in day.activities:
meta = addr_to_meta.get(act.location)
act.map_url = maps_search_url(act.location, meta.get("place_id") if meta else None)
# Per day routing matrix
for day in itinerary.days:
# collect unique locations for this day (in order)
day_locs: List[str] = []
seen: set[str] = set()
for act in day.activities:
loc = (act.location or "").strip()
if loc and loc not in seen:
seen.add(loc)
day_locs.append(loc)
use_matrix = prefer_single_matrix and (len(day_locs) <= nxn_destination_cap)
routes: Dict[str, Any] = {}
if use_matrix:
routes = route_tool.run(
origin=origin,
destinations=day_locs,
modes=allowed_modes,
max_results=len(day_locs),
return_matrix=True,
)
prev_loc = origin
for act in day.activities:
if use_matrix:
mode, dist_km, dur_min = compute_leg_metrics_from_matrix(routes, allowed_modes, prev_loc, act.location)
# if matrix misses a leg (shouldn’t, but safe), fall back pairwise
if (mode is None) or (dist_km is None) or (dur_min is None):
mode2, dist2, dur2 = compute_leg_metrics_pairwise(route_tool, allowed_modes, prev_loc, act.location)
# only overwrite if pairwise gave something better
if mode2 is not None: mode = mode2
if dist_km is None: dist_km = dist2
if dur_min is None: dur_min = dur2
else:
mode, dist_km, dur_min = compute_leg_metrics_pairwise(route_tool, allowed_modes, prev_loc, act.location)
act.travel_mode = mode
act.distance_from_prev = dist_km
act.duration_minutes = dur_min
if dist_km is not None:
total_km += dist_km
prev_loc = act.location
itinerary.total_distance_km = round(total_km, 2)
return itinerary
# ----------------------------
# Agents (LLM-only)
# ----------------------------
planner_agent = Agent(
role="Senior Travel Agent",
goal="Create a trip plan strictly from CONTEXT_JSON. Never invent places.",
backstory="You are a senior travel agent who plans realistic schedules using only provided options.",
llm="gpt-5-mini",
temperature=0.25,
verbose=False,
tools=[], # IMPORTANT: no tools in planner (Python did all tools)
)
writer_agent = Agent(
role="Itinerary Writer Agent",
goal="Write Markdown from the provided itinerary JSON without inventing new places.",
backstory="You write clear, engaging itineraries that stick to the provided structured plan.",
llm="gpt-4o-mini",
temperature=0.3,
verbose=False,
)
# ----------------------------
# Core logic
# ----------------------------
def generate_itinerary(location, start_date, end_date, preferences, transport_modes):
t0 = time.time()
print("✅ Button clicked:", location, start_date, end_date, transport_modes, preferences)
try:
if not location or not str(location).strip():
return "### ❌ Error\nPlease enter a destination."
# Normalize dates
start_date_iso = to_iso_date(start_date)
end_date_iso = to_iso_date(end_date)
days_count, date_list = expand_dates(start_date_iso, end_date_iso)
if isinstance(transport_modes, str):
transport_modes = [transport_modes]
transport_modes = [m for m in (transport_modes or []) if m]
if not transport_modes:
transport_modes = ["walking"]
activity_terms = extract_activity_terms(preferences or "")
# --------------------
# 1) Python: Maps (once)
# --------------------
# cuisine_preferences is optional and your maps tool expects a dict keyed by meals,
# but we do not "guess cuisines" from preferences; we keep it None for neutrality.
cuisine_preferences = build_meal_hints(preferences or "")
places_raw = maps_tool.run(
location=location,
activities=activity_terms if (preferences and activity_terms) else None,
cuisine_preferences=cuisine_preferences,
max_results_per_query=20,
)
#Fallback: if any of the meals are missing, requery meals only
if not (places_raw.get("breakfast") and places_raw.get("lunch") and places_raw.get("dinner")):
fallback = maps_tool.run(
location=location,
activities=None,
cuisine_preferences={"breakfast": [], "lunch": [], "dinner": []},
max_results_per_query=30,
)
# merge in anything missing
for k in ("breakfast", "lunch", "dinner"):
if not places_raw.get(k):
places_raw[k] = fallback.get(k, [])
print({k: len(v) for k, v in places_raw.items()})
# Make places compact + deterministic
per_cat = min(20, max(6, days_count)) #up to 20 per category
k_activity = min(60, max(24, days_count * 8)) #rank more when trip is longer
k_meal = min(30, max(6, days_count * 3)) # need >= days_count if no reuse
places_compact = compact_places(places_raw, per_cat=per_cat)
print("DEBUG dinner compact count:", len(places_compact.get("dinner", [])))
print("DEBUG dinner compact categories:", [p.get("category") for p in places_compact.get("dinner", [])])
candidates = flatten_candidates(places_compact)
addr_to_meta = place_lookup(places_compact)
meal_cands = [c for c in candidates if c.get("category") in ("breakfast", "lunch", "dinner")]
act_cands = [c for c in candidates if c.get("category") not in ("breakfast", "lunch", "dinner")]
bundle = {"breakfast": [], "lunch": [], "dinner": [], "activities": []}
vibes = extract_vibes(preferences or "")
diet_terms = ("vegan", "vegetarian", "gluten-free", "gluten free")
pref_lower = (preferences or "").lower()
diet_requested = any(t in pref_lower for t in diet_terms)
for meal in ("breakfast", "lunch", "dinner"):
per_meal = [c for c in meal_cands if c.get("category") == meal]
vibe_str = ", ".join(vibes) if vibes else "any"
diet_line = "Prioritize vegan/vegetarian/gluten-free suitability. " if diet_requested else ""
meal_pref = (
f"{preferences}. For {meal}: vibe={vibe_str}. "
f"{diet_line}"
"Prioritize local food and strong reviews."
)
ranked = semantic_rank(meal_pref, per_meal, top_k=k_meal)
print(f"🍽️ {meal}: per_meal={len(per_meal)} ranked={len(ranked)} days={days_count}")
# If ranking is too strict or insufficient returns, fill with high-quality defaults
if len(ranked) < days_count:
fallback_sorted = sorted(
per_meal,
key=lambda x: ((x.get("rating") or 0), (x.get("user_ratings_total") or 0)),
reverse=True,
)
ranked = merge_unique_by_address(ranked, fallback_sorted, limit=k_meal)
bundle[meal] = ranked
# -------------- Balanced activities ---------------
prefs_l = (preferences or "").lower()
must_cats: List[str] = []
if "museum" in prefs_l:
must_cats.append("museums")
if "landmark" in prefs_l:
must_cats.append("landmarks")
if "park" in prefs_l:
must_cats.append("parks")
if "art" in prefs_l:
must_cats.append("art")
picked: List[Dict[str, Any]] = []
picked_addr: set[str] = set()
# enough unique options so the planner can actually pick them across days
per_must = min(20, max(6, days_count * 3)) # e.g. 3–8 per must category
for cat in must_cats:
cat_cands = [c for c in act_cands if (c.get("category") or "").lower().strip() == cat]
# rank within that category using a category-focused query
ranked_cat = semantic_rank(f"{cat}. {preferences or ''}", cat_cands, top_k=per_must)
for x in ranked_cat:
addr = (x.get("address") or "").lower().strip()
if not addr or addr in picked_addr:
continue
picked.append(x)
picked_addr.add(addr)
# fill remaining with global ranking, but diversify so one category doesn't take over
remaining_slots = max(0, k_activity - len(picked))
ranked_all = semantic_rank(
preferences or "",
[c for c in act_cands if (c.get("address") or "").lower().strip() not in picked_addr],
top_k=max(remaining_slots * 3, remaining_slots) # overfetch for diversity
)
per_cat_cap = 3
counts: Dict[str, int] = {}
fill: List[Dict[str, Any]] = []
for x in ranked_all:
cat = (x.get("category") or "other").lower().strip()
if counts.get(cat, 0) >= per_cat_cap:
continue
fill.append(x)
counts[cat] = counts.get(cat, 0) + 1
if len(fill) >= remaining_slots:
break
bundle["activities"] = picked + fill
# Ensure no meal items leak into activities
bundle["activities"] = [
p for p in (bundle.get("activities") or [])
if (p.get("category") or "").lower().strip() not in MEAL_CATS
]
# debug
print("✅ Activities bundle category counts:",
{k: sum(1 for a in bundle["activities"] if (a.get("category") or "").lower() == k)
for k in sorted({(a.get("category") or "").lower() for a in bundle["activities"]})})
# --------------------
# 2) Python: Weather (once)
# --------------------
print("➡️ Weather: calling weather_tool.run()")
weather_raw = weather_tool.run(
city=location,
start_date=start_date_iso,
end_date=end_date_iso,
)
print("✅ Weather: returned from weather_tool.run()")
# Convert weather list -> date map
weather_by_date: Dict[str, Dict[str, Any]] = {}
for row in weather_raw.get("forecasts", []) or []:
d = row.get("date")
if not d:
continue
weather_by_date[d] = {
"temp_max": row.get("temp_max"),
"temp_min": row.get("temp_min"),
"precipitation_mm": row.get("precipitation_mm"),
}
# --------------------
# 3) Python: Semantic pre-rank (once)
# --------------------
# ensure enough unique meal options for the number of days
for m in ("breakfast", "lunch", "dinner"):
if len(bundle.get(m, [])) < days_count:
return (
"### Not enough meal options found\n"
f"Need at least {days_count} unique {m} venues, but only found {len(bundle.get(m, []))}. "
"Try broadening preferences or increasing max_results_per_query."
)
print("✅ Bundle sizes:", {m: len(bundle.get(m, [])) for m in ("breakfast","lunch","dinner")},
"activities:", len(bundle.get("activities", [])))
# --------------------
# 5) LLM: Planner (no tools) — compact deterministic context
# --------------------
print("➡️ Planner: building context_json")
context_json = {
"location": location,
"start_date": start_date_iso,
"end_date": end_date_iso,
"trip_duration_days": days_count,
"preferences": preferences or "",
"transport_modes": transport_modes,
"places": {
"breakfast": bundle["breakfast"],
"lunch": bundle["lunch"],
"dinner": bundle["dinner"],
"activities": bundle["activities"],
},
"weather": weather_by_date,
"RULES": [
"Use ONLY the provided places lists. Never invent new places.",
"Each activity.location MUST be exactly one of the provided place addresses (verbatim).",
"You must produce exactly trip_duration_days day-plans, matching the date range.",
"Keep each day within 08:00–22:00.",
"Each day MUST include exactly 1 breakfast, 1 lunch, and 1 dinner activity (choose from the provided meal lists).",
"Each day should include 2–4 non-meal activities (in addition to breakfast/lunch/dinner), unless weather is severe or options are limited.",
"If preferences mention museums, landmarks, parks, or art — and matching categories exist in places.activities — include at least 1 activity per day from those matched categories before choosing other activities.",
"DO NOT reuse venues.",
"Do not schedule bars before 12:00.",
"Use weather to bias indoor vs outdoor choices (rain -> museums/indoor).",
"Do not worry about distance/time fields; Python will compute travel metrics after planning.",
"Still output travel_mode/distance_from_prev/duration_minutes fields (can be null).",
"Prefer <= 9 unique locations PER DAY to keep routing efficient."
],
}
print("➡️ Planner: creating planning_task")
planning_task = Task(
description=(
"Build an itinerary ONLY from the provided CONTEXT_JSON. Never invent places.\n\n"
f"CONTEXT_JSON={json.dumps(context_json, ensure_ascii=False)}\n\n"
"Output MUST match ItineraryModel exactly.\n"
),
expected_output="A JSON itinerary matching ItineraryModel exactly.",
agent=planner_agent,
output_pydantic=ItineraryModel,
)
planner_crew = Crew(
agents=[planner_agent],
tasks=[planning_task],
verbose=True,
)
print("➡️ Planner: kickoff starting")
_ = planner_crew.kickoff()
print("✅ Planner: kickoff finished")
# Extract the Pydantic object (CrewAI stores in task.output)
planned = getattr(planning_task, "output", None)
planned_obj = getattr(planned, "pydantic", None) or getattr(planned, "output_pydantic", None)
if planned_obj is None:
# fallback: try parse raw json
raw = getattr(planned, "raw", None) or str(planned)
planned_obj = ItineraryModel.model_validate_json(raw)
itinerary: ItineraryModel = planned_obj
used = set()
for day in itinerary.days:
for act in day.activities:
loc = (act.location or "").strip()
if not loc:
continue
if loc in used:
return "### Planner reused a venue\nTry increasing max_results_per_query or relax constraints."
used.add(loc)
# --------------------
# 6) Python: overwrite travel fields deterministically (THE FIX)
# --------------------
itinerary = postprocess_itinerary(
itinerary=itinerary,
route_tool=route_tool,
allowed_modes=transport_modes,
origin=location,
addr_to_meta=addr_to_meta,
prefer_single_matrix=True, # will use NxN when possible
nxn_destination_cap=9, # origin + 9 = 10 stops
)
itinerary_json = itinerary.model_dump()
# --------------------
# 7) LLM: Writer — Markdown from corrected JSON
# --------------------
writing_task = Task(
description=(
"Write Markdown from ITINERARY_JSON. Do NOT invent places.\n\n"
f"**Trip Dates:** {start_date_iso}{end_date_iso}\n\n"
f"ITINERARY_JSON={json.dumps(itinerary_json, ensure_ascii=False)}\n\n"
"Formatting rules:\n"
"- Use a header per day.\n"
"- Bold timestamps.\n"
"- Include rating inline when present (⭐ 4.7).\n"
"\n"
"- For each day, render activities strictly in the JSON order.\n"
"- IMPORTANT: Do NOT output travel_mode, distance_from_prev, or duration_minutes as separate labeled fields inside an activity block.\n"
"- Travel info must appear ONLY as the single italic line below.\n"
"\n"
"- For each activity (index i) in a day:\n"
" - If i > 0: print this line as its own paragraph (NOT a bullet, no leading '-' or '###') immediately BEFORE the activity block:\n"
" *Travel from previous: {duration_minutes} min, {distance_from_prev} km ({travel_mode})*\n"
" - If i == 0: do NOT print any travel line, even if travel fields are present.\n"
"\n"
"- The travel line uses the CURRENT activity’s fields (travel_mode/distance_from_prev/duration_minutes) because they represent travel FROM the previous activity TO this activity.\n"
"\n"
"- Example (follow exactly):\n"
" *Travel from previous: 13 min, 0.96 km (walking)*\n"
" **10:00 - 12:00** \n"
" **ArteBo - Contemporary Art gallery** (⭐ 4.9) \n"
" Location: [Via S. Petronio Vecchio, 8/A, 40125 Bologna BO, Italy](https://...)\n"
"\n"
"- Only describe places in the JSON.\n"
),
expected_output="Markdown itinerary.",
agent=writer_agent,
)
writer_crew = Crew(
agents=[writer_agent],
tasks=[writing_task],
verbose=True,
)
result = writer_crew.kickoff()
markdown_itinerary = (
result if isinstance(result, str)
else getattr(result, "raw", None) or str(result)
)
print(f"✅ Done in {time.time() - t0:.1f}s")
return markdown_itinerary
except Exception:
tb = traceback.format_exc()
print(tb)
return f"### ❌ Error\n```text\n{tb}\n```"
# ----------------------------
# UI
# ----------------------------
with gr.Blocks(theme=gr.themes.Soft()) as demo:
gr.Markdown("## 🧭 AI-Powered Travel Itinerary Planner")
gr.Markdown("Plan optimized, weather-aware trips based on your preferences")
with gr.Row():
location = gr.Textbox(label="Destination", placeholder="e.g. Venice, Italy")
transport_modes = gr.CheckboxGroup(
["walking", "public_transport", "driving", "cycling"],
label="Transport modes",
value=["walking"],
info="Select one or multiple modes for route optimization",
)
with gr.Row():
start_date = gr.DateTime(label="Start Date", include_time=False, type="datetime")
end_date = gr.DateTime(label="End Date", include_time=False, type="datetime")
preferences = gr.Textbox(label="Your Preferences", placeholder="art, local food, museums, relaxed pace, avoid queues")
generate_btn = gr.Button("📝 Generate Itinerary")
itinerary_markdown = gr.Markdown(label="🔖 Your Personalized Itinerary")
generate_btn.click(
fn=generate_itinerary,
inputs=[location, start_date, end_date, preferences, transport_modes],
outputs=[itinerary_markdown],
)
demo.queue(default_concurrency_limit=1, max_size=20)
if __name__ == "__main__":
demo.launch(
server_name="0.0.0.0",
server_port=7860,
show_error=True,
max_threads=1,
ssr_mode=False,
)