import pg from 'pg'; import dotenv from 'dotenv'; dotenv.config(); const { Pool } = pg; // 优先使用环境变量,否则默认为 localhost (适配本地开发) // Docker 环境下会在 docker-compose.yml 中显式注入 DATABASE_URL const connectionString = process.env.DATABASE_URL || 'postgresql://postgres:postgres@localhost:5432/codex'; const pool = new Pool({ connectionString, max: 20, // 连接池最大连接数 idleTimeoutMillis: 30000, connectionTimeoutMillis: 5000, // 增加超时时间到 5s }); export let isPgAvailable = false; // 简单的重试机制 const retry = async (fn: () => Promise, retries = 5, delay = 2000) => { try { return await fn(); } catch (err: any) { if (retries > 0) { console.warn(`[PG] 连接失败,${delay / 1000}秒后重试... (剩余 ${retries} 次) - ${err.message}`); await new Promise(res => setTimeout(res, delay)); return retry(fn, retries - 1, delay); } else { throw err; } } }; // 初始化 PGVector 扩展和表结构 export const initPG = async () => { try { await retry(async () => { const client = await pool.connect(); try { console.log(`[PG] 正在初始化 PostgreSQL + PGVector (${connectionString.includes('localhost') ? 'Local' : 'Remote'})...`); // 1. 启用 pgvector 扩展 await client.query('CREATE EXTENSION IF NOT EXISTS vector'); // 2. 创建知识库表 (与 SQLite 保持一致,但在 PG 中重建) await client.query(` CREATE TABLE IF NOT EXISTS knowledge_bases ( id TEXT PRIMARY KEY, user_id TEXT NOT NULL, name TEXT NOT NULL, description TEXT, status TEXT DEFAULT 'processing', created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP ) `); // 3. 创建知识切片表 (核心:向量存储) // 使用 1024 维向量 (适配 BGE-M3 模型) await client.query(` CREATE TABLE IF NOT EXISTS knowledge_chunks ( id TEXT PRIMARY KEY, kb_id TEXT NOT NULL, content TEXT NOT NULL, embedding vector(1024), metadata JSONB, created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (kb_id) REFERENCES knowledge_bases(id) ON DELETE CASCADE ) `); // 4. 创建全文检索索引 (用于混合检索 - 关键词部分) // 使用 simple 分词器以支持中文 (或者使用 zhparser 如果安装了的话,这里先用 simple) await client.query(` CREATE INDEX IF NOT EXISTS idx_knowledge_chunks_content_ts ON knowledge_chunks USING GIN (to_tsvector('simple', content)); `); // 5. 创建向量索引 (HNSW - 用于混合检索 - 语义部分) await client.query(` CREATE INDEX IF NOT EXISTS idx_knowledge_chunks_embedding ON knowledge_chunks USING hnsw (embedding vector_cosine_ops); `); // 6. 创建混合检索函数 (Hybrid Search RPC) // 结合 向量相似度 (Cosine) + 关键词匹配 (BM25/TSRank) + RRF (Reciprocal Rank Fusion) await client.query(` CREATE OR REPLACE FUNCTION hybrid_search( query_text TEXT, query_embedding vector(1024), match_threshold FLOAT, match_count INT, filter_kb_id TEXT DEFAULT NULL, rrf_k INT DEFAULT 60 ) RETURNS TABLE ( id TEXT, content TEXT, metadata JSONB, similarity FLOAT, rank_score FLOAT ) LANGUAGE plpgsql AS $$ BEGIN RETURN QUERY WITH semantic_search AS ( SELECT kc.id, kc.content, kc.metadata, 1 - (kc.embedding <=> query_embedding) AS similarity, ROW_NUMBER() OVER (ORDER BY kc.embedding <=> query_embedding) AS rank_seq FROM knowledge_chunks kc WHERE 1 - (kc.embedding <=> query_embedding) > match_threshold AND (filter_kb_id IS NULL OR kc.kb_id = filter_kb_id) ORDER BY similarity DESC LIMIT match_count * 2 ), keyword_search AS ( SELECT kc.id, kc.content, kc.metadata, ts_rank_cd(to_tsvector('simple', kc.content), websearch_to_tsquery('simple', query_text)) AS similarity, ROW_NUMBER() OVER (ORDER BY ts_rank_cd(to_tsvector('simple', kc.content), websearch_to_tsquery('simple', query_text)) DESC) AS rank_seq FROM knowledge_chunks kc WHERE to_tsvector('simple', kc.content) @@ websearch_to_tsquery('simple', query_text) AND (filter_kb_id IS NULL OR kc.kb_id = filter_kb_id) ORDER BY similarity DESC LIMIT match_count * 2 ) SELECT COALESCE(s.id, k.id) AS id, COALESCE(s.content, k.content) AS content, COALESCE(s.metadata, k.metadata) AS metadata, COALESCE(s.similarity, 0) AS similarity, ( COALESCE(1.0 / (rrf_k + s.rank_seq), 0.0) + COALESCE(1.0 / (rrf_k + k.rank_seq), 0.0) ) AS rank_score FROM semantic_search s FULL OUTER JOIN keyword_search k ON s.id = k.id ORDER BY rank_score DESC LIMIT match_count; END; $$; `); console.log('[PG] PostgreSQL 初始化完成'); isPgAvailable = true; } finally { client.release(); } }, 5, 1000); // 减少重试次数和间隔,快速失败以回退到 SQLite } catch (err: any) { console.error('[PG] 初始化彻底失败,将降级使用 SQLite:', err.message); isPgAvailable = false; } }; export default pool;