Spaces:
Sleeping
Sleeping
| import streamlit as st | |
| import faiss | |
| import numpy as np | |
| from transformers import pipeline, M2M100ForConditionalGeneration, M2M100Tokenizer | |
| from sentence_transformers import SentenceTransformer | |
| from docx import Document | |
| import PyPDF2 # Use PyPDF2 instead of PyMuPDF | |
| import requests | |
| from bs4 import BeautifulSoup | |
| from langdetect import detect, LangDetectException | |
| # Initialize models and pipeline | |
| qa_pipeline = pipeline("question-answering", model="distilbert-base-uncased") | |
| embedding_model = SentenceTransformer('distiluse-base-multilingual-cased-v1') | |
| # FAISS index setup (in-memory) | |
| dimension = 512 # Size of the embeddings | |
| index = faiss.IndexFlatL2(dimension) | |
| documents = [] | |
| # Initialize translation model for on-the-fly translation | |
| tokenizer = M2M100Tokenizer.from_pretrained("facebook/m2m100_418M") | |
| model = M2M100ForConditionalGeneration.from_pretrained("facebook/m2m100_418M") | |
| def translate_text(text, src_lang, tgt_lang): | |
| """ Translate text using the M2M100 model. """ | |
| tokenizer.src_lang = src_lang | |
| encoded = tokenizer(text, return_tensors="pt") | |
| generated_tokens = model.generate(**encoded, forced_bos_token_id=tokenizer.get_lang_id(tgt_lang)) | |
| return tokenizer.batch_decode(generated_tokens, skip_special_tokens=True)[0] | |
| # Sidebar for navigation | |
| st.sidebar.title("Navigation") | |
| page = st.sidebar.radio("Go to", ["Upload Knowledge", "Q&A"]) | |
| # Page 1: Knowledge Upload | |
| if page == "Upload Knowledge": | |
| st.title("Upload Knowledge Base") | |
| uploaded_files = st.file_uploader("Upload your files (DOCX, PDF)", type=["pdf", "docx"], accept_multiple_files=True) | |
| url = st.text_input("Or enter a website URL to scrape") | |
| if uploaded_files or url: | |
| st.write("Processing your data...") | |
| texts = [] | |
| # Process uploaded files | |
| for file in uploaded_files: | |
| try: | |
| if file.type == "application/pdf": | |
| pdf_reader = PyPDF2.PdfReader(file) # Use PyPDF2 for PDF reading | |
| text = "" | |
| for page in pdf_reader.pages: | |
| text += page.extract_text() | |
| elif file.type == "application/vnd.openxmlformats-officedocument.wordprocessingml.document": | |
| doc = Document(file) | |
| text = " ".join([para.text for para in doc.paragraphs]) | |
| else: | |
| st.error(f"Unsupported file type: {file.type}") | |
| continue | |
| # Language detection | |
| try: | |
| detected_lang = detect(text) | |
| st.write(f"Detected language: {detected_lang}") | |
| except LangDetectException: | |
| st.error("Could not detect the language of the text.") | |
| continue | |
| # Generate embeddings | |
| embedding = embedding_model.encode([text])[0] | |
| # Add the embedding to FAISS index | |
| index.add(np.array([embedding], dtype=np.float32)) | |
| documents.append(text) | |
| texts.append(text) | |
| except Exception as e: | |
| st.error(f"Error processing file: {e}") | |
| # Process URL | |
| if url: | |
| try: | |
| response = requests.get(url) | |
| soup = BeautifulSoup(response.text, 'html.parser') | |
| text = soup.get_text() | |
| try: | |
| detected_lang = detect(text) | |
| st.write(f"Detected language: {detected_lang}") | |
| except LangDetectException: | |
| st.error("Could not detect the language of the webpage.") | |
| url = None # Set URL to None or skip to prevent further processing | |
| if url: # Continue only if URL processing is valid | |
| # Generate embedding | |
| embedding = embedding_model.encode([text])[0] | |
| # Add the embedding to FAISS index | |
| index.add(np.array([embedding], dtype=np.float32)) | |
| documents.append(text) | |
| texts.append(text) | |
| except Exception as e: | |
| st.error(f"Error processing URL: {e}") | |
| st.write("Data processed and added to knowledge base!") | |
| # Provide a summary of the uploaded content | |
| for i, text in enumerate(texts): | |
| st.write(f"Summary of Document {i+1}:") | |
| st.write(text[:500] + "...") # Display first 500 characters as a summary | |
| # Page 2: Q&A Interface | |
| elif page == "Q&A": | |
| st.title("Ask the Knowledge Base") | |
| user_query = st.text_input("Enter your query:") | |
| if user_query: | |
| try: | |
| detected_query_lang = detect(user_query) | |
| # Translate the query if it's in a different language than the knowledge base | |
| if detected_query_lang != "en": | |
| st.write(f"Translating query from {detected_query_lang} to English") | |
| user_query = translate_text(user_query, detected_query_lang, "en") | |
| query_embedding = embedding_model.encode([user_query]) | |
| D, I = index.search(np.array(query_embedding, dtype=np.float32), k=5) # Retrieve top 5 documents | |
| context = " ".join([documents[i] for i in I[0]]) | |
| # Pass translated query and context to the QA pipeline | |
| result = qa_pipeline(question=user_query, context=context) | |
| st.write(f"Answer: {result['answer']}") | |
| except LangDetectException: | |
| st.error("Could not detect the language of the query.") | |
| except Exception as e: | |
| st.error(f"Error during Q&A processing: {e}") | |