Spaces:
Build error
Build error
| # # import streamlit as st | |
| # # import torch | |
| # # from transformers import GPTNeoXForCausalLM, AutoTokenizer | |
| # # from sentence_transformers import SentenceTransformer | |
| # # import faiss | |
| # # import fitz # PyMuPDF | |
| # # from langchain_text_splitters import RecursiveCharacterTextSplitter | |
| # # # 1. Set page config FIRST | |
| # # st.set_page_config(page_title="π Smart Book Analyst", layout="wide") | |
| # # # Configuration | |
| # # MODEL_NAME = "ibm-granite/granite-3.1-1b-a400m-instruct" | |
| # # EMBED_MODEL = "sentence-transformers/all-mpnet-base-v2" | |
| # # DEVICE = "cuda" if torch.cuda.is_available() else "cpu" | |
| # # CHUNK_SIZE = 512 | |
| # # CHUNK_OVERLAP = 50 | |
| # # @st.cache_resource | |
| # # def load_models(): | |
| # # try: | |
| # # # Load Granite model | |
| # # tokenizer = AutoTokenizer.from_pretrained( | |
| # # MODEL_NAME, | |
| # # trust_remote_code=True | |
| # # ) | |
| # # model = GPTNeoXForCausalLM.from_pretrained( | |
| # # MODEL_NAME, | |
| # # device_map="auto" if DEVICE == "cuda" else None, | |
| # # torch_dtype=torch.float16 if DEVICE == "cuda" else torch.float32, | |
| # # trust_remote_code=True | |
| # # ).eval() | |
| # # # Load sentence transformer for embeddings | |
| # # embedder = SentenceTransformer(EMBED_MODEL, device=DEVICE) | |
| # # return tokenizer, model, embedder | |
| # # except Exception as e: | |
| # # st.error(f"Model loading failed: {str(e)}") | |
| # # st.stop() | |
| # # tokenizer, model, embedder = load_models() | |
| # # # Text processing | |
| # # def process_text(text): | |
| # # splitter = RecursiveCharacterTextSplitter( | |
| # # chunk_size=CHUNK_SIZE, | |
| # # chunk_overlap=CHUNK_OVERLAP, | |
| # # length_function=len | |
| # # ) | |
| # # return splitter.split_text(text) | |
| # # # PDF extraction | |
| # # def extract_pdf_text(uploaded_file): | |
| # # try: | |
| # # doc = fitz.open(stream=uploaded_file.read(), filetype="pdf") | |
| # # return "\n".join([page.get_text() for page in doc]) | |
| # # except Exception as e: | |
| # # st.error(f"PDF extraction error: {str(e)}") | |
| # # return "" | |
| # # # Summarization function | |
| # # def generate_summary(text): | |
| # # chunks = process_text(text)[:10] | |
| # # summaries = [] | |
| # # for chunk in chunks: | |
| # # prompt = f"""<|user|> | |
| # # Summarize this text section focusing on key themes, characters, and plot points: | |
| # # {chunk[:2000]} | |
| # # <|assistant|> | |
| # # """ | |
| # # inputs = tokenizer(prompt, return_tensors="pt").to(DEVICE) | |
| # # outputs = model.generate(**inputs, max_new_tokens=300, temperature=0.3) | |
| # # summaries.append(tokenizer.decode(outputs[0], skip_special_tokens=True)) | |
| # # combined = "\n".join(summaries) | |
| # # final_prompt = f"""<|user|> | |
| # # Combine these section summaries into a coherent book summary: | |
| # # {combined} | |
| # # <|assistant|> | |
| # # The comprehensive summary is:""" | |
| # # inputs = tokenizer(final_prompt, return_tensors="pt").to(DEVICE) | |
| # # outputs = model.generate(**inputs, max_new_tokens=500, temperature=0.5) | |
| # # return tokenizer.decode(outputs[0], skip_special_tokens=True).split(":")[-1].strip() | |
| # # # FAISS index creation | |
| # # def build_faiss_index(texts): | |
| # # embeddings = embedder.encode(texts, show_progress_bar=False) | |
| # # dimension = embeddings.shape[1] | |
| # # index = faiss.IndexFlatIP(dimension) | |
| # # faiss.normalize_L2(embeddings) | |
| # # index.add(embeddings) | |
| # # return index | |
| # # # Answer generation | |
| # # def generate_answer(query, context): | |
| # # prompt = f"""<|user|> | |
| # # Using this context: {context} | |
| # # Answer the question precisely and truthfully. If unsure, say "I don't know". | |
| # # Question: {query} | |
| # # <|assistant|> | |
| # # """ | |
| # # inputs = tokenizer(prompt, return_tensors="pt", max_length=1024, truncation=True).to(DEVICE) | |
| # # outputs = model.generate( | |
| # # **inputs, | |
| # # max_new_tokens=300, | |
| # # temperature=0.4, | |
| # # top_p=0.9, | |
| # # repetition_penalty=1.2, | |
| # # do_sample=True | |
| # # ) | |
| # # return tokenizer.decode(outputs[0], skip_special_tokens=True).split("<|assistant|>")[-1].strip() | |
| # # # Streamlit UI | |
| # # st.title("π AI-Powered Book Analysis System") | |
| # # uploaded_file = st.file_uploader("Upload book (PDF or TXT)", type=["pdf", "txt"]) | |
| # # if uploaded_file: | |
| # # with st.spinner("π Analyzing book content..."): | |
| # # try: | |
| # # if uploaded_file.type == "application/pdf": | |
| # # text = extract_pdf_text(uploaded_file) | |
| # # else: | |
| # # text = uploaded_file.read().decode() | |
| # # chunks = process_text(text) | |
| # # st.session_state.docs = chunks | |
| # # st.session_state.index = build_faiss_index(chunks) | |
| # # with st.expander("π Book Summary", expanded=True): | |
| # # summary = generate_summary(text) | |
| # # st.write(summary) | |
| # # except Exception as e: | |
| # # st.error(f"Processing failed: {str(e)}") | |
| # # if 'index' in st.session_state and st.session_state.index: | |
| # # query = st.text_input("Ask about the book:") | |
| # # if query: | |
| # # with st.spinner("π Searching for answers..."): | |
| # # try: | |
| # # query_embed = embedder.encode([query]) | |
| # # faiss.normalize_L2(query_embed) | |
| # # distances, indices = st.session_state.index.search(query_embed, k=3) | |
| # # context = "\n".join([st.session_state.docs[i] for i in indices[0]]) | |
| # # answer = generate_answer(query, context) | |
| # # st.subheader("Answer") | |
| # # st.markdown(f"```\n{answer}\n```") | |
| # # st.caption("Retrieved context confidence: {:.2f}".format(distances[0][0])) | |
| # # except Exception as e: | |
| # # st.error(f"Query failed: {str(e)}") | |
| # import streamlit as st | |
| # import torch | |
| # from transformers import GPTNeoXForCausalLM, AutoTokenizer | |
| # from sentence_transformers import SentenceTransformer | |
| # import faiss | |
| # import fitz | |
| # from langchain_text_splitters import RecursiveCharacterTextSplitter | |
| # # Set page config FIRST | |
| # st.set_page_config(page_title="π Smart Book Analyst", layout="wide") | |
| # # Configuration | |
| # MODEL_NAME = "ibm-granite/granite-3.1-1b-a400m-instruct" | |
| # EMBED_MODEL = "sentence-transformers/all-mpnet-base-v2" | |
| # DEVICE = "cuda" if torch.cuda.is_available() else "cpu" | |
| # CHUNK_SIZE = 1024 # Increased chunk size for better performance | |
| # CHUNK_OVERLAP = 100 | |
| # MAX_SUMMARY_CHUNKS = 5 # Reduced from 10 to 5 for faster processing | |
| # @st.cache_resource | |
| # def load_models(): | |
| # try: | |
| # # Load model with optimized settings | |
| # tokenizer = AutoTokenizer.from_pretrained( | |
| # MODEL_NAME, | |
| # trust_remote_code=True | |
| # ) | |
| # model = GPTNeoXForCausalLM.from_pretrained( | |
| # MODEL_NAME, | |
| # device_map="auto", | |
| # torch_dtype=torch.float16 if DEVICE == "cuda" else torch.float32, | |
| # trust_remote_code=True, | |
| # low_cpu_mem_usage=True | |
| # ).eval() | |
| # # Load embedder with faster model | |
| # embedder = SentenceTransformer(EMBED_MODEL, device=DEVICE) | |
| # embedder.max_seq_length = 256 # Reduce embedding dimension | |
| # return tokenizer, model, embedder | |
| # except Exception as e: | |
| # st.error(f"Model loading failed: {str(e)}") | |
| # st.stop() | |
| # tokenizer, model, embedder = load_models() | |
| # def process_text(text): | |
| # splitter = RecursiveCharacterTextSplitter( | |
| # chunk_size=CHUNK_SIZE, | |
| # chunk_overlap=CHUNK_OVERLAP, | |
| # length_function=len | |
| # ) | |
| # return splitter.split_text(text) | |
| # def extract_pdf_text(uploaded_file): | |
| # try: | |
| # doc = fitz.open(stream=uploaded_file.read(), filetype="pdf") | |
| # return "\n".join(page.get_text() for page in doc) | |
| # except Exception as e: | |
| # st.error(f"PDF extraction error: {str(e)}") | |
| # return "" | |
| # def generate_summary(text): | |
| # chunks = process_text(text)[:MAX_SUMMARY_CHUNKS] | |
| # if not chunks: | |
| # return "No meaningful content found." | |
| # progress_bar = st.progress(0) | |
| # summaries = [] | |
| # for i, chunk in enumerate(chunks): | |
| # progress_bar.progress((i+1)/len(chunks), text=f"Processing chunk {i+1}/{len(chunks)}...") | |
| # prompt = f"""<|user|> | |
| # Summarize key points in 2 sentences: | |
| # {chunk[:1500]} | |
| # <|assistant|> | |
| # """ | |
| # inputs = tokenizer(prompt, return_tensors="pt").to(DEVICE) | |
| # outputs = model.generate( | |
| # **inputs, | |
| # max_new_tokens=150, | |
| # temperature=0.2, | |
| # do_sample=False # Disable sampling for faster generation | |
| # ) | |
| # summaries.append(tokenizer.decode(outputs[0], skip_special_tokens=True)) | |
| # combined = "\n".join(summaries) | |
| # final_prompt = f"""<|user|> | |
| # Combine these into a concise summary (3-5 paragraphs): | |
| # {combined} | |
| # <|assistant|> | |
| # Summary:""" | |
| # inputs = tokenizer(final_prompt, return_tensors="pt").to(DEVICE) | |
| # outputs = model.generate( | |
| # **inputs, | |
| # max_new_tokens=300, | |
| # temperature=0.3, | |
| # do_sample=False | |
| # ) | |
| # return tokenizer.decode(outputs[0], skip_special_tokens=True).split("Summary:")[-1].strip() | |
| # def build_faiss_index(texts): | |
| # embeddings = embedder.encode(texts, show_progress_bar=False, batch_size=32) | |
| # dimension = embeddings.shape[1] | |
| # index = faiss.IndexFlatIP(dimension) | |
| # faiss.normalize_L2(embeddings) | |
| # index.add(embeddings) | |
| # return index | |
| # def generate_answer(query, context): | |
| # prompt = f"""<|user|> | |
| # Context: {context[:2000]} | |
| # Q: {query} | |
| # A:""" | |
| # inputs = tokenizer(prompt, return_tensors="pt", max_length=1024, truncation=True).to(DEVICE) | |
| # outputs = model.generate( | |
| # **inputs, | |
| # max_new_tokens=200, | |
| # temperature=0.3, | |
| # top_p=0.85, | |
| # repetition_penalty=1.1, | |
| # do_sample=True | |
| # ) | |
| # return tokenizer.decode(outputs[0], skip_special_tokens=True).split("A:")[-1].strip() | |
| # # Streamlit UI | |
| # st.title("π AI-Powered Book Analysis System") | |
| # uploaded_file = st.file_uploader("Upload book (PDF or TXT)", type=["pdf", "txt"]) | |
| # if uploaded_file: | |
| # with st.spinner("π Analyzing book content..."): | |
| # try: | |
| # if uploaded_file.type == "application/pdf": | |
| # text = extract_pdf_text(uploaded_file) | |
| # else: | |
| # text = uploaded_file.read().decode() | |
| # if not text.strip(): | |
| # st.error("Uploaded file appears to be empty") | |
| # st.stop() | |
| # chunks = process_text(text) | |
| # st.session_state.docs = chunks | |
| # st.session_state.index = build_faiss_index(chunks) | |
| # with st.expander("π Book Summary", expanded=True): | |
| # summary = generate_summary(text) | |
| # st.write(summary) | |
| # except Exception as e: | |
| # st.error(f"Processing failed: {str(e)}") | |
| # if 'index' in st.session_state and st.session_state.index: | |
| # query = st.text_input("Ask about the book:") | |
| # if query: | |
| # with st.spinner("π Searching for answers..."): | |
| # try: | |
| # query_embed = embedder.encode([query]) | |
| # faiss.normalize_L2(query_embed) | |
| # distances, indices = st.session_state.index.search(query_embed, k=2) | |
| # context = "\n".join([st.session_state.docs[i] for i in indices[0]]) | |
| # answer = generate_answer(query, context) | |
| # st.subheader("Answer") | |
| # st.markdown(f"```\n{answer}\n```") | |
| # st.caption(f"Confidence: {distances[0][0]:.2f}") | |
| # except Exception as e: | |
| # st.error(f"Query failed: {str(e)}") | |
| import streamlit as st | |
| import torch | |
| from transformers import GPTNeoXForCausalLM, AutoTokenizer | |
| from sentence_transformers import SentenceTransformer | |
| import faiss | |
| import fitz | |
| from langchain_text_splitters import RecursiveCharacterTextSplitter | |
| # Set page config first | |
| st.set_page_config(page_title="π Smart Book Analyst", layout="wide") | |
| # Configuration | |
| MODEL_NAME = "ibm-granite/granite-3.1-1b-a400m-instruct" | |
| EMBED_MODEL = "sentence-transformers/all-mpnet-base-v2" | |
| DEVICE = "cuda" if torch.cuda.is_available() else "cpu" | |
| CHUNK_SIZE = 1024 | |
| CHUNK_OVERLAP = 100 | |
| MAX_SUMMARY_CHUNKS = 5 | |
| def load_models(): | |
| try: | |
| # Load model with correct tokenizer mapping | |
| tokenizer = AutoTokenizer.from_pretrained( | |
| MODEL_NAME, | |
| trust_remote_code=True, | |
| padding_side="left" # Crucial for generation quality | |
| ) | |
| tokenizer.pad_token = tokenizer.eos_token | |
| model = GPTNeoXForCausalLM.from_pretrained( | |
| MODEL_NAME, | |
| device_map="auto", | |
| torch_dtype=torch.float16 if DEVICE == "cuda" else torch.float32, | |
| trust_remote_code=True, | |
| low_cpu_mem_usage=True | |
| ).eval() | |
| # Configure embedder properly | |
| embedder = SentenceTransformer(EMBED_MODEL, device=DEVICE) | |
| embedder.max_seq_length = 512 | |
| return tokenizer, model, embedder | |
| except Exception as e: | |
| st.error(f"Model loading failed: {str(e)}") | |
| st.stop() | |
| tokenizer, model, embedder = load_models() | |
| def process_text(text): | |
| splitter = RecursiveCharacterTextSplitter( | |
| chunk_size=CHUNK_SIZE, | |
| chunk_overlap=CHUNK_OVERLAP, | |
| length_function=len | |
| ) | |
| return splitter.split_text(text) | |
| def extract_pdf_text(uploaded_file): | |
| try: | |
| doc = fitz.open(stream=uploaded_file.read(), filetype="pdf") | |
| return "\n".join(page.get_text() for page in doc) | |
| except Exception as e: | |
| st.error(f"PDF extraction error: {str(e)}") | |
| return "" | |
| def generate_summary(text): | |
| chunks = process_text(text)[:MAX_SUMMARY_CHUNKS] | |
| if not chunks: | |
| return "No meaningful content found." | |
| progress_bar = st.progress(0) | |
| summaries = [] | |
| for i, chunk in enumerate(chunks): | |
| # Proper progress text formatting | |
| progress_bar.progress((i+1)/len(chunks), | |
| text=f"Processing section {i+1}/{len(chunks)}...") | |
| prompt = f"""<|user|> | |
| Summarize the key points from this text section in 3 bullet points: | |
| {chunk[:1500]} | |
| <|assistant|> | |
| """ | |
| inputs = tokenizer( | |
| prompt, | |
| return_tensors="pt", | |
| max_length=1024, | |
| truncation=True | |
| ).to(DEVICE) | |
| outputs = model.generate( | |
| **inputs, | |
| max_new_tokens=200, | |
| temperature=0.3, | |
| top_p=0.9, | |
| repetition_penalty=1.1, | |
| do_sample=True, | |
| pad_token_id=tokenizer.eos_token_id # Critical fix | |
| ) | |
| decoded = tokenizer.decode( | |
| outputs[0], | |
| skip_special_tokens=True | |
| ).split("<|assistant|>")[-1].strip() | |
| summaries.append(decoded) | |
| combined = "\n\n".join(summaries) | |
| final_prompt = f"""<|user|> | |
| Combine these bullet points into a coherent 3-paragraph summary: | |
| {combined} | |
| <|assistant|> | |
| Here is the comprehensive summary:""" | |
| inputs = tokenizer(final_prompt, return_tensors="pt").to(DEVICE) | |
| outputs = model.generate( | |
| **inputs, | |
| max_new_tokens=400, | |
| temperature=0.5, | |
| top_p=0.9, | |
| repetition_penalty=1.1, | |
| do_sample=True, | |
| pad_token_id=tokenizer.eos_token_id | |
| ) | |
| return tokenizer.decode( | |
| outputs[0], | |
| skip_special_tokens=True | |
| ).split("Here is the comprehensive summary:")[-1].strip() | |
| def build_faiss_index(texts): | |
| embeddings = embedder.encode( | |
| texts, | |
| show_progress_bar=False, | |
| batch_size=16, | |
| convert_to_tensor=True | |
| ).cpu().numpy() | |
| dimension = embeddings.shape[1] | |
| index = faiss.IndexFlatIP(dimension) | |
| faiss.normalize_L2(embeddings) | |
| index.add(embeddings) | |
| return index | |
| def generate_answer(query, context): | |
| prompt = f"""<|user|> | |
| Based on this context: | |
| {context[:2000]} | |
| Answer this question concisely: {query} | |
| <|assistant|> | |
| """ | |
| inputs = tokenizer( | |
| prompt, | |
| return_tensors="pt", | |
| max_length=1024, | |
| truncation=True | |
| ).to(DEVICE) | |
| outputs = model.generate( | |
| **inputs, | |
| max_new_tokens=300, | |
| temperature=0.4, | |
| top_p=0.95, | |
| repetition_penalty=1.15, | |
| do_sample=True, | |
| pad_token_id=tokenizer.eos_token_id, | |
| no_repeat_ngram_size=3 # Prevent repetition | |
| ) | |
| return tokenizer.decode( | |
| outputs[0], | |
| skip_special_tokens=True | |
| ).split("<|assistant|>")[-1].strip() | |
| # Streamlit UI | |
| st.title("π AI-Powered Book Analysis System") | |
| uploaded_file = st.file_uploader("Upload book (PDF or TXT)", type=["pdf", "txt"]) | |
| if uploaded_file: | |
| with st.spinner("π Analyzing book content..."): | |
| try: | |
| if uploaded_file.type == "application/pdf": | |
| text = extract_pdf_text(uploaded_file) | |
| else: | |
| text = uploaded_file.read().decode() | |
| if not text.strip(): | |
| st.error("Uploaded file is empty") | |
| st.stop() | |
| chunks = process_text(text) | |
| st.session_state.docs = chunks | |
| st.session_state.index = build_faiss_index(chunks) | |
| with st.expander("π Book Summary", expanded=True): | |
| summary = generate_summary(text) | |
| st.write(summary) | |
| except Exception as e: | |
| st.error(f"Processing failed: {str(e)}") | |
| if 'index' in st.session_state and st.session_state.index: | |
| query = st.text_input("Ask about the book:") | |
| if query: | |
| with st.spinner("π Searching for answers..."): | |
| try: | |
| query_embed = embedder.encode([query]) | |
| faiss.normalize_L2(query_embed) | |
| distances, indices = st.session_state.index.search(query_embed, k=3) | |
| context = "\n".join([st.session_state.docs[i] for i in indices[0]]) | |
| answer = generate_answer(query, context) | |
| st.subheader("Answer") | |
| st.markdown(f"```\n{answer}\n```") | |
| st.caption(f"Confidence score: {distances[0][0]:.2f}") | |
| except Exception as e: | |
| st.error(f"Query failed: {str(e)}") |