File size: 6,041 Bytes
ae4ceef
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
import pg from 'pg';
import dotenv from 'dotenv';

dotenv.config();

const { Pool } = pg;

// 优先使用环境变量,否则默认为 localhost (适配本地开发)
// Docker 环境下会在 docker-compose.yml 中显式注入 DATABASE_URL
const connectionString = process.env.DATABASE_URL || 'postgresql://postgres:postgres@localhost:5432/codex';

const pool = new Pool({
  connectionString,
  max: 20, // 连接池最大连接数
  idleTimeoutMillis: 30000,
  connectionTimeoutMillis: 5000, // 增加超时时间到 5s
});

export let isPgAvailable = false;

// 简单的重试机制
const retry = async (fn: () => Promise<any>, retries = 5, delay = 2000) => {
  try {
    return await fn();
  } catch (err: any) {
    if (retries > 0) {
      console.warn(`[PG] 连接失败,${delay / 1000}秒后重试... (剩余 ${retries} 次) - ${err.message}`);
      await new Promise(res => setTimeout(res, delay));
      return retry(fn, retries - 1, delay);
    } else {
      throw err;
    }
  }
};

// 初始化 PGVector 扩展和表结构
export const initPG = async () => {
  try {
    await retry(async () => {
      const client = await pool.connect();
      try {
        console.log(`[PG] 正在初始化 PostgreSQL + PGVector (${connectionString.includes('localhost') ? 'Local' : 'Remote'})...`);
        
        // 1. 启用 pgvector 扩展
        await client.query('CREATE EXTENSION IF NOT EXISTS vector');
        
        // 2. 创建知识库表 (与 SQLite 保持一致,但在 PG 中重建)
        await client.query(`
          CREATE TABLE IF NOT EXISTS knowledge_bases (
            id TEXT PRIMARY KEY,
            user_id TEXT NOT NULL,
            name TEXT NOT NULL,
            description TEXT,
            status TEXT DEFAULT 'processing',
            created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP
          )
        `);

        // 3. 创建知识切片表 (核心:向量存储)
        // 使用 1024 维向量 (适配 BGE-M3 模型)
        await client.query(`
          CREATE TABLE IF NOT EXISTS knowledge_chunks (
            id TEXT PRIMARY KEY,
            kb_id TEXT NOT NULL,
            content TEXT NOT NULL,
            embedding vector(1024), 
            metadata JSONB,
            created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
            FOREIGN KEY (kb_id) REFERENCES knowledge_bases(id) ON DELETE CASCADE
          )
        `);

        // 4. 创建全文检索索引 (用于混合检索 - 关键词部分)
        // 使用 simple 分词器以支持中文 (或者使用 zhparser 如果安装了的话,这里先用 simple)
        await client.query(`
          CREATE INDEX IF NOT EXISTS idx_knowledge_chunks_content_ts 
          ON knowledge_chunks USING GIN (to_tsvector('simple', content));
        `);

        // 5. 创建向量索引 (HNSW - 用于混合检索 - 语义部分)
        await client.query(`
          CREATE INDEX IF NOT EXISTS idx_knowledge_chunks_embedding 
          ON knowledge_chunks USING hnsw (embedding vector_cosine_ops);
        `);

        // 6. 创建混合检索函数 (Hybrid Search RPC)
        // 结合 向量相似度 (Cosine) + 关键词匹配 (BM25/TSRank) + RRF (Reciprocal Rank Fusion)
        await client.query(`
          CREATE OR REPLACE FUNCTION hybrid_search(
            query_text TEXT, 
            query_embedding vector(1024), 
            match_threshold FLOAT, 
            match_count INT,
            filter_kb_id TEXT DEFAULT NULL,
            rrf_k INT DEFAULT 60
          )
          RETURNS TABLE (
            id TEXT,
            content TEXT,
            metadata JSONB,
            similarity FLOAT,
            rank_score FLOAT
          ) LANGUAGE plpgsql AS $$
          BEGIN
            RETURN QUERY
            WITH semantic_search AS (
              SELECT 
                kc.id, 
                kc.content, 
                kc.metadata, 
                1 - (kc.embedding <=> query_embedding) AS similarity,
                ROW_NUMBER() OVER (ORDER BY kc.embedding <=> query_embedding) AS rank_seq
              FROM knowledge_chunks kc
              WHERE 1 - (kc.embedding <=> query_embedding) > match_threshold
              AND (filter_kb_id IS NULL OR kc.kb_id = filter_kb_id)
              ORDER BY similarity DESC
              LIMIT match_count * 2
            ),
            keyword_search AS (
              SELECT 
                kc.id, 
                kc.content, 
                kc.metadata, 
                ts_rank_cd(to_tsvector('simple', kc.content), websearch_to_tsquery('simple', query_text)) AS similarity,
                ROW_NUMBER() OVER (ORDER BY ts_rank_cd(to_tsvector('simple', kc.content), websearch_to_tsquery('simple', query_text)) DESC) AS rank_seq
              FROM knowledge_chunks kc
              WHERE to_tsvector('simple', kc.content) @@ websearch_to_tsquery('simple', query_text)
              AND (filter_kb_id IS NULL OR kc.kb_id = filter_kb_id)
              ORDER BY similarity DESC
              LIMIT match_count * 2
            )
            SELECT
              COALESCE(s.id, k.id) AS id,
              COALESCE(s.content, k.content) AS content,
              COALESCE(s.metadata, k.metadata) AS metadata,
              COALESCE(s.similarity, 0) AS similarity,
              (
                COALESCE(1.0 / (rrf_k + s.rank_seq), 0.0) + 
                COALESCE(1.0 / (rrf_k + k.rank_seq), 0.0)
              ) AS rank_score
            FROM semantic_search s
            FULL OUTER JOIN keyword_search k ON s.id = k.id
            ORDER BY rank_score DESC
            LIMIT match_count;
          END;
          $$;
        `);

        console.log('[PG] PostgreSQL 初始化完成');
        isPgAvailable = true;
      } finally {
        client.release();
      }
    }, 5, 1000); // 减少重试次数和间隔,快速失败以回退到 SQLite
  } catch (err: any) {
    console.error('[PG] 初始化彻底失败,将降级使用 SQLite:', err.message);
    isPgAvailable = false;
  }
};

export default pool;