Spaces:
Sleeping
Sleeping
| """End-to-end multilingual chatbot pipeline (with generative RAG). | |
| Architecture: | |
| user_text | |
| -> Language Detector (AR / CS / EN / FR) [CPU, DistilBERT] | |
| -> Intent Classifier (booking / complaint / ...) [CPU, DistilBERT] | |
| -> NER (PER / LOC / ORG / DATE) [CPU, DistilBERT] | |
| -> Branch on intent: | |
| greeting / farewell -> canned reply (deterministic, no LLM call) | |
| inquiry / booking / complaint -> RAG retrieve top-3 (same-lang preferred) | |
| + Qwen2.5-0.5B-Instruct generates reply | |
| other -> Qwen2.5-0.5B-Instruct answers from general knowledge (no retrieval) | |
| Why this layout: the GTX 1650 has 4 GB VRAM. Putting the small DistilBERT | |
| classifiers on CPU is essentially free (each <100 ms per turn) and frees the | |
| GPU for the generator, which is the bottleneck. | |
| Models loaded: | |
| models/lang_detector/ (DistilBERT, 4 classes) [CPU] | |
| models/intent_classifier/ (DistilBERT, 6 classes) [CPU] | |
| models/ner_model/ (DistilBERT TokenClassification) [CPU] | |
| models/rag/faiss.index (FAISS IndexFlatIP, 36 x 384) [CPU] | |
| paraphrase-multilingual-MiniLM-L12-v2 [CPU] | |
| Qwen/Qwen2.5-0.5B-Instruct (causal LM, fp16 on GPU) [GPU] | |
| Usage: | |
| python src/chatbot.py "How do I book a flight?" | |
| python src/chatbot.py "كيف أحجز فندقاً في باريس يوم 15 يونيو؟" | |
| python src/chatbot.py --interactive | |
| python src/chatbot.py --json "Hello there!" | |
| """ | |
| from __future__ import annotations | |
| import argparse | |
| import json | |
| import sys | |
| from dataclasses import dataclass, field, asdict | |
| from pathlib import Path | |
| from threading import Thread | |
| from typing import Any, Iterator | |
| import faiss | |
| import numpy as np | |
| import torch | |
| from sentence_transformers import SentenceTransformer | |
| from transformers import ( | |
| AutoModelForCausalLM, | |
| AutoModelForSequenceClassification, | |
| AutoModelForTokenClassification, | |
| AutoTokenizer, | |
| TextIteratorStreamer, | |
| pipeline, | |
| ) | |
| PROJECT_ROOT = Path(__file__).resolve().parent.parent | |
| def _model_id(local_path: Path, hub_id: str) -> str: | |
| """Return local path if the model is trained locally, else the HF Hub repo id. | |
| Lets the same code run unchanged in dev (local weights) and in deployed | |
| environments like HF Spaces (weights pulled from the Hub). | |
| """ | |
| return str(local_path) if local_path.exists() and any(local_path.iterdir()) else hub_id | |
| LANG_DIR = _model_id( | |
| PROJECT_ROOT / "models" / "lang_detector", | |
| "momenalhamza/multilingual-chatbot-lang-detector", | |
| ) | |
| INTENT_DIR = _model_id( | |
| PROJECT_ROOT / "models" / "intent_classifier", | |
| "momenalhamza/multilingual-chatbot-intent", | |
| ) | |
| NER_DIR = _model_id( | |
| PROJECT_ROOT / "models" / "ner_model", | |
| "momenalhamza/multilingual-chatbot-ner", | |
| ) | |
| RAG_DIR = PROJECT_ROOT / "models" / "rag" # small (~80 KB), always shipped with the repo | |
| EMBED_MODEL = "sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2" | |
| GEN_MODEL = "Qwen/Qwen2.5-0.5B-Instruct" | |
| LANG_NAME = {"AR": "Arabic", "EN": "English", "FR": "French", | |
| "CS": "Code-Switching (Arabic-English mix)"} | |
| CANNED: dict[str, dict[str, str]] = { | |
| "greeting": { | |
| "AR": "مرحباً! كيف يمكنني مساعدتك اليوم؟", | |
| "EN": "Hello! How can I help you today?", | |
| "FR": "Bonjour ! Comment puis-je vous aider aujourd'hui ?", | |
| "CS": "Hello! How can I help you today?", | |
| }, | |
| "farewell": { | |
| "AR": "إلى اللقاء! يومٌ سعيد.", | |
| "EN": "Goodbye! Have a great day.", | |
| "FR": "Au revoir ! Bonne journée.", | |
| "CS": "Goodbye! Have a great day.", | |
| }, | |
| "fallback": { | |
| "AR": "عذراً، لم أفهم طلبك. هل يمكنك إعادة صياغته؟", | |
| "EN": "Sorry, I didn't understand. Could you rephrase?", | |
| "FR": "Désolé, je n'ai pas compris. Pourriez-vous reformuler ?", | |
| "CS": "Sorry, I didn't understand. Could you rephrase?", | |
| }, | |
| } | |
| class Turn: | |
| """One chatbot exchange. Serializable to JSON for the UI/logs.""" | |
| text: str | |
| language: str | |
| intent: str | |
| intent_confidence: float | |
| entities: list[dict] | |
| reply: str | |
| retrieved: list[dict] = field(default_factory=list) | |
| generator_used: bool = False | |
| class Chatbot: | |
| """Holds all models. Thread-unsafe; create one per process.""" | |
| def __init__(self, gen_max_new_tokens: int = 200) -> None: | |
| gpu_available = torch.cuda.is_available() | |
| gen_device = "cuda" if gpu_available else "cpu" | |
| print(f"[chatbot] GPU available: {gpu_available} -> generator on {gen_device}, " | |
| "classifiers on CPU") | |
| # Classifiers on CPU (free up VRAM for Qwen). | |
| print("[chatbot] loading language detector (CPU) ...") | |
| self.lang_pipe = pipeline( | |
| "text-classification", | |
| model=AutoModelForSequenceClassification.from_pretrained(str(LANG_DIR)), | |
| tokenizer=AutoTokenizer.from_pretrained(str(LANG_DIR)), | |
| device=-1, top_k=None, | |
| ) | |
| print("[chatbot] loading intent classifier (CPU) ...") | |
| self.intent_pipe = pipeline( | |
| "text-classification", | |
| model=AutoModelForSequenceClassification.from_pretrained(str(INTENT_DIR)), | |
| tokenizer=AutoTokenizer.from_pretrained(str(INTENT_DIR)), | |
| device=-1, top_k=None, | |
| ) | |
| print("[chatbot] loading NER model (CPU) ...") | |
| self.ner_pipe = pipeline( | |
| "token-classification", | |
| model=AutoModelForTokenClassification.from_pretrained(str(NER_DIR)), | |
| tokenizer=AutoTokenizer.from_pretrained(str(NER_DIR)), | |
| device=-1, aggregation_strategy="simple", | |
| ) | |
| # RAG (sentence embedder + FAISS) on CPU; embedding 1 query is sub-100ms there. | |
| print("[chatbot] loading RAG embedder (CPU) ...") | |
| self.embed_model = SentenceTransformer(EMBED_MODEL, device="cpu") | |
| self.faiss_index = faiss.read_index(str(RAG_DIR / "faiss.index")) | |
| self.metadata: list[dict] = json.loads((RAG_DIR / "metadata.json").read_text()) | |
| # Generator on GPU (fp16) if available. | |
| print(f"[chatbot] loading generator {GEN_MODEL} on {gen_device} (fp16) ...") | |
| self.gen_tokenizer = AutoTokenizer.from_pretrained(GEN_MODEL) | |
| self.gen_model = AutoModelForCausalLM.from_pretrained( | |
| GEN_MODEL, | |
| torch_dtype=torch.float16 if gpu_available else torch.float32, | |
| device_map=gen_device, | |
| ) | |
| self.gen_model.eval() | |
| self.gen_max_new_tokens = gen_max_new_tokens | |
| if self.gen_tokenizer.pad_token_id is None: | |
| self.gen_tokenizer.pad_token_id = self.gen_tokenizer.eos_token_id | |
| print(f"[chatbot] ready (KB rows: {len(self.metadata)})") | |
| # ---- individual stages ------------------------------------------------- | |
| def detect_language(self, text: str) -> tuple[str, float]: | |
| scores = self.lang_pipe(text)[0] | |
| best = max(scores, key=lambda s: s["score"]) | |
| return best["label"], float(best["score"]) | |
| def classify_intent(self, text: str) -> tuple[str, float]: | |
| scores = self.intent_pipe(text)[0] | |
| best = max(scores, key=lambda s: s["score"]) | |
| return best["label"], float(best["score"]) | |
| def extract_entities(self, text: str) -> list[dict]: | |
| raw = self.ner_pipe(text) | |
| out = [] | |
| for ent in raw: | |
| out.append({ | |
| "text": ent["word"], | |
| "type": ent["entity_group"], | |
| "start": int(ent["start"]), | |
| "end": int(ent["end"]), | |
| "score": float(ent["score"]), | |
| }) | |
| return out | |
| def retrieve(self, query: str, prefer_lang: str | None = None, | |
| top_k: int = 3) -> list[dict]: | |
| q = self.embed_model.encode( | |
| [query], normalize_embeddings=True, convert_to_numpy=True | |
| ).astype("float32") | |
| scores, ids = self.faiss_index.search(q, max(top_k * 2, 6)) | |
| hits = [] | |
| for s, idx in zip(scores[0], ids[0]): | |
| if idx < 0: | |
| continue | |
| row = dict(self.metadata[idx]) | |
| row["score"] = float(s) | |
| hits.append(row) | |
| if prefer_lang: | |
| same = [h for h in hits if h["language"] == prefer_lang] | |
| other = [h for h in hits if h["language"] != prefer_lang] | |
| hits = same + other | |
| return hits[:top_k] | |
| # ---- generation -------------------------------------------------------- | |
| def _build_messages(self, user_text: str, lang: str, | |
| retrieved: list[dict]) -> list[dict]: | |
| """Build the chat-template messages for Qwen.""" | |
| lang_name = LANG_NAME.get(lang, lang) | |
| system = ( | |
| "You are a helpful, concise customer-service assistant for a travel and " | |
| "booking company.\n\n" | |
| f"REPLY RULES (follow strictly):\n" | |
| f"- Reply ONLY in {lang_name}. Do not mix languages.\n" | |
| "- Keep replies to 1–3 sentences.\n" | |
| "- Write the reply directly. Never start with labels like " | |
| "'A:', 'Answer:', 'Reply:', or any prefix.\n" | |
| "- If reference information is provided, use it to inform your " | |
| "answer but rephrase naturally; do not copy verbatim.\n" | |
| "- If the references do not cover the question, answer briefly from " | |
| "general knowledge." | |
| ) | |
| if retrieved: | |
| ref_lines = ["Reference information from our knowledge base:"] | |
| for r in retrieved: | |
| ref_lines.append(f"- {r['answer']}") | |
| user = "\n".join(ref_lines) + f"\n\nThe customer says: {user_text}" | |
| else: | |
| user = user_text | |
| return [ | |
| {"role": "system", "content": system}, | |
| {"role": "user", "content": user}, | |
| ] | |
| def _strip_label_prefix(text: str) -> str: | |
| """Remove stray 'A:'/'Answer:'/etc. prefixes the model might emit.""" | |
| text = text.lstrip() | |
| for prefix in ("A:", "a:", "Answer:", "Reply:", "Response:", | |
| "الإجابة:", "الجواب:", "Réponse:"): | |
| if text.startswith(prefix): | |
| return text[len(prefix):].lstrip() | |
| return text | |
| def generate_reply(self, user_text: str, lang: str, | |
| retrieved: list[dict]) -> str: | |
| """Synchronous generation (used by CLI / non-streaming callers).""" | |
| return "".join(self._stream_generate(user_text, lang, retrieved)) | |
| def _stream_generate(self, user_text: str, lang: str, | |
| retrieved: list[dict]) -> Iterator[str]: | |
| """Yield generated text in chunks as it's produced. Threaded internally | |
| so the generator can drive a UI loop without blocking.""" | |
| messages = self._build_messages(user_text, lang, retrieved) | |
| prompt = self.gen_tokenizer.apply_chat_template( | |
| messages, tokenize=False, add_generation_prompt=True, | |
| ) | |
| inputs = self.gen_tokenizer(prompt, return_tensors="pt").to(self.gen_model.device) | |
| streamer = TextIteratorStreamer( | |
| self.gen_tokenizer, skip_prompt=True, skip_special_tokens=True, | |
| timeout=60.0, | |
| ) | |
| gen_kwargs = dict( | |
| **inputs, | |
| max_new_tokens=self.gen_max_new_tokens, | |
| do_sample=True, | |
| temperature=0.7, | |
| top_p=0.9, | |
| repetition_penalty=1.1, | |
| pad_token_id=self.gen_tokenizer.pad_token_id, | |
| streamer=streamer, | |
| ) | |
| thread = Thread(target=self.gen_model.generate, kwargs=gen_kwargs) | |
| thread.start() | |
| try: | |
| for chunk in streamer: | |
| yield chunk | |
| finally: | |
| thread.join() | |
| # ---- end-to-end -------------------------------------------------------- | |
| def respond(self, text: str) -> Turn: | |
| """Blocking response — returns final Turn after full generation.""" | |
| last: Turn | None = None | |
| for turn in self.respond_stream(text): | |
| last = turn | |
| assert last is not None | |
| return last | |
| def respond_stream(self, text: str) -> Iterator[Turn]: | |
| """Streaming response — yields a Turn each time the reply grows. | |
| For greeting/farewell, yields exactly one Turn with the canned reply. | |
| For LLM-generated replies, yields once per token chunk so the UI can | |
| show progressive text. | |
| """ | |
| text = text.strip() | |
| if not text: | |
| yield Turn(text="", language="EN", intent="other", | |
| intent_confidence=0.0, entities=[], | |
| reply=CANNED["fallback"]["EN"]) | |
| return | |
| language, _ = self.detect_language(text) | |
| intent, intent_conf = self.classify_intent(text) | |
| entities = self.extract_entities(text) | |
| if intent in ("greeting", "farewell"): | |
| yield Turn( | |
| text=text, language=language, intent=intent, | |
| intent_confidence=intent_conf, entities=entities, | |
| reply=CANNED[intent].get(language, CANNED[intent]["EN"]), | |
| retrieved=[], generator_used=False, | |
| ) | |
| return | |
| retrieved: list[dict] = [] | |
| if intent in ("inquiry", "booking", "complaint"): | |
| retrieved = self.retrieve(text, prefer_lang=language, top_k=3) | |
| # else: "other" — no retrieval, model uses general knowledge. | |
| running = "" | |
| for chunk in self._stream_generate(text, language, retrieved): | |
| running += chunk | |
| yield Turn( | |
| text=text, language=language, intent=intent, | |
| intent_confidence=intent_conf, entities=entities, | |
| reply=self._strip_label_prefix(running), | |
| retrieved=retrieved, generator_used=True, | |
| ) | |
| # Edge case: nothing generated at all. | |
| if not running.strip(): | |
| yield Turn( | |
| text=text, language=language, intent=intent, | |
| intent_confidence=intent_conf, entities=entities, | |
| reply=CANNED["fallback"].get(language, CANNED["fallback"]["EN"]), | |
| retrieved=retrieved, generator_used=True, | |
| ) | |
| def _print_turn(turn: Turn) -> None: | |
| print(f" language : {turn.language}") | |
| print(f" intent : {turn.intent} ({turn.intent_confidence:.4f})") | |
| if turn.entities: | |
| print(" entities :") | |
| for e in turn.entities: | |
| print(f" [{e['type']}] {e['text']!r} (score={e['score']:.3f})") | |
| else: | |
| print(" entities : (none)") | |
| if turn.retrieved: | |
| print(f" retrieved (top {len(turn.retrieved)}):") | |
| for r in turn.retrieved: | |
| print(f" {r['score']:.3f} [{r['language']}/{r['topic']}] {r['question']}") | |
| print(f" reply : {turn.reply}") | |
| print(f" source : {'LLM (Qwen2.5-0.5B)' if turn.generator_used else 'canned'}") | |
| def main() -> int: | |
| parser = argparse.ArgumentParser(description=__doc__.split("\n")[0]) | |
| parser.add_argument("text", nargs="?", default=None, | |
| help="Single user message. If omitted, use --interactive.") | |
| parser.add_argument("--interactive", action="store_true", | |
| help="REPL mode: read messages from stdin until Ctrl-D.") | |
| parser.add_argument("--json", action="store_true", | |
| help="Print one JSON object per turn instead of pretty text.") | |
| args = parser.parse_args() | |
| if not args.text and not args.interactive: | |
| parser.error("provide a message or use --interactive") | |
| bot = Chatbot() | |
| def handle(msg: str) -> None: | |
| turn = bot.respond(msg) | |
| if args.json: | |
| print(json.dumps(asdict(turn), ensure_ascii=False)) | |
| else: | |
| print(f"\n> {msg}") | |
| _print_turn(turn) | |
| if args.text: | |
| handle(args.text) | |
| if args.interactive: | |
| print("\n[interactive mode — Ctrl-D / Ctrl-C to exit]") | |
| try: | |
| while True: | |
| msg = input("> ").strip() | |
| if not msg: | |
| continue | |
| handle(msg) | |
| except (EOFError, KeyboardInterrupt): | |
| print("\nbye.") | |
| return 0 | |
| if __name__ == "__main__": | |
| sys.exit(main()) | |