| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| from pathlib import Path |
| from typing import Any, Dict, List |
|
|
| import chromadb |
| from chromadb.config import Settings |
|
|
| from src.chunker import Chunk |
|
|
|
|
| class ChromaVectorStore: |
| """ |
| Thin wrapper around ChromaDB. |
| |
| This makes the rest of the app independent from Chroma-specific code. |
| """ |
|
|
| def __init__( |
| self, |
| persist_directory: Path, |
| collection_name: str, |
| embedding_model_name: str, |
| ) -> None: |
| """ |
| Create a persistent ChromaDB client. |
| |
| persistent directory: |
| - stores the vector database on disk |
| - allows reuse after app restart |
| """ |
|
|
| self.persist_directory = persist_directory |
| self.collection_name = collection_name |
| self.embedding_model_name = embedding_model_name |
|
|
| self.persist_directory.mkdir(parents=True, exist_ok=True) |
|
|
| self.client = chromadb.PersistentClient( |
| path=str(self.persist_directory), |
| settings=Settings(anonymized_telemetry=False), |
| ) |
|
|
| self.collection = self.client.get_or_create_collection( |
| name=self.collection_name, |
| metadata={ |
| "description": "KnowFlow AI document knowledge base", |
| "embedding_model": self.embedding_model_name, |
| }, |
| ) |
|
|
| def reset_collection(self) -> None: |
| """ |
| Delete and recreate the collection. |
| |
| Good for demos and development. |
| |
| Production alternative: |
| - upsert changed documents only |
| - delete old chunks for changed files |
| - maintain document versions |
| """ |
|
|
| try: |
| self.client.delete_collection(self.collection_name) |
| except Exception: |
| pass |
|
|
| self.collection = self.client.get_or_create_collection( |
| name=self.collection_name, |
| metadata={ |
| "description": "KnowFlow AI document knowledge base", |
| "embedding_model": self.embedding_model_name, |
| }, |
| ) |
|
|
| def count(self) -> int: |
| """ |
| Return the number of vectors stored. |
| """ |
| return self.collection.count() |
|
|
| def add_chunks( |
| self, |
| chunks: List[Chunk], |
| embeddings: List[List[float]], |
| ) -> None: |
| """ |
| Add chunks and their embeddings into ChromaDB. |
| |
| Metadata is important because it allows the final answer to show: |
| - source file |
| - chunk number |
| - character count |
| """ |
|
|
| if not chunks: |
| return |
|
|
| ids = [chunk.id for chunk in chunks] |
| documents = [chunk.text for chunk in chunks] |
|
|
| metadatas = [ |
| { |
| "source": chunk.source, |
| "chunk_index": chunk.chunk_index, |
| "character_count": chunk.character_count, |
| } |
| for chunk in chunks |
| ] |
|
|
| self.collection.add( |
| ids=ids, |
| documents=documents, |
| metadatas=metadatas, |
| embeddings=embeddings, |
| ) |
|
|
| def query( |
| self, |
| query_embedding: List[float], |
| top_k: int, |
| ) -> List[Dict[str, Any]]: |
| """ |
| Query the vector database using a query embedding. |
| |
| Returns: |
| A list of retrieved chunks with metadata and distance. |
| |
| Distance: |
| Lower usually means more similar. |
| """ |
|
|
| results = self.collection.query( |
| query_embeddings=[query_embedding], |
| n_results=top_k, |
| include=[ |
| "documents", |
| "metadatas", |
| "distances", |
| ], |
| ) |
|
|
| retrieved = [] |
|
|
| documents_list = results.get("documents", [[]])[0] |
| metadatas_list = results.get("metadatas", [[]])[0] |
| distances_list = results.get("distances", [[]])[0] |
|
|
| for rank, (document_text, metadata, distance) in enumerate( |
| zip(documents_list, metadatas_list, distances_list), |
| start=1, |
| ): |
| retrieved.append( |
| { |
| "rank": rank, |
| "text": document_text, |
| "source": metadata.get("source", "unknown"), |
| "chunk_index": metadata.get("chunk_index", -1), |
| "character_count": metadata.get("character_count", 0), |
| "distance": float(distance), |
| } |
| ) |
|
|
| return retrieved |