import streamlit as st import logging import os from io import BytesIO import pdfplumber from langchain.text_splitter import RecursiveCharacterTextSplitter from langchain_community.vectorstores import FAISS from sentence_transformers import SentenceTransformer from transformers import pipeline import re # Setup logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) # ----------- Load Models ----------- @st.cache_resource(ttl=1800) def load_embeddings_model(): try: return SentenceTransformer("all-MiniLM-L12-v2") except Exception as e: st.error(f"Embedding model error: {str(e)}") return None @st.cache_resource(ttl=1800) def load_qa_pipeline(): try: return pipeline("text2text-generation", model="google/flan-t5-small", max_length=300) except Exception as e: st.error(f"QA model error: {str(e)}") return None @st.cache_resource(ttl=1800) def load_summary_pipeline(): try: return pipeline("summarization", model="sshleifer/distilbart-cnn-6-6", max_length=150) except Exception as e: st.error(f"Summary model error: {str(e)}") return None # ----------- PDF Processing ----------- def process_pdf(uploaded_file): text = "" code_blocks = [] try: with pdfplumber.open(BytesIO(uploaded_file.read())) as pdf: for page in pdf.pages[:20]: extracted = page.extract_text(layout=False) if extracted: text += extracted + "\n" for char in page.chars: if 'fontname' in char and 'mono' in char['fontname'].lower(): code_blocks.append(char['text']) code_text_page = page.extract_text() or "" code_matches = re.finditer(r'(^\s{2,}.*?(?:\n\s{2,}.*?)*)', code_text_page, re.MULTILINE) for match in code_matches: code_blocks.append(match.group().strip()) tables = page.extract_tables() if tables: for table in tables: text += "\n".join([" | ".join(map(str, row)) for row in table if row]) + "\n" code_text = "\n".join(code_blocks).strip() text_splitter = RecursiveCharacterTextSplitter( chunk_size=500, chunk_overlap=100, separators=["\n\n", "\n", ".", " "] ) text_chunks = text_splitter.split_text(text)[:50] code_chunks = text_splitter.split_text(code_text)[:25] if code_text else [] embeddings_model = load_embeddings_model() if not embeddings_model: return None, None, text, code_text text_vectors = [embeddings_model.encode(chunk) for chunk in text_chunks] code_vectors = [embeddings_model.encode(chunk) for chunk in code_chunks] text_vector_store = FAISS.from_embeddings(zip(text_chunks, text_vectors), embeddings_model.encode) if text_chunks else None code_vector_store = FAISS.from_embeddings(zip(code_chunks, code_vectors), embeddings_model.encode) if code_chunks else None return text_vector_store, code_vector_store, text, code_text except Exception as e: st.error(f"PDF error: {str(e)}") return None, None, "", "" # ----------- Preload Dataset ----------- def preload_dataset(): dataset_path = "data" combined_text = "" combined_code = "" text_vector_store = None code_vector_store = None if not os.path.exists(dataset_path): return text_vector_store, code_vector_store, combined_text, combined_code embeddings_model = load_embeddings_model() if not embeddings_model: return text_vector_store, code_vector_store, combined_text, combined_code all_text_chunks = [] all_text_vectors = [] all_code_chunks = [] all_code_vectors = [] for file_name in os.listdir(dataset_path): file_path = os.path.join(dataset_path, file_name) if file_name.lower().endswith(".pdf"): with open(file_path, "rb") as f: t_store, c_store, t_text, c_text = process_pdf(f) combined_text += t_text + "\n" combined_code += c_text + "\n" if t_store: for chunk in t_store.index_to_docstore().values(): all_text_chunks.append(chunk) all_text_vectors.append(embeddings_model.encode(chunk)) if c_store: for chunk in c_store.index_to_docstore().values(): all_code_chunks.append(chunk) all_code_vectors.append(embeddings_model.encode(chunk)) elif file_name.lower().endswith(".txt"): with open(file_path, "r", encoding="utf-8") as f: text_content = f.read() combined_text += text_content + "\n" chunks = text_content.split("\n\n") for chunk in chunks: all_text_chunks.append(chunk) all_text_vectors.append(embeddings_model.encode(chunk)) if all_text_chunks: text_vector_store = FAISS.from_embeddings(zip(all_text_chunks, all_text_vectors), embeddings_model.encode) if all_code_chunks: code_vector_store = FAISS.from_embeddings(zip(all_code_chunks, all_code_vectors), embeddings_model.encode) return text_vector_store, code_vector_store, combined_text, combined_code # ----------- Streamlit UI ----------- st.set_page_config(page_title="Smart PDF Q&A", page_icon="📄", layout="wide") # Fixed CSS for chat colors st.markdown(""" """, unsafe_allow_html=True) st.markdown('