| """Database initialization.""" | |
| from sqlalchemy import text | |
| from src.db.postgres.connection import engine, Base | |
| from src.db.postgres.models import ( | |
| ChatMessage, | |
| DatabaseClient, | |
| Document, | |
| MessageSource, | |
| Room, | |
| User, | |
| ) | |
| async def init_db(): | |
| """Initialize database tables and required extensions.""" | |
| async with engine.begin() as conn: | |
| # Create pgvector extension using two separate statements. | |
| # Must NOT be combined into one string β asyncpg rejects multi-statement | |
| # prepared statements (langchain_postgres bug workaround via create_extension=False). | |
| await conn.execute(text("SELECT pg_advisory_xact_lock(1573678846307946496)")) | |
| await conn.execute(text("CREATE EXTENSION IF NOT EXISTS vector")) | |
| # Create application tables | |
| await conn.run_sync(Base.metadata.create_all) | |
| # Schema migrations (idempotent β safe to run on every startup) | |
| await conn.execute(text( | |
| "ALTER TABLE rooms ADD COLUMN IF NOT EXISTS status VARCHAR NOT NULL DEFAULT 'active'" | |
| )) | |
| # HNSW index for fast approximate vector similarity search | |
| # Only created when the embedding column has explicit dimensions (HNSW requirement). | |
| # atttypmod > 0 means the vector column was created with a dimension (e.g. vector(1536)); | |
| # atttypmod = -1 means dimensionless β HNSW would fail with "column does not have dimensions". | |
| await conn.execute(text(""" | |
| DO $$ | |
| BEGIN | |
| IF EXISTS ( | |
| SELECT FROM pg_attribute a | |
| JOIN pg_class c ON c.oid = a.attrelid | |
| WHERE c.relname = 'langchain_pg_embedding' | |
| AND a.attname = 'embedding' | |
| AND a.atttypmod > 0 | |
| ) THEN | |
| CREATE INDEX IF NOT EXISTS idx_langchain_pg_embedding_hnsw | |
| ON langchain_pg_embedding USING hnsw (embedding vector_cosine_ops); | |
| END IF; | |
| END $$ | |
| """)) | |
| # GIN index for FTS on schema chunks β only created if table exists | |
| # (langchain_pg_embedding is created by PGVector on first use, not by create_all) | |
| await conn.execute(text(""" | |
| DO $$ | |
| BEGIN | |
| IF EXISTS ( | |
| SELECT FROM information_schema.tables | |
| WHERE table_name = 'langchain_pg_embedding' | |
| ) THEN | |
| CREATE INDEX IF NOT EXISTS idx_langchain_pg_embedding_fts | |
| ON langchain_pg_embedding USING GIN (to_tsvector('english', document)); | |
| END IF; | |
| END $$ | |
| """)) | |