Spaces:
Sleeping
Sleeping
| import chromadb | |
| import streamlit as st | |
| import fitz | |
| import os | |
| from chromadb.utils import embedding_functions | |
| from text_processing import lines_chunking, paragraphs_chunking | |
| def get_chroma_client(): | |
| """ | |
| Get an ephemeral ChromaDB client for session-based RAG. | |
| Data is automatically deleted when user closes browser/session ends. | |
| """ | |
| return chromadb.EphemeralClient() | |
| #@st.cache_resource | |
| def initialize_chroma_client(): | |
| """ | |
| Initialize ChromaDB client and store in Streamlit's resource cache. | |
| This ensures one client per Streamlit session. | |
| """ | |
| return get_chroma_client() | |
| #@st.cache_resource | |
| def initialize_chromadb(embedding_model): | |
| """ | |
| Initialize ChromaDB client and embedding function. | |
| Both are cached to avoid recreating on every rerun. | |
| """ | |
| # Get the cached client | |
| client = initialize_chroma_client() | |
| # Initialize an embedding function (using a Sentence Transformer model) | |
| embedding_func = embedding_functions.SentenceTransformerEmbeddingFunction( | |
| model_name=embedding_model | |
| ) | |
| return client, embedding_func | |
| def initialize_collection(client, embedding_func, collection_name): | |
| """ | |
| Initialize a collection in ChromaDB. | |
| """ | |
| collection = client.get_or_create_collection( | |
| name=collection_name, | |
| embedding_function=embedding_func, | |
| metadata={"hnsw:space": "cosine"}, | |
| ) | |
| return collection | |
| def update_collection(collection, files_to_add_to_collection): | |
| """ | |
| Update collection with new uploaded files. | |
| Returns updated collection and session state. | |
| """ | |
| for file_to_add in files_to_add_to_collection: | |
| current_file = next( | |
| (file for file in st.session_state.get('uploaded_files_raw', []) | |
| if file.name == file_to_add),None) | |
| if current_file is None: | |
| st.error(f"File '{file_to_add}' not found in uploaded files.") | |
| continue | |
| # Read file content | |
| try: | |
| if current_file.type == "text/plain": # Handling TXT files | |
| file_text = current_file.getvalue().decode("utf-8") | |
| elif current_file.type == "application/pdf": # Handling PDFs | |
| with fitz.open(stream=current_file.getvalue(), filetype="pdf") as pdf_document: | |
| file_text = "\n".join([page.get_text("text") for page in pdf_document]) | |
| else: | |
| st.warning(f"Unsupported file type: {current_file.name} type:{current_file.type}") | |
| continue | |
| # Tokenize text into chunks | |
| max_words = 200 | |
| chunks = lines_chunking(file_text, max_words=max_words) | |
| if not chunks: # Skip if no chunks generated | |
| st.warning(f"No content extracted from {current_file.name}") | |
| continue | |
| # Store chunks in the collection | |
| filename = current_file.name | |
| collection.add( | |
| documents=chunks, | |
| ids=[f"id{filename[:-4]}.{j}" for j in range(len(chunks))], | |
| metadatas=[{"source": filename, "part": n} for n in range(len(chunks))], | |
| ) | |
| st.session_state.collections_files_name.append(filename) | |
| st.success(f"Added {len(chunks)} chunks from {filename}") | |
| except Exception as e: | |
| st.error(f"Error processing {current_file.name}: {str(e)}") | |
| # Remove from session state if processing failed | |
| st.session_state.uploaded_files_name.remove(filename) | |
| return collection | |