File size: 9,098 Bytes
6bb2fbe
 
 
 
 
 
c66336a
 
662ac2a
ae4ceef
 
 
 
 
 
 
 
 
 
 
 
9a67481
716de21
ae4ceef
 
 
 
 
 
 
 
6bb2fbe
 
 
 
 
 
 
c4523a3
6bb2fbe
716de21
6bb2fbe
716de21
 
 
 
 
 
 
 
 
 
 
662ac2a
716de21
 
 
 
 
 
 
 
 
 
9a67481
716de21
 
 
 
 
 
 
ae4ceef
 
716de21
 
 
9a67481
716de21
 
 
 
 
 
ae4ceef
 
716de21
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
ae4ceef
 
716de21
 
 
 
 
c66336a
716de21
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
c66336a
716de21
 
 
 
 
6bb2fbe
716de21
 
 
 
 
6bb2fbe
716de21
 
 
 
 
 
 
 
 
6bb2fbe
716de21
 
 
 
 
 
 
ae4ceef
716de21
 
 
ae4ceef
716de21
 
 
 
 
 
 
 
 
 
 
 
 
 
1544b82
716de21
 
 
6bb2fbe
716de21
 
 
 
ae4ceef
716de21
 
 
 
 
 
 
 
6bb2fbe
9a67481
716de21
9a67481
 
 
 
716de21
9a67481
6bb2fbe
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
import express from 'express';
import cors from 'cors';
import path from 'path';
import { fileURLToPath } from 'url';
import dotenv from 'dotenv';
import { StripeService } from './services/stripe.service.js';
import { AIService } from './services/ai.service.js';
import { WorkflowService } from './services/workflow.service.js';
import paymentRoutes from './routes/payment.js';
import authRoutes, { verifyToken } from './routes/auth.js';
import orderRoutes from './routes/order.js';
import adminRoutes from './routes/admin.js';
import knowledgeRoutes from './routes/knowledge.js';
import projectRoutes from './routes/project.js';
import workflowRoutes from './routes/workflow.js';
import userRoutes from './routes/user.js';
import apikeyRoutes from './routes/apikey.js';
import externalRoutes from './routes/external.js';
import concurrencyRoutes from './routes/concurrency.js';
import debugRoutes from './routes/debug.js';
import { setupWorkers } from './lib/queue.js';
import { ConcurrencyService } from './services/concurrency.service.js';
import { initDB, getDB } from './lib/db.js';
import { initPG } from './lib/pg.js';
import crypto from 'crypto';
import { aiCircuitBreaker } from './lib/circuit-breaker.js';
import { SystemService } from './lib/system.js';
import { initSocket } from './lib/socket.js';
import http from 'http';
import { rateLimit } from 'express-rate-limit';
import { QuotaService } from './services/quota.service.js';

dotenv.config();

const __filename = fileURLToPath(import.meta.url);
const __dirname = path.dirname(__filename);

const app = express();
const port = Number(process.env.PORT) || 7860;

