codex-ai-platform / api /services /concurrency.service.ts
3v324v23's picture
chore: 彻底清理项目,符合 Hugging Face 部署规范
ae4ceef
/**
* 银行级稳健版核心业务逻辑
* 特性:AOF 实时流水 + 内存快照 + 优雅退出 + 竞态保护
*/
import fs from 'fs';
import path from 'path';
// 内存存储
const votes = new Map();
const userVoted = new Set();
const prizePool = [];
const winners = [];
// 文件路径
const SNAPSHOT_FILE = path.resolve(process.cwd(), 'concurrency_snapshot.json');
const AOF_FILE = path.resolve(process.cwd(), 'concurrency.aof');
// 流量控制
const MAX_QPS = 3000;
let currentQPS = 0;
let lastReset = Date.now();
export const ConcurrencyService = {
/**
* 流量与健康自检
*/
check() {
const now = Date.now();
if (now - lastReset > 1000) {
currentQPS = 0;
lastReset = now;
}
if (currentQPS >= MAX_QPS) throw new Error('BUSY: 触发系统过载保护');
currentQPS++;
},
/**
* 实时流水持久化 (AOF)
* 模拟数据库的 Binlog,确保数据零丢失
*/
async appendLog(action, data) {
const logEntry = JSON.stringify({ action, data, t: Date.now() }) + '\n';
try {
// 使用同步追加或带缓存的追加,确保极高性能
fs.appendFileSync(AOF_FILE, logEntry);
} catch (err) {
console.error('[AOF] 写入失败:', err);
}
},
/**
* 1. 投票 (零丢失版)
*/
async vote(candidateId, userId) {
this.check();
if (userVoted.has(userId)) {
throw new Error('您已经投过票了');
}
// 执行业务
userVoted.add(userId);
votes.set(candidateId, (votes.get(candidateId) || 0) + 1);
// 实时存盘
await this.appendLog('VOTE', { candidateId, userId });
return { success: true, newCount: votes.get(candidateId) };
},
/**
* 2. 抽奖 (零丢失版)
*/
async draw(userId) {
this.check();
if (prizePool.length === 0) {
return { win: false, message: '奖品已抽完' };
}
const prize = prizePool.pop();
winners.push({ userId, prize, timestamp: Date.now() });
// 实时存盘
await this.appendLog('DRAW', { userId, prize });
return { win: true, prize };
},
/**
* 定期生成快照 (用于加速启动)
*/
async saveSnapshot() {
const data = {
votes: Object.fromEntries(votes),
userVoted: Array.from(userVoted),
prizePool,
winners
};
fs.writeFileSync(SNAPSHOT_FILE, JSON.stringify(data));
// 快照保存后,可以清空 AOF 文件以节省空间 (类似 Redis BGREWRITEAOF)
fs.writeFileSync(AOF_FILE, '');
console.log('[Persistence] 全量快照已更新,日志已重写');
},
/**
* 极致恢复:快照 + AOF 日志重放
*/
async loadData() {
// 1. 先加载快照
if (fs.existsSync(SNAPSHOT_FILE)) {
try {
const content = fs.readFileSync(SNAPSHOT_FILE, 'utf-8');
if (content.trim()) {
const data = JSON.parse(content);
Object.entries(data.votes || {}).forEach(([k, v]) => votes.set(k, v));
(data.userVoted || []).forEach(u => userVoted.add(u));
prizePool.push(...(data.prizePool || []));
winners.push(...(data.winners || []));
console.log(`[Recovery] 快照加载完毕: ${userVoted.size} 条投票记录`);
}
} catch (err) {
console.warn('[Recovery] 快照解析失败,跳过快照恢复:', err.message);
}
}
// 2. 重放 AOF 日志 (恢复快照之后产生的数据)
if (fs.existsSync(AOF_FILE)) {
try {
const content = fs.readFileSync(AOF_FILE, 'utf-8');
const logs = content.split('\n').filter(Boolean);
for (const line of logs) {
try {
const { action, data } = JSON.parse(line);
if (action === 'VOTE') {
userVoted.add(data.userId);
votes.set(data.candidateId, (votes.get(data.candidateId) || 0) + 1);
} else if (action === 'DRAW') {
winners.push(data);
const idx = prizePool.indexOf(data.prize);
if (idx > -1) prizePool.splice(idx, 1);
}
} catch (lineErr) {
console.warn('[Recovery] AOF 行解析失败,跳过该行:', lineErr.message);
}
}
if (logs.length > 0) {
console.log(`[Recovery] AOF 日志重放完毕: ${logs.length} 条流水`);
}
} catch (err) {
console.warn('[Recovery] AOF 读取失败:', err.message);
}
}
},
async initPrizePool(prizes) {
prizePool.length = 0;
prizePool.push(...prizes);
await this.saveSnapshot();
},
async getVoteStats() {
return Object.fromEntries(votes);
},
getStats() {
return {
totalVotes: userVoted.size,
prizeRemaining: prizePool.length,
winnersCount: winners.length,
memoryUsage: `${(process.memoryUsage().heapUsed / 1024 / 1024).toFixed(2)} MB`
};
}
};
// 依然保留 1 分钟一次的快照,用于清理 AOF 文件
setInterval(() => ConcurrencyService.saveSnapshot(), 60000);