Spaces:
Sleeping
Sleeping
| import datetime as dt | |
| import os | |
| import json | |
| from dataclasses import dataclass, field | |
| from typing import Dict, List, Optional, Tuple, Union | |
| import gradio as gr | |
| # Optional LLM / LangChain imports (lazy usage) | |
| try: | |
| from transformers import AutoModelForCausalLM, AutoTokenizer, pipeline # type: ignore | |
| _TRANSFORMERS_AVAILABLE = True | |
| except Exception: | |
| _TRANSFORMERS_AVAILABLE = False | |
| try: | |
| from langchain.llms import HuggingFacePipeline # type: ignore | |
| _LANGCHAIN_AVAILABLE = True | |
| except Exception: | |
| _LANGCHAIN_AVAILABLE = False | |
| # ----------------------------- | |
| # Simple in-memory hotel sim DB | |
| # ----------------------------- | |
| def _daterange(start: dt.date, end: dt.date): | |
| cur = start | |
| while cur < end: | |
| yield cur | |
| cur = cur + dt.timedelta(days=1) | |
| class Booking: | |
| room_number: int | |
| guest_name: str | |
| check_in: dt.date | |
| check_out: dt.date | |
| room_type: str | |
| class HotelSim: | |
| room_types: Dict[str, int] = field(default_factory=lambda: { | |
| "Standard": 10, | |
| "Deluxe": 5, | |
| "Suite": 2, | |
| }) | |
| rooms: Dict[int, str] = field(default_factory=dict) | |
| bookings: List[Booking] = field(default_factory=list) | |
| cleaning_requests: List[Tuple[int, str, dt.datetime]] = field(default_factory=list) | |
| occupants: Dict[int, Tuple[str, dt.date, dt.date]] = field(default_factory=dict) | |
| def __post_init__(self): | |
| # Create room numbering by type (simple, predictable ranges) | |
| # Standard: 101-110, Deluxe: 201-205, Suite: 301-302 | |
| rn = 101 | |
| for _ in range(self.room_types["Standard"]): | |
| self.rooms[rn] = "Standard" | |
| rn += 1 | |
| rn = 201 | |
| for _ in range(self.room_types["Deluxe"]): | |
| self.rooms[rn] = "Deluxe" | |
| rn += 1 | |
| rn = 301 | |
| for _ in range(self.room_types["Suite"]): | |
| self.rooms[rn] = "Suite" | |
| rn += 1 | |
| # Pre-populate with a couple of “current” occupants for login demo | |
| today = dt.date.today() | |
| self.occupants[104] = ("Papadopoulos", today - dt.timedelta(days=1), today + dt.timedelta(days=3)) | |
| self.occupants[202] = ("Smith", today, today + dt.timedelta(days=2)) | |
| # ----- Availability & Booking ----- | |
| def _is_room_available(self, room: int, start: dt.date, end: dt.date) -> bool: | |
| if start >= end: | |
| return False | |
| for b in self.bookings: | |
| if b.room_number == room: | |
| if start < b.check_out and end > b.check_in: | |
| return False | |
| if room in self.occupants: | |
| last_name, occ_in, occ_out = self.occupants[room] | |
| if start < occ_out and end > occ_in: | |
| return False | |
| return True | |
| def find_available_room(self, room_type: str, start: dt.date, end: dt.date) -> Optional[int]: | |
| for room, rtype in self.rooms.items(): | |
| if rtype == room_type and self._is_room_available(room, start, end): | |
| return room | |
| return None | |
| def book_room(self, guest_name: str, room_type: str, start: dt.date, end: dt.date) -> Optional[Booking]: | |
| room = self.find_available_room(room_type, start, end) | |
| if room is None: | |
| return None | |
| booking = Booking(room_number=room, guest_name=guest_name, check_in=start, check_out=end, room_type=room_type) | |
| self.bookings.append(booking) | |
| return booking | |
| # ----- Cleaning ----- | |
| def request_cleaning(self, room_number: int, guest_name: str) -> Tuple[int, dt.datetime]: | |
| ts = dt.datetime.now() | |
| self.cleaning_requests.append((room_number, guest_name, ts)) | |
| return len(self.cleaning_requests), ts | |
| # ----- Login ----- | |
| def login(self, room_number: int, last_name: str) -> Tuple[bool, Optional[Tuple[str, dt.date, dt.date]]]: | |
| occ = self.occupants.get(room_number) | |
| if not occ: | |
| return False, None | |
| ln, ci, co = occ | |
| if ln.strip().lower() == last_name.strip().lower(): | |
| return True, occ | |
| return False, None | |
| HOTEL = HotelSim() | |
| # ----------------------------- | |
| # Hotel static knowledge & offers / upsells | |
| # ----------------------------- | |
| HOTEL_INFO = { | |
| "name": "Bluebird Hotel", | |
| "location": "Volos, Greece — Pagasetic Gulf waterfront, gateway to Mount Pelion", | |
| "star_rating": 5, | |
| "amenities": [ | |
| "Rooftop infinity pool", | |
| "24/7 concierge", | |
| "High-speed Wi-Fi", | |
| "EV charging bays", | |
| "Ocean-view fitness center", | |
| "Business lounge & meeting pods", | |
| ], | |
| "dining": [ | |
| "Azure Restaurant (Mediterranean tasting menu)", | |
| "Harbor Café (casual, specialty coffee)", | |
| "SkyBar (sunset cocktails)", | |
| ], | |
| "spa": { | |
| "name": "Aeris Spa", | |
| "signature_treatments": [ | |
| "Thalasso Revive (80m)", | |
| "Deep Sea Stone Massage (60m)", | |
| "Luminescence Facial (50m)", | |
| ], | |
| "open": "07:00 - 22:00", | |
| }, | |
| "check_in_time": "15:00", | |
| "check_out_time": "11:00", | |
| "late_checkout_policy": "Subject to availability; fees may apply after 13:00.", | |
| # Curated local guide for Volos & nearby Pelion | |
| "local_guide": { | |
| "dining_tsipouradika": [ | |
| "MeZen (modern tsipouro meze, city center)", | |
| "Ouzeri Ta Kymata (classic seafood meze on the promenade)", | |
| "Iolkos Tsipouradiko (casual, fresh small plates, near port)", | |
| ], | |
| "beaches": [ | |
| "Anavros Beach (city beach, easy walk from center)", | |
| "Kala Nera (Pelion west coast ~25–35 min drive)", | |
| "Mylopotamos (Pelion east coast ~1h 15m; iconic rock arch)", | |
| "Papa Nero & Agios Ioannis (east Pelion ~1h 10m; organized)" | |
| ], | |
| "pelion_villages": [ | |
| "Makrinitsa (balconies of Pelion, views over Volos ~25 min)", | |
| "Portaria (traditional square & tavernas ~20 min)", | |
| "Tsagarada (stone bridges & chestnut forests ~1h 10m)", | |
| "Milies (terminus of Pelion steam train ~50 min)" | |
| ], | |
| "activities": [ | |
| "Pelion Train (Ano Lechonia → Milies; weekends seasonal)", | |
| "Waterfront promenade sunset stroll (Argonauts Avenue)", | |
| "Sea kayaking / sailing in Pagasetic Gulf (calm waters)", | |
| "Hiking Centaurs' Path (Portaria ↔ Makrinitsa)" | |
| ], | |
| "museums_culture": [ | |
| "Athanasakeion Archaeological Museum of Volos", | |
| "Rooftile & Brickworks Museum N. & S. Tsalapatas", | |
| "Volos Municipal Gallery — Giorgio de Chirico Art Center", | |
| ], | |
| "day_trips": [ | |
| "Ferry to Skiathos (summer schedule from Volos port)", | |
| "Meteora monasteries (long day ~2h each way by car)", | |
| ], | |
| "cafes_bars": [ | |
| "Isalos (seafront coffee & cocktails)", | |
| "Canteen on the waterfront (casual bites)", | |
| "Grooove (drinks, music near center)", | |
| ], | |
| }, | |
| } | |
| OFFERS_SEED = [ | |
| {"id": 1, "title": "Spa Thermal Circuit Upgrade", "desc": "Gain 90-minute access to hydrotherapy pools + aromatherapy sauna.", "price_text": "€45 per guest"}, | |
| {"id": 2, "title": "Sunset Tasting Menu", "desc": "7-course chef's selection at Azure with panoramic sea view table priority.", "price_text": "€85 per person"}, | |
| {"id": 3, "title": "Premium Late Checkout", "desc": "Extend your stay until 17:00 and enjoy lounge refreshments.", "price_text": "€60 flat"}, | |
| {"id": 4, "title": "Wellness Package", "desc": "Morning yoga session + detox juice flight + mini massage add-on.", "price_text": "€55 per guest"}, | |
| ] | |
| OFFERS_FILE = "offers_store.json" | |
| ANALYTICS_LOG = "analytics.log" | |
| def _extract_price(price_text: str): | |
| import re | |
| m = re.search(r"([€$£])?(\d+(?:\.\d+)?)", price_text) | |
| if not m: | |
| return None, None | |
| currency = m.group(1) or "€" | |
| value = float(m.group(2)) | |
| return currency, value | |
| def load_offers(): | |
| if os.path.exists(OFFERS_FILE): | |
| try: | |
| with open(OFFERS_FILE, "r", encoding="utf-8") as f: | |
| data = json.load(f) | |
| for o in data: | |
| if "active" not in o: | |
| o["active"] = True | |
| if "price_text" not in o and "price" in o: | |
| o["price_text"] = str(o["price"]) | |
| cur, val = _extract_price(o.get("price_text", "")) | |
| o["currency"], o["price_value"] = cur, val | |
| return data | |
| except Exception: | |
| pass | |
| seeded = [] | |
| for raw in OFFERS_SEED: | |
| cur, val = _extract_price(raw.get("price_text", "")) | |
| seeded.append({ | |
| "id": raw["id"], | |
| "title": raw["title"], | |
| "desc": raw["desc"], | |
| "price_text": raw["price_text"], | |
| "currency": cur, | |
| "price_value": val, | |
| "active": True, | |
| }) | |
| save_offers(seeded) | |
| return seeded | |
| def save_offers(offers): | |
| try: | |
| with open(OFFERS_FILE, "w", encoding="utf-8") as f: | |
| json.dump(offers, f, ensure_ascii=False, indent=2) | |
| except Exception as e: | |
| print("[OFFERS] save failed", e) | |
| def get_active_offers(): | |
| return [o for o in load_offers() if o.get("active")] | |
| def set_offer_inactive(offer_id: int): | |
| offers = load_offers() | |
| changed = False | |
| for o in offers: | |
| if o["id"] == offer_id and o.get("active"): | |
| o["active"] = False | |
| changed = True | |
| break | |
| if changed: | |
| save_offers(offers) | |
| def log_event(event: str, **data): | |
| rec = {"ts": dt.datetime.utcnow().isoformat(), "event": event, **data} | |
| try: | |
| with open(ANALYTICS_LOG, "a", encoding="utf-8") as f: | |
| f.write(json.dumps(rec) + "\n") | |
| except Exception as e: | |
| print("[ANALYTICS] log failed", e) | |
| def format_offer_card(offer: Optional[dict]) -> str: | |
| if not offer: | |
| return "**No current offers.**" | |
| return ( | |
| f"### {offer['title']}\n" | |
| f"{offer['desc']}\n\n" | |
| f"**Price:** {offer['price_text']}\n" | |
| f"(Offer ID: {offer['id']})" | |
| ) | |
| def compute_total(accepted: List[dict]): | |
| total = 0.0 | |
| currency = None | |
| for o in accepted: | |
| val = o.get("price_value") | |
| if val is not None: | |
| total += val | |
| if not currency: | |
| currency = o.get("currency") or "€" | |
| return currency, total | |
| def accepted_summary_md(accepted: List[dict]): | |
| if not accepted: | |
| return "**Accepted Offers:** None" | |
| lines = ["**Accepted Offers:**"] | |
| for o in accepted: | |
| lines.append(f"- {o['title']} ({o['price_text']})") | |
| cur, total = compute_total(accepted) | |
| lines.append(f"**Total:** {cur}{total:.2f}") | |
| return "\n".join(lines) | |
| def init_offers_state(session): | |
| if not session.get("accepted_offers"): | |
| session["accepted_offers"] = [] | |
| offers = get_active_offers() | |
| first = offers[0] if offers else None | |
| return format_offer_card(first), accepted_summary_md(session["accepted_offers"]), 0, session | |
| def context_summary_md(session: dict) -> str: | |
| s = session or {} | |
| parts = [] | |
| if s.get("time_of_day"): | |
| parts.append(f"Time: {s['time_of_day']}") | |
| if s.get("weather"): | |
| parts.append(f"Weather: {s['weather']}") | |
| nk = s.get("near_km") | |
| if isinstance(nk, (int, float)) and nk > 0: | |
| parts.append(f"Near me: ~{int(nk)} km") | |
| if not parts: | |
| return "_No extra context set. Seasonal notes apply automatically._" | |
| return "**Context:** " + ", ".join(parts) | |
| def set_context_action(time_of_day, weather, near_km, session): | |
| s = session or {} | |
| if time_of_day: | |
| s["time_of_day"] = str(time_of_day) | |
| else: | |
| s.pop("time_of_day", None) | |
| if weather: | |
| s["weather"] = str(weather) | |
| else: | |
| s.pop("weather", None) | |
| try: | |
| nk = float(near_km) if near_km is not None else None | |
| except Exception: | |
| nk = None | |
| if nk is not None and nk >= 0: | |
| s["near_km"] = nk | |
| else: | |
| s.pop("near_km", None) | |
| return s, context_summary_md(s) | |
| def clear_context_action(session): | |
| s = session or {} | |
| s.pop("time_of_day", None) | |
| s.pop("weather", None) | |
| s.pop("near_km", None) | |
| return s, context_summary_md(s), None, None, 0 | |
| def next_offer_action(index, session): | |
| offers = get_active_offers() | |
| if not offers: | |
| return "**No current offers.**", 0 | |
| if index is None: | |
| index = 0 | |
| index = (index + 1) % len(offers) | |
| log_event("offer_view", offer_id=offers[index]["id"]) | |
| return format_offer_card(offers[index]), index | |
| def prev_offer_action(index, session): | |
| offers = get_active_offers() | |
| if not offers: | |
| return "**No current offers.**", 0 | |
| if index is None: | |
| index = 0 | |
| index = (index - 1) % len(offers) | |
| log_event("offer_view", offer_id=offers[index]["id"]) | |
| return format_offer_card(offers[index]), index | |
| def accept_offer_action(index, chat_history, session): | |
| chat = chat_history or [] | |
| offers = get_active_offers() | |
| if not offers: | |
| chat.append(system_reply("No offers to accept.")) | |
| return chat, "**No current offers.**", accepted_summary_md(session.get("accepted_offers", [])), 0 | |
| if index is None or index >= len(offers): | |
| index = 0 | |
| offer = offers[index] | |
| set_offer_inactive(offer["id"]) | |
| session.setdefault("accepted_offers", []).append(offer) | |
| log_event("offer_accept", offer_id=offer["id"], title=offer["title"]) | |
| chat.append(system_reply(f"Accepted offer: {offer['title']} ({offer['price_text']}).")) | |
| remaining = get_active_offers() | |
| if not remaining: | |
| return chat, "**No current offers.**", accepted_summary_md(session["accepted_offers"]), 0 | |
| new_index = index % len(remaining) | |
| return chat, format_offer_card(remaining[new_index]), accepted_summary_md(session["accepted_offers"]), new_index | |
| LLM_MODEL_ID = os.getenv("HOTEL_LLM_MODEL", "TinyLlama/TinyLlama-1.1B-Chat-v1.0") | |
| _LLM_PIPE = None | |
| _LLM = None | |
| def load_llm(): | |
| global _LLM_PIPE, _LLM | |
| if _LLM: | |
| return _LLM | |
| if not _TRANSFORMERS_AVAILABLE: | |
| return None | |
| try: | |
| tok = AutoTokenizer.from_pretrained(LLM_MODEL_ID, trust_remote_code=True) | |
| model = AutoModelForCausalLM.from_pretrained( | |
| LLM_MODEL_ID, | |
| device_map="auto" if os.getenv("LLM_DEVICE_AUTO", "1") == "1" else None, | |
| torch_dtype="auto", | |
| ) | |
| _LLM_PIPE = pipeline( | |
| task="text-generation", | |
| model=model, | |
| tokenizer=tok, | |
| max_new_tokens=int(os.getenv("LLM_MAX_NEW_TOKENS", 256)), | |
| temperature=float(os.getenv("LLM_TEMP", 0.3)), | |
| top_p=float(os.getenv("LLM_TOP_P", 0.9)), | |
| repetition_penalty=float(os.getenv("LLM_REP_PENALTY", 1.1)), | |
| no_repeat_ngram_size=int(os.getenv("LLM_NO_REPEAT_NGRAM", 3)), | |
| do_sample=True, | |
| ) | |
| if _LANGCHAIN_AVAILABLE: | |
| _llm = HuggingFacePipeline(pipeline=_LLM_PIPE) | |
| else: | |
| _llm = _LLM_PIPE | |
| _LLM = _llm | |
| return _LLM | |
| except Exception as e: | |
| print("[LLM] Failed to load model", LLM_MODEL_ID, e) | |
| return None | |
| def compose_system_prompt(dynamic_context: str = ""): | |
| amenities = "\n".join(f"- {a}" for a in HOTEL_INFO["amenities"]) | |
| treatments = "\n".join(f"- {t}" for t in HOTEL_INFO["spa"]["signature_treatments"]) | |
| try: | |
| offers_list = get_active_offers() | |
| except Exception: | |
| offers_list = [] | |
| offers = "\n".join(f"* {o['title']}: {o['desc']} ({o['price_text']})" for o in offers_list if o.get("active")) | |
| # Local guide sections (Volos & Pelion) | |
| lg = HOTEL_INFO.get("local_guide", {}) | |
| def _fmt(cat: str) -> str: | |
| items = lg.get(cat, []) | |
| return "\n".join(f"- {x}" for x in items) | |
| sections: List[str] = [] | |
| if lg.get("dining_tsipouradika"): | |
| sections.append("Dining (tsipouradika):\n" + _fmt("dining_tsipouradika")) | |
| if lg.get("beaches"): | |
| sections.append("Beaches:\n" + _fmt("beaches")) | |
| if lg.get("pelion_villages"): | |
| sections.append("Pelion villages:\n" + _fmt("pelion_villages")) | |
| if lg.get("activities"): | |
| sections.append("Activities:\n" + _fmt("activities")) | |
| if lg.get("museums_culture"): | |
| sections.append("Museums & culture:\n" + _fmt("museums_culture")) | |
| if lg.get("day_trips"): | |
| sections.append("Day trips & ferries:\n" + _fmt("day_trips")) | |
| if lg.get("cafes_bars"): | |
| sections.append("Cafés & bars:\n" + _fmt("cafes_bars")) | |
| local_guide = "\n\n".join(sections) | |
| base = f"""You are the concierge AI for {HOTEL_INFO['name']}. Provide concise, friendly answers. | |
| Hotel Facts: | |
| Location: {HOTEL_INFO['location']} | |
| Star Rating: {HOTEL_INFO['star_rating']}⭐ | |
| Check-in: {HOTEL_INFO['check_in_time']} | Check-out: {HOTEL_INFO['check_out_time']} | |
| Amenities:\n{amenities}\nSpa: {HOTEL_INFO['spa']['name']} (Open {HOTEL_INFO['spa']['open']})\nSignature Treatments:\n{treatments}\nDining Venues: {', '.join(HOTEL_INFO['dining'])}\nLate Checkout: {HOTEL_INFO['late_checkout_policy']}\nCurrent Upsell Offers:\n{offers} | |
| Local Guide — Volos & Pelion:\n{local_guide} | |
| """ | |
| if dynamic_context: | |
| base += f"Dynamic Context:\n{dynamic_context}\n" | |
| base += ( | |
| "Instructions:\n" | |
| "- Answer the user's question directly using the facts above.\n" | |
| "- Do NOT include any role prefixes like 'User:' or 'Assistant:'.\n" | |
| "- Do NOT create additional 'User:' turns in your output.\n" | |
| "- Do NOT echo 'Question:' or 'Answer:' in the output.\n" | |
| "- Prefer nearby Volos & Pelion suggestions; for food, prioritize tsipouradika seafood meze. Include short distance/time if helpful.\n" | |
| "- Keep responses under 120 words.\n" | |
| "Reply with only the answer text." | |
| ) | |
| return base | |
| def _sanitize_llm_output(text: str) -> str: | |
| """Remove role markers and self-chat from model outputs; cap length.""" | |
| if not text: | |
| return text | |
| t = text.strip() | |
| # Trim common assistant prefixes | |
| for p in ("Assistant:", "ASSISTANT:", "assistant:", "AI:", "Bot:"): | |
| if t.startswith(p): | |
| t = t[len(p):].lstrip() | |
| # Remove leading Answer: prefix if present | |
| if t.startswith("Answer:"): | |
| t = t[len("Answer:"):].lstrip() | |
| # Cut at first sign of a new role line | |
| for marker in ("\nUser:", "\nUSER:", "\nHuman:", "\nHUMAN:", "\nAssistant:", "\nASSISTANT:", "\nSystem:"): | |
| idx = t.find(marker) | |
| if idx != -1: | |
| t = t[:idx].rstrip() | |
| break | |
| # If the model starts another pseudo Q/A, cut at repeated Question/Answer markers | |
| for marker in ("\nQuestion:", "\nQUESTION:", "\nAnswer:", "\nANSWER:"): | |
| idx = t.find(marker) | |
| if idx != -1: | |
| t = t[:idx].rstrip() | |
| break | |
| # Normalize spaces | |
| t = " ".join(t.split()) | |
| # Soft cap ~120 words | |
| words = t.split() | |
| if len(words) > 120: | |
| t = " ".join(words[:120]).rstrip() + "…" | |
| return t | |
| def _build_dynamic_context(session: dict) -> str: | |
| """Lightweight contextual notes; placeholder for time/weather/nearby in future UI. | |
| Uses session hints if present; otherwise empty string. | |
| """ | |
| parts: List[str] = [] | |
| # Time of day | |
| tod = (session or {}).get("time_of_day") | |
| if tod: | |
| parts.append(f"Time of day: {tod}") | |
| # Weather | |
| wx = (session or {}).get("weather") | |
| if wx: | |
| parts.append(f"Weather: {wx}") | |
| # Near-me radius (km) | |
| near = (session or {}).get("near_km") | |
| if isinstance(near, (int, float)) and near > 0: | |
| parts.append(f"Focus on places within ~{near} km when relevant.") | |
| # Seasonal hint | |
| month = dt.datetime.now().month | |
| season = ( | |
| "summer (beaches, ferries to Sporades)" if month in (6, 7, 8) else | |
| "shoulder season (milder weather; Pelion villages, hikes)" if month in (5, 9, 10) else | |
| "cool season (mountain villages, museums, hearty food)" | |
| ) | |
| parts.append(f"Seasonal note: {season}.") | |
| return "\n".join(parts) | |
| def ai_general_response(user_message: str, session: dict) -> Optional[str]: | |
| llm = load_llm() | |
| if not llm: | |
| return "(AI assistant unavailable right now — please try again later or ask at the front desk.)" | |
| dynamic_ctx = _build_dynamic_context(session) | |
| system_prompt = compose_system_prompt(dynamic_ctx) | |
| # Use an instruction-style prompt to avoid multi-turn role simulation | |
| final_prompt = system_prompt + "\n\nQuestion: " + user_message.strip() + "\nAnswer:" | |
| try: | |
| if hasattr(llm, "__call__") and not hasattr(llm, "_pipeline"): | |
| # Likely a raw HF pipeline; try passing conservative decoding args to reduce hallucinations | |
| try: | |
| out = llm(final_prompt, max_new_tokens=200, temperature=0.2, top_p=0.9) | |
| except Exception: | |
| out = llm(final_prompt) | |
| if isinstance(out, list): | |
| raw = out[0].get("generated_text", "") | |
| text = raw[len(final_prompt):] if raw.startswith(final_prompt) else raw | |
| else: | |
| text = str(out) | |
| else: | |
| # LangChain or other wrapper; fall back to simple call | |
| text = llm(final_prompt) # type: ignore | |
| return _sanitize_llm_output(text) | |
| except Exception as e: | |
| return f"(AI error: {e})" | |
| # ----------------------------- | |
| # Helpers / parsing | |
| # ----------------------------- | |
| def format_date(d: Optional[dt.date]) -> str: | |
| return d.strftime("%Y-%m-%d") if isinstance(d, dt.date) else "—" | |
| def parse_date(d: Union[str, dt.date, dt.datetime, None]) -> Optional[dt.date]: | |
| if isinstance(d, dt.datetime): | |
| return d.date() | |
| if isinstance(d, dt.date): | |
| return d | |
| if isinstance(d, str): | |
| d_clean = d.strip() | |
| if not d_clean: | |
| return None | |
| try: | |
| return dt.datetime.fromisoformat(d_clean.replace('T', ' ')).date() | |
| except Exception: | |
| pass | |
| try: | |
| return dt.date.fromisoformat(d_clean) | |
| except Exception: | |
| pass | |
| for fmt in ( | |
| "%Y-%m-%d %H:%M:%S", | |
| "%Y-%m-%d %H:%M", | |
| "%d/%m/%Y", | |
| "%m/%d/%Y", | |
| "%Y/%m/%d", | |
| ): | |
| try: | |
| return dt.datetime.strptime(d_clean, fmt).date() | |
| except Exception: | |
| continue | |
| return None | |
| def ensure_valid_date_range(check_in, check_out) -> Tuple[Optional[dt.date], Optional[dt.date], Optional[str]]: | |
| ci = parse_date(check_in) | |
| co = parse_date(check_out) | |
| if not ci or not co: | |
| return None, None, "Please select both check-in and check-out dates." | |
| if ci >= co: | |
| return None, None, "Check-out must be after check-in." | |
| return ci, co, None | |
| def system_reply(text: str) -> Tuple[str, str]: | |
| return (None, text) | |
| def user_reply(text: str) -> Tuple[str, str]: | |
| return (text, None) | |
| def login_action(room_number, last_name, chat_history, session): | |
| chat = chat_history or [] | |
| if not room_number or not last_name: | |
| chat.append(system_reply("Enter room number and last name to log in.")) | |
| return chat, session, "Missing credentials" | |
| try: | |
| rn = int(room_number) | |
| except Exception: | |
| chat.append(system_reply("Room number must be numeric.")) | |
| return chat, session, "Invalid room number" | |
| ok, occ = HOTEL.login(rn, last_name) | |
| if ok: | |
| ln, ci, co = occ | |
| session = {"logged_in": True, "room_number": rn, "guest_name": ln, "check_in": format_date(ci), "check_out": format_date(co)} | |
| chat.append(system_reply(f"Welcome {ln}. You're in room {rn} from {format_date(ci)} to {format_date(co)}.")) | |
| return chat, session, f"Logged in as {ln} (Room {rn})" | |
| else: | |
| chat.append(system_reply("Login failed. Please check details or proceed to booking.")) | |
| return chat, session, "Login failed" | |
| def logout_action(chat_history, session): | |
| chat = chat_history or [] | |
| chat.append(system_reply("You've been logged out.")) | |
| return chat, {"logged_in": False}, "Logged out" | |
| def call_cleaning_action(chat_history, session): | |
| chat = chat_history or [] | |
| if not session or not session.get("logged_in"): | |
| chat.append(system_reply("Please log in to request cleaning for your room.")) | |
| return chat | |
| rn = session.get("room_number") | |
| name = session.get("guest_name", "Guest") | |
| rid, ts = HOTEL.request_cleaning(rn, name) | |
| chat.append(system_reply(f"Cleaning request #{rid} scheduled for room {rn} at {ts.strftime('%H:%M')}.")) | |
| return chat | |
| def book_action(guest_name, room_type, check_in, check_out, chat_history, session): | |
| chat = chat_history or [] | |
| if not guest_name: | |
| if session and session.get("logged_in") and session.get("guest_name"): | |
| guest_name = session["guest_name"] | |
| else: | |
| chat.append(system_reply("Please provide a guest name (or log in first).")) | |
| return chat | |
| raw_ci, raw_co = check_in, check_out | |
| ci, co, err = ensure_valid_date_range(check_in, check_out) | |
| if err: | |
| chat.append(system_reply(f"{err} (got: check_in='{raw_ci}' check_out='{raw_co}')")) | |
| return chat | |
| if room_type not in HOTEL.room_types: | |
| chat.append(system_reply("Please select a room type.")) | |
| return chat | |
| nights = (co - ci).days | |
| if nights <= 0: | |
| chat.append(system_reply("Stay must be at least one night.")) | |
| return chat | |
| booking = HOTEL.book_room(guest_name.strip(), room_type, ci, co) | |
| if not booking: | |
| chat.append(system_reply(f"No {room_type} rooms available for {nights} night(s) from {format_date(ci)} to {format_date(co)}.")) | |
| return chat | |
| chat.append(system_reply(f"✅ Booked {room_type} room {booking.room_number} for {booking.guest_name}: {format_date(ci)} → {format_date(co)} ({nights} night(s)).")) | |
| return chat | |
| def handle_chat(message, chat_history, session, room_type_dd, ci, co): | |
| chat = chat_history or [] | |
| msg = (message or "").strip() | |
| if not msg: | |
| return chat | |
| chat.append(user_reply(msg)) | |
| lower = msg.lower() | |
| # Offer navigation & acceptance: deterministic actions | |
| if any(k in lower for k in ["next offer", "next >>", "next ▶", "show next offer"]): | |
| card, idx = next_offer_action(session.get("offers_index", 0), session) | |
| session["offers_index"] = idx | |
| chat.append(system_reply("Showing next offer.")) | |
| chat.append(system_reply(card)) | |
| return chat | |
| if any(k in lower for k in ["prev offer", "previous offer", "◀ prev", "show previous offer"]): | |
| card, idx = prev_offer_action(session.get("offers_index", 0), session) | |
| session["offers_index"] = idx | |
| chat.append(system_reply("Showing previous offer.")) | |
| chat.append(system_reply(card)) | |
| return chat | |
| if any(k in lower for k in ["accept offer", "i'll take it", "i will take it", "buy offer", "add offer"]): | |
| # accept current index | |
| updated_chat, card, accepted_md_text, new_idx = accept_offer_action(session.get("offers_index", 0), chat, session) | |
| session["offers_index"] = new_idx | |
| chat = updated_chat | |
| chat.append(system_reply(card)) | |
| chat.append(system_reply(accepted_md_text)) | |
| return chat | |
| # Accept a specific offer by title keywords, e.g., "I want the Spa Thermal Circuit Upgrade" | |
| if any(k in lower for k in ["i want", "i'd like", "i will take", "i will get", "i want the", "i want to buy"]): | |
| offers = get_active_offers() | |
| if offers: | |
| # find best title match | |
| best_idx, best_score = None, 0 | |
| for i, o in enumerate(offers): | |
| title = o.get("title", "").lower() | |
| score = sum(1 for w in title.split() if w in lower) | |
| if score > best_score: | |
| best_score, best_idx = score, i | |
| if best_idx is not None and best_score > 0: | |
| updated_chat, card, accepted_md_text, new_idx = accept_offer_action(best_idx, chat, session) | |
| session["offers_index"] = new_idx | |
| chat = updated_chat | |
| chat.append(system_reply(card)) | |
| chat.append(system_reply(accepted_md_text)) | |
| return chat | |
| # Cleaning intents | |
| if any(k in lower for k in ["clean", "housekeeping", "maid"]): | |
| return call_cleaning_action(chat, session) | |
| # Booking intents | |
| if "book" in lower: | |
| guessed_type = None | |
| for t in HOTEL.room_types.keys(): | |
| if t.lower() in lower: | |
| guessed_type = t | |
| break | |
| import re | |
| m = re.findall(r"(20\d{2}-\d{2}-\d{2})", lower) | |
| d1 = parse_date(m[0]) if len(m) > 0 else parse_date(ci) | |
| d2 = parse_date(m[1]) if len(m) > 1 else parse_date(co) | |
| if not guessed_type: | |
| guessed_type = room_type_dd if room_type_dd in HOTEL.room_types else None | |
| if not d1 or not d2 or d1 >= d2 or not guessed_type: | |
| chat.append(system_reply("To book, specify room type and a valid date range (e.g., 'Book Deluxe from 2025-09-12 to 2025-09-14').")) | |
| return chat | |
| guest = session.get("guest_name", "Guest") if session and session.get("logged_in") else "Guest" | |
| booking = HOTEL.book_room(guest, guessed_type, d1, d2) | |
| if not booking: | |
| chat.append(system_reply(f"No {guessed_type} rooms available for those dates.")) | |
| else: | |
| chat.append(system_reply(f"Booked {guessed_type} room {booking.room_number} from {format_date(d1)} to {format_date(d2)}.")) | |
| return chat | |
| if any(k in lower for k in ["login", "log in"]): | |
| chat.append(system_reply("Use the Login panel: enter your room number and last name.")) | |
| return chat | |
| ai_reply = ai_general_response(msg, session) | |
| if ai_reply: | |
| chat.append(system_reply(ai_reply)) | |
| return chat | |
| help_text = ( | |
| "I can help you:\n" | |
| "- Log in to your room (use the panel).\n" | |
| "- Book a room (select dates and type, then Book).\n" | |
| "- Request cleaning (button or say 'clean my room')." | |
| ) | |
| chat.append(system_reply(help_text)) | |
| return chat | |
| # ----------------------------- | |
| # UI | |
| # ----------------------------- | |
| with gr.Blocks(theme=gr.themes.Soft(primary_hue="blue")) as demo: | |
| gr.Markdown(""" | |
| # 🏨 Bluebird Hotel Assistant | |
| Welcome! Log in if you're a current guest, or book a new stay. Ask for cleaning anytime. | |
| """) | |
| session = gr.State({"logged_in": False}) | |
| with gr.Row(): | |
| # Left: Login | |
| with gr.Column(scale=1): | |
| gr.Markdown("### Guest Login") | |
| room_in = gr.Number(label="Room Number", value=None) | |
| last_name_in = gr.Textbox(label="Last Name") | |
| with gr.Row(): | |
| login_btn = gr.Button("Log In", variant="primary") | |
| logout_btn = gr.Button("Log Out") | |
| login_status = gr.Markdown("Not logged in.") | |
| # Middle: Chat | |
| with gr.Column(scale=2): | |
| gr.Markdown("### Chat") | |
| chat = gr.Chatbot(height=360, type="tuples") | |
| msg = gr.Textbox(placeholder="Type a message (e.g., 'Clean my room', 'Book Deluxe from 2025-10-12 to 2025-10-14')", label="Message", lines=2) | |
| with gr.Row(): | |
| send = gr.Button("Send", variant="primary") | |
| clean_btn = gr.Button("Call Cleaning") | |
| with gr.Accordion("Context (time, weather, near me)", open=False): | |
| with gr.Row(): | |
| time_of_day_dd = gr.Dropdown(["morning", "afternoon", "evening", "night"], label="Time of day", value=None) | |
| weather_dd = gr.Dropdown(["clear", "cloudy", "rain", "windy", "hot"], label="Weather", value=None) | |
| near_slider = gr.Slider(0, 50, value=0, step=1, label="Near me radius (km)") | |
| with gr.Row(): | |
| apply_ctx_btn = gr.Button("Apply Context") | |
| clear_ctx_btn = gr.Button("Clear Context") | |
| ctx_status = gr.Markdown("_No extra context set. Seasonal notes apply automatically._") | |
| # Right: Booking + Offers | |
| with gr.Column(scale=1): | |
| gr.Markdown("### Book a Room") | |
| guest_name = gr.Textbox(label="Guest Name", placeholder="Your name") | |
| room_type_dd = gr.Dropdown(list(HOTEL.room_types.keys()), label="Room Type") | |
| _DateTime = getattr(gr, "DateTime", None) | |
| _DatePicker = getattr(gr, "DatePicker", None) or getattr(gr, "Datepicker", None) | |
| if _DateTime: | |
| check_in = _DateTime(label="Check-in", include_time=True, type="string") | |
| check_out = _DateTime(label="Check-out", include_time=True, type="string") | |
| elif _DatePicker: | |
| check_in = _DatePicker(label="Check-in") | |
| check_out = _DatePicker(label="Check-out") | |
| else: | |
| gr.Markdown("⚠️ No calendar component in this Gradio version. Upgrade with: `pip install -U gradio`.\nEnter dates as YYYY-MM-DD.") | |
| check_in = gr.Textbox(label="Check-in (YYYY-MM-DD)", placeholder="2025-10-12") | |
| check_out = gr.Textbox(label="Check-out (YYYY-MM-DD)", placeholder="2025-10-15") | |
| book_btn = gr.Button("Find & Book", variant="primary") | |
| gr.Markdown("### Offers & Upsells") | |
| offer_card = gr.Markdown("(Click Refresh Offers)") | |
| accepted_md = gr.Markdown("**Accepted Offers:** None") | |
| offers_index = gr.State(0) | |
| with gr.Row(): | |
| prev_offer_btn = gr.Button("◀ Prev") | |
| next_offer_btn = gr.Button("Next ▶") | |
| with gr.Row(): | |
| accept_offer_btn = gr.Button("Accept Offer", variant="primary") | |
| refresh_offers_btn = gr.Button("Refresh Offers") | |
| # Wiring callbacks | |
| login_btn.click( | |
| login_action, | |
| inputs=[room_in, last_name_in, chat, session], | |
| outputs=[chat, session, login_status], | |
| ) | |
| logout_btn.click( | |
| logout_action, | |
| inputs=[chat, session], | |
| outputs=[chat, session, login_status], | |
| ) | |
| clean_btn.click( | |
| call_cleaning_action, | |
| inputs=[chat, session], | |
| outputs=[chat], | |
| ) | |
| apply_ctx_btn.click( | |
| set_context_action, | |
| inputs=[time_of_day_dd, weather_dd, near_slider, session], | |
| outputs=[session, ctx_status], | |
| ) | |
| clear_ctx_btn.click( | |
| clear_context_action, | |
| inputs=[session], | |
| outputs=[session, ctx_status, time_of_day_dd, weather_dd, near_slider], | |
| ) | |
| book_btn.click( | |
| book_action, | |
| inputs=[guest_name, room_type_dd, check_in, check_out, chat, session], | |
| outputs=[chat], | |
| ) | |
| send.click( | |
| handle_chat, | |
| inputs=[msg, chat, session, room_type_dd, check_in, check_out], | |
| outputs=[chat], | |
| ) | |
| msg.submit( | |
| handle_chat, | |
| inputs=[msg, chat, session, room_type_dd, check_in, check_out], | |
| outputs=[chat], | |
| ) | |
| # Offers wiring | |
| refresh_offers_btn.click( | |
| init_offers_state, | |
| inputs=[session], | |
| outputs=[offer_card, accepted_md, offers_index, session], | |
| ) | |
| next_offer_btn.click( | |
| next_offer_action, | |
| inputs=[offers_index, session], | |
| outputs=[offer_card, offers_index], | |
| ) | |
| prev_offer_btn.click( | |
| prev_offer_action, | |
| inputs=[offers_index, session], | |
| outputs=[offer_card, offers_index], | |
| ) | |
| accept_offer_btn.click( | |
| accept_offer_action, | |
| inputs=[offers_index, chat, session], | |
| outputs=[chat, offer_card, accepted_md, offers_index], | |
| ) | |
| app = demo | |
| if __name__ == "__main__": | |
| demo.launch() | |