college / app.py
battulabhaskar543
updated code files for deployment
c92680a
import streamlit as st
import os
import streamlit as st
from typing import Dict, Any
from config.config import Config
from ingestion.document_loader import DocumentLoader
from ingestion.text_cleaner import TextCleaner
from ingestion.text_splitter import TextSplitter
from embeddings.embedder import Embedder
from embeddings.vector_store import VectorStore
from retriever.retriever import Retriever
from llm.answer_generator import AnswerGenerator
from validation.safety_checker import SafetyChecker
from validation.grounding_validator import GroundingValidator
from ui.sidebar import Sidebar
from ui.chat_ui import ChatUI
from utils.logger import logger
class CampusGuideApp:
def __init__(self):
self.config = Config()
self.sidebar = Sidebar()
self.chat_ui = ChatUI()
# RAG components
self.document_loader = DocumentLoader()
self.text_cleaner = TextCleaner()
self.text_splitter = TextSplitter()
self.embedder = Embedder()
self.vector_store = VectorStore()
self.retriever = Retriever()
self.answer_generator = AnswerGenerator()
self.safety_checker = SafetyChecker()
self.grounding_validator = GroundingValidator()
self._initialize_system()
# ---------------- INITIALIZATION ---------------- #
def _initialize_system(self):
logger.info("Initializing CampusGuide system...")
if self.vector_store.load():
st.session_state["system_ready"] = True
logger.info("Vector store loaded")
else:
raw_dir = self.config.DATA_RAW_PATH
supported_files_exist = os.path.exists(raw_dir) and any(
f.lower().endswith(tuple(self.config.SUPPORTED_EXTENSIONS))
for f in os.listdir(raw_dir)
)
if supported_files_exist:
logger.info(
"Documents found in raw directory. Auto-ingesting documents..."
)
self.ingest_documents()
st.session_state["system_ready"] = True
else:
logger.warning("No supported documents found for ingestion")
st.session_state["system_ready"] = False
st.session_state["retrieval_stats"] = self.retriever.get_retrieval_stats()
# ---------------- INGESTION ---------------- #
def handle_file_uploads(self, uploaded_files):
"""Handle file uploads from Streamlit UI."""
if not uploaded_files:
return False
raw_dir = self.config.DATA_RAW_PATH
os.makedirs(raw_dir, exist_ok=True)
saved_files = []
for uploaded_file in uploaded_files:
# Validate file type
if not uploaded_file.name.lower().endswith(".txt"):
continue
# Create unique filename to avoid overwrites
base_name = os.path.splitext(uploaded_file.name)[0]
ext = ".txt"
counter = 0
file_path = os.path.join(raw_dir, uploaded_file.name)
while os.path.exists(file_path):
counter += 1
file_path = os.path.join(raw_dir, f"{base_name}_{counter}{ext}")
# Save file
try:
with open(file_path, "wb") as f:
f.write(uploaded_file.getbuffer())
saved_files.append(file_path)
logger.info(f"Saved uploaded file: {file_path}")
except Exception as e:
logger.error(f"Failed to save file {uploaded_file.name}: {e}")
continue
if saved_files:
# Trigger full ingestion to rebuild vector store with all files
logger.info(
f"Uploaded {len(saved_files)} files, rebuilding knowledge base..."
)
self.ingest_documents()
return True
return False
def ingest_documents(self):
raw_dir = self.config.DATA_RAW_PATH
# Ensure processed directory exists
processed_dir = self.config.DATA_PROCESSED_PATH
os.makedirs(processed_dir, exist_ok=True)
logger.info("Starting document ingestion...")
# Clear existing vector store to rebuild fresh
self.vector_store.clear()
logger.info("Cleared existing vector store for fresh rebuild")
documents = self.document_loader.load_multiple_documents(raw_dir)
if not documents:
logger.warning("No documents found")
return
all_chunks = []
for doc in documents:
doc["text"] = self.text_cleaner.clean_text(doc["text"])
all_chunks.extend(self.text_splitter.split_document(doc))
# Fit the embedder on all chunk texts first
chunk_texts = [chunk["text"] for chunk in all_chunks]
self.embedder.fit_on_texts(chunk_texts)
# Then embed the chunks
embedded_chunks = self.embedder.embed_chunks(all_chunks)
self.vector_store.add_chunks(embedded_chunks)
self.vector_store.save()
st.session_state["system_ready"] = True
logger.info(
f"Ingested {len(documents)} documents with {len(all_chunks)} chunks"
)
# ---------------- QUERY PIPELINE ---------------- #
def process_query(self, query: str, role: str) -> Dict[str, Any]:
logger.info(f"Processing query: {query}")
retrieval_result = self.retriever.retrieve(query, role)
safety = self.safety_checker.check_retrieval_safety(retrieval_result)
# ❌ Retrieval confidence failure
if not safety["safe"]:
return self._refusal_response()
chunks = retrieval_result["chunks"]
answer = self.answer_generator.generate_answer(query, chunks)
# ❌ LLM refusal
if self._is_refusal(answer):
return self._refusal_response()
# ❌ Grounding failure
grounding = self.grounding_validator.validate_answer_grounding(
answer, chunks, query
)
if not grounding["valid"]:
return self._refusal_response()
# βœ… Valid answer
return {
"answer": answer,
"sources": self._prepare_sources(chunks),
}
# ---------------- HELPERS ---------------- #
def _is_refusal(self, answer: str) -> bool:
refusal_markers = [
"not available in the provided documents",
"do not explicitly",
"cannot provide an answer",
]
return any(marker in answer.lower() for marker in refusal_markers)
def _refusal_response(self) -> Dict[str, Any]:
return {
"answer": (
"The internship guidelines do not explicitly list the documents students "
"must carry during placement induction. To avoid assumptions, the system "
"cannot provide an answer based on the available documents."
),
"sources": [],
"source_note": (
"No sections in the provided documents explicitly mention this information."
),
}
def _prepare_sources(self, chunks: list) -> list:
sources, seen = [], set()
for chunk in chunks:
meta = chunk.get("metadata", {})
key = (meta.get("filename"), tuple(chunk.get("pages", [])))
if key in seen:
continue
sources.append(
{
"filename": meta.get("filename", "Unknown"),
"pages": sorted(chunk.get("pages", [])),
"excerpt": chunk["text"][:300],
}
)
seen.add(key)
return sources
# ---------------- UI ---------------- #
def run(self):
st.set_page_config(
page_title="CampusGuide - ICFAI University Assistant",
page_icon="πŸŽ“",
layout="wide",
)
# Handle file uploads first
if "uploaded_files" in st.session_state and st.session_state["uploaded_files"]:
with st.spinner("Processing uploaded files..."):
success = self.handle_file_uploads(st.session_state["uploaded_files"])
if success:
st.success("βœ… Files uploaded and processed successfully!")
st.session_state["uploaded_files"] = [] # Clear after processing
st.rerun() # Refresh to update stats
else:
st.error("❌ Failed to process uploaded files.")
# Auto-ingest documents if not ready (for initial load or restart)
if not st.session_state.get("system_ready"):
with st.spinner("Loading documents..."):
self.ingest_documents()
if not st.session_state.get("system_ready"):
st.error(
"❌ Failed to load documents. Please add .txt files to data/raw/ and restart the application."
)
return
role = self.sidebar.render()
self.chat_ui.set_query_callback(self.process_query)
self.chat_ui.render_chat_interface(role)
def main():
CampusGuideApp().run()
if __name__ == "__main__":
main()