GP.chatbot / app.py
Youmnaaaa's picture
Upload app.py
b4f8279 verified
# -*- coding: utf-8 -*-
"""app.ipynb
Automatically generated by Colab.
Original file is located at
https://colab.research.google.com/drive/1iPAjeI3M04kA13lYenlROS96tUeCYakB
"""
import os, re, json, math, random, pickle, joblib
import numpy as np
import pandas as pd
import torch
from datetime import datetime
from zoneinfo import ZoneInfo
from contextlib import asynccontextmanager
from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from typing import Optional
from sentence_transformers import SentenceTransformer, util
from transformers import (
AutoTokenizer,
AutoModelForSequenceClassification,
AutoModelForTokenClassification,
pipeline,
)
from huggingface_hub import snapshot_download
"""Paths"""
try:
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
except NameError:
BASE_DIR = os.getcwd()
# HuggingFace Model Repos
INTENT_REPO = "Youmnaaaa/intent-arabert-ff"
ENTITY_REPO = "Youmnaaaa/entity-hybrid-ff"
SEMANTIC_REPO = "Youmnaaaa/semantic-search-ff"
# ملف الأماكن جوا الـ Space
PLACES_FILE = os.path.join(BASE_DIR, "beni_suef_100_places_v5ff.xlsx")
intent_tokenizer = intent_model = label_encoder = id2intent = None
ner_pipeline = label2id = id2label = None
semantic_model = corpus_df = corpus_embeddings = places_df = None
SESSIONS: dict = {}
def clean_text(text):
text = str(text).strip().lower()
text = re.sub(r"ـ+", "", text)
for old, new in [("[إأآا]","ا"),("ى","ي"),("ة","ه"),("ؤ","و"),("ئ","ي")]:
text = re.sub(old, new, text)
text = re.sub(r"[^\w\s]", " ", text)
return re.sub(r"\s+", " ", text).strip()
def norm(text):
text = str(text).strip().lower()
text = re.sub(r"ـ+", "", text)
for old, new in [("[إأآا]","ا"),("ى","ي"),("ة","ه"),("ؤ","و"),("ئ","ي")]:
text = re.sub(old, new, text)
for old, new in [("صباحًا","ص"),("صباحا","ص"),("مساءً","م"),("مساءا","م"),
("ليلًا","م"),("ليلا","م"),("إلى","-"),("الى","-"),("حتى","-"),
("–","-"),("—","-")]:
text = text.replace(old, new)
return re.sub(r"\s+", " ", text).strip()
# INTENT MAPS
SEARCH_INTENTS = {"nearest_restaurant","nearest_pharmacy","nearest_cafe",
"nearest_supermarket","housing_search","recommend_place",
"open_now","place_details"}
STATIC_INTENTS = {"greeting","thanks","goodbye","confirm","deny"}
INTENT_TO_CATEGORY = {
"nearest_restaurant":"restaurant","nearest_pharmacy":"pharmacy",
"nearest_cafe":"cafe","nearest_supermarket":"supermarket",
"housing_search":"housing",
}
INTENT_TEMPLATE_MAP = {
"nearest_restaurant":"find_restaurant","nearest_pharmacy":"find_pharmacy",
"nearest_cafe":"find_cafe","nearest_supermarket":"find_supermarket",
"housing_search":"find_housing","recommend_place":"find_restaurant",
"open_now":"find_restaurant","place_details":"find_restaurant",
"greeting":"greeting","thanks":"thanks","goodbye":"goodbye",
"confirm":"clarification","deny":"clarification","fallback":"fallback",
}
ENTITY_FIELD_MAP = {
"location":"location","place_type":"category","cuisine_or_item":"sub_category",
"food_type":"sub_category","price":"price","price_range":"price",
"category":"category","sub_category":"sub_category","facility_type":"category",
"housing_type":"category","status":"status","time":"time",
}
KEYWORD_OVERRIDE = {
"goodbye": ["مع السلامة","مع السلامه","باي","وداعا","bye","goodbye","تصبح على خير",
"في امان الله","الله يسلمك","سلامتك"],
"greeting":["السلام عليكم","وعليكم السلام","اهلا","أهلا","هلا","هلو","مرحبا","مرحباً",
"صباح الخير","مساء الخير","هاي","hi","hello","صباح","مساء"],
"thanks": ["شكرا","شكراً","تسلم","يسلمو","ممنون","مشكور","thanks","thank","الف شكر"],
}
CATEGORY_KEYWORDS = {
"restaurant":["مطعم","اكل","وجبات","مشويات","كباب","شاورما","كريب","برجر","سمك","فرايد"],
"pharmacy": ["صيدليه","صيدلية","دوا","ادويه","دواء"],
"cafe": ["كافيه","كوفي","قهوه","قهوة","كافيتيريا"],
"supermarket":["سوبرماركت","ماركت","بقاله","هايبر"],
"housing": ["شقه","شقة","ايجار","إيجار","فندق","هوستل","سكن"],
}
_CAT_MAP = {
"مطعم":"restaurant","مطاعم":"restaurant","طعام":"restaurant","اكل":"restaurant",
"صيدليه":"pharmacy","صيدلية":"pharmacy","صيدله":"pharmacy","دواء":"pharmacy","دوا":"pharmacy",
"كافيه":"cafe","كافية":"cafe","كوفي":"cafe","قهوه":"cafe","قهوة":"cafe","كافيتيريا":"cafe",
"سوبرماركت":"supermarket","ماركت":"supermarket","بقاله":"supermarket","بقالة":"supermarket","هايبر":"supermarket",
"شقه":"housing","شقة":"housing","ايجار":"housing","إيجار":"housing",
"فندق":"housing","سكن":"housing","هوستل":"housing",
}
def normalize_category(cat):
if not cat: return cat
cat_s = str(cat).strip()
if cat_s in ("restaurant","pharmacy","cafe","supermarket","housing"):
return cat_s
if cat_s in _CAT_MAP:
return _CAT_MAP[cat_s]
for ar, en in _CAT_MAP.items():
if ar in cat_s or cat_s in ar:
return en
return cat_s
CLARIFICATION_Q = {
"nearest_restaurant":"أي نوع أكل؟ مشويات، شاورما، كريب، برجر؟",
"nearest_pharmacy":"في أي منطقة بتدور على صيدلية؟",
"nearest_cafe":"في أي منطقة بتدور على كافيه؟",
"nearest_supermarket":"في أي منطقة بتدور على ماركت؟",
"housing_search":"بتدور على إيه — شقة إيجار، فندق؟ وفين؟",
}
OUT_OF_SCOPE_KW = ["الجو","طقس","درجه","كوره","كرة","أهلي","زمالك","مباريات",
"سياسه","سياسة","أخبار","رصيد","بنك","تحويل","امتحان","مدرسه",
"جامعه","وظيفه","برمجه","كود","python","java","رياضيات","ترجمه","translate"]
NEXT_WORDS = ["تاني","غيره","غيرها","بديل","حاجة تانية","مش عاجبني","فيه تاني","عايز غيره"]
DETAIL_WORDS = ["بيفتح","بتفتح","مواعيده","مواعيدها","امتى","امتي","عنوانه","عنوانها",
"تليفونه","تليفونها","رقمه","رقمها","تقييمه","تقييمها","سعره","سعرها"]
REF_WORDS = ["هو","هي","ده","دي","المكان ده"]
_LOC_CUES = ["الحي","بني سويف","الاباصيري","الكورنيش","مقبل","الزراعيين",
"صلاح سالم","شرق النيل","سيتي سنتر","عرابي","الروضه"]
# HELPER FUNCTIONS
def apply_keyword_override(text):
t = norm(text); tw = set(t.split())
for intent, kws in KEYWORD_OVERRIDE.items():
for k in sorted(kws, key=len, reverse=True):
kn = norm(k)
if (" " in kn and kn in t) or (kn in tw): return intent
return None
def get_template_key(intent, category=None):
if category:
k = {"restaurant":"find_restaurant","pharmacy":"find_pharmacy",
"cafe":"find_cafe","supermarket":"find_supermarket",
"housing":"find_housing"}.get(category)
if k: return k
return INTENT_TEMPLATE_MAP.get(intent, "fallback")
def infer_category(query):
q = norm(query)
for cat, words in CATEGORY_KEYWORDS.items():
if any(norm(w) in q for w in words): return cat
return None
def is_out_of_scope(text):
t = norm(text)
return any(norm(k) in t for k in OUT_OF_SCOPE_KW)
def detect_ref_type(text):
t = norm(text); tw = set(t.split())
if any(norm(w) in t for w in NEXT_WORDS): return "next"
if any(norm(w) in t for w in DETAIL_WORDS): return "detail"
for w in REF_WORDS:
wn = norm(w)
if (" " in wn and wn in t) or (wn in tw): return "reference"
return "new"
def _loc_continuation(text):
t = norm(text); words = t.split()
if len(words) <= 4 and any(norm(c) in t for c in _LOC_CUES): return True
return bool(words and words[0] == "في")
def normalize_rating(r):
try:
r = float(r)
return round(r/2, 1) if r > 5 else round(r, 1) if r > 0 else 0.0
except: return 0.0
# TIME UTILS
def get_cairo_now():
return datetime.now(ZoneInfo("Africa/Cairo"))
def parse_time(token):
token = norm(token).replace(" ", "")
m = re.match(r"^(\d{1,2})(?::(\d{1,2}))?(ص|م|ظهر)?$", token)
if not m: return None
h = int(m.group(1)); mn = int(m.group(2)) if m.group(2) else 0; suf = m.group(3)
if not (0 <= mn <= 59): return None
if suf == "ص":
if h == 12: h = 0
elif not (1 <= h <= 11): return None
elif suf in ("م","ظهر"):
if h != 12 and 1 <= h <= 11: h += 12
else:
if h == 24: h = 0
elif not (0 <= h <= 23): return None
return f"{h:02d}:{mn:02d}"
def check_open_now(opening_hours_str):
if not opening_hours_str or str(opening_hours_str).strip() in ("","nan","none"): return None
text = norm(str(opening_hours_str))
if any(k in text for k in ["24","always","طول اليوم","24/7"]): return 1
sep = re.search(r"(.+?)\s*-\s*(.+)", text)
if not sep: return None
t1 = parse_time(sep.group(1).strip()); t2 = parse_time(sep.group(2).strip())
if not t1 or not t2: return None
now_t = f"{get_cairo_now().hour:02d}:{get_cairo_now().minute:02d}"
if t1 <= t2: return 1 if t1 <= now_t <= t2 else 0
return 1 if (now_t >= t1 or now_t <= t2) else 0
# SEARCH + FILTER + RANK
def semantic_candidates(query, top_k=20):
q_emb = semantic_model.encode(clean_text(query), convert_to_tensor=True)
scores = util.cos_sim(q_emb, corpus_embeddings)[0]
top_k = min(top_k, len(corpus_df))
top_r = torch.topk(scores, k=top_k)
res = corpus_df.iloc[top_r.indices.cpu().numpy()].copy()
res["semantic_score"] = top_r.values.cpu().numpy()
keep = [c for c in ["place_id","doc_id","name","category","sub_category","location",
"address","price_range","opening_hours","description","semantic_score"]
if c in res.columns]
return res[keep].reset_index(drop=True)
def merge_places(df):
extra = [c for c in ["lat","lon","rating","phone","social_media","status",
"category_clean","sub_category_clean","location_clean",
"address_clean","price_range_clean","search_text_clean"]
if c in places_df.columns]
slim = places_df[["place_id"] + extra].copy()
return df.merge(slim, on="place_id", how="left")
def apply_filters(df, query, category=None, sub_category=None, location=None,
price_range=None, open_now_only=False, min_rating=None):
f = df.copy()
if category: f = f[f["category_clean"].astype(str).str.contains(re.escape(clean_text(category)), na=False)]
if sub_category: f = f[f["sub_category_clean"].astype(str).str.contains(re.escape(clean_text(sub_category)), na=False)]
if location: f = f[f["location_clean"].astype(str).str.contains(re.escape(clean_text(location)), na=False)]
if price_range: f = f[f["price_range_clean"].astype(str).str.contains(re.escape(clean_text(price_range)), na=False)]
f["open_now"] = f["opening_hours"].apply(check_open_now)
f["rating_num"] = pd.to_numeric(f.get("rating", pd.Series()), errors="coerce").fillna(0)
f["rating_norm"] = f["rating_num"].apply(normalize_rating)
f["rating_score"] = f["rating_norm"] / 5.0
f["open_score"] = f["open_now"].apply(lambda x: 1.0 if x==1 else (0.5 if x is None else 0.0))
if open_now_only: f = f[f["open_now"] == 1]
if min_rating: f = f[f["rating_norm"] >= min_rating]
return f
def haversine(lat1, lon1, lat2, lon2):
R=6371; p=math.pi/180
a = (math.sin((lat2-lat1)*p/2)**2 + math.cos(lat1*p)*math.cos(lat2*p)*math.sin((lon2-lon1)*p/2)**2)
return 2*R*math.asin(math.sqrt(a))
def rank(df, query, user_lat=None, user_lon=None):
df = df.copy()
if user_lat and user_lon and "lat" in df.columns:
def dist(row):
try: return haversine(float(user_lat), float(user_lon), float(row["lat"]), float(row["lon"]))
except: return 999
df["distance_km"] = df.apply(dist, axis=1)
mx = df["distance_km"].replace(999, np.nan).max() or 1
df["distance_score"] = 1 - (df["distance_km"] / (mx + 1))
else:
df["distance_km"] = 999; df["distance_score"] = 0.0
q_clean = clean_text(query)
df["name_match_score"] = df["name"].apply(
lambda n: 1.0 if clean_text(str(n)) in q_clean or q_clean in clean_text(str(n)) else 0.0)
w = dict(semantic=0.40, rating=0.25, open=0.15, distance=0.10, name=0.10)
df["final_score"] = (
w["semantic"]*df.get("semantic_score", pd.Series(0,index=df.index)).fillna(0) +
w["rating"] *df.get("rating_score", pd.Series(0,index=df.index)).fillna(0) +
w["open"] *df.get("open_score", pd.Series(0,index=df.index)).fillna(0) +
w["distance"]*df["distance_score"] + w["name"]*df["name_match_score"]
)
return df.sort_values("final_score", ascending=False).reset_index(drop=True)
def search_places(query, top_k_final=5, category=None, sub_category=None,
location=None, price_range=None, open_now_only=False,
min_rating=None, user_lat=None, user_lon=None):
cands = semantic_candidates(query, top_k=20)
merged = merge_places(cands)
for attempt in [
dict(category=category, sub_category=sub_category, location=location,
price_range=price_range, open_now_only=open_now_only, min_rating=min_rating),
dict(category=category, sub_category=None, location=location,
price_range=price_range, open_now_only=open_now_only, min_rating=min_rating),
dict(category=category, sub_category=None, location=location,
price_range=None, open_now_only=False, min_rating=min_rating),
dict(category=category, sub_category=None, location=None,
price_range=None, open_now_only=False, min_rating=None),
]:
filtered = apply_filters(merged, query, **attempt)
if not filtered.empty: break
if filtered.empty: return pd.DataFrame()
ranked = rank(filtered, query, user_lat, user_lon)
keep = [c for c in ["place_id","name","category","sub_category","location","address",
"price_range","rating","rating_norm","opening_hours","description",
"phone","lat","lon","semantic_score","final_score","open_now"]
if c in ranked.columns]
return ranked[keep].head(top_k_final).reset_index(drop=True)
# RESPONSE TEMPLATES + FORMATTERS
RESPONSE_TEMPLATES = {
"find_restaurant":[
"🍽️ لقيتلك {name} في {location}. {price_info}{rating_info}{hours_info}",
"أنصحك بـ {name} — هتلاقيه في {location}. {price_info}{rating_info}{hours_info}",
"في {location} فيه {name}. {description_short}{price_info}{hours_info}",
],
"find_pharmacy":[
"💊 {name} في {location}.{hours_info}{rating_info}",
"أقرب صيدلية ليك: {name} — {address_info}{hours_info}",
],
"find_cafe":[
"☕ {name} في {location}. {price_info}{rating_info}{hours_info}",
"جرب {name} — في {location}. {description_short}{hours_info}",
],
"find_supermarket":[
"🛒 {name} في {location}.{hours_info}{rating_info}",
"أقرب ماركت: {name} — {address_info}{hours_info}",
],
"find_housing":[
"🏠 {name} في {location}. {price_info}{description_short}",
"فيه {name} في {location}. {price_info}{rating_info}",
],
"greeting": ["أهلاً! 😊 أنا بساعدك تلاقي أي مكان في بني سويف. عايز إيه؟",
"وعليكم السلام! قولي محتاج إيه — مطعم، صيدلية، كافيه؟",
"هلا بيك! محتاج إيه في بني سويف؟ 😊"],
"thanks": ["العفو! 😊 في حاجة تانية أساعدك فيها؟","أي خدمة! 😊","بكل سرور! 😊"],
"goodbye": ["مع السلامة! 👋","سلامتك! أي وقت محتاج مساعدة أنا هنا.","باي! ربنا يوفقك 😊"],
"clarification":["😊 قصدك إيه بالظبط؟","ممكن توضح أكتر؟","تمام! بتدور على إيه بالظبط؟"],
"no_result": ["😔 مش لاقي حاجة مناسبة. جرب تغير المنطقة أو تسأل بطريقة تانية.",
"معلش، مفيش نتايج. ممكن تحدد المنطقة أو النوع أكتر؟"],
"fallback": ["آسف، مش فاهم قصدك. 😊 قولي محتاج إيه — مطعم، صيدلية، كافيه؟",
"ممكن تسألني عن أي مكان في بني سويف وأنا هساعدك! 😊"],
}
def fmt_price(x):
p = str(x).strip().lower()
if not p or p in ("","nan","none"): return ""
m = {"cheap":"الأسعار رخيصة","رخيص":"الأسعار رخيصة","اقتصادي":"الأسعار اقتصادية",
"medium":"الأسعار متوسطة","متوسط":"الأسعار متوسطة",
"expensive":"الأسعار غالية","غالي":"الأسعار غالية"}
for k,v in m.items():
if k in p: return v+". "
return f"السعر: {x}. "
def fmt_rating(x):
try:
r = normalize_rating(float(x)); stars = min(round(r), 5)
return f"تقييمه {r} {'⭐'*stars}. " if r > 0 else ""
except: return ""
def fmt_hours(x):
h = str(x).strip()
if not h or h in ("","nan","none"): return ""
if any(k in h.lower() for k in ["24","always","طول اليوم"]): return "مفتوح 24 ساعة. "
return f"بيفتح: {h}. "
def fmt_addr(address, location):
a=str(address).strip(); l=str(location).strip()
if a and a not in ("","nan","none"): return f"عنوانه: {a}. "
if l and l not in ("","nan","none"): return f"في {l}. "
return ""
def fmt_desc(x, max_words=12):
d = str(x).strip()
if not d or d in ("","nan","none"): return ""
words = d.split()
return (" ".join(words[:max_words])+"...") if len(words)>max_words else d+" "
def build_response(place, intent, category=None):
if not place: return random.choice(RESPONSE_TEMPLATES["no_result"])
tk = get_template_key(intent, category)
reply = random.choice(RESPONSE_TEMPLATES[tk]).format(
name = str(place.get("name","")).strip(),
location = str(place.get("location","")).strip() or "بني سويف",
price_info = fmt_price(place.get("price_range","")),
rating_info = fmt_rating(place.get("rating_norm", place.get("rating", 0))),
hours_info = fmt_hours(place.get("opening_hours","")),
address_info = fmt_addr(place.get("address",""), place.get("location","")),
description_short= fmt_desc(place.get("description","")),
)
on = place.get("open_now")
if on == 1: reply += "\n🟢 مفتوح دلوقتي."
elif on == 0: reply += "\n🔴 مغلق دلوقتي."
return reply
def handle_detail(text, place):
if not place: return "مش فاكر إحنا اتكلمنا عن مكان. ممكن تسألني من الأول؟"
t = norm(text); name = str(place.get("name","")).strip()
if any(w in t for w in ["امتي","امتى","مواعيد","يفتح","تفتح","يقفل"]):
st = "🟢 مفتوح" if place.get("open_now")==1 else "🔴 مغلق"
return f"⏰ {name}{fmt_hours(place.get('opening_hours',''))}\n{st} دلوقتي."
if any(w in t for w in ["عنوان","فين","وصول","اوصل"]):
return f"📍 {name} في {place.get('location','')}.\\nالعنوان: {place.get('address','')}"
if any(w in t for w in ["سعر","بكام","تكلف","غالي","رخيص"]):
return f"💰 {name}{fmt_price(place.get('price_range',''))}"
if any(w in t for w in ["تقييم","نجوم"]):
return f"⭐ {name}{fmt_rating(place.get('rating_norm', place.get('rating',0)))}"
if any(w in t for w in ["رقم","تليفون"]):
phone = str(place.get("phone","")).strip()
return f"📞 {name}{phone}" if phone else f"معنديش رقم {name}."
return f"📋 {name}:\n{fmt_desc(place.get('description',''), 20)}\n{fmt_hours(place.get('opening_hours',''))}{fmt_rating(place.get('rating_norm',0))}"
# PREDICT FUNCTIONS
def predict_intent(text, threshold=0.5):
override = apply_keyword_override(text)
if override: return {"intent": override, "confidence": 1.0}
inputs = intent_tokenizer(text, return_tensors="pt", truncation=True, padding=True, max_length=128)
with torch.no_grad():
outputs = intent_model(**inputs)
probs = torch.softmax(outputs.logits, dim=1)
pid = torch.argmax(probs, dim=1).item()
conf = probs[0][pid].item()
return {"intent": id2intent[pid] if conf >= threshold else "fallback", "confidence": round(conf, 4)}
def extract_entities(text, min_score=0.40):
raw = ner_pipeline([text])[0]; entities = {}
for item in raw:
rtype = item["entity_group"].lower().strip()
val = re.sub(r"##", "", item["word"].strip()).strip()
val = re.sub(r"\s+", " ", val).strip()
score = float(item["score"])
if len(val) < 2 or score < min_score: continue
mapped = ENTITY_FIELD_MAP.get(rtype, rtype)
val_c = clean_text(val)
if mapped not in entities or len(val_c) > len(clean_text(entities[mapped])):
entities[mapped] = val_c
return entities
# SESSION
class Session:
def __init__(self, sid="default"):
self.sid = sid; self.history=[]; self.last_intent=None
self.last_entities={}; self.last_place=None
self.last_results=[]; self.result_pointer=0
self.context_slots={}; self.turns=0
def add(self, user, bot, intent, entities, place, results):
self.history.append({"turn":self.turns,"user":user,"bot":bot,
"intent":intent,"entities":entities})
if intent and intent not in ("fallback","no_result","out_of_scope"):
self.last_intent = intent
if intent in SEARCH_INTENTS:
self.last_entities = entities
if place is not None: self.last_place = place
if results: self.last_results=results; self.result_pointer=0
self._slots(entities)
self.turns += 1
def _slots(self, ents):
for s in ["location","category","sub_category","price"]:
v = ents.get(s)
if v and str(v).strip(): self.context_slots[s] = str(v).strip()
def merge(self, new_ents):
merged = dict(self.context_slots)
for k,v in new_ents.items():
if v and str(v).strip(): merged[k]=str(v).strip()
self._slots(new_ents)
return merged
# MAIN CHAT
def chat(text: str, session: Session, user_lat=None, user_lon=None):
result = dict(reply="", intent="", confidence=0.0, entities={}, best_place=None, all_results=[])
if not text or not text.strip():
result.update(reply="الرجاء إدخال سؤال 😊", intent="fallback")
session.add("", result["reply"], "fallback", {}, None, [])
return result
if is_out_of_scope(text):
reply = "أنا متخصص في إيجاد الأماكن في بني سويف فقط. 😊\nممكن أساعدك تلاقي مطعم، صيدلية، كافيه، ماركت، أو سكن."
result.update(reply=reply, intent="out_of_scope")
session.add(text, reply, "out_of_scope", {}, None, [])
return result
ref = detect_ref_type(text)
if ref == "detail" and session.last_place:
reply = handle_detail(text, session.last_place)
result.update(reply=reply, intent=session.last_intent or "detail", best_place=session.last_place)
session.add(text, reply, result["intent"], {}, session.last_place, [])
return result
if ref == "next" and session.last_results:
ptr = session.result_pointer + 1
if ptr < len(session.last_results):
session.result_pointer = ptr; nxt = session.last_results[ptr]; session.last_place = nxt
reply = build_response(nxt, session.last_intent, category=nxt.get("category"))
result.update(reply=reply, intent=session.last_intent, best_place=nxt)
else:
result.update(reply="😔 مفيش نتايج تانية. عايز أدور من الأول؟", intent="no_result")
session.add(text, result["reply"], result["intent"], {}, result["best_place"], [])
return result
ir = predict_intent(text); intent = ir["intent"]; conf = ir["confidence"]
result["intent"] = intent; result["confidence"] = conf
if intent in STATIC_INTENTS:
result["reply"] = random.choice(RESPONSE_TEMPLATES[get_template_key(intent)])
session.add(text, result["reply"], intent, {}, None, [])
return result
if intent == "fallback":
if session.last_intent in SEARCH_INTENTS and _loc_continuation(text):
intent = session.last_intent; result["intent"] = intent
else:
result["reply"] = random.choice(RESPONSE_TEMPLATES["fallback"])
session.add(text, result["reply"], "fallback", {}, None, [])
return result
if intent not in SEARCH_INTENTS:
result["reply"] = random.choice(RESPONSE_TEMPLATES.get(get_template_key(intent), RESPONSE_TEMPLATES["fallback"]))
session.add(text, result["reply"], intent, {}, None, [])
return result
ents = extract_entities(text); result["entities"] = ents
merged = session.merge(ents)
category = normalize_category(merged.get("category") or INTENT_TO_CATEGORY.get(intent) or infer_category(text))
sub_cat = merged.get("sub_category")
location = merged.get("location")
price_range = merged.get("price")
open_only = ("open_now" in intent or "place_details" in intent)
df = search_places(text, top_k_final=5, category=category, sub_category=sub_cat,
location=location, price_range=price_range, open_now_only=open_only,
user_lat=user_lat, user_lon=user_lon)
if df.empty:
cl = CLARIFICATION_Q.get(intent, "")
reply = random.choice(RESPONSE_TEMPLATES["no_result"]) + (f"\n\n💬 {cl}" if cl else "")
result.update(reply=reply, intent="no_result")
session.add(text, reply, "no_result", ents, None, [])
return result
all_res = df.to_dict(orient="records"); best = all_res[0]
reply = build_response(best, intent, category=category)
if len(all_res) > 1: reply += f"\n\n💬 فيه {len(all_res)} نتيجة — قولي 'تاني' لو عايز غيره."
result.update(reply=reply, best_place=best, all_results=all_res)
session.add(text, reply, intent, ents, best, all_res)
return result
@asynccontextmanager
async def lifespan(app: FastAPI):
global intent_tokenizer, intent_model, label_encoder, id2intent
global ner_pipeline, label2id, id2label
global semantic_model, corpus_df, corpus_embeddings, places_df
print("⏳ Downloading models from HuggingFace …")
# تحميل الموديلز من HuggingFace Model Hub
intent_local = snapshot_download(INTENT_REPO)
entity_local = snapshot_download(ENTITY_REPO)
semantic_local = snapshot_download(SEMANTIC_REPO)
print("⏳ Loading Intent model …")
intent_tokenizer = AutoTokenizer.from_pretrained(intent_local)
intent_model = AutoModelForSequenceClassification.from_pretrained(intent_local)
label_encoder = joblib.load(os.path.join(intent_local, "label_encoder.pkl"))
id2intent = {i: lbl for i, lbl in enumerate(label_encoder.classes_)}
intent_model.eval()
print("⏳ Loading Entity model …")
with open(os.path.join(entity_local, "label2id.json"), encoding="utf-8") as f: label2id = json.load(f)
with open(os.path.join(entity_local, "id2label.json"), encoding="utf-8") as f: id2label = json.load(f)
etok = AutoTokenizer.from_pretrained(entity_local)
emod = AutoModelForTokenClassification.from_pretrained(entity_local)
ner_pipeline = pipeline("token-classification", model=emod, tokenizer=etok, aggregation_strategy="first")
print("⏳ Loading Semantic model …")
semantic_model = SentenceTransformer("Youmnaaaa/semantic-search-ff")
from huggingface_hub import hf_hub_download
pkl_path = hf_hub_download(
repo_id="Youmnaaaa/semantic-search-ff",
filename="semantic_data.pkl"
)
with open(pkl_path, "rb") as f:
sd = pickle.load(f)
corpus_df = sd["corpus_df"]
corpus_embeddings = sd["corpus_embeddings"]
places_df = pd.read_excel(PLACES_FILE)
for col in ["place_id","name","category","sub_category","location","address",
"price_range","rating","opening_hours","description","lat","lon"]:
if col not in places_df.columns: places_df[col] = ""
places_df = places_df.fillna("")
places_df["category_clean"] = places_df["category"].apply(clean_text)
places_df["sub_category_clean"] = places_df["sub_category"].apply(clean_text)
places_df["location_clean"] = places_df["location"].apply(clean_text)
places_df["address_clean"] = places_df["address"].apply(clean_text)
places_df["price_range_clean"] = places_df["price_range"].apply(clean_text)
places_df["description_clean"] = places_df["description"].apply(clean_text)
places_df["search_text_clean"] = (
places_df["name"].astype(str)+" "+places_df["category"].astype(str)+" "+
places_df["sub_category"].astype(str)+" "+places_df["location"].astype(str)+" "+
places_df["description"].astype(str)
).apply(clean_text)
print("✅ All models loaded!")
yield
print("Shutting down.")
# FASTAPI
app = FastAPI(title="Beni Suef Chatbot API", version="1.0.0", lifespan=lifespan)
app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"])
class ChatRequest(BaseModel):
message: str
session_id: str = "default"
user_lat: Optional[float] = None
user_lon: Optional[float] = None
class ChatResponse(BaseModel):
reply: str
intent: str
confidence: float
entities: dict
session_id: str
best_place: Optional[dict] = None
@app.get("/")
def root():
return {"status": "ok", "message": "Beni Suef Chatbot is running 🚀"}
@app.get("/health")
def health():
return {"status": "healthy",
"models_loaded": all([intent_model, ner_pipeline, semantic_model, places_df is not None])}
@app.post("/chat", response_model=ChatResponse)
def chat_endpoint(req: ChatRequest):
if req.session_id not in SESSIONS:
SESSIONS[req.session_id] = Session(req.session_id)
session = SESSIONS[req.session_id]
try:
result = chat(req.message, session, req.user_lat, req.user_lon)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
best = result.get("best_place")
if best:
best = {k: (float(v) if isinstance(v, (np.floating, np.integer)) else
(None if (isinstance(v, float) and np.isnan(v)) else v))
for k, v in best.items()
if k in ["place_id","name","category","sub_category","location","address",
"price_range","rating","opening_hours","description","phone",
"lat","lon","open_now","final_score"]}
return ChatResponse(reply=result["reply"], intent=result["intent"],
confidence=result["confidence"], entities=result["entities"],
session_id=req.session_id, best_place=best)
@app.delete("/session/{session_id}")
def reset_session(session_id: str):
SESSIONS.pop(session_id, None)
return {"status": "reset", "session_id": session_id}