| """ChromaDB persistence and LangChain ``Chroma`` vector store helpers. |
| |
| Collections are named per ingest target; documents are stored with UUID chunk ids. |
| Telemetry is disabled at the client level for quieter logs in production. |
| """ |
|
|
| from datetime import datetime, timezone |
| from pathlib import Path |
| from uuid import uuid4 |
|
|
| import chromadb |
| from chromadb.config import Settings |
| from langchain_chroma import Chroma |
| from langchain_core.documents import Document |
| from langchain_core.embeddings import Embeddings |
|
|
| _CHROMA_CLIENT_SETTINGS = Settings(anonymized_telemetry=False) |
|
|
|
|
| def _utc_now_iso() -> str: |
| return datetime.now(timezone.utc).replace(microsecond=0).isoformat().replace("+00:00", "Z") |
|
|
|
|
| def _chroma_client(persist_directory: str) -> chromadb.PersistentClient: |
| Path(persist_directory).mkdir(parents=True, exist_ok=True) |
| return chromadb.PersistentClient(path=persist_directory, settings=_CHROMA_CLIENT_SETTINGS) |
|
|
|
|
| def get_vector_store( |
| persist_directory: str, |
| collection_name: str, |
| embedding_function: Embeddings, |
| ) -> Chroma: |
| """Open or create a persisted Chroma collection wired to the given embedder.""" |
| client = _chroma_client(persist_directory) |
| try: |
| client.get_collection(name=collection_name) |
| except Exception: |
| client.get_or_create_collection( |
| name=collection_name, |
| metadata={"created_at": _utc_now_iso()}, |
| ) |
| return Chroma( |
| collection_name=collection_name, |
| embedding_function=embedding_function, |
| persist_directory=persist_directory, |
| client_settings=_CHROMA_CLIENT_SETTINGS, |
| ) |
|
|
|
|
| def add_documents(vector_store: Chroma, chunks: list[Document]) -> list[str]: |
| """Embed and insert chunks; return the generated vector ids.""" |
| document_ids = [str(uuid4()) for _ in chunks] |
| vector_store.add_documents(documents=chunks, ids=document_ids) |
| return document_ids |
|
|
|
|
| def list_collection_names(persist_directory: str) -> list[str]: |
| """Sorted list of collection names in the persist directory.""" |
| client = _chroma_client(persist_directory) |
| return sorted(c.name for c in client.list_collections()) |
|
|
|
|
| def delete_collection(persist_directory: str, collection_name: str) -> int: |
| """Delete a collection and return the number of documents that were removed (best effort).""" |
| client = _chroma_client(persist_directory) |
| removed = 0 |
| try: |
| col = client.get_collection(name=collection_name) |
| removed = int(col.count()) |
| except Exception: |
| removed = 0 |
| client.delete_collection(name=collection_name) |
| return removed |
|
|
|
|
| def collection_document_count(persist_directory: str, collection_name: str) -> int: |
| """Number of vectors in a collection, or 0 if the collection does not exist.""" |
| client = _chroma_client(persist_directory) |
| try: |
| col = client.get_collection(name=collection_name) |
| return int(col.count()) |
| except Exception: |
| return 0 |
|
|
|
|
| def collection_created_at(persist_directory: str, collection_name: str) -> str | None: |
| """Return collection metadata ``created_at`` if present (Chroma-specific).""" |
| client = _chroma_client(persist_directory) |
| try: |
| col = client.get_collection(name=collection_name) |
| meta = getattr(col, "metadata", None) or {} |
| if isinstance(meta, dict): |
| raw = meta.get("created_at") or meta.get("created") |
| if raw is not None: |
| return str(raw) |
| except Exception: |
| pass |
| return None |
|
|
|
|
| def ensure_collection_created_at( |
| persist_directory: str, |
| collection_name: str, |
| *, |
| fallback: str | None = None, |
| ) -> str | None: |
| """Persist ``created_at`` on the Chroma collection when missing; never overwrites an existing value.""" |
| client = _chroma_client(persist_directory) |
| try: |
| col = client.get_collection(name=collection_name) |
| except Exception: |
| return None |
| meta = getattr(col, "metadata", None) or {} |
| if not isinstance(meta, dict): |
| meta = {} |
| raw = meta.get("created_at") or meta.get("created") |
| if raw is not None: |
| return str(raw) |
| value = fallback or _utc_now_iso() |
| updated = dict(meta) |
| updated["created_at"] = value |
| col.modify(metadata=updated) |
| return value |
|
|
|
|