| import gradio as gr |
| import os |
| import glob |
| from sentence_transformers import SentenceTransformer |
| import chromadb |
| from chromadb.config import Settings |
|
|
| |
| CORPUS_PATH = "corpus/*.txt" |
| CHROMA_PATH = "chroma_db" |
| CHUNK_SIZE = 512 |
| TOP_N = 3 |
| model = SentenceTransformer('all-MiniLM-L6-v2') |
|
|
| def chunk_text(text, chunk_size=CHUNK_SIZE): |
| """Split text into chunks""" |
| chunks = [] |
| for i in range(0, len(text), chunk_size): |
| chunk = text[i:i + chunk_size] |
| chunks.append(chunk) |
| if i + chunk_size >= len(text): |
| break |
| return chunks |
|
|
| def create_vector_db(): |
| """Create ChromaDB vector database from text files""" |
| text_files = glob.glob(CORPUS_PATH) |
| |
| if not text_files: |
| return "No text files found in corpus directory!" |
| |
| |
| client = chromadb.PersistentClient(path=CHROMA_PATH) |
| |
| |
| try: |
| client.delete_collection("documents") |
| except: |
| pass |
| |
| |
| collection = client.create_collection("documents") |
| |
| all_chunks = [] |
| all_metadatas = [] |
| all_ids = [] |
| |
| chunk_count = 0 |
| for file_path in text_files: |
| try: |
| with open(file_path, 'r', encoding='utf-8') as f: |
| text = f.read() |
| |
| filename = os.path.basename(file_path) |
| chunks = chunk_text(text) |
| |
| for i, chunk in enumerate(chunks): |
| all_chunks.append(chunk) |
| all_metadatas.append({ |
| "source": filename, |
| "chunk_id": i, |
| "file_path": file_path |
| }) |
| all_ids.append(f"{filename}_chunk_{i}") |
| chunk_count += 1 |
| |
| except Exception as e: |
| print(f"Error reading {file_path}: {e}") |
| |
| if not all_chunks: |
| return "No text could be read from the files!" |
| |
| |
| print("Creating embeddings...") |
| embeddings = model.encode(all_chunks).tolist() |
| |
| |
| batch_size = 100 |
| for i in range(0, len(all_chunks), batch_size): |
| end_idx = min(i + batch_size, len(all_chunks)) |
| collection.add( |
| embeddings=embeddings[i:end_idx], |
| documents=all_chunks[i:end_idx], |
| metadatas=all_metadatas[i:end_idx], |
| ids=all_ids[i:end_idx] |
| ) |
| |
| return f"ChromaDB created with {chunk_count} chunks from {len(text_files)} files!" |
|
|
| def search_similar_chunks(query, top_n=TOP_N): |
| """Search for similar chunks using ChromaDB""" |
| |
| if not os.path.exists(CHROMA_PATH): |
| return "Vector database not found! Please create it first." |
| |
| try: |
| |
| client = chromadb.PersistentClient(path=CHROMA_PATH) |
| collection = client.get_collection("documents") |
| |
| |
| query_embedding = model.encode([query]).tolist() |
| |
| |
| results = collection.query( |
| query_embeddings=query_embedding, |
| n_results=top_n, |
| include=["documents", "metadatas", "distances"] |
| ) |
| |
| |
| formatted_results = [] |
| if results['documents']: |
| for i, (doc, metadata, distance) in enumerate(zip( |
| results['documents'][0], |
| results['metadatas'][0], |
| results['distances'][0] |
| )): |
| |
| similarity = 1 - distance |
| formatted_results.append({ |
| 'rank': i + 1, |
| 'similarity': f"{similarity:.4f}", |
| 'source': metadata['source'], |
| 'chunk_id': metadata['chunk_id'], |
| 'content': doc |
| }) |
| |
| return formatted_results |
| |
| except Exception as e: |
| return f"Error searching database: {str(e)}" |
|
|
| def format_results(results): |
| """Format search results for display""" |
| if isinstance(results, str): |
| return results |
| |
| if not results: |
| return "No results found." |
| |
| formatted = "" |
| for result in results: |
| formatted += f"**Rank {result['rank']}** (Similarity: {result['similarity']})\n" |
| formatted += f"**Source:** {result['source']} (Chunk {result['chunk_id']})\n" |
| formatted += f"**Content:** {result['content']}\n" |
| formatted += "---\n\n" |
| |
| return formatted |
|
|
| def process_query(query, top_n): |
| """Main function to process user queries""" |
| results = search_similar_chunks(query, top_n) |
| return format_results(results) |
|
|
| |
| def initialize_app(): |
| if not os.path.exists(CHROMA_PATH): |
| return "Database not found. Click 'Create Vector Database' to build it." |
| return "Database ready! You can start searching." |
|
|
| |
| with gr.Blocks(title="Text Corpus Semantic Search with ChromaDB") as demo: |
| gr.Markdown("# 🔍 Text Corpus Semantic Search") |
| gr.Markdown("Using ChromaDB vector database for efficient semantic search") |
| |
| with gr.Row(): |
| with gr.Column(): |
| gr.Markdown("### Database Setup") |
| create_db_btn = gr.Button("Create Vector Database") |
| db_status = gr.Textbox( |
| label="Database Status", |
| value=initialize_app(), |
| interactive=False |
| ) |
| |
| with gr.Column(): |
| gr.Markdown("### Search") |
| query_input = gr.Textbox( |
| label="Enter your query", |
| placeholder="What would you like to search for?", |
| lines=2 |
| ) |
| top_n_slider = gr.Slider( |
| minimum=1, |
| maximum=10, |
| value=TOP_N, |
| step=1, |
| label="Number of results to show" |
| ) |
| search_btn = gr.Button("Search", variant="primary") |
| |
| results_output = gr.Textbox( |
| label="Search Results", |
| lines=15, |
| max_lines=20, |
| interactive=False |
| ) |
| |
| |
| create_db_btn.click( |
| fn=create_vector_db, |
| outputs=db_status |
| ) |
| |
| search_btn.click( |
| fn=process_query, |
| inputs=[query_input, top_n_slider], |
| outputs=results_output |
| ) |
| |
| query_input.submit( |
| fn=process_query, |
| inputs=[query_input, top_n_slider], |
| outputs=results_output |
| ) |
| |
| if __name__ == "__main__": |
| demo.launch() |