Spaces:
Sleeping
Sleeping
File size: 4,738 Bytes
068aa4e |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 |
import json
import chromadb
from sentence_transformers import SentenceTransformer
from pathlib import Path
class VectorDatabase:
def __init__(self, db_path="data/vectordb", model_name="all-MiniLM-L6-v2"):
"""
Initialize vector database
Args:
db_path: Path to store vector database
model_name: Sentence transformer model to use
"""
# Initialize ChromaDB
self.client = chromadb.PersistentClient(path=db_path)
self.collection = self.client.get_or_create_collection(
name="documents",
metadata={"hnsw:space": "cosine"}
)
# Load embedding model
print(f"π¦ Loading embedding model: {model_name}")
self.model = SentenceTransformer(model_name)
print("β
Model loaded!")
def load_documents(self, json_path):
"""Load documents from JSON file"""
print(f"\nπ Loading documents from {json_path}")
with open(json_path, 'r', encoding='utf-8') as f:
documents = json.load(f)
print(f"β
Loaded {len(documents)} documents")
return documents
def create_embeddings(self, documents):
"""Create embeddings for all documents"""
print(f"\nπ Creating embeddings for {len(documents)} documents...")
texts = [doc['content'] for doc in documents]
embeddings = self.model.encode(texts, show_progress_bar=True)
print(f"β
Created {len(embeddings)} embeddings")
return embeddings
def store_documents(self, documents, embeddings):
"""Store documents and embeddings in ChromaDB"""
print(f"\nπΎ Storing documents in ChromaDB...")
# Prepare data for ChromaDB
ids = [doc['doc_id'] for doc in documents]
texts = [doc['content'] for doc in documents]
metadatas = [
{
'title': doc['title'],
'word_count': str(doc['word_count']),
'source_file': doc['source_file']
}
for doc in documents
]
# Convert embeddings to list format
embeddings_list = [emb.tolist() for emb in embeddings]
# Add to collection
self.collection.add(
ids=ids,
embeddings=embeddings_list,
documents=texts,
metadatas=metadatas
)
print(f"β
Stored {len(documents)} documents in ChromaDB")
def search(self, query, top_k=5):
"""Search for similar documents"""
print(f"\nπ Searching for: '{query}'")
# Create embedding for query
query_embedding = self.model.encode([query])[0]
# Search in collection
results = self.collection.query(
query_embeddings=[query_embedding.tolist()],
n_results=top_k
)
return results
def display_results(self, results):
"""Display search results in readable format"""
if not results or not results['documents'] or len(results['documents'][0]) == 0:
print("β No results found")
return
print(f"\nβ
Found {len(results['documents'][0])} results:\n")
for i, (doc, distance, metadata) in enumerate(
zip(
results['documents'][0],
results['distances'][0],
results['metadatas'][0]
)
):
print(f"--- Result {i+1} ---")
print(f"Title: {metadata['title']}")
print(f"Source: {metadata['source_file']}")
print(f"Similarity Score: {1 - distance:.3f}")
print(f"Preview: {doc[:200]}...")
print()
# Main execution
if __name__ == "__main__":
print("=" * 60)
print("π VECTOR DATABASE SETUP")
print("=" * 60)
# Initialize vector database
vdb = VectorDatabase()
# Load documents
documents = vdb.load_documents("data/processed/processed_documents.json")
# Create embeddings
embeddings = vdb.create_embeddings(documents)
# Store in database
vdb.store_documents(documents, embeddings)
# Test search
print("\n" + "=" * 60)
print("π§ͺ TESTING SEARCH")
print("=" * 60)
test_queries = [
"How do I create a FastAPI endpoint?",
"What is employee leave policy?",
"How do I work remotely?"
]
for query in test_queries:
results = vdb.search(query, top_k=3)
vdb.display_results(results)
print("\n" + "=" * 60)
print("β
VECTOR DATABASE SETUP COMPLETE!")
print("=" * 60)
|