Spaces:
Runtime error
Runtime error
| import os | |
| import io | |
| import re | |
| from typing import List, Tuple | |
| import streamlit as st | |
| from transformers import pipeline | |
| import PyPDF2 | |
| import nltk | |
| def download_nltk(): | |
| nltk.download("punkt", quiet=True) | |
| download_nltk() | |
| with st.sidebar: | |
| st.image( | |
| "https://raw.githubusercontent.com/Runtimepirate/About_me/main/Profile_pic.jpg", | |
| width=200, | |
| ) | |
| st.markdown( | |
| "## **Mr. Aditya Katariya [[Resume](https://drive.google.com/file/d/1Vq9-H1dl5Kky2ugXPIbnPvJ72EEkTROY/view?usp=drive_link)]**" | |
| ) | |
| st.markdown(" *College - Noida Institute of Engineering and Technology, U.P*") | |
| st.markdown("----") | |
| st.markdown("## Contact Details:-") | |
| st.markdown("📫 *[Prasaritation@gmail.com](mailto:Prasaritation@gmail.com)*") | |
| st.markdown("💼 *[LinkedIn](https://www.linkedin.com/in/adityakatariya/)*") | |
| st.markdown("💻 *[GitHub](https://github.com/Runtimepirate)*") | |
| st.markdown("----") | |
| st.markdown("**AI & ML Enthusiast**") | |
| st.markdown( | |
| "Passionate about solving real-world problems using data science and customer analytics. Always learning and building smart, scalable AI solutions." | |
| ) | |
| st.markdown("----") | |
| mode = st.radio("Choose Mode:", ["Ask Anything", "Challenge Me"], key="mode") | |
| st.title("📚 Document‑Aware Assistant") | |
| st.markdown( | |
| """ | |
| This assistant **reads your uploaded PDF or TXT document**, produces a *≤150‑word* summary, answers your questions with paragraph‑level justification, **generates logic‑based questions**, and evaluates your responses. | |
| """ | |
| ) | |
| def load_models(): | |
| """Load all required Hugging Face pipelines once and reuse.""" | |
| summarizer = pipeline( | |
| "summarization", | |
| model="facebook/bart-large-cnn", | |
| device_map="auto", | |
| ) | |
| qa = pipeline( | |
| "question-answering", | |
| model="deepset/roberta-base-squad2", | |
| device_map="auto", | |
| ) | |
| qg = pipeline( | |
| "text2text-generation", | |
| model="valhalla/t5-small-qg-hl", | |
| device_map="auto", | |
| max_length=64, | |
| ) | |
| return summarizer, qa, qg | |
| summarizer, qa_pipeline, qg_pipeline = load_models() | |
| def extract_text_from_pdf(uploaded_file: io.BytesIO) -> str: | |
| reader = PyPDF2.PdfReader(uploaded_file) | |
| text = "" | |
| for page in reader.pages: | |
| page_text = page.extract_text() | |
| if page_text: | |
| text += page_text + "\n" | |
| return text | |
| def extract_text(uploaded_file) -> str: | |
| if uploaded_file.name.lower().endswith(".pdf"): | |
| return extract_text_from_pdf(uploaded_file) | |
| elif uploaded_file.name.lower().endswith(".txt"): | |
| return uploaded_file.read().decode("utf-8", errors="ignore") | |
| return "" | |
| def chunk_text(text: str, max_tokens: int = 450) -> List[str]: | |
| """Split text into roughly max_tokens‑sized chunks using sentences.""" | |
| sentences = nltk.sent_tokenize(text) | |
| chunks: List[str] = [] | |
| current: List[str] = [] | |
| token_count = 0 | |
| for sent in sentences: | |
| num_tokens = len(sent.split()) | |
| if token_count + num_tokens > max_tokens and current: | |
| chunks.append(" ".join(current)) | |
| current = [] | |
| token_count = 0 | |
| current.append(sent) | |
| token_count += num_tokens | |
| if current: | |
| chunks.append(" ".join(current)) | |
| return chunks | |
| def get_best_answer(question: str, chunks: List[str]) -> Tuple[str, int, int, float, str]: | |
| """Run QA over chunks, return best answer with its score and context chunk.""" | |
| best = {"score": -float("inf")} | |
| for chunk in chunks: | |
| try: | |
| answer = qa_pipeline(question=question, context=chunk) | |
| if answer["score"] > best["score"] and answer["answer"].strip(): | |
| best = { | |
| "answer": answer["answer"], | |
| "score": answer["score"], | |
| "start": answer["start"], | |
| "end": answer["end"], | |
| "context": chunk, | |
| } | |
| except Exception: | |
| continue | |
| if best["score"] == -float("inf"): | |
| return "", 0, 0, 0.0, "" | |
| return ( | |
| best["answer"], | |
| best["start"], | |
| best["end"], | |
| best["score"], | |
| best["context"], | |
| ) | |
| def highlight_answer(context: str, start: int, end: int) -> str: | |
| """Return context with the answer wrapped in **bold** for display.""" | |
| return ( | |
| context[:start] | |
| + " **" | |
| + context[start:end] | |
| + "** " | |
| + context[end:] | |
| ) | |
| def generate_logic_questions(text: str, num_q: int = 3) -> List[str]: | |
| """Generate num_q questions from the document using QG pipeline.""" | |
| sentences = nltk.sent_tokenize(text) | |
| questions: List[str] = [] | |
| for sent in sentences: | |
| if len(questions) >= num_q: | |
| break | |
| hl_text = f"<hl> {sent} <hl> " | |
| try: | |
| q = qg_pipeline(hl_text, do_sample=False, max_length=64)[0]["generated_text"] | |
| q = q.strip().rstrip("?.!") + "?" | |
| if q not in questions: | |
| questions.append(q) | |
| except Exception: | |
| continue | |
| default_q = [ | |
| "What is the main topic of the document?", | |
| "Summarize the methodology described.", | |
| "What are the key findings or conclusions?", | |
| ] | |
| while len(questions) < num_q: | |
| questions.append(default_q[len(questions)]) | |
| return questions | |
| os.environ["TOKENIZERS_PARALLELISM"] = "false" | |
| uploaded = st.file_uploader("Upload PDF or TXT Document", type=["pdf", "txt"], key="uploader") | |
| if uploaded: | |
| doc_text = extract_text(uploaded) | |
| st.session_state["doc_text"] = doc_text | |
| st.subheader("🔎 Auto Summary (≤ 150 words)") | |
| try: | |
| summary = summarizer( | |
| doc_text[:4096], | |
| max_length=150, | |
| min_length=30, | |
| do_sample=False, | |
| )[0]["summary_text"] | |
| st.write(summary) | |
| except Exception as e: | |
| st.error(f"Summarization failed: {e}") | |
| if "chunks" not in st.session_state: | |
| st.session_state["chunks"] = chunk_text(doc_text) | |
| if mode == "Ask Anything": | |
| st.subheader("💬 Ask Anything") | |
| question = st.text_input("Ask a question about the document:", key="user_question") | |
| if st.button("Submit Question", key="submit_question") and question: | |
| with st.spinner("Finding answer..."): | |
| ans, start, end, score, context = get_best_answer( | |
| question, st.session_state["chunks"] | |
| ) | |
| if ans: | |
| st.markdown(f"**Answer:** {ans}") | |
| justification = highlight_answer(context, start, end) | |
| st.caption(f"Justification: …{justification[:300]}…") | |
| st.caption( | |
| f"Confidence Score: {score:.3f} | Paragraph tokens: {len(context.split())}" | |
| ) | |
| else: | |
| st.warning("Sorry, I couldn't find an answer in the document.") | |
| elif mode == "Challenge Me": | |
| st.subheader("🎯 Challenge Me") | |
| if "logic_questions" not in st.session_state: | |
| st.session_state["logic_questions"] = generate_logic_questions(doc_text) | |
| st.session_state["user_answers"] = ["" for _ in st.session_state["logic_questions"]] | |
| for idx, q in enumerate(st.session_state["logic_questions"]): | |
| st.text_input(f"Q{idx+1}: {q}", key=f"logic_q_{idx}") | |
| if st.button("Submit Answers", key="submit_logic"): | |
| st.markdown("----") | |
| for idx, q in enumerate(st.session_state["logic_questions"]): | |
| user_ans = st.session_state.get(f"logic_q_{idx}", "").strip() | |
| correct, start, end, score, context = get_best_answer( | |
| q, st.session_state["chunks"] | |
| ) | |
| st.markdown(f"**Q{idx+1} Evaluation:**") | |
| st.write(f"*Your Answer*: {user_ans or '—'}") | |
| st.write(f"*Expected Answer*: {correct or 'Not found in document'}") | |
| if correct: | |
| justification = highlight_answer(context, start, end) | |
| st.caption(f"Justification: …{justification[:300]}…") | |
| st.caption(f"Confidence Score: {score:.3f}") | |
| st.markdown("----") | |
| else: | |
| st.info("Please upload a PDF or TXT document to begin.") |