|
|
""" |
|
|
Client for LanceDB vector store operations with lazy loading. |
|
|
|
|
|
This module provides an optimized client for LanceDB with automatic |
|
|
connection management and lazy table initialization. |
|
|
""" |
|
|
|
|
|
import lancedb |
|
|
import asyncio |
|
|
from typing import List, Dict, Any, Optional |
|
|
from datetime import datetime |
|
|
from loguru import logger |
|
|
|
|
|
|
|
|
class VectorStoreClient: |
|
|
""" |
|
|
Client for LanceDB vector store with lazy loading. |
|
|
|
|
|
Features: |
|
|
- Lazy connection and table initialization |
|
|
- Automatic reconnection on errors |
|
|
- Document validation and enrichment |
|
|
- Search with metadata filtering |
|
|
|
|
|
Attributes: |
|
|
uri: Database URI path |
|
|
table_name: Name of the table to use |
|
|
|
|
|
Examples: |
|
|
>>> client = VectorStoreClient(uri="./lancedb") |
|
|
>>> # No connection yet - happens on first use |
|
|
>>> client.add_documents([{"text": "...", "vector": [...]}]) |
|
|
>>> # Connection established automatically |
|
|
""" |
|
|
|
|
|
def __init__(self, uri: str, table_name: str = "a11y_expert"): |
|
|
""" |
|
|
Initialize client with database URI and table name. |
|
|
|
|
|
Args: |
|
|
uri: Path to LanceDB database |
|
|
table_name: Name of the table (default: "a11y_expert") |
|
|
""" |
|
|
self.uri = uri |
|
|
self.table_name = table_name |
|
|
self._db = None |
|
|
self._table = None |
|
|
|
|
|
@property |
|
|
def db(self): |
|
|
""" |
|
|
Lazy database connection property. |
|
|
|
|
|
Connects to database on first access and returns cached connection. |
|
|
|
|
|
Returns: |
|
|
LanceDB database connection |
|
|
""" |
|
|
if self._db is None: |
|
|
logger.info(f"Connecting to LanceDB at: {self.uri}") |
|
|
self._db = lancedb.connect(self.uri) |
|
|
logger.info("✅ Connected to LanceDB") |
|
|
return self._db |
|
|
|
|
|
@property |
|
|
def table(self): |
|
|
""" |
|
|
Lazy table initialization property. |
|
|
|
|
|
Opens or creates table on first access. |
|
|
|
|
|
Returns: |
|
|
LanceDB table or None if table doesn't exist yet |
|
|
""" |
|
|
if self._table is None: |
|
|
if self.table_name in self.db.table_names(): |
|
|
logger.debug(f"Opening existing table: '{self.table_name}'") |
|
|
self._table = self.db.open_table(self.table_name) |
|
|
else: |
|
|
logger.debug(f"Table '{self.table_name}' doesn't exist yet") |
|
|
return None |
|
|
return self._table |
|
|
|
|
|
def connect(self): |
|
|
""" |
|
|
Explicitly connect to database (optional - happens automatically). |
|
|
|
|
|
Provided for backward compatibility. Connection happens automatically |
|
|
when first accessing db or table properties. |
|
|
""" |
|
|
_ = self.db |
|
|
if self.table is not None: |
|
|
logger.info(f"Table '{self.table_name}' ready ({len(self.table)} docs)") |
|
|
else: |
|
|
logger.info(f"Table '{self.table_name}' will be created on first insert") |
|
|
|
|
|
|
|
|
def add_documents(self, documents: List[Dict[str, Any]]): |
|
|
""" |
|
|
Add documents to the table with automatic validation. |
|
|
|
|
|
Validates required fields, adds timestamps, and creates table if needed. |
|
|
|
|
|
Args: |
|
|
documents: List of dicts with required keys: |
|
|
- text (str): Document text |
|
|
- vector (List[float]): Embedding vector |
|
|
- source (str): Source identifier |
|
|
- language (str): Language code (en/pl) |
|
|
- doc_type (str): Document type |
|
|
|
|
|
Examples: |
|
|
>>> client.add_documents([{ |
|
|
... "text": "Content", |
|
|
... "vector": [0.1, 0.2, ...], |
|
|
... "source": "wcag", |
|
|
... "language": "en", |
|
|
... "doc_type": "specification" |
|
|
... }]) |
|
|
""" |
|
|
|
|
|
valid_docs = [] |
|
|
now = datetime.now() |
|
|
skipped_count = 0 |
|
|
|
|
|
for doc in documents: |
|
|
try: |
|
|
|
|
|
required_fields = {"text", "vector", "source", "language", "doc_type"} |
|
|
missing = required_fields - set(doc.keys()) |
|
|
if missing: |
|
|
logger.warning(f"Skipping document with missing fields: {missing}") |
|
|
skipped_count += 1 |
|
|
continue |
|
|
|
|
|
|
|
|
if "created_at" not in doc or doc["created_at"] is None: |
|
|
doc["created_at"] = now |
|
|
if "updated_at" not in doc or doc["updated_at"] is None: |
|
|
doc["updated_at"] = now |
|
|
|
|
|
valid_docs.append(doc) |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Failed to process document: {e}") |
|
|
skipped_count += 1 |
|
|
continue |
|
|
|
|
|
if not valid_docs: |
|
|
logger.warning(f"No valid documents to add (skipped: {skipped_count})") |
|
|
return |
|
|
|
|
|
try: |
|
|
logger.info(f"Adding {len(valid_docs)} documents to '{self.table_name}'") |
|
|
|
|
|
|
|
|
if self.table_name not in self.db.table_names(): |
|
|
self._table = self.db.create_table(self.table_name, data=valid_docs) |
|
|
logger.info(f"✅ Created table '{self.table_name}' with {len(valid_docs)} docs") |
|
|
else: |
|
|
|
|
|
self._table = self.db.open_table(self.table_name) |
|
|
self._table.add(valid_docs) |
|
|
logger.info(f"✅ Added {len(valid_docs)} documents to '{self.table_name}'") |
|
|
|
|
|
if skipped_count > 0: |
|
|
logger.warning(f"Skipped {skipped_count} invalid documents") |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Failed to add documents to LanceDB: {e}") |
|
|
raise |
|
|
|
|
|
|
|
|
def search( |
|
|
self, |
|
|
query_embedding: List[float], |
|
|
where: str = "", |
|
|
top_k: int = 5 |
|
|
) -> List[Dict[str, Any]]: |
|
|
""" |
|
|
Search for documents using vector similarity. |
|
|
|
|
|
Args: |
|
|
query_embedding: Query vector embedding |
|
|
where: Optional SQL-like filter (e.g., "language = 'en'") |
|
|
top_k: Number of results to return |
|
|
|
|
|
Returns: |
|
|
List of matching documents with similarity scores |
|
|
|
|
|
Examples: |
|
|
>>> results = client.search(embedding, where="language = 'pl'", top_k=3) |
|
|
>>> len(results) |
|
|
3 |
|
|
""" |
|
|
if self.table is None: |
|
|
logger.error(f"Table '{self.table_name}' doesn't exist") |
|
|
return [] |
|
|
|
|
|
try: |
|
|
logger.debug(f"Searching for {top_k} documents" + (f" where: {where}" if where else "")) |
|
|
|
|
|
query = self.table.search(query_embedding) |
|
|
if where: |
|
|
query = query.where(where) |
|
|
|
|
|
results = query.limit(top_k).to_df() |
|
|
logger.debug(f"Found {len(results)} documents") |
|
|
return results.to_dict("records") |
|
|
except Exception as e: |
|
|
logger.error(f"Search failed: {e}") |
|
|
return [] |
|
|
|
|
|
def count_documents(self) -> int: |
|
|
""" |
|
|
Return total number of documents in table. |
|
|
|
|
|
Returns: |
|
|
Document count or 0 if table doesn't exist |
|
|
""" |
|
|
if self.table is None: |
|
|
return 0 |
|
|
return len(self.table) |
|
|
|
|
|
def get_statistics(self) -> Dict[str, Any]: |
|
|
"""Get database statistics.""" |
|
|
if self._db is None: |
|
|
self.connect() |
|
|
|
|
|
if self.table_name not in self._db.table_names(): |
|
|
logger.warning(f"Table '{self.table_name}' does not exist yet") |
|
|
return { |
|
|
"total_documents": 0, |
|
|
"languages": {}, |
|
|
"doc_types": {}, |
|
|
"sources": [], |
|
|
"earliest_document": None, |
|
|
"latest_document": None, |
|
|
} |
|
|
|
|
|
try: |
|
|
table = self._db.open_table(self.table_name) |
|
|
df = table.to_pandas() |
|
|
|
|
|
stats = { |
|
|
"total_documents": len(df), |
|
|
"languages": df["language"].value_counts().to_dict() if "language" in df.columns else {}, |
|
|
"doc_types": df["doc_type"].value_counts().to_dict() if "doc_type" in df.columns else {}, |
|
|
"sources": df["source"].unique().tolist() if "source" in df.columns else [], |
|
|
"earliest_document": str(df["created_at"].min()) if "created_at" in df.columns else None, |
|
|
"latest_document": str(df["created_at"].max()) if "created_at" in df.columns else None, |
|
|
} |
|
|
|
|
|
logger.info(f"Database stats: {stats['total_documents']} documents") |
|
|
return stats |
|
|
except Exception as e: |
|
|
logger.error(f"Failed to get statistics: {e}") |
|
|
return {"error": str(e)} |
|
|
|
|
|
|
|
|
def get_recent_documents(self, limit: int = 20) -> List[Dict[str, Any]]: |
|
|
""" |
|
|
Get recently added documents sorted by creation time. |
|
|
|
|
|
Args: |
|
|
limit: Maximum number of documents to return |
|
|
|
|
|
Returns: |
|
|
List of recent documents |
|
|
""" |
|
|
if self.table is None: |
|
|
logger.warning(f"Table '{self.table_name}' doesn't exist") |
|
|
return [] |
|
|
|
|
|
try: |
|
|
df = self.table.to_pandas() |
|
|
if "created_at" in df.columns: |
|
|
df = df.sort_values("created_at", ascending=False).head(limit) |
|
|
else: |
|
|
df = df.head(limit) |
|
|
|
|
|
return df.to_dict("records") |
|
|
except Exception as e: |
|
|
logger.error(f"Failed to get recent documents: {e}") |
|
|
return [] |
|
|
|
|
|
|
|
|
def search_with_filters( |
|
|
self, |
|
|
query_embedding: List[float], |
|
|
language: Optional[str] = None, |
|
|
doc_type: Optional[str] = None, |
|
|
source: Optional[str] = None, |
|
|
top_k: int = 5 |
|
|
) -> List[Dict[str, Any]]: |
|
|
""" |
|
|
Search with optional metadata filters. |
|
|
|
|
|
Args: |
|
|
query_embedding: Query vector embedding |
|
|
language: Filter by language code (e.g., 'en', 'pl') |
|
|
doc_type: Filter by document type (e.g., 'specification') |
|
|
source: Filter by source (e.g., 'wcag') |
|
|
top_k: Number of results to return |
|
|
|
|
|
Returns: |
|
|
List of matching documents |
|
|
|
|
|
Examples: |
|
|
>>> results = client.search_with_filters( |
|
|
... embedding, |
|
|
... language='pl', |
|
|
... doc_type='specification', |
|
|
... top_k=5 |
|
|
... ) |
|
|
""" |
|
|
if self.table is None: |
|
|
logger.warning(f"Table '{self.table_name}' doesn't exist") |
|
|
return [] |
|
|
|
|
|
|
|
|
conditions = [] |
|
|
if language: |
|
|
conditions.append(f"language = '{language}'") |
|
|
if doc_type: |
|
|
conditions.append(f"doc_type = '{doc_type}'") |
|
|
if source: |
|
|
conditions.append(f"source = '{source}'") |
|
|
|
|
|
where_clause = " AND ".join(conditions) if conditions else "" |
|
|
|
|
|
try: |
|
|
query = self.table.search(query_embedding) |
|
|
if where_clause: |
|
|
query = query.where(where_clause) |
|
|
|
|
|
results = query.limit(top_k).to_df() |
|
|
logger.debug(f"Found {len(results)} documents with filters") |
|
|
return results.to_dict("records") |
|
|
except Exception as e: |
|
|
logger.error(f"Search with filters failed: {e}") |
|
|
return [] |
|
|
|
|
|
def close(self): |
|
|
""" |
|
|
Close database connection and clean up resources. |
|
|
|
|
|
Call this method when shutting down the application to properly |
|
|
release all database resources and prevent asyncio warnings. |
|
|
""" |
|
|
try: |
|
|
if self._db is not None: |
|
|
|
|
|
|
|
|
self._table = None |
|
|
self._db = None |
|
|
logger.info("VectorStoreClient resources cleared") |
|
|
except Exception as e: |
|
|
logger.warning(f"Error during VectorStoreClient cleanup: {e}") |
|
|
|