import express from 'express'; import cors from 'cors'; import path from 'path'; import { fileURLToPath } from 'url'; import dotenv from 'dotenv'; import { StripeService } from './services/stripe.service.js'; import { AIService } from './services/ai.service.js'; import { WorkflowService } from './services/workflow.service.js'; import paymentRoutes from './routes/payment.js'; import authRoutes, { verifyToken } from './routes/auth.js'; import orderRoutes from './routes/order.js'; import adminRoutes from './routes/admin.js'; import knowledgeRoutes from './routes/knowledge.js'; import projectRoutes from './routes/project.js'; import workflowRoutes from './routes/workflow.js'; import userRoutes from './routes/user.js'; import apikeyRoutes from './routes/apikey.js'; import externalRoutes from './routes/external.js'; import concurrencyRoutes from './routes/concurrency.js'; import debugRoutes from './routes/debug.js'; import { setupWorkers } from './lib/queue.js'; import { ConcurrencyService } from './services/concurrency.service.js'; import { initDB, getDB } from './lib/db.js'; import { initPG } from './lib/pg.js'; import crypto from 'crypto'; import { aiCircuitBreaker } from './lib/circuit-breaker.js'; import { SystemService } from './lib/system.js'; import { initSocket } from './lib/socket.js'; import http from 'http'; import { rateLimit } from 'express-rate-limit'; import { QuotaService } from './services/quota.service.js'; dotenv.config(); const __filename = fileURLToPath(import.meta.url); const __dirname = path.dirname(__filename); const app = express(); const port = Number(process.env.PORT) || 7860; const startServer = async () => { try { console.log('[系统] 正在启动服务...'); // 1. 初始化数据库 await initDB(); const db = await getDB(); try { await initPG(); } catch (e) { console.warn('[Database] PostgreSQL 初始化跳过 (非致命):', e); } // 2. Stripe Webhook app.post('/api/payment/webhook', express.raw({ type: 'application/json' }), async (req, res) => { const sig = req.headers['stripe-signature'] as string; try { await StripeService.handleWebhook(sig, req.body); res.json({ received: true }); } catch (err: any) { res.status(400).send(`Webhook 错误: ${err.message}`); } }); // 3. 全局限流 const apiLimiter = rateLimit({ windowMs: 15 * 60 * 1000, max: 2000, message: { error: { message: "请求过于频繁,请稍后再试", type: "rate_limit_error", code: "429" } }, standardHeaders: true, legacyHeaders: false, }); app.use(express.json()); app.use(cors()); app.use('/api/', apiLimiter); // 4. 审计日志 app.use((req, _res, next) => { if (!req.url.includes('/api/debug/queue-status')) { console.log(`[${new Date().toISOString()}] ${req.method} ${req.url}`); } next(); }); // 5. 业务路由 app.use('/api/auth', authRoutes); app.use('/api/payment', paymentRoutes); app.use('/api/order', orderRoutes); app.use('/api/admin', adminRoutes); app.use('/api/knowledge', knowledgeRoutes); app.use('/api/project', projectRoutes); app.use('/api/workflow', workflowRoutes); app.use('/api/user', userRoutes); app.use('/api/apikey', apikeyRoutes); app.use('/api/v1', externalRoutes); app.use('/api/concurrency', concurrencyRoutes); app.use('/api/debug', debugRoutes); // --- AI 业务核心接口 --- app.post('/api/ai/chat', verifyToken, async (req: any, res) => { const { query, sessionId: incomingSessionId, knowledgeBaseId, model } = req.body; const userId = req.user.userId; if (!QuotaService.checkAndConsume(userId)) { return res.status(402).json({ error: { message: "配额不足", type: "insufficient_quota", code: "402" } }); } let sessionId = incomingSessionId || crypto.randomUUID(); if (!incomingSessionId) { db.prepare('INSERT INTO chat_sessions (id, user_id, title, knowledge_base_id) VALUES (?, ?, ?, ?)') .run(sessionId, userId, query.slice(0, 20) + (query.length > 20 ? '...' : ''), knowledgeBaseId || null); } try { const { stream, sources } = await aiCircuitBreaker.execute(async () => { return await AIService.chatWithKnowledge(userId, query, knowledgeBaseId, model); }); res.setHeader('Content-Type', 'text/event-stream'); res.setHeader('Cache-Control', 'no-cache'); res.setHeader('Connection', 'keep-alive'); res.write(`data: ${JSON.stringify({ sessionId, sources })}\n\n`); let assistantResponse = ''; for await (const chunk of stream) { const content = chunk.choices[0]?.delta?.content || ''; if (content) { assistantResponse += content; res.write(`data: ${JSON.stringify({ content })}\n\n`); } } const userEmbedding = await AIService.getEmbedding(query); db.prepare('INSERT INTO chat_messages (id, session_id, role, content, embedding) VALUES (?, ?, ?, ?, ?)') .run(crypto.randomUUID(), sessionId, 'user', query, JSON.stringify(userEmbedding)); db.prepare('INSERT INTO chat_messages (id, session_id, role, content) VALUES (?, ?, ?, ?)') .run(crypto.randomUUID(), sessionId, 'assistant', assistantResponse); if (!incomingSessionId) { AIService.generateTitle(query, assistantResponse).then(smartTitle => { db.prepare('UPDATE chat_sessions SET title = ? WHERE id = ?').run(smartTitle, sessionId); }).catch(err => console.error('[AI] 标题生成失败:', err)); } SystemService.logAudit(userId, 'CHAT', 'SUCCESS', { sessionId }); res.end(); } catch (err: any) { console.error('[AI] 对话出错:', err.message); res.status(err.message.includes('[熔断]') ? 503 : 500).json({ error: err.message }); } }); app.get('/api/ai/messages/:sessionId', verifyToken, (req: any, res) => { const { sessionId } = req.params; const messages = db.prepare('SELECT * FROM chat_messages WHERE session_id = ? ORDER BY created_at ASC').all(sessionId); res.json({ success: true, messages }); }); app.get('/api/ai/sessions', verifyToken, (req: any, res) => { const userId = req.user.userId; const sessions = db.prepare('SELECT * FROM chat_sessions WHERE user_id = ? ORDER BY created_at DESC').all(userId); res.json({ success: true, sessions }); }); app.delete('/api/ai/sessions/:sessionId', verifyToken, (req: any, res) => { const { sessionId } = req.params; const userId = req.user.userId; const session = db.prepare('SELECT id FROM chat_sessions WHERE id = ? AND user_id = ?').get(sessionId, userId); if (!session) return res.status(403).json({ success: false, error: '权限不足' }); db.prepare('DELETE FROM chat_messages WHERE session_id = ?').run(sessionId); db.prepare('DELETE FROM chat_sessions WHERE id = ?').run(sessionId); res.json({ success: true }); }); app.patch('/api/ai/sessions/:sessionId', verifyToken, (req: any, res) => { const { sessionId } = req.params; const { title } = req.body; const userId = req.user.userId; const result = db.prepare('UPDATE chat_sessions SET title = ? WHERE id = ? AND user_id = ?').run(title, sessionId, userId); res.json({ success: result.changes > 0 }); }); app.get('/api/system/stats', verifyToken, (_req, res) => { res.json({ success: true, circuitBreaker: aiCircuitBreaker.getStatus(), serverTime: new Date().toISOString() }); }); // 6. 队列工作 setupWorkers( async (data) => WorkflowService.execute(data.workflow, data.input, data.userId), async (data) => AIService.processDocument(data) ); // 7. 静态文件 const isProd = process.env.NODE_ENV === 'production'; const distPath = isProd ? path.resolve(__dirname, '../../') : path.resolve(__dirname, '../dist'); app.use(express.static(distPath)); app.get('*', (req, res) => { if (req.path.startsWith('/api/')) return res.status(404).json({ error: 'API Not Found' }); res.sendFile(path.join(distPath, 'index.html')); }); // 8. 启动 const httpServer = http.createServer(app); initSocket(httpServer); const server = httpServer.listen(port, '0.0.0.0', async () => { await ConcurrencyService.loadData(); console.log(`[服务器] 全栈后端运行在: http://0.0.0.0:${port}`); }); // 9. 优雅退出 const shutdown = async (signal: string) => { console.log(`\n[${signal}] 正在存档...`); await ConcurrencyService.saveSnapshot(); server.close(() => process.exit(0)); }; process.on('SIGTERM', () => shutdown('SIGTERM')); process.on('SIGINT', () => shutdown('SIGINT')); } catch (err) { console.error('[系统] 启动失败:', err); process.exit(1); } }; startServer(); export default app;