from __future__ import annotations import hashlib from pathlib import Path import re import streamlit as st from src.edurag_math_bot.config import get_settings from src.edurag_math_bot.image_to_text import extract_text_from_image_bytes from src.edurag_math_bot.pdf_processing import extract_chunks_from_uploaded_file from src.edurag_math_bot.rag_chain import ( MathRAGAssistant, SupplementalChunk, build_supplemental_chunks, run_ingestion, ) from src.edurag_math_bot.speech_to_text import transcribe_audio_bytes st.set_page_config( page_title="MathSutra 12", page_icon="📘", layout="wide", ) def ensure_state() -> None: if "messages" not in st.session_state: st.session_state.messages = [] if "uploaded_context" not in st.session_state: st.session_state.uploaded_context: list[SupplementalChunk] = [] # type: ignore if "uploaded_file_summaries" not in st.session_state: st.session_state.uploaded_file_summaries = [] if "upload_signature" not in st.session_state: st.session_state.upload_signature = None def inject_styles() -> None: st.markdown( """ """, unsafe_allow_html=True, ) DISPLAY_MATH_RE = re.compile(r"\$\$(.+?)\$\$", re.DOTALL) def normalize_math_markdown(content: str) -> str: return ( content.replace("\\[", "$$") .replace("\\]", "$$") .replace("\\(", "$") .replace("\\)", "$") ) def render_assistant_content(content: str) -> None: normalized = normalize_math_markdown(content) parts = DISPLAY_MATH_RE.split(normalized) for index, part in enumerate(parts): if not part.strip(): continue if index % 2 == 0: st.markdown(part) else: st.latex(part.strip()) def clear_uploaded_context() -> None: st.session_state.uploaded_context = [] st.session_state.uploaded_file_summaries = [] st.session_state.upload_signature = None def file_signature(file_bytes: bytes) -> str: return hashlib.sha1(file_bytes).hexdigest() def file_suffix(file_name: str | None, fallback: str) -> str: if not file_name: return fallback suffix = Path(file_name).suffix.lower() return suffix or fallback def build_upload_signature(uploaded_files: list[object]) -> tuple[tuple[str, int, str], ...]: signature = [] for uploaded_file in uploaded_files: file_bytes = uploaded_file.getvalue() signature.append( ( uploaded_file.name, len(file_bytes), file_signature(file_bytes), ) ) return tuple(signature) def process_uploaded_files( uploaded_files: list[object], assistant: MathRAGAssistant, ) -> str | None: if not uploaded_files: if st.session_state.upload_signature is not None: clear_uploaded_context() return None signature = build_upload_signature(uploaded_files) if signature == st.session_state.upload_signature: return None settings = get_settings() all_chunks = [] summaries = [] for uploaded_file in uploaded_files: file_bytes = uploaded_file.getvalue() file_chunks = extract_chunks_from_uploaded_file( file_name=uploaded_file.name, file_bytes=file_bytes, chunk_size=settings.chunk_size, chunk_overlap=settings.chunk_overlap, ) if not file_chunks: raise ValueError(f"No readable text was found in {uploaded_file.name}.") all_chunks.extend(file_chunks) summaries.append({"name": uploaded_file.name, "chunks": len(file_chunks)}) st.session_state.uploaded_context = build_supplemental_chunks( all_chunks, ollama=assistant.ollama, model=settings.embed_model, ) st.session_state.uploaded_file_summaries = summaries st.session_state.upload_signature = signature return ( f"Loaded {len(uploaded_files)} file(s) into this chat session " f"across {len(st.session_state.uploaded_context)} searchable chunks." ) def format_source(source: dict[str, object]) -> str: chapter_number = int(source.get("chapter_number", -1)) chapter_name = str(source.get("chapter_name", "Not identified")) topic = str(source.get("topic", "Not identified")) page_number = int(source.get("page_number", 1)) source_file = str(source.get("source_file", "Unknown source")) if chapter_number > 0: return ( f"- Chapter {chapter_number}: {chapter_name} | " f"Topic: {topic} | Page: {page_number}" ) return f"- Uploaded file: {source_file} | Topic: {topic} | Page: {page_number}" def render_sidebar(assistant: MathRAGAssistant) -> None: settings = get_settings() st.sidebar.title("MathSutra 12") st.sidebar.caption("Class 12 Maths RAG Chatbot") st.sidebar.write(f"LLM model: `{settings.llm_model}`") st.sidebar.write(f"Embedding model: `{settings.embed_model}`") st.sidebar.write(f"Collection: `{settings.collection_name}`") if st.sidebar.button("Build / Refresh Knowledge Base", use_container_width=True): with st.sidebar.status("Running ingestion...", expanded=True): try: summary = run_ingestion(settings=settings, reset=True) st.sidebar.success( f"Ingested {summary['chunks_created']} chunks from {summary['pdf_count']} PDFs." ) except Exception as exc: st.sidebar.error(str(exc)) st.sidebar.markdown("### Add Reference Files") uploaded_files = st.sidebar.file_uploader( "Upload PDFs or notes for this chat", type=["pdf", "txt", "md"], accept_multiple_files=True, help="These files become extra study context for the current session.", ) if uploaded_files: signature = build_upload_signature(uploaded_files) if signature != st.session_state.upload_signature: with st.sidebar.status("Processing uploaded files...", expanded=False): try: message = process_uploaded_files(uploaded_files, assistant) if message: st.sidebar.success(message) except Exception as exc: clear_uploaded_context() st.sidebar.error(str(exc)) elif st.session_state.uploaded_file_summaries: st.sidebar.success( f"{len(st.session_state.uploaded_file_summaries)} uploaded file(s) ready for chat." ) else: clear_uploaded_context() if st.session_state.uploaded_file_summaries: st.sidebar.markdown("### Files In This Chat") for summary in st.session_state.uploaded_file_summaries: st.sidebar.write(f"- {summary['name']} ({summary['chunks']} chunks)") st.sidebar.markdown("### Demo tips") st.sidebar.markdown( "- Ask full questions.\n" "- Ask for step-by-step solutions.\n" "- Use the mic icon in the chat input to ask by voice.\n" "- Use the paperclip icon to upload a question image.\n" "- Upload a PDF, TXT, or MD file to add temporary context.\n" "- Answers are formatted in clean structured sections.\n" "- Ask which chapter and topic the question belongs to." ) def render_messages() -> None: for message in st.session_state.messages: avatar = "🧑‍🎓" if message["role"] == "user" else "🤖" with st.chat_message(message["role"], avatar=avatar): if message["role"] == "assistant": render_assistant_content(message["content"]) else: st.markdown(message["content"]) if message.get("sources"): with st.expander("📚 View Sources Used"): for source in message["sources"]: st.markdown(format_source(source)) def main() -> None: ensure_state() inject_styles() settings = get_settings() assistant = MathRAGAssistant(settings=settings) render_sidebar(assistant) st.title("✨ MathSutra 12") st.subheader("Your Class 12 Mathematics Chapter-Aware Chatbot") st.write( "Ask any Class 12 mathematics question by typing, voice, or a question image. " "You can also upload PDFs or notes to give the chatbot extra context for the current session." ) render_messages() prompt = st.chat_input( "Ask a Class 12 maths question...", accept_file=True, file_type=["png", "jpg", "jpeg", "heic"], accept_audio=True, ) if not prompt: return if isinstance(prompt, str): question = prompt.strip() files = [] audio = None else: question = (getattr(prompt, "text", None) or "").strip() files = getattr(prompt, "files", []) audio = getattr(prompt, "audio", None) if audio is not None: with st.spinner("Transcribing your recording..."): audio_bytes = audio.getvalue() audio_suffix = file_suffix(getattr(audio, "name", ".wav"), ".wav") try: transcript = transcribe_audio_bytes(audio_bytes, suffix=audio_suffix) question = f"{question}\n\n[Voice transcript: {transcript}]" if question else transcript except Exception as exc: st.error(f"Voice transcription failed: {exc}") return if files: with st.spinner("Reading text from your question image..."): image_file = files[0] image_bytes = image_file.getvalue() image_suffix = file_suffix(getattr(image_file, "name", ".jpg"), ".jpg") try: extracted_text = extract_text_from_image_bytes(image_bytes, suffix=image_suffix) question = f"{question}\n\n[Image text: {extracted_text}]" if question else extracted_text except Exception as exc: st.error(f"Image OCR failed: {exc}") return if not question: st.warning("No question text could be extracted. Please try again.") return st.session_state.messages.append({"role": "user", "content": question}) with st.chat_message("user", avatar="🧑‍🎓"): st.markdown(question) with st.chat_message("assistant", avatar="🤖"): with st.spinner("Thinking with retrieved chapter content..."): try: result = assistant.answer( question, extra_chunks=st.session_state.uploaded_context, ) render_assistant_content(result["answer"]) with st.expander("📚 View Sources Used"): for source in result["sources"]: st.markdown(format_source(source)) except Exception as exc: result = {"answer": str(exc), "sources": []} st.error(result["answer"]) st.session_state.messages.append( { "role": "assistant", "content": result["answer"], "sources": result["sources"], } ) if __name__ == "__main__": main()