math-chatbot-v2 / app.py
pranshu dhiman
Deploy MathSutra Space
7fab45b
Raw
History Blame Contribute Delete
12.3 kB
from __future__ import annotations
import hashlib
from pathlib import Path
import re
import streamlit as st
from src.edurag_math_bot.config import get_settings
from src.edurag_math_bot.image_to_text import extract_text_from_image_bytes
from src.edurag_math_bot.pdf_processing import extract_chunks_from_uploaded_file
from src.edurag_math_bot.rag_chain import (
MathRAGAssistant,
SupplementalChunk,
build_supplemental_chunks,
run_ingestion,
)
from src.edurag_math_bot.speech_to_text import transcribe_audio_bytes
st.set_page_config(
page_title="MathSutra 12",
page_icon="πŸ“˜",
layout="wide",
)
def ensure_state() -> None:
if "messages" not in st.session_state:
st.session_state.messages = []
if "uploaded_context" not in st.session_state:
st.session_state.uploaded_context: list[SupplementalChunk] = [] # type: ignore
if "uploaded_file_summaries" not in st.session_state:
st.session_state.uploaded_file_summaries = []
if "upload_signature" not in st.session_state:
st.session_state.upload_signature = None
def inject_styles() -> None:
st.markdown(
"""
<style>
h1, h2, h3 {
color: #0f172a;
font-family: 'Segoe UI', Tahoma, Geneva, Verdana, sans-serif;
}
[data-testid="stChatMessage"] p,
[data-testid="stChatMessage"] li {
line-height: 1.7;
font-size: 1rem;
}
[data-testid="stChatMessage"] h2 {
margin-top: 1.25rem;
margin-bottom: 0.5rem;
font-size: 1.55rem;
}
[data-testid="stChatMessage"] h3 {
margin-top: 1rem;
margin-bottom: 0.35rem;
font-size: 1.2rem;
}
[data-testid="stChatMessage"] ul,
[data-testid="stChatMessage"] ol {
margin-bottom: 0.75rem;
}
[data-testid="stChatMessage"] code {
white-space: pre-wrap;
}
.math-label {
font-weight: 700;
margin-top: 0.35rem;
margin-bottom: 0.15rem;
}
</style>
""",
unsafe_allow_html=True,
)
DISPLAY_MATH_RE = re.compile(r"\$\$(.+?)\$\$", re.DOTALL)
def normalize_math_markdown(content: str) -> str:
return (
content.replace("\\[", "$$")
.replace("\\]", "$$")
.replace("\\(", "$")
.replace("\\)", "$")
)
def render_assistant_content(content: str) -> None:
normalized = normalize_math_markdown(content)
parts = DISPLAY_MATH_RE.split(normalized)
for index, part in enumerate(parts):
if not part.strip():
continue
if index % 2 == 0:
st.markdown(part)
else:
st.latex(part.strip())
def clear_uploaded_context() -> None:
st.session_state.uploaded_context = []
st.session_state.uploaded_file_summaries = []
st.session_state.upload_signature = None
def file_signature(file_bytes: bytes) -> str:
return hashlib.sha1(file_bytes).hexdigest()
def file_suffix(file_name: str | None, fallback: str) -> str:
if not file_name:
return fallback
suffix = Path(file_name).suffix.lower()
return suffix or fallback
def build_upload_signature(uploaded_files: list[object]) -> tuple[tuple[str, int, str], ...]:
signature = []
for uploaded_file in uploaded_files:
file_bytes = uploaded_file.getvalue()
signature.append(
(
uploaded_file.name,
len(file_bytes),
file_signature(file_bytes),
)
)
return tuple(signature)
def process_uploaded_files(
uploaded_files: list[object],
assistant: MathRAGAssistant,
) -> str | None:
if not uploaded_files:
if st.session_state.upload_signature is not None:
clear_uploaded_context()
return None
signature = build_upload_signature(uploaded_files)
if signature == st.session_state.upload_signature:
return None
settings = get_settings()
all_chunks = []
summaries = []
for uploaded_file in uploaded_files:
file_bytes = uploaded_file.getvalue()
file_chunks = extract_chunks_from_uploaded_file(
file_name=uploaded_file.name,
file_bytes=file_bytes,
chunk_size=settings.chunk_size,
chunk_overlap=settings.chunk_overlap,
)
if not file_chunks:
raise ValueError(f"No readable text was found in {uploaded_file.name}.")
all_chunks.extend(file_chunks)
summaries.append({"name": uploaded_file.name, "chunks": len(file_chunks)})
st.session_state.uploaded_context = build_supplemental_chunks(
all_chunks,
ollama=assistant.ollama,
model=settings.embed_model,
)
st.session_state.uploaded_file_summaries = summaries
st.session_state.upload_signature = signature
return (
f"Loaded {len(uploaded_files)} file(s) into this chat session "
f"across {len(st.session_state.uploaded_context)} searchable chunks."
)
def format_source(source: dict[str, object]) -> str:
chapter_number = int(source.get("chapter_number", -1))
chapter_name = str(source.get("chapter_name", "Not identified"))
topic = str(source.get("topic", "Not identified"))
page_number = int(source.get("page_number", 1))
source_file = str(source.get("source_file", "Unknown source"))
if chapter_number > 0:
return (
f"- Chapter {chapter_number}: {chapter_name} | "
f"Topic: {topic} | Page: {page_number}"
)
return f"- Uploaded file: {source_file} | Topic: {topic} | Page: {page_number}"
def render_sidebar(assistant: MathRAGAssistant) -> None:
settings = get_settings()
st.sidebar.title("MathSutra 12")
st.sidebar.caption("Class 12 Maths RAG Chatbot")
st.sidebar.write(f"LLM model: `{settings.llm_model}`")
st.sidebar.write(f"Embedding model: `{settings.embed_model}`")
st.sidebar.write(f"Collection: `{settings.collection_name}`")
if st.sidebar.button("Build / Refresh Knowledge Base", use_container_width=True):
with st.sidebar.status("Running ingestion...", expanded=True):
try:
summary = run_ingestion(settings=settings, reset=True)
st.sidebar.success(
f"Ingested {summary['chunks_created']} chunks from {summary['pdf_count']} PDFs."
)
except Exception as exc:
st.sidebar.error(str(exc))
st.sidebar.markdown("### Add Reference Files")
uploaded_files = st.sidebar.file_uploader(
"Upload PDFs or notes for this chat",
type=["pdf", "txt", "md"],
accept_multiple_files=True,
help="These files become extra study context for the current session.",
)
if uploaded_files:
signature = build_upload_signature(uploaded_files)
if signature != st.session_state.upload_signature:
with st.sidebar.status("Processing uploaded files...", expanded=False):
try:
message = process_uploaded_files(uploaded_files, assistant)
if message:
st.sidebar.success(message)
except Exception as exc:
clear_uploaded_context()
st.sidebar.error(str(exc))
elif st.session_state.uploaded_file_summaries:
st.sidebar.success(
f"{len(st.session_state.uploaded_file_summaries)} uploaded file(s) ready for chat."
)
else:
clear_uploaded_context()
if st.session_state.uploaded_file_summaries:
st.sidebar.markdown("### Files In This Chat")
for summary in st.session_state.uploaded_file_summaries:
st.sidebar.write(f"- {summary['name']} ({summary['chunks']} chunks)")
st.sidebar.markdown("### Demo tips")
st.sidebar.markdown(
"- Ask full questions.\n"
"- Ask for step-by-step solutions.\n"
"- Use the mic icon in the chat input to ask by voice.\n"
"- Use the paperclip icon to upload a question image.\n"
"- Upload a PDF, TXT, or MD file to add temporary context.\n"
"- Answers are formatted in clean structured sections.\n"
"- Ask which chapter and topic the question belongs to."
)
def render_messages() -> None:
for message in st.session_state.messages:
avatar = "πŸ§‘β€πŸŽ“" if message["role"] == "user" else "πŸ€–"
with st.chat_message(message["role"], avatar=avatar):
if message["role"] == "assistant":
render_assistant_content(message["content"])
else:
st.markdown(message["content"])
if message.get("sources"):
with st.expander("πŸ“š View Sources Used"):
for source in message["sources"]:
st.markdown(format_source(source))
def main() -> None:
ensure_state()
inject_styles()
settings = get_settings()
assistant = MathRAGAssistant(settings=settings)
render_sidebar(assistant)
st.title("✨ MathSutra 12")
st.subheader("Your Class 12 Mathematics Chapter-Aware Chatbot")
st.write(
"Ask any Class 12 mathematics question by typing, voice, or a question image. "
"You can also upload PDFs or notes to give the chatbot extra context for the current session."
)
render_messages()
prompt = st.chat_input(
"Ask a Class 12 maths question...",
accept_file=True,
file_type=["png", "jpg", "jpeg", "heic"],
accept_audio=True,
)
if not prompt:
return
if isinstance(prompt, str):
question = prompt.strip()
files = []
audio = None
else:
question = (getattr(prompt, "text", None) or "").strip()
files = getattr(prompt, "files", [])
audio = getattr(prompt, "audio", None)
if audio is not None:
with st.spinner("Transcribing your recording..."):
audio_bytes = audio.getvalue()
audio_suffix = file_suffix(getattr(audio, "name", ".wav"), ".wav")
try:
transcript = transcribe_audio_bytes(audio_bytes, suffix=audio_suffix)
question = f"{question}\n\n[Voice transcript: {transcript}]" if question else transcript
except Exception as exc:
st.error(f"Voice transcription failed: {exc}")
return
if files:
with st.spinner("Reading text from your question image..."):
image_file = files[0]
image_bytes = image_file.getvalue()
image_suffix = file_suffix(getattr(image_file, "name", ".jpg"), ".jpg")
try:
extracted_text = extract_text_from_image_bytes(image_bytes, suffix=image_suffix)
question = f"{question}\n\n[Image text: {extracted_text}]" if question else extracted_text
except Exception as exc:
st.error(f"Image OCR failed: {exc}")
return
if not question:
st.warning("No question text could be extracted. Please try again.")
return
st.session_state.messages.append({"role": "user", "content": question})
with st.chat_message("user", avatar="πŸ§‘β€πŸŽ“"):
st.markdown(question)
with st.chat_message("assistant", avatar="πŸ€–"):
with st.spinner("Thinking with retrieved chapter content..."):
try:
result = assistant.answer(
question,
extra_chunks=st.session_state.uploaded_context,
)
render_assistant_content(result["answer"])
with st.expander("πŸ“š View Sources Used"):
for source in result["sources"]:
st.markdown(format_source(source))
except Exception as exc:
result = {"answer": str(exc), "sources": []}
st.error(result["answer"])
st.session_state.messages.append(
{
"role": "assistant",
"content": result["answer"],
"sources": result["sources"],
}
)
if __name__ == "__main__":
main()