const startServer = async () => {
  try {
    console.log('[系统] 正在启动服务...');
    
    // 1. 初始化数据库
    await initDB();
    const db = await getDB();
    
    try {
      await initPG();
    } catch (e) {
      console.warn('[Database] PostgreSQL 初始化跳过 (非致命):', e);
    }

    // 2. Stripe Webhook
    app.post('/api/payment/webhook', express.raw({ type: 'application/json' }), async (req, res) => {
      const sig = req.headers['stripe-signature'] as string;
      try {
        await StripeService.handleWebhook(sig, req.body);
        res.json({ received: true });
      } catch (err: any) {
        res.status(400).send(`Webhook 错误: ${err.message}`);
      }
    });

    // 3. 全局限流
    const apiLimiter = rateLimit({
      windowMs: 15 * 60 * 1000,
      max: 2000,
      message: { error: { message: "请求过于频繁,请稍后再试", type: "rate_limit_error", code: "429" } },
      standardHeaders: true,
      legacyHeaders: false,
    });

    app.use(express.json());
    app.use(cors());
    app.use('/api/', apiLimiter);

    // 4. 审计日志
    app.use((req, _res, next) => {
      if (!req.url.includes('/api/debug/queue-status')) {
        console.log(`[${new Date().toISOString()}] ${req.method} ${req.url}`);
      }
      next();
    });

    // 5. 业务路由
    app.use('/api/auth', authRoutes);
    app.use('/api/payment', paymentRoutes);
    app.use('/api/order', orderRoutes);
    app.use('/api/admin', adminRoutes);
    app.use('/api/knowledge', knowledgeRoutes);
    app.use('/api/project', projectRoutes);
    app.use('/api/workflow', workflowRoutes);
    app.use('/api/user', userRoutes);
    app.use('/api/apikey', apikeyRoutes);
    app.use('/api/v1', externalRoutes);
    app.use('/api/concurrency', concurrencyRoutes);
    app.use('/api/debug', debugRoutes);

    // --- AI 业务核心接口 ---
    app.post('/api/ai/chat', verifyToken, async (req: any, res) => {
      const { query, sessionId: incomingSessionId, knowledgeBaseId, model } = req.body;
      const userId = req.user.userId;

      if (!QuotaService.checkAndConsume(userId)) {
        return res.status(402).json({ error: { message: "配额不足", type: "insufficient_quota", code: "402" } });
      }

      let sessionId = incomingSessionId || crypto.randomUUID();
      if (!incomingSessionId) {
        db.prepare('INSERT INTO chat_sessions (id, user_id, title, knowledge_base_id) VALUES (?, ?, ?, ?)')
          .run(sessionId, userId, query.slice(0, 20) + (query.length > 20 ? '...' : ''), knowledgeBaseId || null);
      }

      try {
        const { stream, sources } = await aiCircuitBreaker.execute(async () => {
          return await AIService.chatWithKnowledge(userId, query, knowledgeBaseId, model);
        });
        
        res.setHeader('Content-Type', 'text/event-stream');
        res.setHeader('Cache-Control', 'no-cache');
        res.setHeader('Connection', 'keep-alive');
        res.write(`data: ${JSON.stringify({ sessionId, sources })}\n\n`);

        let assistantResponse = '';
        for await (const chunk of stream) {
          const content = chunk.choices[0]?.delta?.content || '';
          if (content) {
            assistantResponse += content;
            res.write(`data: ${JSON.stringify({ content })}\n\n`);
          }
        }

        const userEmbedding = await AIService.getEmbedding(query);
        db.prepare('INSERT INTO chat_messages (id, session_id, role, content, embedding) VALUES (?, ?, ?, ?, ?)')
          .run(crypto.randomUUID(), sessionId, 'user', query, JSON.stringify(userEmbedding));
        db.prepare('INSERT INTO chat_messages (id, session_id, role, content) VALUES (?, ?, ?, ?)')
          .run(crypto.randomUUID(), sessionId, 'assistant', assistantResponse);

        if (!incomingSessionId) {
          AIService.generateTitle(query, assistantResponse).then(smartTitle => {
            db.prepare('UPDATE chat_sessions SET title = ? WHERE id = ?').run(smartTitle, sessionId);
          }).catch(err => console.error('[AI] 标题生成失败:', err));
        }

        SystemService.logAudit(userId, 'CHAT', 'SUCCESS', { sessionId });
        res.end();
      } catch (err: any) {
        console.error('[AI] 对话出错:', err.message);
        res.status(err.message.includes('[熔断]') ? 503 : 500).json({ error: err.message });
      }
    });

    app.get('/api/ai/messages/:sessionId', verifyToken, (req: any, res) => {
      const { sessionId } = req.params;
      const messages = db.prepare('SELECT * FROM chat_messages WHERE session_id = ? ORDER BY created_at ASC').all(sessionId);
      res.json({ success: true, messages });
    });

    app.get('/api/ai/sessions', verifyToken, (req: any, res) => {
      const userId = req.user.userId;
      const sessions = db.prepare('SELECT * FROM chat_sessions WHERE user_id = ? ORDER BY created_at DESC').all(userId);
      res.json({ success: true, sessions });
    });

    app.delete('/api/ai/sessions/:sessionId', verifyToken, (req: any, res) => {
      const { sessionId } = req.params;
      const userId = req.user.userId;
      const session = db.prepare('SELECT id FROM chat_sessions WHERE id = ? AND user_id = ?').get(sessionId, userId);
      if (!session) return res.status(403).json({ success: false, error: '权限不足' });
      db.prepare('DELETE FROM chat_messages WHERE session_id = ?').run(sessionId);
      db.prepare('DELETE FROM chat_sessions WHERE id = ?').run(sessionId);
      res.json({ success: true });
    });

    app.patch('/api/ai/sessions/:sessionId', verifyToken, (req: any, res) => {
      const { sessionId } = req.params;
      const { title } = req.body;
      const userId = req.user.userId;
      const result = db.prepare('UPDATE chat_sessions SET title = ? WHERE id = ? AND user_id = ?').run(title, sessionId, userId);
      res.json({ success: result.changes > 0 });
    });

    app.get('/api/system/stats', verifyToken, (_req, res) => {
      res.json({ success: true, circuitBreaker: aiCircuitBreaker.getStatus(), serverTime: new Date().toISOString() });
    });

    // 6. 队列工作
    setupWorkers(
      async (data) => WorkflowService.execute(data.workflow, data.input, data.userId),
      async (data) => AIService.processDocument(data)
    );

    // 7. 静态文件
    const isProd = process.env.NODE_ENV === 'production';
    const distPath = isProd ? path.resolve(__dirname, '../../') : path.resolve(__dirname, '../dist');
    app.use(express.static(distPath));
    app.get('*', (req, res) => {
      if (req.path.startsWith('/api/')) return res.status(404).json({ error: 'API Not Found' });
      res.sendFile(path.join(distPath, 'index.html'));
    });

    // 8. 启动
    const httpServer = http.createServer(app);
    initSocket(httpServer);

    const server = httpServer.listen(port, '0.0.0.0', async () => {
      await ConcurrencyService.loadData();
      console.log(`[服务器] 全栈后端运行在: http://0.0.0.0:${port}`);
    });

    // 9. 优雅退出
    const shutdown = async (signal: string) => {
      console.log(`\n[${signal}] 正在存档...`);
      await ConcurrencyService.saveSnapshot();
      server.close(() => process.exit(0));
    };
    process.on('SIGTERM', () => shutdown('SIGTERM'));
    process.on('SIGINT', () => shutdown('SIGINT'));

  } catch (err) {
    console.error('[系统] 启动失败:', err);
    process.exit(1);
  }
};

startServer();

export default app;