Spaces:
Sleeping
Sleeping
File size: 6,041 Bytes
ae4ceef | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 | import pg from 'pg';
import dotenv from 'dotenv';
dotenv.config();
const { Pool } = pg;
// 优先使用环境变量,否则默认为 localhost (适配本地开发)
// Docker 环境下会在 docker-compose.yml 中显式注入 DATABASE_URL
const connectionString = process.env.DATABASE_URL || 'postgresql://postgres:postgres@localhost:5432/codex';
const pool = new Pool({
connectionString,
max: 20, // 连接池最大连接数
idleTimeoutMillis: 30000,
connectionTimeoutMillis: 5000, // 增加超时时间到 5s
});
export let isPgAvailable = false;
// 简单的重试机制
const retry = async (fn: () => Promise<any>, retries = 5, delay = 2000) => {
try {
return await fn();
} catch (err: any) {
if (retries > 0) {
console.warn(`[PG] 连接失败,${delay / 1000}秒后重试... (剩余 ${retries} 次) - ${err.message}`);
await new Promise(res => setTimeout(res, delay));
return retry(fn, retries - 1, delay);
} else {
throw err;
}
}
};
// 初始化 PGVector 扩展和表结构
export const initPG = async () => {
try {
await retry(async () => {
const client = await pool.connect();
try {
console.log(`[PG] 正在初始化 PostgreSQL + PGVector (${connectionString.includes('localhost') ? 'Local' : 'Remote'})...`);
// 1. 启用 pgvector 扩展
await client.query('CREATE EXTENSION IF NOT EXISTS vector');
// 2. 创建知识库表 (与 SQLite 保持一致,但在 PG 中重建)
await client.query(`
CREATE TABLE IF NOT EXISTS knowledge_bases (
id TEXT PRIMARY KEY,
user_id TEXT NOT NULL,
name TEXT NOT NULL,
description TEXT,
status TEXT DEFAULT 'processing',
created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP
)
`);
// 3. 创建知识切片表 (核心:向量存储)
// 使用 1024 维向量 (适配 BGE-M3 模型)
await client.query(`
CREATE TABLE IF NOT EXISTS knowledge_chunks (
id TEXT PRIMARY KEY,
kb_id TEXT NOT NULL,
content TEXT NOT NULL,
embedding vector(1024),
metadata JSONB,
created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (kb_id) REFERENCES knowledge_bases(id) ON DELETE CASCADE
)
`);
// 4. 创建全文检索索引 (用于混合检索 - 关键词部分)
// 使用 simple 分词器以支持中文 (或者使用 zhparser 如果安装了的话,这里先用 simple)
await client.query(`
CREATE INDEX IF NOT EXISTS idx_knowledge_chunks_content_ts
ON knowledge_chunks USING GIN (to_tsvector('simple', content));
`);
// 5. 创建向量索引 (HNSW - 用于混合检索 - 语义部分)
await client.query(`
CREATE INDEX IF NOT EXISTS idx_knowledge_chunks_embedding
ON knowledge_chunks USING hnsw (embedding vector_cosine_ops);
`);
// 6. 创建混合检索函数 (Hybrid Search RPC)
// 结合 向量相似度 (Cosine) + 关键词匹配 (BM25/TSRank) + RRF (Reciprocal Rank Fusion)
await client.query(`
CREATE OR REPLACE FUNCTION hybrid_search(
query_text TEXT,
query_embedding vector(1024),
match_threshold FLOAT,
match_count INT,
filter_kb_id TEXT DEFAULT NULL,
rrf_k INT DEFAULT 60
)
RETURNS TABLE (
id TEXT,
content TEXT,
metadata JSONB,
similarity FLOAT,
rank_score FLOAT
) LANGUAGE plpgsql AS $$
BEGIN
RETURN QUERY
WITH semantic_search AS (
SELECT
kc.id,
kc.content,
kc.metadata,
1 - (kc.embedding <=> query_embedding) AS similarity,
ROW_NUMBER() OVER (ORDER BY kc.embedding <=> query_embedding) AS rank_seq
FROM knowledge_chunks kc
WHERE 1 - (kc.embedding <=> query_embedding) > match_threshold
AND (filter_kb_id IS NULL OR kc.kb_id = filter_kb_id)
ORDER BY similarity DESC
LIMIT match_count * 2
),
keyword_search AS (
SELECT
kc.id,
kc.content,
kc.metadata,
ts_rank_cd(to_tsvector('simple', kc.content), websearch_to_tsquery('simple', query_text)) AS similarity,
ROW_NUMBER() OVER (ORDER BY ts_rank_cd(to_tsvector('simple', kc.content), websearch_to_tsquery('simple', query_text)) DESC) AS rank_seq
FROM knowledge_chunks kc
WHERE to_tsvector('simple', kc.content) @@ websearch_to_tsquery('simple', query_text)
AND (filter_kb_id IS NULL OR kc.kb_id = filter_kb_id)
ORDER BY similarity DESC
LIMIT match_count * 2
)
SELECT
COALESCE(s.id, k.id) AS id,
COALESCE(s.content, k.content) AS content,
COALESCE(s.metadata, k.metadata) AS metadata,
COALESCE(s.similarity, 0) AS similarity,
(
COALESCE(1.0 / (rrf_k + s.rank_seq), 0.0) +
COALESCE(1.0 / (rrf_k + k.rank_seq), 0.0)
) AS rank_score
FROM semantic_search s
FULL OUTER JOIN keyword_search k ON s.id = k.id
ORDER BY rank_score DESC
LIMIT match_count;
END;
$$;
`);
console.log('[PG] PostgreSQL 初始化完成');
isPgAvailable = true;
} finally {
client.release();
}
}, 5, 1000); // 减少重试次数和间隔,快速失败以回退到 SQLite
} catch (err: any) {
console.error('[PG] 初始化彻底失败,将降级使用 SQLite:', err.message);
isPgAvailable = false;
}
};
export default pool;
|