Spaces:
Sleeping
Sleeping
| from __future__ import annotations | |
| import hashlib | |
| from pathlib import Path | |
| import re | |
| import streamlit as st | |
| from src.edurag_math_bot.config import get_settings | |
| from src.edurag_math_bot.image_to_text import extract_text_from_image_bytes | |
| from src.edurag_math_bot.pdf_processing import extract_chunks_from_uploaded_file | |
| from src.edurag_math_bot.rag_chain import ( | |
| MathRAGAssistant, | |
| SupplementalChunk, | |
| build_supplemental_chunks, | |
| run_ingestion, | |
| ) | |
| from src.edurag_math_bot.speech_to_text import transcribe_audio_bytes | |
| st.set_page_config( | |
| page_title="MathSutra 12", | |
| page_icon="π", | |
| layout="wide", | |
| ) | |
| def ensure_state() -> None: | |
| if "messages" not in st.session_state: | |
| st.session_state.messages = [] | |
| if "uploaded_context" not in st.session_state: | |
| st.session_state.uploaded_context: list[SupplementalChunk] = [] # type: ignore | |
| if "uploaded_file_summaries" not in st.session_state: | |
| st.session_state.uploaded_file_summaries = [] | |
| if "upload_signature" not in st.session_state: | |
| st.session_state.upload_signature = None | |
| def inject_styles() -> None: | |
| st.markdown( | |
| """ | |
| <style> | |
| h1, h2, h3 { | |
| color: #0f172a; | |
| font-family: 'Segoe UI', Tahoma, Geneva, Verdana, sans-serif; | |
| } | |
| [data-testid="stChatMessage"] p, | |
| [data-testid="stChatMessage"] li { | |
| line-height: 1.7; | |
| font-size: 1rem; | |
| } | |
| [data-testid="stChatMessage"] h2 { | |
| margin-top: 1.25rem; | |
| margin-bottom: 0.5rem; | |
| font-size: 1.55rem; | |
| } | |
| [data-testid="stChatMessage"] h3 { | |
| margin-top: 1rem; | |
| margin-bottom: 0.35rem; | |
| font-size: 1.2rem; | |
| } | |
| [data-testid="stChatMessage"] ul, | |
| [data-testid="stChatMessage"] ol { | |
| margin-bottom: 0.75rem; | |
| } | |
| [data-testid="stChatMessage"] code { | |
| white-space: pre-wrap; | |
| } | |
| .math-label { | |
| font-weight: 700; | |
| margin-top: 0.35rem; | |
| margin-bottom: 0.15rem; | |
| } | |
| </style> | |
| """, | |
| unsafe_allow_html=True, | |
| ) | |
| DISPLAY_MATH_RE = re.compile(r"\$\$(.+?)\$\$", re.DOTALL) | |
| def normalize_math_markdown(content: str) -> str: | |
| return ( | |
| content.replace("\\[", "$$") | |
| .replace("\\]", "$$") | |
| .replace("\\(", "$") | |
| .replace("\\)", "$") | |
| ) | |
| def render_assistant_content(content: str) -> None: | |
| normalized = normalize_math_markdown(content) | |
| parts = DISPLAY_MATH_RE.split(normalized) | |
| for index, part in enumerate(parts): | |
| if not part.strip(): | |
| continue | |
| if index % 2 == 0: | |
| st.markdown(part) | |
| else: | |
| st.latex(part.strip()) | |
| def clear_uploaded_context() -> None: | |
| st.session_state.uploaded_context = [] | |
| st.session_state.uploaded_file_summaries = [] | |
| st.session_state.upload_signature = None | |
| def file_signature(file_bytes: bytes) -> str: | |
| return hashlib.sha1(file_bytes).hexdigest() | |
| def file_suffix(file_name: str | None, fallback: str) -> str: | |
| if not file_name: | |
| return fallback | |
| suffix = Path(file_name).suffix.lower() | |
| return suffix or fallback | |
| def build_upload_signature(uploaded_files: list[object]) -> tuple[tuple[str, int, str], ...]: | |
| signature = [] | |
| for uploaded_file in uploaded_files: | |
| file_bytes = uploaded_file.getvalue() | |
| signature.append( | |
| ( | |
| uploaded_file.name, | |
| len(file_bytes), | |
| file_signature(file_bytes), | |
| ) | |
| ) | |
| return tuple(signature) | |
| def process_uploaded_files( | |
| uploaded_files: list[object], | |
| assistant: MathRAGAssistant, | |
| ) -> str | None: | |
| if not uploaded_files: | |
| if st.session_state.upload_signature is not None: | |
| clear_uploaded_context() | |
| return None | |
| signature = build_upload_signature(uploaded_files) | |
| if signature == st.session_state.upload_signature: | |
| return None | |
| settings = get_settings() | |
| all_chunks = [] | |
| summaries = [] | |
| for uploaded_file in uploaded_files: | |
| file_bytes = uploaded_file.getvalue() | |
| file_chunks = extract_chunks_from_uploaded_file( | |
| file_name=uploaded_file.name, | |
| file_bytes=file_bytes, | |
| chunk_size=settings.chunk_size, | |
| chunk_overlap=settings.chunk_overlap, | |
| ) | |
| if not file_chunks: | |
| raise ValueError(f"No readable text was found in {uploaded_file.name}.") | |
| all_chunks.extend(file_chunks) | |
| summaries.append({"name": uploaded_file.name, "chunks": len(file_chunks)}) | |
| st.session_state.uploaded_context = build_supplemental_chunks( | |
| all_chunks, | |
| ollama=assistant.ollama, | |
| model=settings.embed_model, | |
| ) | |
| st.session_state.uploaded_file_summaries = summaries | |
| st.session_state.upload_signature = signature | |
| return ( | |
| f"Loaded {len(uploaded_files)} file(s) into this chat session " | |
| f"across {len(st.session_state.uploaded_context)} searchable chunks." | |
| ) | |
| def format_source(source: dict[str, object]) -> str: | |
| chapter_number = int(source.get("chapter_number", -1)) | |
| chapter_name = str(source.get("chapter_name", "Not identified")) | |
| topic = str(source.get("topic", "Not identified")) | |
| page_number = int(source.get("page_number", 1)) | |
| source_file = str(source.get("source_file", "Unknown source")) | |
| if chapter_number > 0: | |
| return ( | |
| f"- Chapter {chapter_number}: {chapter_name} | " | |
| f"Topic: {topic} | Page: {page_number}" | |
| ) | |
| return f"- Uploaded file: {source_file} | Topic: {topic} | Page: {page_number}" | |
| def render_sidebar(assistant: MathRAGAssistant) -> None: | |
| settings = get_settings() | |
| st.sidebar.title("MathSutra 12") | |
| st.sidebar.caption("Class 12 Maths RAG Chatbot") | |
| st.sidebar.write(f"LLM model: `{settings.llm_model}`") | |
| st.sidebar.write(f"Embedding model: `{settings.embed_model}`") | |
| st.sidebar.write(f"Collection: `{settings.collection_name}`") | |
| if st.sidebar.button("Build / Refresh Knowledge Base", use_container_width=True): | |
| with st.sidebar.status("Running ingestion...", expanded=True): | |
| try: | |
| summary = run_ingestion(settings=settings, reset=True) | |
| st.sidebar.success( | |
| f"Ingested {summary['chunks_created']} chunks from {summary['pdf_count']} PDFs." | |
| ) | |
| except Exception as exc: | |
| st.sidebar.error(str(exc)) | |
| st.sidebar.markdown("### Add Reference Files") | |
| uploaded_files = st.sidebar.file_uploader( | |
| "Upload PDFs or notes for this chat", | |
| type=["pdf", "txt", "md"], | |
| accept_multiple_files=True, | |
| help="These files become extra study context for the current session.", | |
| ) | |
| if uploaded_files: | |
| signature = build_upload_signature(uploaded_files) | |
| if signature != st.session_state.upload_signature: | |
| with st.sidebar.status("Processing uploaded files...", expanded=False): | |
| try: | |
| message = process_uploaded_files(uploaded_files, assistant) | |
| if message: | |
| st.sidebar.success(message) | |
| except Exception as exc: | |
| clear_uploaded_context() | |
| st.sidebar.error(str(exc)) | |
| elif st.session_state.uploaded_file_summaries: | |
| st.sidebar.success( | |
| f"{len(st.session_state.uploaded_file_summaries)} uploaded file(s) ready for chat." | |
| ) | |
| else: | |
| clear_uploaded_context() | |
| if st.session_state.uploaded_file_summaries: | |
| st.sidebar.markdown("### Files In This Chat") | |
| for summary in st.session_state.uploaded_file_summaries: | |
| st.sidebar.write(f"- {summary['name']} ({summary['chunks']} chunks)") | |
| st.sidebar.markdown("### Demo tips") | |
| st.sidebar.markdown( | |
| "- Ask full questions.\n" | |
| "- Ask for step-by-step solutions.\n" | |
| "- Use the mic icon in the chat input to ask by voice.\n" | |
| "- Use the paperclip icon to upload a question image.\n" | |
| "- Upload a PDF, TXT, or MD file to add temporary context.\n" | |
| "- Answers are formatted in clean structured sections.\n" | |
| "- Ask which chapter and topic the question belongs to." | |
| ) | |
| def render_messages() -> None: | |
| for message in st.session_state.messages: | |
| avatar = "π§βπ" if message["role"] == "user" else "π€" | |
| with st.chat_message(message["role"], avatar=avatar): | |
| if message["role"] == "assistant": | |
| render_assistant_content(message["content"]) | |
| else: | |
| st.markdown(message["content"]) | |
| if message.get("sources"): | |
| with st.expander("π View Sources Used"): | |
| for source in message["sources"]: | |
| st.markdown(format_source(source)) | |
| def main() -> None: | |
| ensure_state() | |
| inject_styles() | |
| settings = get_settings() | |
| assistant = MathRAGAssistant(settings=settings) | |
| render_sidebar(assistant) | |
| st.title("β¨ MathSutra 12") | |
| st.subheader("Your Class 12 Mathematics Chapter-Aware Chatbot") | |
| st.write( | |
| "Ask any Class 12 mathematics question by typing, voice, or a question image. " | |
| "You can also upload PDFs or notes to give the chatbot extra context for the current session." | |
| ) | |
| render_messages() | |
| prompt = st.chat_input( | |
| "Ask a Class 12 maths question...", | |
| accept_file=True, | |
| file_type=["png", "jpg", "jpeg", "heic"], | |
| accept_audio=True, | |
| ) | |
| if not prompt: | |
| return | |
| if isinstance(prompt, str): | |
| question = prompt.strip() | |
| files = [] | |
| audio = None | |
| else: | |
| question = (getattr(prompt, "text", None) or "").strip() | |
| files = getattr(prompt, "files", []) | |
| audio = getattr(prompt, "audio", None) | |
| if audio is not None: | |
| with st.spinner("Transcribing your recording..."): | |
| audio_bytes = audio.getvalue() | |
| audio_suffix = file_suffix(getattr(audio, "name", ".wav"), ".wav") | |
| try: | |
| transcript = transcribe_audio_bytes(audio_bytes, suffix=audio_suffix) | |
| question = f"{question}\n\n[Voice transcript: {transcript}]" if question else transcript | |
| except Exception as exc: | |
| st.error(f"Voice transcription failed: {exc}") | |
| return | |
| if files: | |
| with st.spinner("Reading text from your question image..."): | |
| image_file = files[0] | |
| image_bytes = image_file.getvalue() | |
| image_suffix = file_suffix(getattr(image_file, "name", ".jpg"), ".jpg") | |
| try: | |
| extracted_text = extract_text_from_image_bytes(image_bytes, suffix=image_suffix) | |
| question = f"{question}\n\n[Image text: {extracted_text}]" if question else extracted_text | |
| except Exception as exc: | |
| st.error(f"Image OCR failed: {exc}") | |
| return | |
| if not question: | |
| st.warning("No question text could be extracted. Please try again.") | |
| return | |
| st.session_state.messages.append({"role": "user", "content": question}) | |
| with st.chat_message("user", avatar="π§βπ"): | |
| st.markdown(question) | |
| with st.chat_message("assistant", avatar="π€"): | |
| with st.spinner("Thinking with retrieved chapter content..."): | |
| try: | |
| result = assistant.answer( | |
| question, | |
| extra_chunks=st.session_state.uploaded_context, | |
| ) | |
| render_assistant_content(result["answer"]) | |
| with st.expander("π View Sources Used"): | |
| for source in result["sources"]: | |
| st.markdown(format_source(source)) | |
| except Exception as exc: | |
| result = {"answer": str(exc), "sources": []} | |
| st.error(result["answer"]) | |
| st.session_state.messages.append( | |
| { | |
| "role": "assistant", | |
| "content": result["answer"], | |
| "sources": result["sources"], | |
| } | |
| ) | |
| if __name__ == "__main__": | |
| main() | |