Spaces:
Sleeping
Sleeping
| """pgvector operations: insert, search, and index management.""" | |
| from uuid import UUID | |
| from typing import Optional | |
| from sqlalchemy import text | |
| from sqlalchemy.ext.asyncio import AsyncSession | |
| from app.core.config import settings | |
| from app.core.logging import get_logger | |
| logger = get_logger(__name__) | |
| def _vector_literal(embedding: list[float]) -> str: | |
| """Encode a Python list as a pgvector text literal: '[v1,v2,...]'. | |
| asyncpg has no native adapter for pgvector, so casting `:param AS vector` | |
| requires the string form. Passing a Python list errors with | |
| `expected str, got list`. | |
| """ | |
| return "[" + ",".join(repr(float(x)) for x in embedding) + "]" | |
| async def insert_embedding( | |
| session: AsyncSession, | |
| chunk_id: UUID, | |
| embedding: list[float], | |
| model_name: str, | |
| ) -> None: | |
| """Insert or update a chunk embedding.""" | |
| await session.execute( | |
| text(""" | |
| INSERT INTO chunk_embeddings (chunk_id, embedding, embedding_model) | |
| VALUES (:chunk_id, CAST(:embedding AS vector), :model) | |
| ON CONFLICT (chunk_id) DO UPDATE | |
| SET embedding = EXCLUDED.embedding, | |
| embedding_model = EXCLUDED.embedding_model, | |
| created_at = NOW() | |
| """), | |
| { | |
| "chunk_id": chunk_id, | |
| "embedding": _vector_literal(embedding), | |
| "model": model_name, | |
| }, | |
| ) | |
| async def search_similar_chunks( | |
| session: AsyncSession, | |
| query_embedding: list[float], | |
| limit: int = 10, | |
| document_id: Optional[UUID] = None, | |
| ) -> list[dict]: | |
| """Find the most similar chunks by cosine distance.""" | |
| filters = "" | |
| params: dict = { | |
| "embedding": _vector_literal(query_embedding), | |
| "limit": limit, | |
| } | |
| if document_id: | |
| filters = "AND c.document_id = :document_id" | |
| params["document_id"] = document_id | |
| result = await session.execute( | |
| text(f""" | |
| SELECT c.id, c.document_id, c.sequence_id, c.markdown, c.plain_text, | |
| c.page_start, c.page_end, c.chunk_type, | |
| 1 - (ce.embedding <=> CAST(:embedding AS vector)) AS similarity | |
| FROM chunk_embeddings ce | |
| JOIN chunks c ON c.id = ce.chunk_id | |
| WHERE 1=1 {filters} | |
| ORDER BY ce.embedding <=> CAST(:embedding AS vector) | |
| LIMIT :limit | |
| """), | |
| params, | |
| ) | |
| rows = result.mappings().all() | |
| return [dict(r) for r in rows] | |
| async def search_chunks_fulltext( | |
| session: AsyncSession, | |
| query: str, | |
| limit: int = 10, | |
| document_id: Optional[UUID] = None, | |
| ) -> list[dict]: | |
| """Keyword search over chunks via Postgres full-text search. | |
| Complements vector search: exact terms (equation numbers, acronyms, author | |
| names, dataset names) that embeddings blur are matched literally here. | |
| ``websearch_to_tsquery`` safely parses arbitrary user input (no tsquery | |
| syntax errors). The expression matches the GIN index created at startup. | |
| """ | |
| filters = "" | |
| params: dict = {"q": query, "limit": limit} | |
| if document_id: | |
| filters = "AND c.document_id = :document_id" | |
| params["document_id"] = document_id | |
| result = await session.execute( | |
| text(f""" | |
| SELECT c.id, c.document_id, c.sequence_id, c.markdown, c.plain_text, | |
| c.page_start, c.page_end, c.chunk_type, | |
| ts_rank( | |
| to_tsvector('english', coalesce(c.plain_text, '')), | |
| websearch_to_tsquery('english', :q) | |
| ) AS fts_rank | |
| FROM chunks c | |
| WHERE to_tsvector('english', coalesce(c.plain_text, '')) | |
| @@ websearch_to_tsquery('english', :q) | |
| {filters} | |
| ORDER BY fts_rank DESC | |
| LIMIT :limit | |
| """), | |
| params, | |
| ) | |
| return [dict(r) for r in result.mappings().all()] | |
| async def ensure_vector_dimension(session: AsyncSession) -> bool: | |
| """Sync the chunk_embeddings column to settings.vector_dimension. | |
| Returns True when a migration happened: existing embeddings (computed at a | |
| different dimension) are dropped and the column is re-typed; the caller is | |
| then responsible for re-queueing embedding jobs. Section summaries and | |
| figure descriptions are cached by prompt-hash, so a re-embed does NOT | |
| re-run any expensive summarization. | |
| """ | |
| try: | |
| result = await session.execute( | |
| text(""" | |
| SELECT atttypmod FROM pg_attribute | |
| WHERE attrelid = 'chunk_embeddings'::regclass | |
| AND attname = 'embedding' AND NOT attisdropped | |
| """) | |
| ) | |
| current = result.scalar_one_or_none() | |
| except Exception: | |
| # Table doesn't exist yet (fresh DB before migrations) — nothing to sync. | |
| return False | |
| target = settings.vector_dimension | |
| # pgvector stores the dimension directly as the type modifier (-1 = unconstrained). | |
| if current is None or current <= 0 or current == target: | |
| return False | |
| count = (await session.execute(text("SELECT COUNT(*) FROM chunk_embeddings"))).scalar_one() | |
| logger.warning( | |
| "Embedding column is vector(%d) but VECTOR_DIMENSION=%d. Dropping %d stored " | |
| "embeddings, re-typing the column, and re-queueing embedding jobs. " | |
| "(Summaries/figure descriptions are cached and will not re-run.)", | |
| current, target, count, | |
| ) | |
| await session.execute(text("DROP INDEX IF EXISTS idx_chunk_embeddings_hnsw")) | |
| await session.execute(text("DELETE FROM chunk_embeddings")) | |
| await session.execute( | |
| text(f"ALTER TABLE chunk_embeddings ALTER COLUMN embedding TYPE vector({target})") | |
| ) | |
| return True | |
| async def create_vector_index(session: AsyncSession) -> None: | |
| """Create the HNSW vector index and the full-text GIN index if missing. | |
| We use HNSW rather than IVFFlat: IVFFlat with a fixed ``lists`` and the | |
| default ``ivfflat.probes = 1`` silently drops relevant rows (it only scans | |
| one of ``lists`` partitions), which on small/medium corpora returns 0 hits | |
| for queries that clearly match. HNSW gives high recall out of the box with | |
| no per-query tuning and no dependency on row count. | |
| pgvector's HNSW implementation has a hard 2000-dimension limit. Embeddings | |
| are truncated/renormalized to settings.vector_dimension (MRL), so as long | |
| as that stays ≤ 2000 the index applies; beyond it we fall back to exact | |
| brute-force search with a loud warning. | |
| """ | |
| # Remove the legacy IVFFlat index if a previous build created it, so the | |
| # HNSW index below actually takes effect (CREATE ... IF NOT EXISTS would | |
| # otherwise no-op on the shared index name). | |
| await session.execute(text("DROP INDEX IF EXISTS idx_chunk_embeddings_vector")) | |
| # Full-text GIN index for the hybrid-retrieval keyword leg. The expression | |
| # must match search_chunks_fulltext exactly for the planner to use it. | |
| await session.execute( | |
| text(""" | |
| CREATE INDEX IF NOT EXISTS idx_chunks_fts | |
| ON chunks | |
| USING gin (to_tsvector('english', coalesce(plain_text, ''))) | |
| """) | |
| ) | |
| if settings.vector_dimension > 2000: | |
| logger.warning( | |
| "Vector dimension %d exceeds pgvector HNSW limit (2000). " | |
| "Skipping HNSW index — exact brute-force search will be used. " | |
| "Set VECTOR_DIMENSION to 1024 (or ≤ 2000) to enable the index.", | |
| settings.vector_dimension, | |
| ) | |
| return | |
| await session.execute( | |
| text(""" | |
| CREATE INDEX IF NOT EXISTS idx_chunk_embeddings_hnsw | |
| ON chunk_embeddings | |
| USING hnsw (embedding vector_cosine_ops) | |
| """) | |
| ) | |
| logger.info("Search indexes (HNSW + full-text GIN) created/verified") | |