# -*- coding: utf-8 -*- """app.ipynb Automatically generated by Colab. Original file is located at https://colab.research.google.com/drive/1iPAjeI3M04kA13lYenlROS96tUeCYakB """ import os, re, json, math, random, pickle, joblib import numpy as np import pandas as pd import torch from datetime import datetime from zoneinfo import ZoneInfo from contextlib import asynccontextmanager from fastapi import FastAPI, HTTPException from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel from typing import Optional from sentence_transformers import SentenceTransformer, util from transformers import ( AutoTokenizer, AutoModelForSequenceClassification, AutoModelForTokenClassification, pipeline, ) from huggingface_hub import snapshot_download """Paths""" try: BASE_DIR = os.path.dirname(os.path.abspath(__file__)) except NameError: BASE_DIR = os.getcwd() # HuggingFace Model Repos INTENT_REPO = "Youmnaaaa/intent-arabert-ff" ENTITY_REPO = "Youmnaaaa/entity-hybrid-ff" SEMANTIC_REPO = "Youmnaaaa/semantic-search-ff" # ملف الأماكن جوا الـ Space PLACES_FILE = os.path.join(BASE_DIR, "beni_suef_100_places_v5ff.xlsx") intent_tokenizer = intent_model = label_encoder = id2intent = None ner_pipeline = label2id = id2label = None semantic_model = corpus_df = corpus_embeddings = places_df = None SESSIONS: dict = {} def clean_text(text): text = str(text).strip().lower() text = re.sub(r"ـ+", "", text) for old, new in [("[إأآا]","ا"),("ى","ي"),("ة","ه"),("ؤ","و"),("ئ","ي")]: text = re.sub(old, new, text) text = re.sub(r"[^\w\s]", " ", text) return re.sub(r"\s+", " ", text).strip() def norm(text): text = str(text).strip().lower() text = re.sub(r"ـ+", "", text) for old, new in [("[إأآا]","ا"),("ى","ي"),("ة","ه"),("ؤ","و"),("ئ","ي")]: text = re.sub(old, new, text) for old, new in [("صباحًا","ص"),("صباحا","ص"),("مساءً","م"),("مساءا","م"), ("ليلًا","م"),("ليلا","م"),("إلى","-"),("الى","-"),("حتى","-"), ("–","-"),("—","-")]: text = text.replace(old, new) return re.sub(r"\s+", " ", text).strip() # INTENT MAPS SEARCH_INTENTS = {"nearest_restaurant","nearest_pharmacy","nearest_cafe", "nearest_supermarket","housing_search","recommend_place", "open_now","place_details"} STATIC_INTENTS = {"greeting","thanks","goodbye","confirm","deny"} INTENT_TO_CATEGORY = { "nearest_restaurant":"restaurant","nearest_pharmacy":"pharmacy", "nearest_cafe":"cafe","nearest_supermarket":"supermarket", "housing_search":"housing", } INTENT_TEMPLATE_MAP = { "nearest_restaurant":"find_restaurant","nearest_pharmacy":"find_pharmacy", "nearest_cafe":"find_cafe","nearest_supermarket":"find_supermarket", "housing_search":"find_housing","recommend_place":"find_restaurant", "open_now":"find_restaurant","place_details":"find_restaurant", "greeting":"greeting","thanks":"thanks","goodbye":"goodbye", "confirm":"clarification","deny":"clarification","fallback":"fallback", } ENTITY_FIELD_MAP = { "location":"location","place_type":"category","cuisine_or_item":"sub_category", "food_type":"sub_category","price":"price","price_range":"price", "category":"category","sub_category":"sub_category","facility_type":"category", "housing_type":"category","status":"status","time":"time", } KEYWORD_OVERRIDE = { "goodbye": ["مع السلامة","مع السلامه","باي","وداعا","bye","goodbye","تصبح على خير", "في امان الله","الله يسلمك","سلامتك"], "greeting":["السلام عليكم","وعليكم السلام","اهلا","أهلا","هلا","هلو","مرحبا","مرحباً", "صباح الخير","مساء الخير","هاي","hi","hello","صباح","مساء"], "thanks": ["شكرا","شكراً","تسلم","يسلمو","ممنون","مشكور","thanks","thank","الف شكر"], } CATEGORY_KEYWORDS = { "restaurant":["مطعم","اكل","وجبات","مشويات","كباب","شاورما","كريب","برجر","سمك","فرايد"], "pharmacy": ["صيدليه","صيدلية","دوا","ادويه","دواء"], "cafe": ["كافيه","كوفي","قهوه","قهوة","كافيتيريا"], "supermarket":["سوبرماركت","ماركت","بقاله","هايبر"], "housing": ["شقه","شقة","ايجار","إيجار","فندق","هوستل","سكن"], } _CAT_MAP = { "مطعم":"restaurant","مطاعم":"restaurant","طعام":"restaurant","اكل":"restaurant", "صيدليه":"pharmacy","صيدلية":"pharmacy","صيدله":"pharmacy","دواء":"pharmacy","دوا":"pharmacy", "كافيه":"cafe","كافية":"cafe","كوفي":"cafe","قهوه":"cafe","قهوة":"cafe","كافيتيريا":"cafe", "سوبرماركت":"supermarket","ماركت":"supermarket","بقاله":"supermarket","بقالة":"supermarket","هايبر":"supermarket", "شقه":"housing","شقة":"housing","ايجار":"housing","إيجار":"housing", "فندق":"housing","سكن":"housing","هوستل":"housing", } def normalize_category(cat): if not cat: return cat cat_s = str(cat).strip() if cat_s in ("restaurant","pharmacy","cafe","supermarket","housing"): return cat_s if cat_s in _CAT_MAP: return _CAT_MAP[cat_s] for ar, en in _CAT_MAP.items(): if ar in cat_s or cat_s in ar: return en return cat_s CLARIFICATION_Q = { "nearest_restaurant":"أي نوع أكل؟ مشويات، شاورما، كريب، برجر؟", "nearest_pharmacy":"في أي منطقة بتدور على صيدلية؟", "nearest_cafe":"في أي منطقة بتدور على كافيه؟", "nearest_supermarket":"في أي منطقة بتدور على ماركت؟", "housing_search":"بتدور على إيه — شقة إيجار، فندق؟ وفين؟", } OUT_OF_SCOPE_KW = ["الجو","طقس","درجه","كوره","كرة","أهلي","زمالك","مباريات", "سياسه","سياسة","أخبار","رصيد","بنك","تحويل","امتحان","مدرسه", "جامعه","وظيفه","برمجه","كود","python","java","رياضيات","ترجمه","translate"] NEXT_WORDS = ["تاني","غيره","غيرها","بديل","حاجة تانية","مش عاجبني","فيه تاني","عايز غيره"] DETAIL_WORDS = ["بيفتح","بتفتح","مواعيده","مواعيدها","امتى","امتي","عنوانه","عنوانها", "تليفونه","تليفونها","رقمه","رقمها","تقييمه","تقييمها","سعره","سعرها"] REF_WORDS = ["هو","هي","ده","دي","المكان ده"] _LOC_CUES = ["الحي","بني سويف","الاباصيري","الكورنيش","مقبل","الزراعيين", "صلاح سالم","شرق النيل","سيتي سنتر","عرابي","الروضه"] # HELPER FUNCTIONS def apply_keyword_override(text): t = norm(text); tw = set(t.split()) for intent, kws in KEYWORD_OVERRIDE.items(): for k in sorted(kws, key=len, reverse=True): kn = norm(k) if (" " in kn and kn in t) or (kn in tw): return intent return None def get_template_key(intent, category=None): if category: k = {"restaurant":"find_restaurant","pharmacy":"find_pharmacy", "cafe":"find_cafe","supermarket":"find_supermarket", "housing":"find_housing"}.get(category) if k: return k return INTENT_TEMPLATE_MAP.get(intent, "fallback") def infer_category(query): q = norm(query) for cat, words in CATEGORY_KEYWORDS.items(): if any(norm(w) in q for w in words): return cat return None def is_out_of_scope(text): t = norm(text) return any(norm(k) in t for k in OUT_OF_SCOPE_KW) def detect_ref_type(text): t = norm(text); tw = set(t.split()) if any(norm(w) in t for w in NEXT_WORDS): return "next" if any(norm(w) in t for w in DETAIL_WORDS): return "detail" for w in REF_WORDS: wn = norm(w) if (" " in wn and wn in t) or (wn in tw): return "reference" return "new" def _loc_continuation(text): t = norm(text); words = t.split() if len(words) <= 4 and any(norm(c) in t for c in _LOC_CUES): return True return bool(words and words[0] == "في") def normalize_rating(r): try: r = float(r) return round(r/2, 1) if r > 5 else round(r, 1) if r > 0 else 0.0 except: return 0.0 # TIME UTILS def get_cairo_now(): return datetime.now(ZoneInfo("Africa/Cairo")) def parse_time(token): token = norm(token).replace(" ", "") m = re.match(r"^(\d{1,2})(?::(\d{1,2}))?(ص|م|ظهر)?$", token) if not m: return None h = int(m.group(1)); mn = int(m.group(2)) if m.group(2) else 0; suf = m.group(3) if not (0 <= mn <= 59): return None if suf == "ص": if h == 12: h = 0 elif not (1 <= h <= 11): return None elif suf in ("م","ظهر"): if h != 12 and 1 <= h <= 11: h += 12 else: if h == 24: h = 0 elif not (0 <= h <= 23): return None return f"{h:02d}:{mn:02d}" def check_open_now(opening_hours_str): if not opening_hours_str or str(opening_hours_str).strip() in ("","nan","none"): return None text = norm(str(opening_hours_str)) if any(k in text for k in ["24","always","طول اليوم","24/7"]): return 1 sep = re.search(r"(.+?)\s*-\s*(.+)", text) if not sep: return None t1 = parse_time(sep.group(1).strip()); t2 = parse_time(sep.group(2).strip()) if not t1 or not t2: return None now_t = f"{get_cairo_now().hour:02d}:{get_cairo_now().minute:02d}" if t1 <= t2: return 1 if t1 <= now_t <= t2 else 0 return 1 if (now_t >= t1 or now_t <= t2) else 0 # SEARCH + FILTER + RANK def semantic_candidates(query, top_k=20): q_emb = semantic_model.encode(clean_text(query), convert_to_tensor=True) scores = util.cos_sim(q_emb, corpus_embeddings)[0] top_k = min(top_k, len(corpus_df)) top_r = torch.topk(scores, k=top_k) res = corpus_df.iloc[top_r.indices.cpu().numpy()].copy() res["semantic_score"] = top_r.values.cpu().numpy() keep = [c for c in ["place_id","doc_id","name","category","sub_category","location", "address","price_range","opening_hours","description","semantic_score"] if c in res.columns] return res[keep].reset_index(drop=True) def merge_places(df): extra = [c for c in ["lat","lon","rating","phone","social_media","status", "category_clean","sub_category_clean","location_clean", "address_clean","price_range_clean","search_text_clean"] if c in places_df.columns] slim = places_df[["place_id"] + extra].copy() return df.merge(slim, on="place_id", how="left") def apply_filters(df, query, category=None, sub_category=None, location=None, price_range=None, open_now_only=False, min_rating=None): f = df.copy() if category: f = f[f["category_clean"].astype(str).str.contains(re.escape(clean_text(category)), na=False)] if sub_category: f = f[f["sub_category_clean"].astype(str).str.contains(re.escape(clean_text(sub_category)), na=False)] if location: f = f[f["location_clean"].astype(str).str.contains(re.escape(clean_text(location)), na=False)] if price_range: f = f[f["price_range_clean"].astype(str).str.contains(re.escape(clean_text(price_range)), na=False)] f["open_now"] = f["opening_hours"].apply(check_open_now) f["rating_num"] = pd.to_numeric(f.get("rating", pd.Series()), errors="coerce").fillna(0) f["rating_norm"] = f["rating_num"].apply(normalize_rating) f["rating_score"] = f["rating_norm"] / 5.0 f["open_score"] = f["open_now"].apply(lambda x: 1.0 if x==1 else (0.5 if x is None else 0.0)) if open_now_only: f = f[f["open_now"] == 1] if min_rating: f = f[f["rating_norm"] >= min_rating] return f def haversine(lat1, lon1, lat2, lon2): R=6371; p=math.pi/180 a = (math.sin((lat2-lat1)*p/2)**2 + math.cos(lat1*p)*math.cos(lat2*p)*math.sin((lon2-lon1)*p/2)**2) return 2*R*math.asin(math.sqrt(a)) def rank(df, query, user_lat=None, user_lon=None): df = df.copy() if user_lat and user_lon and "lat" in df.columns: def dist(row): try: return haversine(float(user_lat), float(user_lon), float(row["lat"]), float(row["lon"])) except: return 999 df["distance_km"] = df.apply(dist, axis=1) mx = df["distance_km"].replace(999, np.nan).max() or 1 df["distance_score"] = 1 - (df["distance_km"] / (mx + 1)) else: df["distance_km"] = 999; df["distance_score"] = 0.0 q_clean = clean_text(query) df["name_match_score"] = df["name"].apply( lambda n: 1.0 if clean_text(str(n)) in q_clean or q_clean in clean_text(str(n)) else 0.0) w = dict(semantic=0.40, rating=0.25, open=0.15, distance=0.10, name=0.10) df["final_score"] = ( w["semantic"]*df.get("semantic_score", pd.Series(0,index=df.index)).fillna(0) + w["rating"] *df.get("rating_score", pd.Series(0,index=df.index)).fillna(0) + w["open"] *df.get("open_score", pd.Series(0,index=df.index)).fillna(0) + w["distance"]*df["distance_score"] + w["name"]*df["name_match_score"] ) return df.sort_values("final_score", ascending=False).reset_index(drop=True) def search_places(query, top_k_final=5, category=None, sub_category=None, location=None, price_range=None, open_now_only=False, min_rating=None, user_lat=None, user_lon=None): cands = semantic_candidates(query, top_k=20) merged = merge_places(cands) for attempt in [ dict(category=category, sub_category=sub_category, location=location, price_range=price_range, open_now_only=open_now_only, min_rating=min_rating), dict(category=category, sub_category=None, location=location, price_range=price_range, open_now_only=open_now_only, min_rating=min_rating), dict(category=category, sub_category=None, location=location, price_range=None, open_now_only=False, min_rating=min_rating), dict(category=category, sub_category=None, location=None, price_range=None, open_now_only=False, min_rating=None), ]: filtered = apply_filters(merged, query, **attempt) if not filtered.empty: break if filtered.empty: return pd.DataFrame() ranked = rank(filtered, query, user_lat, user_lon) keep = [c for c in ["place_id","name","category","sub_category","location","address", "price_range","rating","rating_norm","opening_hours","description", "phone","lat","lon","semantic_score","final_score","open_now"] if c in ranked.columns] return ranked[keep].head(top_k_final).reset_index(drop=True) # RESPONSE TEMPLATES + FORMATTERS RESPONSE_TEMPLATES = { "find_restaurant":[ "🍽️ لقيتلك {name} في {location}. {price_info}{rating_info}{hours_info}", "أنصحك بـ {name} — هتلاقيه في {location}. {price_info}{rating_info}{hours_info}", "في {location} فيه {name}. {description_short}{price_info}{hours_info}", ], "find_pharmacy":[ "💊 {name} في {location}.{hours_info}{rating_info}", "أقرب صيدلية ليك: {name} — {address_info}{hours_info}", ], "find_cafe":[ "☕ {name} في {location}. {price_info}{rating_info}{hours_info}", "جرب {name} — في {location}. {description_short}{hours_info}", ], "find_supermarket":[ "🛒 {name} في {location}.{hours_info}{rating_info}", "أقرب ماركت: {name} — {address_info}{hours_info}", ], "find_housing":[ "🏠 {name} في {location}. {price_info}{description_short}", "فيه {name} في {location}. {price_info}{rating_info}", ], "greeting": ["أهلاً! 😊 أنا بساعدك تلاقي أي مكان في بني سويف. عايز إيه؟", "وعليكم السلام! قولي محتاج إيه — مطعم، صيدلية، كافيه؟", "هلا بيك! محتاج إيه في بني سويف؟ 😊"], "thanks": ["العفو! 😊 في حاجة تانية أساعدك فيها؟","أي خدمة! 😊","بكل سرور! 😊"], "goodbye": ["مع السلامة! 👋","سلامتك! أي وقت محتاج مساعدة أنا هنا.","باي! ربنا يوفقك 😊"], "clarification":["😊 قصدك إيه بالظبط؟","ممكن توضح أكتر؟","تمام! بتدور على إيه بالظبط؟"], "no_result": ["😔 مش لاقي حاجة مناسبة. جرب تغير المنطقة أو تسأل بطريقة تانية.", "معلش، مفيش نتايج. ممكن تحدد المنطقة أو النوع أكتر؟"], "fallback": ["آسف، مش فاهم قصدك. 😊 قولي محتاج إيه — مطعم، صيدلية، كافيه؟", "ممكن تسألني عن أي مكان في بني سويف وأنا هساعدك! 😊"], } def fmt_price(x): p = str(x).strip().lower() if not p or p in ("","nan","none"): return "" m = {"cheap":"الأسعار رخيصة","رخيص":"الأسعار رخيصة","اقتصادي":"الأسعار اقتصادية", "medium":"الأسعار متوسطة","متوسط":"الأسعار متوسطة", "expensive":"الأسعار غالية","غالي":"الأسعار غالية"} for k,v in m.items(): if k in p: return v+". " return f"السعر: {x}. " def fmt_rating(x): try: r = normalize_rating(float(x)); stars = min(round(r), 5) return f"تقييمه {r} {'⭐'*stars}. " if r > 0 else "" except: return "" def fmt_hours(x): h = str(x).strip() if not h or h in ("","nan","none"): return "" if any(k in h.lower() for k in ["24","always","طول اليوم"]): return "مفتوح 24 ساعة. " return f"بيفتح: {h}. " def fmt_addr(address, location): a=str(address).strip(); l=str(location).strip() if a and a not in ("","nan","none"): return f"عنوانه: {a}. " if l and l not in ("","nan","none"): return f"في {l}. " return "" def fmt_desc(x, max_words=12): d = str(x).strip() if not d or d in ("","nan","none"): return "" words = d.split() return (" ".join(words[:max_words])+"...") if len(words)>max_words else d+" " def build_response(place, intent, category=None): if not place: return random.choice(RESPONSE_TEMPLATES["no_result"]) tk = get_template_key(intent, category) reply = random.choice(RESPONSE_TEMPLATES[tk]).format( name = str(place.get("name","")).strip(), location = str(place.get("location","")).strip() or "بني سويف", price_info = fmt_price(place.get("price_range","")), rating_info = fmt_rating(place.get("rating_norm", place.get("rating", 0))), hours_info = fmt_hours(place.get("opening_hours","")), address_info = fmt_addr(place.get("address",""), place.get("location","")), description_short= fmt_desc(place.get("description","")), ) on = place.get("open_now") if on == 1: reply += "\n🟢 مفتوح دلوقتي." elif on == 0: reply += "\n🔴 مغلق دلوقتي." return reply def handle_detail(text, place): if not place: return "مش فاكر إحنا اتكلمنا عن مكان. ممكن تسألني من الأول؟" t = norm(text); name = str(place.get("name","")).strip() if any(w in t for w in ["امتي","امتى","مواعيد","يفتح","تفتح","يقفل"]): st = "🟢 مفتوح" if place.get("open_now")==1 else "🔴 مغلق" return f"⏰ {name} — {fmt_hours(place.get('opening_hours',''))}\n{st} دلوقتي." if any(w in t for w in ["عنوان","فين","وصول","اوصل"]): return f"📍 {name} في {place.get('location','')}.\\nالعنوان: {place.get('address','')}" if any(w in t for w in ["سعر","بكام","تكلف","غالي","رخيص"]): return f"💰 {name} — {fmt_price(place.get('price_range',''))}" if any(w in t for w in ["تقييم","نجوم"]): return f"⭐ {name} — {fmt_rating(place.get('rating_norm', place.get('rating',0)))}" if any(w in t for w in ["رقم","تليفون"]): phone = str(place.get("phone","")).strip() return f"📞 {name} — {phone}" if phone else f"معنديش رقم {name}." return f"📋 {name}:\n{fmt_desc(place.get('description',''), 20)}\n{fmt_hours(place.get('opening_hours',''))}{fmt_rating(place.get('rating_norm',0))}" # PREDICT FUNCTIONS def predict_intent(text, threshold=0.5): override = apply_keyword_override(text) if override: return {"intent": override, "confidence": 1.0} inputs = intent_tokenizer(text, return_tensors="pt", truncation=True, padding=True, max_length=128) with torch.no_grad(): outputs = intent_model(**inputs) probs = torch.softmax(outputs.logits, dim=1) pid = torch.argmax(probs, dim=1).item() conf = probs[0][pid].item() return {"intent": id2intent[pid] if conf >= threshold else "fallback", "confidence": round(conf, 4)} def extract_entities(text, min_score=0.40): raw = ner_pipeline([text])[0]; entities = {} for item in raw: rtype = item["entity_group"].lower().strip() val = re.sub(r"##", "", item["word"].strip()).strip() val = re.sub(r"\s+", " ", val).strip() score = float(item["score"]) if len(val) < 2 or score < min_score: continue mapped = ENTITY_FIELD_MAP.get(rtype, rtype) val_c = clean_text(val) if mapped not in entities or len(val_c) > len(clean_text(entities[mapped])): entities[mapped] = val_c return entities # SESSION class Session: def __init__(self, sid="default"): self.sid = sid; self.history=[]; self.last_intent=None self.last_entities={}; self.last_place=None self.last_results=[]; self.result_pointer=0 self.context_slots={}; self.turns=0 def add(self, user, bot, intent, entities, place, results): self.history.append({"turn":self.turns,"user":user,"bot":bot, "intent":intent,"entities":entities}) if intent and intent not in ("fallback","no_result","out_of_scope"): self.last_intent = intent if intent in SEARCH_INTENTS: self.last_entities = entities if place is not None: self.last_place = place if results: self.last_results=results; self.result_pointer=0 self._slots(entities) self.turns += 1 def _slots(self, ents): for s in ["location","category","sub_category","price"]: v = ents.get(s) if v and str(v).strip(): self.context_slots[s] = str(v).strip() def merge(self, new_ents): merged = dict(self.context_slots) for k,v in new_ents.items(): if v and str(v).strip(): merged[k]=str(v).strip() self._slots(new_ents) return merged # MAIN CHAT def chat(text: str, session: Session, user_lat=None, user_lon=None): result = dict(reply="", intent="", confidence=0.0, entities={}, best_place=None, all_results=[]) if not text or not text.strip(): result.update(reply="الرجاء إدخال سؤال 😊", intent="fallback") session.add("", result["reply"], "fallback", {}, None, []) return result if is_out_of_scope(text): reply = "أنا متخصص في إيجاد الأماكن في بني سويف فقط. 😊\nممكن أساعدك تلاقي مطعم، صيدلية، كافيه، ماركت، أو سكن." result.update(reply=reply, intent="out_of_scope") session.add(text, reply, "out_of_scope", {}, None, []) return result ref = detect_ref_type(text) if ref == "detail" and session.last_place: reply = handle_detail(text, session.last_place) result.update(reply=reply, intent=session.last_intent or "detail", best_place=session.last_place) session.add(text, reply, result["intent"], {}, session.last_place, []) return result if ref == "next" and session.last_results: ptr = session.result_pointer + 1 if ptr < len(session.last_results): session.result_pointer = ptr; nxt = session.last_results[ptr]; session.last_place = nxt reply = build_response(nxt, session.last_intent, category=nxt.get("category")) result.update(reply=reply, intent=session.last_intent, best_place=nxt) else: result.update(reply="😔 مفيش نتايج تانية. عايز أدور من الأول؟", intent="no_result") session.add(text, result["reply"], result["intent"], {}, result["best_place"], []) return result ir = predict_intent(text); intent = ir["intent"]; conf = ir["confidence"] result["intent"] = intent; result["confidence"] = conf if intent in STATIC_INTENTS: result["reply"] = random.choice(RESPONSE_TEMPLATES[get_template_key(intent)]) session.add(text, result["reply"], intent, {}, None, []) return result if intent == "fallback": if session.last_intent in SEARCH_INTENTS and _loc_continuation(text): intent = session.last_intent; result["intent"] = intent else: result["reply"] = random.choice(RESPONSE_TEMPLATES["fallback"]) session.add(text, result["reply"], "fallback", {}, None, []) return result if intent not in SEARCH_INTENTS: result["reply"] = random.choice(RESPONSE_TEMPLATES.get(get_template_key(intent), RESPONSE_TEMPLATES["fallback"])) session.add(text, result["reply"], intent, {}, None, []) return result ents = extract_entities(text); result["entities"] = ents merged = session.merge(ents) category = normalize_category(merged.get("category") or INTENT_TO_CATEGORY.get(intent) or infer_category(text)) sub_cat = merged.get("sub_category") location = merged.get("location") price_range = merged.get("price") open_only = ("open_now" in intent or "place_details" in intent) df = search_places(text, top_k_final=5, category=category, sub_category=sub_cat, location=location, price_range=price_range, open_now_only=open_only, user_lat=user_lat, user_lon=user_lon) if df.empty: cl = CLARIFICATION_Q.get(intent, "") reply = random.choice(RESPONSE_TEMPLATES["no_result"]) + (f"\n\n💬 {cl}" if cl else "") result.update(reply=reply, intent="no_result") session.add(text, reply, "no_result", ents, None, []) return result all_res = df.to_dict(orient="records"); best = all_res[0] reply = build_response(best, intent, category=category) if len(all_res) > 1: reply += f"\n\n💬 فيه {len(all_res)} نتيجة — قولي 'تاني' لو عايز غيره." result.update(reply=reply, best_place=best, all_results=all_res) session.add(text, reply, intent, ents, best, all_res) return result @asynccontextmanager async def lifespan(app: FastAPI): global intent_tokenizer, intent_model, label_encoder, id2intent global ner_pipeline, label2id, id2label global semantic_model, corpus_df, corpus_embeddings, places_df print("⏳ Downloading models from HuggingFace …") # تحميل الموديلز من HuggingFace Model Hub intent_local = snapshot_download(INTENT_REPO) entity_local = snapshot_download(ENTITY_REPO) semantic_local = snapshot_download(SEMANTIC_REPO) print("⏳ Loading Intent model …") intent_tokenizer = AutoTokenizer.from_pretrained(intent_local) intent_model = AutoModelForSequenceClassification.from_pretrained(intent_local) label_encoder = joblib.load(os.path.join(intent_local, "label_encoder.pkl")) id2intent = {i: lbl for i, lbl in enumerate(label_encoder.classes_)} intent_model.eval() print("⏳ Loading Entity model …") with open(os.path.join(entity_local, "label2id.json"), encoding="utf-8") as f: label2id = json.load(f) with open(os.path.join(entity_local, "id2label.json"), encoding="utf-8") as f: id2label = json.load(f) etok = AutoTokenizer.from_pretrained(entity_local) emod = AutoModelForTokenClassification.from_pretrained(entity_local) ner_pipeline = pipeline("token-classification", model=emod, tokenizer=etok, aggregation_strategy="first") print("⏳ Loading Semantic model …") semantic_model = SentenceTransformer("Youmnaaaa/semantic-search-ff") from huggingface_hub import hf_hub_download pkl_path = hf_hub_download( repo_id="Youmnaaaa/semantic-search-ff", filename="semantic_data.pkl" ) with open(pkl_path, "rb") as f: sd = pickle.load(f) corpus_df = sd["corpus_df"] corpus_embeddings = sd["corpus_embeddings"] places_df = pd.read_excel(PLACES_FILE) for col in ["place_id","name","category","sub_category","location","address", "price_range","rating","opening_hours","description","lat","lon"]: if col not in places_df.columns: places_df[col] = "" places_df = places_df.fillna("") places_df["category_clean"] = places_df["category"].apply(clean_text) places_df["sub_category_clean"] = places_df["sub_category"].apply(clean_text) places_df["location_clean"] = places_df["location"].apply(clean_text) places_df["address_clean"] = places_df["address"].apply(clean_text) places_df["price_range_clean"] = places_df["price_range"].apply(clean_text) places_df["description_clean"] = places_df["description"].apply(clean_text) places_df["search_text_clean"] = ( places_df["name"].astype(str)+" "+places_df["category"].astype(str)+" "+ places_df["sub_category"].astype(str)+" "+places_df["location"].astype(str)+" "+ places_df["description"].astype(str) ).apply(clean_text) print("✅ All models loaded!") yield print("Shutting down.") # FASTAPI app = FastAPI(title="Beni Suef Chatbot API", version="1.0.0", lifespan=lifespan) app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"]) class ChatRequest(BaseModel): message: str session_id: str = "default" user_lat: Optional[float] = None user_lon: Optional[float] = None class ChatResponse(BaseModel): reply: str intent: str confidence: float entities: dict session_id: str best_place: Optional[dict] = None @app.get("/") def root(): return {"status": "ok", "message": "Beni Suef Chatbot is running 🚀"} @app.get("/health") def health(): return {"status": "healthy", "models_loaded": all([intent_model, ner_pipeline, semantic_model, places_df is not None])} @app.post("/chat", response_model=ChatResponse) def chat_endpoint(req: ChatRequest): if req.session_id not in SESSIONS: SESSIONS[req.session_id] = Session(req.session_id) session = SESSIONS[req.session_id] try: result = chat(req.message, session, req.user_lat, req.user_lon) except Exception as e: raise HTTPException(status_code=500, detail=str(e)) best = result.get("best_place") if best: best = {k: (float(v) if isinstance(v, (np.floating, np.integer)) else (None if (isinstance(v, float) and np.isnan(v)) else v)) for k, v in best.items() if k in ["place_id","name","category","sub_category","location","address", "price_range","rating","opening_hours","description","phone", "lat","lon","open_now","final_score"]} return ChatResponse(reply=result["reply"], intent=result["intent"], confidence=result["confidence"], entities=result["entities"], session_id=req.session_id, best_place=best) @app.delete("/session/{session_id}") def reset_session(session_id: str): SESSIONS.pop(session_id, None) return {"status": "reset", "session_id": session_id}