| |
| |
| |
|
|
| |
| |
| |
| |
| |
|
|
| import os, json, re, shutil |
| from datetime import datetime |
| from collections import Counter |
| import pandas as pd |
| import gradio as gr |
| from deep_translator import GoogleTranslator |
|
|
| from langchain_core.documents import Document |
| from langchain_community.document_loaders import ( |
| WebBaseLoader, PyPDFLoader, Docx2txtLoader |
| ) |
| from langchain.text_splitter import RecursiveCharacterTextSplitter |
| from langchain_community.embeddings import HuggingFaceEmbeddings |
| from langchain_community.vectorstores import Chroma |
| from langchain.chains import RetrievalQA |
| from langchain.prompts import PromptTemplate |
| from langchain_community.llms import HuggingFacePipeline |
| from transformers import pipeline |
|
|
| |
| |
| |
|
|
| SASTRA_URLS = [ |
| "https://www.sastra.edu/about-us.html", |
| "https://www.sastra.edu/academics/schools.html#school-of-computing", |
| "https://www.sastra.edu/admissions/ug-pg.html", |
| "https://www.sastra.edu/admissions/eligibility-criteria.html", |
| "https://www.sastra.edu/admissions/fee-structure.html", |
| "https://www.sastra.edu/admissions/hostel-fees.html", |
| "https://www.sastra.edu/infrastructure/physical-facilities.html", |
| "https://www.sastra.edu/about-us/mission-vision.html", |
| ] |
|
|
| KEYWORD_EXCEL = "training_data.xlsx" |
| VECTOR_DB_PATH = "sastra_local_db" |
| LOG_FILE = "query_logs.json" |
|
|
| UPLOAD_DIR = "data" |
| os.makedirs(UPLOAD_DIR, exist_ok=True) |
|
|
| EMBEDDING_MODEL = "sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2" |
| LLM_MODEL = "google/flan-t5-base" |
| ADMIN_PASSWORD = "sastra_admin_2024" |
|
|
| |
| vectordb = None |
| retriever = None |
| qa_chain = None |
| keyword_responses = [] |
|
|
| |
| |
| |
|
|
| def load_keyword_responses(path): |
| df = pd.read_excel(path) |
| pairs = [] |
| for _, row in df.iterrows(): |
| if pd.notna(row.get("Keywords")) and pd.notna(row.get("Response")): |
| for kw in str(row["Keywords"]).split(","): |
| pairs.append((kw.strip().lower(), str(row["Response"]))) |
| return pairs |
|
|
| |
| |
| |
|
|
| def load_local_documents(): |
| docs = [] |
| for file in os.listdir(UPLOAD_DIR): |
| path = os.path.join(UPLOAD_DIR, file) |
| try: |
| if file.lower().endswith(".pdf"): |
| docs.extend(PyPDFLoader(path).load()) |
|
|
| elif file.lower().endswith(".docx"): |
| docs.extend(Docx2txtLoader(path).load()) |
|
|
| elif file.lower().endswith(".xlsx"): |
| df = pd.read_excel(path) |
| for _, row in df.iterrows(): |
| text = " | ".join( |
| f"{c}: {row[c]}" for c in df.columns if pd.notna(row[c]) |
| ) |
| docs.append(Document(page_content=text, metadata={"source": file})) |
| except Exception as e: |
| print(f"β Error loading {file}: {e}") |
| return docs |
|
|
| |
| |
| |
|
|
| def initialize_model(): |
| global vectordb, retriever, qa_chain, keyword_responses |
|
|
| docs = [] |
|
|
| |
| for url in SASTRA_URLS: |
| try: |
| docs.extend(WebBaseLoader(url).load()) |
| except: |
| pass |
|
|
| |
| docs.extend(load_local_documents()) |
|
|
| |
| keyword_responses = load_keyword_responses(KEYWORD_EXCEL) |
| for k, v in keyword_responses: |
| docs.append(Document(page_content=f"{k}: {v}", metadata={"source": "keywords"})) |
|
|
| splitter = RecursiveCharacterTextSplitter(chunk_size=600, chunk_overlap=50) |
| chunks = splitter.split_documents(docs) |
|
|
| embeddings = HuggingFaceEmbeddings(model_name=EMBEDDING_MODEL) |
| vectordb = Chroma.from_documents(chunks, embeddings, persist_directory=VECTOR_DB_PATH) |
| retriever = vectordb.as_retriever(search_kwargs={"k": 4}) |
|
|
| generator = pipeline( |
| "text2text-generation", |
| model=LLM_MODEL, |
| tokenizer=LLM_MODEL, |
| max_new_tokens=200, |
| temperature=0.1, |
| top_p=0.85, |
| do_sample=True, |
| repetition_penalty=1.2 |
| ) |
| llm = HuggingFacePipeline(pipeline=generator) |
|
|
| prompt = PromptTemplate( |
| input_variables=["context", "question"], |
| template=""" |
| You are AskSASTRA, the official SASTRA University admissions assistant. |
| Answer ONLY from the context. Be clear and factual. |
| If not found, say INSUFFICIENT_DATA. |
| |
| Context: |
| {context} |
| |
| Question: |
| {question} |
| |
| Answer: |
| """ |
| ) |
|
|
| qa_chain = RetrievalQA.from_chain_type( |
| llm=llm, |
| retriever=retriever, |
| chain_type="stuff", |
| chain_type_kwargs={"prompt": prompt}, |
| return_source_documents=False |
| ) |
|
|
| BASE_KNOWLEDGE_DIR = "knowledge_base" |
| os.makedirs(BASE_KNOWLEDGE_DIR, exist_ok=True) |
| import shutil |
|
|
| def bootstrap_initial_files(): |
| source_dir = "initial_data" |
|
|
| for file in os.listdir(source_dir): |
| src = os.path.join(source_dir, file) |
| dst = os.path.join(BASE_KNOWLEDGE_DIR, file) |
|
|
| if not os.path.exists(dst): |
| shutil.copy(src, dst) |
|
|
| print("β
Initial knowledge base prepared") |
|
|
| bootstrap_initial_files() |
| initialize_model() |
|
|
| def load_local_documents(): |
| docs = [] |
|
|
| for file in os.listdir(BASE_KNOWLEDGE_DIR): |
| path = os.path.join(BASE_KNOWLEDGE_DIR, file) |
|
|
| try: |
| if file.lower().endswith(".pdf"): |
| docs.extend(PyPDFLoader(path).load()) |
|
|
| elif file.lower().endswith(".docx"): |
| docs.extend(Docx2txtLoader(path).load()) |
|
|
| elif file.lower().endswith(".xlsx"): |
| df = pd.read_excel(path) |
| for _, row in df.iterrows(): |
| text = " | ".join( |
| f"{col}: {row[col]}" |
| for col in df.columns |
| if pd.notna(row[col]) |
| ) |
| docs.append(Document(page_content=text, metadata={"source": file})) |
|
|
| except Exception as e: |
| print(f"β Error loading {file}: {e}") |
|
|
| return docs |
|
|
|
|
|
|
| |
| |
| |
|
|
| def log_query(query, answer, language="en", response_type="success"): |
| entry = { |
| "query": query, |
| "answer": answer, |
| "language": language, |
| "response_type": response_type, |
| "timestamp": datetime.now().isoformat() |
| } |
| logs = [] |
| if os.path.exists(LOG_FILE): |
| logs = json.load(open(LOG_FILE)) |
| logs.append(entry) |
| json.dump(logs, open(LOG_FILE, "w"), indent=2) |
|
|
| def get_analytics(): |
| if not os.path.exists(LOG_FILE): |
| return {"total_queries":0,"top_questions":[],"language_distribution":{},"response_types":{},"recent_queries":[]} |
| logs = json.load(open(LOG_FILE)) |
| return { |
| "total_queries": len(logs), |
| "top_questions": Counter(l["query"] for l in logs).most_common(10), |
| "language_distribution": dict(Counter(l["language"] for l in logs)), |
| "response_types": dict(Counter(l["response_type"] for l in logs)), |
| "recent_queries": logs[-20:][::-1] |
| } |
|
|
| def display_analytics(): |
| a = get_analytics() |
| out = f"## π Analytics Dashboard\n\nTotal Queries: {a['total_queries']}\n\n" |
| out += "### π₯ Top Questions\n" |
| for q,c in a["top_questions"]: |
| out += f"- {q} ({c})\n" |
| out += "\n### π Language Distribution\n" |
| for k,v in a["language_distribution"].items(): |
| out += f"- {k}: {v}\n" |
| out += "\n### β
Response Types\n" |
| for k,v in a["response_types"].items(): |
| out += f"- {k}: {v}\n" |
| out += "\n### π Recent Queries\n" |
| for r in a["recent_queries"][:10]: |
| out += f"- {r['query']} ({r['language']})\n" |
| return out |
|
|
| def download_logs(): |
| return LOG_FILE if os.path.exists(LOG_FILE) else None |
|
|
| |
| |
| |
|
|
| def clean_llm_output(text): |
| text = re.sub(r'^(Answer:|Response:)', '', text, flags=re.I).strip() |
| if text.lower().startswith("insufficient_data"): |
| text = text.replace("INSUFFICIENT_DATA","").strip() |
| text = re.sub(r'\s+',' ',text) |
| return text[:600] |
|
|
| def match_keyword(query): |
| for k,v in keyword_responses: |
| if k in query.lower(): |
| return v |
| return None |
|
|
| |
| |
| |
|
|
| def ask_sastra(query, lang="en"): |
| original = query.strip() |
|
|
| if not original: |
| return "Please ask a valid question." |
|
|
| |
| if lang != "en": |
| try: |
| query = GoogleTranslator(source=lang, target="en").translate(query) |
| except: |
| query = original |
|
|
| |
| kw = match_keyword(query) |
| if kw: |
| log_query(original, kw, lang, "keyword_match") |
| return kw |
|
|
| |
| try: |
| result = qa_chain.invoke({"query": query}) |
| raw = result.get("result", "").strip() |
| ans = clean_llm_output(raw) |
| except Exception as e: |
| print("RAG error:", e) |
| ans = "" |
|
|
| |
| if ans and len(ans.split()) >= 5: |
| log_query(original, ans, lang, "rag_success") |
| return ans |
|
|
| |
| fallback = ( |
| "I couldn't find confident information related to this question. " |
| "Please contact the SASTRA Admissions Office at " |
| "admissions@sastra.edu or visit www.sastra.edu." |
| ) |
| log_query(original, fallback, lang, "insufficient_data") |
| return fallback |
|
|
|
|
| |
| |
| |
| def retrain_model(file, password): |
| if password != ADMIN_PASSWORD: |
| return "β Invalid password. Access denied." |
|
|
| if file is None: |
| return "β Please upload a file." |
|
|
| try: |
| |
| if isinstance(file, str): |
| src_path = file |
| elif hasattr(file, "name"): |
| src_path = file.name |
| else: |
| return "β Unsupported upload format." |
|
|
| dest_path = os.path.join( |
| BASE_KNOWLEDGE_DIR, |
| os.path.basename(src_path) |
| ) |
|
|
| shutil.copy(src_path, dest_path) |
|
|
| initialize_model() |
|
|
| return "β
Model retrained using existing + newly uploaded files." |
|
|
| except Exception as e: |
| return f"β Retraining failed: {str(e)}" |
|
|
|
|
|
|
| |
| |
| |
|
|
| langs = {"English":"en","Tamil":"ta","Telugu":"te","Kannada":"kn","Hindi":"hi"} |
|
|
| chatbot_interface = gr.Interface( |
| fn=lambda q,l: ask_sastra(q,langs[l]), |
| inputs=[gr.Textbox(),gr.Dropdown(list(langs.keys()))], |
| outputs=gr.Textbox(), |
| title="π AskSASTRA" |
| ) |
|
|
| admin_interface = gr.Interface( |
| fn=retrain_model, |
| inputs=[gr.File(),gr.Textbox(type="password")], |
| outputs=gr.Textbox(), |
| title="π Admin Panel" |
| ) |
|
|
| analytics_interface = gr.Interface( |
| fn=display_analytics, |
| inputs=[], |
| outputs=gr.Markdown(), |
| title="π Analytics" |
| ) |
|
|
| logs_interface = gr.Interface( |
| fn=download_logs, |
| inputs=[], |
| outputs=gr.File(), |
| title="π₯ Download Logs" |
| ) |
|
|
| gr.TabbedInterface( |
| [chatbot_interface,admin_interface,analytics_interface,logs_interface], |
| ["π¬ Chatbot","π Admin","π Analytics","π₯ Logs"] |
| ).launch() |