hotel-mvp / app.py
Panagiotis Spanakis
Fix hallucinations
6b287bc
import datetime as dt
import os
import json
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Tuple, Union
import gradio as gr
# Optional LLM / LangChain imports (lazy usage)
try:
from transformers import AutoModelForCausalLM, AutoTokenizer, pipeline # type: ignore
_TRANSFORMERS_AVAILABLE = True
except Exception:
_TRANSFORMERS_AVAILABLE = False
try:
from langchain.llms import HuggingFacePipeline # type: ignore
_LANGCHAIN_AVAILABLE = True
except Exception:
_LANGCHAIN_AVAILABLE = False
# -----------------------------
# Simple in-memory hotel sim DB
# -----------------------------
def _daterange(start: dt.date, end: dt.date):
cur = start
while cur < end:
yield cur
cur = cur + dt.timedelta(days=1)
@dataclass
class Booking:
room_number: int
guest_name: str
check_in: dt.date
check_out: dt.date
room_type: str
@dataclass
class HotelSim:
room_types: Dict[str, int] = field(default_factory=lambda: {
"Standard": 10,
"Deluxe": 5,
"Suite": 2,
})
rooms: Dict[int, str] = field(default_factory=dict)
bookings: List[Booking] = field(default_factory=list)
cleaning_requests: List[Tuple[int, str, dt.datetime]] = field(default_factory=list)
occupants: Dict[int, Tuple[str, dt.date, dt.date]] = field(default_factory=dict)
def __post_init__(self):
# Create room numbering by type (simple, predictable ranges)
# Standard: 101-110, Deluxe: 201-205, Suite: 301-302
rn = 101
for _ in range(self.room_types["Standard"]):
self.rooms[rn] = "Standard"
rn += 1
rn = 201
for _ in range(self.room_types["Deluxe"]):
self.rooms[rn] = "Deluxe"
rn += 1
rn = 301
for _ in range(self.room_types["Suite"]):
self.rooms[rn] = "Suite"
rn += 1
# Pre-populate with a couple of “current” occupants for login demo
today = dt.date.today()
self.occupants[104] = ("Papadopoulos", today - dt.timedelta(days=1), today + dt.timedelta(days=3))
self.occupants[202] = ("Smith", today, today + dt.timedelta(days=2))
# ----- Availability & Booking -----
def _is_room_available(self, room: int, start: dt.date, end: dt.date) -> bool:
if start >= end:
return False
for b in self.bookings:
if b.room_number == room:
if start < b.check_out and end > b.check_in:
return False
if room in self.occupants:
last_name, occ_in, occ_out = self.occupants[room]
if start < occ_out and end > occ_in:
return False
return True
def find_available_room(self, room_type: str, start: dt.date, end: dt.date) -> Optional[int]:
for room, rtype in self.rooms.items():
if rtype == room_type and self._is_room_available(room, start, end):
return room
return None
def book_room(self, guest_name: str, room_type: str, start: dt.date, end: dt.date) -> Optional[Booking]:
room = self.find_available_room(room_type, start, end)
if room is None:
return None
booking = Booking(room_number=room, guest_name=guest_name, check_in=start, check_out=end, room_type=room_type)
self.bookings.append(booking)
return booking
# ----- Cleaning -----
def request_cleaning(self, room_number: int, guest_name: str) -> Tuple[int, dt.datetime]:
ts = dt.datetime.now()
self.cleaning_requests.append((room_number, guest_name, ts))
return len(self.cleaning_requests), ts
# ----- Login -----
def login(self, room_number: int, last_name: str) -> Tuple[bool, Optional[Tuple[str, dt.date, dt.date]]]:
occ = self.occupants.get(room_number)
if not occ:
return False, None
ln, ci, co = occ
if ln.strip().lower() == last_name.strip().lower():
return True, occ
return False, None
HOTEL = HotelSim()
# -----------------------------
# Hotel static knowledge & offers / upsells
# -----------------------------
HOTEL_INFO = {
"name": "Bluebird Hotel",
"location": "Volos, Greece — Pagasetic Gulf waterfront, gateway to Mount Pelion",
"star_rating": 5,
"amenities": [
"Rooftop infinity pool",
"24/7 concierge",
"High-speed Wi-Fi",
"EV charging bays",
"Ocean-view fitness center",
"Business lounge & meeting pods",
],
"dining": [
"Azure Restaurant (Mediterranean tasting menu)",
"Harbor Café (casual, specialty coffee)",
"SkyBar (sunset cocktails)",
],
"spa": {
"name": "Aeris Spa",
"signature_treatments": [
"Thalasso Revive (80m)",
"Deep Sea Stone Massage (60m)",
"Luminescence Facial (50m)",
],
"open": "07:00 - 22:00",
},
"check_in_time": "15:00",
"check_out_time": "11:00",
"late_checkout_policy": "Subject to availability; fees may apply after 13:00.",
# Curated local guide for Volos & nearby Pelion
"local_guide": {
"dining_tsipouradika": [
"MeZen (modern tsipouro meze, city center)",
"Ouzeri Ta Kymata (classic seafood meze on the promenade)",
"Iolkos Tsipouradiko (casual, fresh small plates, near port)",
],
"beaches": [
"Anavros Beach (city beach, easy walk from center)",
"Kala Nera (Pelion west coast ~25–35 min drive)",
"Mylopotamos (Pelion east coast ~1h 15m; iconic rock arch)",
"Papa Nero & Agios Ioannis (east Pelion ~1h 10m; organized)"
],
"pelion_villages": [
"Makrinitsa (balconies of Pelion, views over Volos ~25 min)",
"Portaria (traditional square & tavernas ~20 min)",
"Tsagarada (stone bridges & chestnut forests ~1h 10m)",
"Milies (terminus of Pelion steam train ~50 min)"
],
"activities": [
"Pelion Train (Ano Lechonia → Milies; weekends seasonal)",
"Waterfront promenade sunset stroll (Argonauts Avenue)",
"Sea kayaking / sailing in Pagasetic Gulf (calm waters)",
"Hiking Centaurs' Path (Portaria ↔ Makrinitsa)"
],
"museums_culture": [
"Athanasakeion Archaeological Museum of Volos",
"Rooftile & Brickworks Museum N. & S. Tsalapatas",
"Volos Municipal Gallery — Giorgio de Chirico Art Center",
],
"day_trips": [
"Ferry to Skiathos (summer schedule from Volos port)",
"Meteora monasteries (long day ~2h each way by car)",
],
"cafes_bars": [
"Isalos (seafront coffee & cocktails)",
"Canteen on the waterfront (casual bites)",
"Grooove (drinks, music near center)",
],
},
}
OFFERS_SEED = [
{"id": 1, "title": "Spa Thermal Circuit Upgrade", "desc": "Gain 90-minute access to hydrotherapy pools + aromatherapy sauna.", "price_text": "€45 per guest"},
{"id": 2, "title": "Sunset Tasting Menu", "desc": "7-course chef's selection at Azure with panoramic sea view table priority.", "price_text": "€85 per person"},
{"id": 3, "title": "Premium Late Checkout", "desc": "Extend your stay until 17:00 and enjoy lounge refreshments.", "price_text": "€60 flat"},
{"id": 4, "title": "Wellness Package", "desc": "Morning yoga session + detox juice flight + mini massage add-on.", "price_text": "€55 per guest"},
]
OFFERS_FILE = "offers_store.json"
ANALYTICS_LOG = "analytics.log"
def _extract_price(price_text: str):
import re
m = re.search(r"([€$£])?(\d+(?:\.\d+)?)", price_text)
if not m:
return None, None
currency = m.group(1) or "€"
value = float(m.group(2))
return currency, value
def load_offers():
if os.path.exists(OFFERS_FILE):
try:
with open(OFFERS_FILE, "r", encoding="utf-8") as f:
data = json.load(f)
for o in data:
if "active" not in o:
o["active"] = True
if "price_text" not in o and "price" in o:
o["price_text"] = str(o["price"])
cur, val = _extract_price(o.get("price_text", ""))
o["currency"], o["price_value"] = cur, val
return data
except Exception:
pass
seeded = []
for raw in OFFERS_SEED:
cur, val = _extract_price(raw.get("price_text", ""))
seeded.append({
"id": raw["id"],
"title": raw["title"],
"desc": raw["desc"],
"price_text": raw["price_text"],
"currency": cur,
"price_value": val,
"active": True,
})
save_offers(seeded)
return seeded
def save_offers(offers):
try:
with open(OFFERS_FILE, "w", encoding="utf-8") as f:
json.dump(offers, f, ensure_ascii=False, indent=2)
except Exception as e:
print("[OFFERS] save failed", e)
def get_active_offers():
return [o for o in load_offers() if o.get("active")]
def set_offer_inactive(offer_id: int):
offers = load_offers()
changed = False
for o in offers:
if o["id"] == offer_id and o.get("active"):
o["active"] = False
changed = True
break
if changed:
save_offers(offers)
def log_event(event: str, **data):
rec = {"ts": dt.datetime.utcnow().isoformat(), "event": event, **data}
try:
with open(ANALYTICS_LOG, "a", encoding="utf-8") as f:
f.write(json.dumps(rec) + "\n")
except Exception as e:
print("[ANALYTICS] log failed", e)
def format_offer_card(offer: Optional[dict]) -> str:
if not offer:
return "**No current offers.**"
return (
f"### {offer['title']}\n"
f"{offer['desc']}\n\n"
f"**Price:** {offer['price_text']}\n"
f"(Offer ID: {offer['id']})"
)
def compute_total(accepted: List[dict]):
total = 0.0
currency = None
for o in accepted:
val = o.get("price_value")
if val is not None:
total += val
if not currency:
currency = o.get("currency") or "€"
return currency, total
def accepted_summary_md(accepted: List[dict]):
if not accepted:
return "**Accepted Offers:** None"
lines = ["**Accepted Offers:**"]
for o in accepted:
lines.append(f"- {o['title']} ({o['price_text']})")
cur, total = compute_total(accepted)
lines.append(f"**Total:** {cur}{total:.2f}")
return "\n".join(lines)
def init_offers_state(session):
if not session.get("accepted_offers"):
session["accepted_offers"] = []
offers = get_active_offers()
first = offers[0] if offers else None
return format_offer_card(first), accepted_summary_md(session["accepted_offers"]), 0, session
def context_summary_md(session: dict) -> str:
s = session or {}
parts = []
if s.get("time_of_day"):
parts.append(f"Time: {s['time_of_day']}")
if s.get("weather"):
parts.append(f"Weather: {s['weather']}")
nk = s.get("near_km")
if isinstance(nk, (int, float)) and nk > 0:
parts.append(f"Near me: ~{int(nk)} km")
if not parts:
return "_No extra context set. Seasonal notes apply automatically._"
return "**Context:** " + ", ".join(parts)
def set_context_action(time_of_day, weather, near_km, session):
s = session or {}
if time_of_day:
s["time_of_day"] = str(time_of_day)
else:
s.pop("time_of_day", None)
if weather:
s["weather"] = str(weather)
else:
s.pop("weather", None)
try:
nk = float(near_km) if near_km is not None else None
except Exception:
nk = None
if nk is not None and nk >= 0:
s["near_km"] = nk
else:
s.pop("near_km", None)
return s, context_summary_md(s)
def clear_context_action(session):
s = session or {}
s.pop("time_of_day", None)
s.pop("weather", None)
s.pop("near_km", None)
return s, context_summary_md(s), None, None, 0
def next_offer_action(index, session):
offers = get_active_offers()
if not offers:
return "**No current offers.**", 0
if index is None:
index = 0
index = (index + 1) % len(offers)
log_event("offer_view", offer_id=offers[index]["id"])
return format_offer_card(offers[index]), index
def prev_offer_action(index, session):
offers = get_active_offers()
if not offers:
return "**No current offers.**", 0
if index is None:
index = 0
index = (index - 1) % len(offers)
log_event("offer_view", offer_id=offers[index]["id"])
return format_offer_card(offers[index]), index
def accept_offer_action(index, chat_history, session):
chat = chat_history or []
offers = get_active_offers()
if not offers:
chat.append(system_reply("No offers to accept."))
return chat, "**No current offers.**", accepted_summary_md(session.get("accepted_offers", [])), 0
if index is None or index >= len(offers):
index = 0
offer = offers[index]
set_offer_inactive(offer["id"])
session.setdefault("accepted_offers", []).append(offer)
log_event("offer_accept", offer_id=offer["id"], title=offer["title"])
chat.append(system_reply(f"Accepted offer: {offer['title']} ({offer['price_text']})."))
remaining = get_active_offers()
if not remaining:
return chat, "**No current offers.**", accepted_summary_md(session["accepted_offers"]), 0
new_index = index % len(remaining)
return chat, format_offer_card(remaining[new_index]), accepted_summary_md(session["accepted_offers"]), new_index
LLM_MODEL_ID = os.getenv("HOTEL_LLM_MODEL", "TinyLlama/TinyLlama-1.1B-Chat-v1.0")
_LLM_PIPE = None
_LLM = None
def load_llm():
global _LLM_PIPE, _LLM
if _LLM:
return _LLM
if not _TRANSFORMERS_AVAILABLE:
return None
try:
tok = AutoTokenizer.from_pretrained(LLM_MODEL_ID, trust_remote_code=True)
model = AutoModelForCausalLM.from_pretrained(
LLM_MODEL_ID,
device_map="auto" if os.getenv("LLM_DEVICE_AUTO", "1") == "1" else None,
torch_dtype="auto",
)
_LLM_PIPE = pipeline(
task="text-generation",
model=model,
tokenizer=tok,
max_new_tokens=int(os.getenv("LLM_MAX_NEW_TOKENS", 256)),
temperature=float(os.getenv("LLM_TEMP", 0.3)),
top_p=float(os.getenv("LLM_TOP_P", 0.9)),
repetition_penalty=float(os.getenv("LLM_REP_PENALTY", 1.1)),
no_repeat_ngram_size=int(os.getenv("LLM_NO_REPEAT_NGRAM", 3)),
do_sample=True,
)
if _LANGCHAIN_AVAILABLE:
_llm = HuggingFacePipeline(pipeline=_LLM_PIPE)
else:
_llm = _LLM_PIPE
_LLM = _llm
return _LLM
except Exception as e:
print("[LLM] Failed to load model", LLM_MODEL_ID, e)
return None
def compose_system_prompt(dynamic_context: str = ""):
amenities = "\n".join(f"- {a}" for a in HOTEL_INFO["amenities"])
treatments = "\n".join(f"- {t}" for t in HOTEL_INFO["spa"]["signature_treatments"])
try:
offers_list = get_active_offers()
except Exception:
offers_list = []
offers = "\n".join(f"* {o['title']}: {o['desc']} ({o['price_text']})" for o in offers_list if o.get("active"))
# Local guide sections (Volos & Pelion)
lg = HOTEL_INFO.get("local_guide", {})
def _fmt(cat: str) -> str:
items = lg.get(cat, [])
return "\n".join(f"- {x}" for x in items)
sections: List[str] = []
if lg.get("dining_tsipouradika"):
sections.append("Dining (tsipouradika):\n" + _fmt("dining_tsipouradika"))
if lg.get("beaches"):
sections.append("Beaches:\n" + _fmt("beaches"))
if lg.get("pelion_villages"):
sections.append("Pelion villages:\n" + _fmt("pelion_villages"))
if lg.get("activities"):
sections.append("Activities:\n" + _fmt("activities"))
if lg.get("museums_culture"):
sections.append("Museums & culture:\n" + _fmt("museums_culture"))
if lg.get("day_trips"):
sections.append("Day trips & ferries:\n" + _fmt("day_trips"))
if lg.get("cafes_bars"):
sections.append("Cafés & bars:\n" + _fmt("cafes_bars"))
local_guide = "\n\n".join(sections)
base = f"""You are the concierge AI for {HOTEL_INFO['name']}. Provide concise, friendly answers.
Hotel Facts:
Location: {HOTEL_INFO['location']}
Star Rating: {HOTEL_INFO['star_rating']}
Check-in: {HOTEL_INFO['check_in_time']} | Check-out: {HOTEL_INFO['check_out_time']}
Amenities:\n{amenities}\nSpa: {HOTEL_INFO['spa']['name']} (Open {HOTEL_INFO['spa']['open']})\nSignature Treatments:\n{treatments}\nDining Venues: {', '.join(HOTEL_INFO['dining'])}\nLate Checkout: {HOTEL_INFO['late_checkout_policy']}\nCurrent Upsell Offers:\n{offers}
Local Guide — Volos & Pelion:\n{local_guide}
"""
if dynamic_context:
base += f"Dynamic Context:\n{dynamic_context}\n"
base += (
"Instructions:\n"
"- Answer the user's question directly using the facts above.\n"
"- Do NOT include any role prefixes like 'User:' or 'Assistant:'.\n"
"- Do NOT create additional 'User:' turns in your output.\n"
"- Do NOT echo 'Question:' or 'Answer:' in the output.\n"
"- Prefer nearby Volos & Pelion suggestions; for food, prioritize tsipouradika seafood meze. Include short distance/time if helpful.\n"
"- Keep responses under 120 words.\n"
"Reply with only the answer text."
)
return base
def _sanitize_llm_output(text: str) -> str:
"""Remove role markers and self-chat from model outputs; cap length."""
if not text:
return text
t = text.strip()
# Trim common assistant prefixes
for p in ("Assistant:", "ASSISTANT:", "assistant:", "AI:", "Bot:"):
if t.startswith(p):
t = t[len(p):].lstrip()
# Remove leading Answer: prefix if present
if t.startswith("Answer:"):
t = t[len("Answer:"):].lstrip()
# Cut at first sign of a new role line
for marker in ("\nUser:", "\nUSER:", "\nHuman:", "\nHUMAN:", "\nAssistant:", "\nASSISTANT:", "\nSystem:"):
idx = t.find(marker)
if idx != -1:
t = t[:idx].rstrip()
break
# If the model starts another pseudo Q/A, cut at repeated Question/Answer markers
for marker in ("\nQuestion:", "\nQUESTION:", "\nAnswer:", "\nANSWER:"):
idx = t.find(marker)
if idx != -1:
t = t[:idx].rstrip()
break
# Normalize spaces
t = " ".join(t.split())
# Soft cap ~120 words
words = t.split()
if len(words) > 120:
t = " ".join(words[:120]).rstrip() + "…"
return t
def _build_dynamic_context(session: dict) -> str:
"""Lightweight contextual notes; placeholder for time/weather/nearby in future UI.
Uses session hints if present; otherwise empty string.
"""
parts: List[str] = []
# Time of day
tod = (session or {}).get("time_of_day")
if tod:
parts.append(f"Time of day: {tod}")
# Weather
wx = (session or {}).get("weather")
if wx:
parts.append(f"Weather: {wx}")
# Near-me radius (km)
near = (session or {}).get("near_km")
if isinstance(near, (int, float)) and near > 0:
parts.append(f"Focus on places within ~{near} km when relevant.")
# Seasonal hint
month = dt.datetime.now().month
season = (
"summer (beaches, ferries to Sporades)" if month in (6, 7, 8) else
"shoulder season (milder weather; Pelion villages, hikes)" if month in (5, 9, 10) else
"cool season (mountain villages, museums, hearty food)"
)
parts.append(f"Seasonal note: {season}.")
return "\n".join(parts)
def ai_general_response(user_message: str, session: dict) -> Optional[str]:
llm = load_llm()
if not llm:
return "(AI assistant unavailable right now — please try again later or ask at the front desk.)"
dynamic_ctx = _build_dynamic_context(session)
system_prompt = compose_system_prompt(dynamic_ctx)
# Use an instruction-style prompt to avoid multi-turn role simulation
final_prompt = system_prompt + "\n\nQuestion: " + user_message.strip() + "\nAnswer:"
try:
if hasattr(llm, "__call__") and not hasattr(llm, "_pipeline"):
# Likely a raw HF pipeline; try passing conservative decoding args to reduce hallucinations
try:
out = llm(final_prompt, max_new_tokens=200, temperature=0.2, top_p=0.9)
except Exception:
out = llm(final_prompt)
if isinstance(out, list):
raw = out[0].get("generated_text", "")
text = raw[len(final_prompt):] if raw.startswith(final_prompt) else raw
else:
text = str(out)
else:
# LangChain or other wrapper; fall back to simple call
text = llm(final_prompt) # type: ignore
return _sanitize_llm_output(text)
except Exception as e:
return f"(AI error: {e})"
# -----------------------------
# Helpers / parsing
# -----------------------------
def format_date(d: Optional[dt.date]) -> str:
return d.strftime("%Y-%m-%d") if isinstance(d, dt.date) else "—"
def parse_date(d: Union[str, dt.date, dt.datetime, None]) -> Optional[dt.date]:
if isinstance(d, dt.datetime):
return d.date()
if isinstance(d, dt.date):
return d
if isinstance(d, str):
d_clean = d.strip()
if not d_clean:
return None
try:
return dt.datetime.fromisoformat(d_clean.replace('T', ' ')).date()
except Exception:
pass
try:
return dt.date.fromisoformat(d_clean)
except Exception:
pass
for fmt in (
"%Y-%m-%d %H:%M:%S",
"%Y-%m-%d %H:%M",
"%d/%m/%Y",
"%m/%d/%Y",
"%Y/%m/%d",
):
try:
return dt.datetime.strptime(d_clean, fmt).date()
except Exception:
continue
return None
def ensure_valid_date_range(check_in, check_out) -> Tuple[Optional[dt.date], Optional[dt.date], Optional[str]]:
ci = parse_date(check_in)
co = parse_date(check_out)
if not ci or not co:
return None, None, "Please select both check-in and check-out dates."
if ci >= co:
return None, None, "Check-out must be after check-in."
return ci, co, None
def system_reply(text: str) -> Tuple[str, str]:
return (None, text)
def user_reply(text: str) -> Tuple[str, str]:
return (text, None)
def login_action(room_number, last_name, chat_history, session):
chat = chat_history or []
if not room_number or not last_name:
chat.append(system_reply("Enter room number and last name to log in."))
return chat, session, "Missing credentials"
try:
rn = int(room_number)
except Exception:
chat.append(system_reply("Room number must be numeric."))
return chat, session, "Invalid room number"
ok, occ = HOTEL.login(rn, last_name)
if ok:
ln, ci, co = occ
session = {"logged_in": True, "room_number": rn, "guest_name": ln, "check_in": format_date(ci), "check_out": format_date(co)}
chat.append(system_reply(f"Welcome {ln}. You're in room {rn} from {format_date(ci)} to {format_date(co)}."))
return chat, session, f"Logged in as {ln} (Room {rn})"
else:
chat.append(system_reply("Login failed. Please check details or proceed to booking."))
return chat, session, "Login failed"
def logout_action(chat_history, session):
chat = chat_history or []
chat.append(system_reply("You've been logged out."))
return chat, {"logged_in": False}, "Logged out"
def call_cleaning_action(chat_history, session):
chat = chat_history or []
if not session or not session.get("logged_in"):
chat.append(system_reply("Please log in to request cleaning for your room."))
return chat
rn = session.get("room_number")
name = session.get("guest_name", "Guest")
rid, ts = HOTEL.request_cleaning(rn, name)
chat.append(system_reply(f"Cleaning request #{rid} scheduled for room {rn} at {ts.strftime('%H:%M')}."))
return chat
def book_action(guest_name, room_type, check_in, check_out, chat_history, session):
chat = chat_history or []
if not guest_name:
if session and session.get("logged_in") and session.get("guest_name"):
guest_name = session["guest_name"]
else:
chat.append(system_reply("Please provide a guest name (or log in first)."))
return chat
raw_ci, raw_co = check_in, check_out
ci, co, err = ensure_valid_date_range(check_in, check_out)
if err:
chat.append(system_reply(f"{err} (got: check_in='{raw_ci}' check_out='{raw_co}')"))
return chat
if room_type not in HOTEL.room_types:
chat.append(system_reply("Please select a room type."))
return chat
nights = (co - ci).days
if nights <= 0:
chat.append(system_reply("Stay must be at least one night."))
return chat
booking = HOTEL.book_room(guest_name.strip(), room_type, ci, co)
if not booking:
chat.append(system_reply(f"No {room_type} rooms available for {nights} night(s) from {format_date(ci)} to {format_date(co)}."))
return chat
chat.append(system_reply(f"✅ Booked {room_type} room {booking.room_number} for {booking.guest_name}: {format_date(ci)}{format_date(co)} ({nights} night(s))."))
return chat
def handle_chat(message, chat_history, session, room_type_dd, ci, co):
chat = chat_history or []
msg = (message or "").strip()
if not msg:
return chat
chat.append(user_reply(msg))
lower = msg.lower()
# Offer navigation & acceptance: deterministic actions
if any(k in lower for k in ["next offer", "next >>", "next ▶", "show next offer"]):
card, idx = next_offer_action(session.get("offers_index", 0), session)
session["offers_index"] = idx
chat.append(system_reply("Showing next offer."))
chat.append(system_reply(card))
return chat
if any(k in lower for k in ["prev offer", "previous offer", "◀ prev", "show previous offer"]):
card, idx = prev_offer_action(session.get("offers_index", 0), session)
session["offers_index"] = idx
chat.append(system_reply("Showing previous offer."))
chat.append(system_reply(card))
return chat
if any(k in lower for k in ["accept offer", "i'll take it", "i will take it", "buy offer", "add offer"]):
# accept current index
updated_chat, card, accepted_md_text, new_idx = accept_offer_action(session.get("offers_index", 0), chat, session)
session["offers_index"] = new_idx
chat = updated_chat
chat.append(system_reply(card))
chat.append(system_reply(accepted_md_text))
return chat
# Accept a specific offer by title keywords, e.g., "I want the Spa Thermal Circuit Upgrade"
if any(k in lower for k in ["i want", "i'd like", "i will take", "i will get", "i want the", "i want to buy"]):
offers = get_active_offers()
if offers:
# find best title match
best_idx, best_score = None, 0
for i, o in enumerate(offers):
title = o.get("title", "").lower()
score = sum(1 for w in title.split() if w in lower)
if score > best_score:
best_score, best_idx = score, i
if best_idx is not None and best_score > 0:
updated_chat, card, accepted_md_text, new_idx = accept_offer_action(best_idx, chat, session)
session["offers_index"] = new_idx
chat = updated_chat
chat.append(system_reply(card))
chat.append(system_reply(accepted_md_text))
return chat
# Cleaning intents
if any(k in lower for k in ["clean", "housekeeping", "maid"]):
return call_cleaning_action(chat, session)
# Booking intents
if "book" in lower:
guessed_type = None
for t in HOTEL.room_types.keys():
if t.lower() in lower:
guessed_type = t
break
import re
m = re.findall(r"(20\d{2}-\d{2}-\d{2})", lower)
d1 = parse_date(m[0]) if len(m) > 0 else parse_date(ci)
d2 = parse_date(m[1]) if len(m) > 1 else parse_date(co)
if not guessed_type:
guessed_type = room_type_dd if room_type_dd in HOTEL.room_types else None
if not d1 or not d2 or d1 >= d2 or not guessed_type:
chat.append(system_reply("To book, specify room type and a valid date range (e.g., 'Book Deluxe from 2025-09-12 to 2025-09-14')."))
return chat
guest = session.get("guest_name", "Guest") if session and session.get("logged_in") else "Guest"
booking = HOTEL.book_room(guest, guessed_type, d1, d2)
if not booking:
chat.append(system_reply(f"No {guessed_type} rooms available for those dates."))
else:
chat.append(system_reply(f"Booked {guessed_type} room {booking.room_number} from {format_date(d1)} to {format_date(d2)}."))
return chat
if any(k in lower for k in ["login", "log in"]):
chat.append(system_reply("Use the Login panel: enter your room number and last name."))
return chat
ai_reply = ai_general_response(msg, session)
if ai_reply:
chat.append(system_reply(ai_reply))
return chat
help_text = (
"I can help you:\n"
"- Log in to your room (use the panel).\n"
"- Book a room (select dates and type, then Book).\n"
"- Request cleaning (button or say 'clean my room')."
)
chat.append(system_reply(help_text))
return chat
# -----------------------------
# UI
# -----------------------------
with gr.Blocks(theme=gr.themes.Soft(primary_hue="blue")) as demo:
gr.Markdown("""
# 🏨 Bluebird Hotel Assistant
Welcome! Log in if you're a current guest, or book a new stay. Ask for cleaning anytime.
""")
session = gr.State({"logged_in": False})
with gr.Row():
# Left: Login
with gr.Column(scale=1):
gr.Markdown("### Guest Login")
room_in = gr.Number(label="Room Number", value=None)
last_name_in = gr.Textbox(label="Last Name")
with gr.Row():
login_btn = gr.Button("Log In", variant="primary")
logout_btn = gr.Button("Log Out")
login_status = gr.Markdown("Not logged in.")
# Middle: Chat
with gr.Column(scale=2):
gr.Markdown("### Chat")
chat = gr.Chatbot(height=360, type="tuples")
msg = gr.Textbox(placeholder="Type a message (e.g., 'Clean my room', 'Book Deluxe from 2025-10-12 to 2025-10-14')", label="Message", lines=2)
with gr.Row():
send = gr.Button("Send", variant="primary")
clean_btn = gr.Button("Call Cleaning")
with gr.Accordion("Context (time, weather, near me)", open=False):
with gr.Row():
time_of_day_dd = gr.Dropdown(["morning", "afternoon", "evening", "night"], label="Time of day", value=None)
weather_dd = gr.Dropdown(["clear", "cloudy", "rain", "windy", "hot"], label="Weather", value=None)
near_slider = gr.Slider(0, 50, value=0, step=1, label="Near me radius (km)")
with gr.Row():
apply_ctx_btn = gr.Button("Apply Context")
clear_ctx_btn = gr.Button("Clear Context")
ctx_status = gr.Markdown("_No extra context set. Seasonal notes apply automatically._")
# Right: Booking + Offers
with gr.Column(scale=1):
gr.Markdown("### Book a Room")
guest_name = gr.Textbox(label="Guest Name", placeholder="Your name")
room_type_dd = gr.Dropdown(list(HOTEL.room_types.keys()), label="Room Type")
_DateTime = getattr(gr, "DateTime", None)
_DatePicker = getattr(gr, "DatePicker", None) or getattr(gr, "Datepicker", None)
if _DateTime:
check_in = _DateTime(label="Check-in", include_time=True, type="string")
check_out = _DateTime(label="Check-out", include_time=True, type="string")
elif _DatePicker:
check_in = _DatePicker(label="Check-in")
check_out = _DatePicker(label="Check-out")
else:
gr.Markdown("⚠️ No calendar component in this Gradio version. Upgrade with: `pip install -U gradio`.\nEnter dates as YYYY-MM-DD.")
check_in = gr.Textbox(label="Check-in (YYYY-MM-DD)", placeholder="2025-10-12")
check_out = gr.Textbox(label="Check-out (YYYY-MM-DD)", placeholder="2025-10-15")
book_btn = gr.Button("Find & Book", variant="primary")
gr.Markdown("### Offers & Upsells")
offer_card = gr.Markdown("(Click Refresh Offers)")
accepted_md = gr.Markdown("**Accepted Offers:** None")
offers_index = gr.State(0)
with gr.Row():
prev_offer_btn = gr.Button("◀ Prev")
next_offer_btn = gr.Button("Next ▶")
with gr.Row():
accept_offer_btn = gr.Button("Accept Offer", variant="primary")
refresh_offers_btn = gr.Button("Refresh Offers")
# Wiring callbacks
login_btn.click(
login_action,
inputs=[room_in, last_name_in, chat, session],
outputs=[chat, session, login_status],
)
logout_btn.click(
logout_action,
inputs=[chat, session],
outputs=[chat, session, login_status],
)
clean_btn.click(
call_cleaning_action,
inputs=[chat, session],
outputs=[chat],
)
apply_ctx_btn.click(
set_context_action,
inputs=[time_of_day_dd, weather_dd, near_slider, session],
outputs=[session, ctx_status],
)
clear_ctx_btn.click(
clear_context_action,
inputs=[session],
outputs=[session, ctx_status, time_of_day_dd, weather_dd, near_slider],
)
book_btn.click(
book_action,
inputs=[guest_name, room_type_dd, check_in, check_out, chat, session],
outputs=[chat],
)
send.click(
handle_chat,
inputs=[msg, chat, session, room_type_dd, check_in, check_out],
outputs=[chat],
)
msg.submit(
handle_chat,
inputs=[msg, chat, session, room_type_dd, check_in, check_out],
outputs=[chat],
)
# Offers wiring
refresh_offers_btn.click(
init_offers_state,
inputs=[session],
outputs=[offer_card, accepted_md, offers_index, session],
)
next_offer_btn.click(
next_offer_action,
inputs=[offers_index, session],
outputs=[offer_card, offers_index],
)
prev_offer_btn.click(
prev_offer_action,
inputs=[offers_index, session],
outputs=[offer_card, offers_index],
)
accept_offer_btn.click(
accept_offer_action,
inputs=[offers_index, chat, session],
outputs=[chat, offer_card, accepted_md, offers_index],
)
app = demo
if __name__ == "__main__":
demo.launch()