Spaces:
Running
Running
| """ | |
| ============================================================= | |
| BAL Chatbot β Step 2: Chat Engine (RAG + LLM) | |
| Usage: python scripts/02_chatbot.py | |
| ============================================================= | |
| This script: | |
| 1. Uses Groq as the only LLM provider | |
| 2. Loads FAISS index and chunk metadata | |
| 3. Converts the user question into an embedding | |
| 4. Retrieves the most relevant chunks (retrieval β done ONCE per query) | |
| 5. Sends the augmented prompt to Groq | |
| 6. Displays the response in the terminal | |
| ============================================================= | |
| Prerequisites: | |
| - A valid Groq API key in the GROQ_API_KEY environment variable | |
| - 01_build_vectorstore.py must have been run | |
| ============================================================= | |
| """ | |
| import os | |
| import sys | |
| import json | |
| import time | |
| import logging | |
| from pathlib import Path | |
| from typing import List, Dict | |
| import numpy as np | |
| import faiss | |
| import requests | |
| from sentence_transformers import SentenceTransformer | |
| from dotenv import load_dotenv | |
| PROJECT_ROOT = Path(__file__).resolve().parents[1] | |
| load_dotenv(PROJECT_ROOT / ".env") | |
| def get_groq_api_key() -> str: | |
| """Read the Groq API key from the project .env/environment.""" | |
| return os.getenv("GROQ_API_KEY") or os.getenv("GROQ_API_Key") or "" | |
| # ββ Logging βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| logging.basicConfig( | |
| level=logging.WARNING, | |
| format="%(asctime)s [%(levelname)s] %(message)s", | |
| handlers=[logging.FileHandler("logs/chatbot.log", encoding="utf-8")], | |
| ) | |
| log = logging.getLogger(__name__) | |
| # ββ Configuration βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| CONFIG = { | |
| # Vector database paths | |
| "faiss_index_file": str(PROJECT_ROOT / "data" / "bal_faiss.index"), | |
| "chunks_meta_file": str(PROJECT_ROOT / "data" / "bal_chunks.json"), | |
| # Embedding model (MUST match 01_build_vectorstore.py) | |
| "embedding_model": "intfloat/multilingual-e5-small", | |
| # How many chunks to retrieve per query (top-k) | |
| "retrieval_top_k": 5, | |
| # Minimum relevance score threshold β chunks below this are discarded | |
| "retrieval_score_threshold": 0.35, | |
| # ββ Groq backend settings ββββββββββββββββββββββββββββββββββββββββββββββββ | |
| "groq_url": "https://api.groq.com/openai/v1/chat/completions", | |
| "groq_model": "llama-3.3-70b-versatile", | |
| "groq_api_key": get_groq_api_key(), | |
| "groq_timeout": 120, # seconds | |
| # ββ LLM generation parameters ββββββββββββββββββββββββββββββββββββββββββββ | |
| "llm_temperature": 0.3, # lower = more consistent | |
| "llm_max_tokens": 1024, | |
| "llm_top_p": 0.9, | |
| # Conversation history β how many previous turns to keep in context | |
| "max_history_turns": 6, | |
| } | |
| # ββ System Prompt βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| SYSTEM_PROMPT = """Sen BAL Asistan'sΔ±n β Bornova Anadolu Lisesi'nin yapay zeka asistanΔ±. BAL Yapay Zeka TopluluΔu tarafΔ±ndan geliΕtirildin. | |
| ## GΓREV | |
| ΓΔrencilere, velilere ve meraklΔ±lara BAL hakkΔ±nda doΔru, kΔ±sa ve samimi bilgi vermek. | |
| ## TON VE ΓSLUP | |
| - KΔ±sa ve net konuΕ. Dolgu cΓΌmlesi yok: "UmarΔ±m yardΔ±mcΔ± olur", "sormaktan Γ§ekinmeyin", "tabii ki" gibi kalΔ±plarΔ± kullanma. | |
| - Samimi ve doΔal ol β ne aΕΔ±rΔ± resmi ne aΕΔ±rΔ± neΕeli. | |
| - GerekmedikΓ§e liste yapma; soruyu doΔrudan yanΔ±tla. | |
| - Selamlama, teΕekkΓΌr, vedaya zaman harcama β direkt konuya gir. | |
| - Telefon numarasΔ±, URL gibi somut verileri ASLA deΔiΕtirme veya uydurma. | |
| BaΔlamda yazan bilgiyi olduΔu gibi kullan. | |
| - TΓΌrkΓ§e yaz, Δ°ngilizce kelime karΔ±ΕtΔ±rma. | |
| ## BΔ°LGΔ° KAPSAMI | |
| YalnΔ±zca Εu konularda bilgi ver: | |
| - Okul tarihi, bΓΆlΓΌmler, eΔitim yapΔ±sΔ± | |
| - LGS taban puanlarΔ± ve yerleΕtirme | |
| - KampΓΌs olanaklarΔ± (laboratuvar, spor salonu, kΓΌtΓΌphane, pansiyon vb.) | |
| - Okul kΓΌltΓΌrΓΌ (BAL Ruhu, Ayran GΓΌnΓΌ, marΕ, mΓΌzik geleneΔi) | |
| - KulΓΌpler ve topluluklar (tiyatro, fotoΔraf, BAL Radyo, BALspor, Ultimate Frizbi vb.) | |
| - UluslararasΔ± programlar (PASCH, eTwinning, DSD, AP) | |
| - BALEV burslarΔ±, BALMED, Bi'BALlΔ± mentorlΓΌk | |
| - KayΔ±t, nakil, devamsΔ±zlΔ±k, pansiyon | |
| - UlaΕΔ±m ve iletiΕim bilgileri | |
| ## KAYNAK KULLANIMI | |
| Verilen baΔlam (RAG) birincil kaynaΔΔ±n. BaΔlamda varsa oradan cevap ver. BaΔlamda yoksa Εunu sΓΆyle: "Bu konuda kesin bilgim yok, okul idaresiyle teyit etmeni ΓΆneririm." β Asla uydurma. | |
| ## SINIRLAR | |
| - Okul dΔ±ΕΔ± konular (politika, genel haberler, kiΕisel tavsiye vb.): "Bu konuda yardΔ±mcΔ± olamam, BAL hakkΔ±nda bir sorun var mΔ±?" de ve geΓ§. | |
| - Bireysel ΓΆΔrenci verisi (not, devamsΔ±zlΔ±k durumu, sΔ±nΔ±f listesi): "Bu bilgilere eriΕimim yok, okul idaresiyle iletiΕime geΓ§." de. | |
| - "Seni kim yaptΔ± / sen ne dΓΌΕΓΌnΓΌyorsun / sen kimsin": BAL Yapay Zeka TopluluΔu tarafΔ±ndan geliΕtirildiΔini sΓΆyle, fazla uzatma. | |
| ## ASLA YAZMA: | |
| - "baΔlamΔ± kontrol etmem gerekiyor" | |
| - "baΔlamda bilgi var/yok" | |
| - "baΔlamΔ± inceliyorum" | |
| - "soruyu cevaplamak iΓ§in" | |
| - "umarΔ±m yardΔ±mcΔ± olur" | |
| - "sormaktan Γ§ekinmeyin" | |
| - Direkt cevap ver. Bu kadar. | |
| ## ΓZEL DURUMLAR | |
| - Hakaret veya uygunsuz dil: Tek cΓΌmleyle kibarca uyar ve konuya dΓΆn. | |
| - Belirsiz soru: Ne sorduΔunu tek cΓΌmleyle sor. | |
| - Bilgi baΔlamda iΕaretliyse gΓΌncel olmayabilir: "Kesin bilgi iΓ§in okul idaresiyle teyit et" ekini koy β ama bunu her cevaba yapΔ±ΕtΔ±rma, sadece gerΓ§ekten gerektiΔinde yaz. | |
| ## YARDIMCI LΔ°NKLER (yalnΔ±zca sorulduΔunda ya da doΔrudan ilgiliyse ver) | |
| - Okul sitesi: izmirbal.meb.k12.tr | |
| - BALEV: balev.org.tr | |
| - BALMED: balmed.org.tr | |
| """ | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # 1. Vector Store | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class VectorStore: | |
| """Manages the FAISS vector database and chunk metadata.""" | |
| def __init__(self, index_path: str, chunks_path: str, model_name: str): | |
| # Load FAISS index | |
| if not Path(index_path).exists(): | |
| raise FileNotFoundError( | |
| f"FAISS index not found: {index_path}\n" | |
| "Run '01_build_vectorstore.py' first." | |
| ) | |
| self.index = faiss.read_index(index_path) | |
| # Load chunk metadata | |
| with open(chunks_path, "r", encoding="utf-8") as f: | |
| self.chunks: List[Dict] = json.load(f) | |
| # Load embedding model | |
| self.model = SentenceTransformer(model_name) | |
| print(f" β Vector store loaded ({self.index.ntotal} chunks)") | |
| def retrieve(self, query: str, top_k: int = 5) -> List[Dict]: | |
| """ | |
| Returns the top-k most relevant chunks for the given query. | |
| E5 model requires the 'query:' prefix for queries. | |
| """ | |
| query_text = f"query: {query}" | |
| embedding = self.model.encode( | |
| [query_text], | |
| normalize_embeddings=True, | |
| convert_to_numpy=True, | |
| ).astype("float32") | |
| scores, indices = self.index.search(embedding, top_k) | |
| results = [] | |
| for score, idx in zip(scores[0], indices[0]): | |
| if idx == -1: # FAISS sometimes returns -1 for empty slots | |
| continue | |
| chunk = self.chunks[idx].copy() | |
| chunk["relevance_score"] = float(score) | |
| results.append(chunk) | |
| return results | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # 2. Context Formatting | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def format_context(retrieved_chunks: List[Dict], score_threshold: float = 0.35) -> str: | |
| """ | |
| Builds the context string that is injected into the LLM prompt. | |
| Chunks below the score threshold are skipped to reduce noise. | |
| """ | |
| if not retrieved_chunks: | |
| return "BaΔlamda ilgili bilgi bulunamadΔ±." | |
| context_parts = [] | |
| for chunk in retrieved_chunks: | |
| score = chunk.get("relevance_score", 0) | |
| if score < score_threshold: | |
| log.debug(f"Low-score chunk skipped: score={score:.3f}") | |
| continue | |
| breadcrumb = chunk.get("breadcrumb", "") | |
| text = chunk.get("text", "") | |
| context_parts.append(f"[Kaynak: {breadcrumb}]\n{text}") | |
| if not context_parts: | |
| return "BaΔlamda yeterince ilgili bilgi bulunamadΔ±." | |
| return "\n\n---\n\n".join(context_parts) | |
| def build_augmented_user_message(user_input: str, context: str) -> str: | |
| """Wraps user input with the retrieved RAG context.""" | |
| return ( | |
| f"## Δ°lgili BaΔlam (Okul Bilgi KaynaΔΔ±)\n\n" | |
| f"{context}\n\n" | |
| f"---\n\n" | |
| f"## KullanΔ±cΔ± Sorusu\n\n{user_input}" | |
| ) | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # 3. Groq Backend | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def query_groq(messages: List[Dict], config: Dict) -> str: | |
| """ | |
| Sends a streaming chat request to Groq and prints tokens as they arrive. | |
| Returns the full response text. | |
| """ | |
| headers = { | |
| "Authorization": f"Bearer {config['groq_api_key']}", | |
| "Content-Type": "application/json", | |
| } | |
| payload = { | |
| "model": config["groq_model"], | |
| "messages": messages, | |
| "stream": True, | |
| "temperature": config["llm_temperature"], | |
| "max_tokens": config["llm_max_tokens"], | |
| "top_p": config["llm_top_p"], | |
| } | |
| full_response = "" | |
| try: | |
| with requests.post( | |
| config["groq_url"], | |
| headers=headers, | |
| json=payload, | |
| stream=True, | |
| timeout=config["groq_timeout"], | |
| ) as resp: | |
| resp.raise_for_status() | |
| print("\n\033[94mBAL Asistan:\033[0m ", end="", flush=True) | |
| for raw_line in resp.iter_lines(): | |
| if not raw_line: | |
| continue | |
| line = raw_line.decode("utf-8") | |
| if not line.startswith("data: "): | |
| continue | |
| data_text = line[6:].strip() | |
| if data_text == "[DONE]": | |
| print() | |
| break | |
| try: | |
| data = json.loads(data_text) | |
| except json.JSONDecodeError: | |
| continue | |
| delta = data.get("choices", [{}])[0].get("delta", {}) | |
| token = delta.get("content", "") | |
| if token: | |
| print(token, end="", flush=True) | |
| full_response += token | |
| except requests.exceptions.ConnectionError: | |
| full_response = "Groq API baΔlantΔ±sΔ± kurulamadΔ±. LΓΌtfen daha sonra tekrar deneyin." | |
| print(f"\n\033[91m{full_response}\033[0m") | |
| except requests.exceptions.Timeout: | |
| full_response = "Groq API zaman aΕΔ±mΔ±na uΔradΔ±. LΓΌtfen tekrar deneyin." | |
| print(f"\n\033[91m{full_response}\033[0m") | |
| except requests.exceptions.HTTPError as e: | |
| status_code = e.response.status_code if e.response is not None else "?" | |
| full_response = f"Groq API hatasΔ±: HTTP {status_code}" | |
| print(f"\n\033[91m{full_response}\033[0m") | |
| log.exception("Groq API HTTP error") | |
| except Exception as e: | |
| full_response = f"Groq API hatasΔ±: {e}" | |
| print(f"\n\033[91m{full_response}\033[0m") | |
| log.exception("Groq query error") | |
| return full_response | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # 4. Conversation Manager | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class Conversation: | |
| """ | |
| Manages conversation history and the full RAG β LLM pipeline. | |
| Retrieval is done ONCE per user query; the result is reused for | |
| both the LLM prompt and the /kaynak command. | |
| """ | |
| def __init__( | |
| self, | |
| vector_store: VectorStore, | |
| config: Dict, | |
| ): | |
| self.vs = vector_store | |
| self.config = config | |
| # Plain message history (no RAG context injected β keeps history compact) | |
| self.history: List[Dict] = [] | |
| # Stores the last retrieved chunks so /kaynak can display them | |
| self.last_retrieved: List[Dict] = [] | |
| def ask(self, user_input: str) -> str: | |
| """ | |
| Full pipeline for one conversational turn: | |
| 1. Retrieve relevant chunks (ONCE) | |
| 2. Build context string | |
| 3. Build augmented user message (context + question) | |
| 4. Send to Groq | |
| 5. Append plain texts to history (no duplicate RAG context) | |
| Returns the assistant's response text. | |
| """ | |
| user_input = user_input.strip() | |
| # ββ 1. Retrieve βββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| t_ret = time.time() | |
| retrieved = self.vs.retrieve(user_input, top_k=self.config["retrieval_top_k"]) | |
| self.last_retrieved = retrieved # cache for /kaynak command | |
| log.debug(f"Retrieval: {len(retrieved)} chunks in {time.time() - t_ret:.2f}s") | |
| # ββ 2. Build context ββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| context = format_context(retrieved, self.config["retrieval_score_threshold"]) | |
| # ββ 3. Build augmented message ββββββββββββββββββββββββββββββββββββββββ | |
| augmented_message = build_augmented_user_message(user_input, context) | |
| # ββ 4. Trim history to max_history_turns ββββββββββββββββββββββββββββββ | |
| recent_history = self.history[-(self.config["max_history_turns"] * 2):] | |
| # ββ 5. Query Groq ββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| t_llm = time.time() | |
| messages = ( | |
| [{"role": "system", "content": SYSTEM_PROMPT}] | |
| + recent_history | |
| + [{"role": "user", "content": augmented_message}] | |
| ) | |
| response = query_groq(messages, self.config) | |
| log.debug(f"LLM response: {len(response)} chars in {time.time() - t_llm:.2f}s") | |
| # ββ 6. Store plain texts in history (no embedded context) βββββββββββββ | |
| if response and not response.startswith(("Bir hata", "Groq API")): | |
| self.history.append({"role": "user", "content": user_input}) | |
| self.history.append({"role": "assistant", "content": response}) | |
| return response | |
| def clear_history(self): | |
| """Clears the conversation history.""" | |
| self.history.clear() | |
| self.last_retrieved.clear() | |
| print("\n\033[93m[Conversation history cleared]\033[0m\n") | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # 5. Terminal UI | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| CHAT_BANNER = """ | |
| ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| β RAG-Powered Chatbot β | |
| β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ£ | |
| β Commands: β | |
| β /temizle β Clear conversation history β | |
| β /kaynak β Show sources from the last query β | |
| β /Γ§Δ±kΔ±Ε β Exit the program β | |
| ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| """ | |
| def print_sources(retrieved: List[Dict]): | |
| """Prints the source breadcrumbs of the last retrieved chunks.""" | |
| print("\n\033[93mββ Sources Used ββββββββββββββββββββββββββββββββββββ\033[0m") | |
| if not retrieved: | |
| print(" (No question has been asked yet)") | |
| else: | |
| for i, chunk in enumerate(retrieved, 1): | |
| score = chunk.get("relevance_score", 0) | |
| breadcrumb = chunk.get("breadcrumb", "") | |
| words = chunk.get("word_count", 0) | |
| print(f" {i}. [{score:.3f}] {breadcrumb} ({words} words)") | |
| print("\033[93mββββββββββββββββββββββββββββββββββββββββββββββββββββ\033[0m\n") | |
| def run_cli(): | |
| """Main command-line chat loop.""" | |
| if not CONFIG["groq_api_key"]: | |
| print( | |
| "\n\033[91mGROQ_API_KEY is not set.\033[0m\n" | |
| "Set the API key in the terminal and run again:\n" | |
| " \033[1mexport GROQ_API_KEY='...'\033[0m\n" | |
| ) | |
| sys.exit(1) | |
| # ββ Load vector store βββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| print("\n\033[96mLoading vector database...\033[0m") | |
| try: | |
| vs = VectorStore( | |
| CONFIG["faiss_index_file"], | |
| CONFIG["chunks_meta_file"], | |
| CONFIG["embedding_model"], | |
| ) | |
| except FileNotFoundError as e: | |
| print(f"\n\033[91m{e}\033[0m\n") | |
| sys.exit(1) | |
| # ββ Start conversation ββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| conv = Conversation(vs, CONFIG) | |
| print(CHAT_BANNER) | |
| print(f"\033[92mβ System ready! Active model: Groq / {CONFIG['groq_model']}\033[0m\n") | |
| while True: | |
| try: | |
| user_input = input("\033[1mYou:\033[0m ").strip() | |
| except (KeyboardInterrupt, EOFError): | |
| print("\n\nSee you later! π") | |
| break | |
| if not user_input: | |
| continue | |
| # ββ Commands ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| if user_input.lower() in ("/Γ§Δ±kΔ±Ε", "/cikis", "Γ§Δ±kΔ±Ε", "exit", "quit"): | |
| print("\nSee you later! π") | |
| break | |
| if user_input.lower() in ("/temizle", "/temizle"): | |
| conv.clear_history() | |
| continue | |
| if user_input.lower() == "/kaynak": | |
| print_sources(conv.last_retrieved) | |
| continue | |
| # ββ Ask βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| print() | |
| conv.ask(user_input) | |
| print() | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Entry Point | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| if __name__ == "__main__": | |
| run_cli() |