Spaces:
Paused
Paused
| import { PassThrough } from 'stream'; | |
| import { createLogger } from '../utils/logger.js'; | |
| const logger = createLogger('StreamManager'); | |
| /** | |
| * 流管理器 - 负责管理和跟踪活跃的流 | |
| */ | |
| export class StreamManager { | |
| constructor() { | |
| this.activeStreams = new Map(); | |
| } | |
| /** | |
| * 创建新的流 | |
| * @returns {PassThrough} 新创建的流 | |
| */ | |
| createStream() { | |
| const stream = new PassThrough(); | |
| let streamClosed = false; | |
| // 重写stream.end方法,确保安全关闭 | |
| const originalEnd = stream.end; | |
| stream.end = function(...args) { | |
| if (streamClosed) return; | |
| streamClosed = true; | |
| return originalEnd.apply(this, args); | |
| }; | |
| // 添加状态检查方法 | |
| stream.isClosed = () => streamClosed; | |
| return stream; | |
| } | |
| /** | |
| * 注册并管理流 | |
| * @param {string} clientId - 客户端ID | |
| * @param {Stream} stream - 要管理的流 | |
| * @returns {Stream} 返回被管理的流 | |
| */ | |
| register(clientId, stream) { | |
| // 如果该客户端已有活跃流,先关闭它 | |
| if (this.activeStreams.has(clientId)) { | |
| this.close(clientId); | |
| } | |
| // 注册新流 | |
| this.activeStreams.set(clientId, stream); | |
| logger.debug(`注册客户端 ${clientId} 的新流`); | |
| // 设置流事件监听器 | |
| stream.on('end', () => { | |
| if (this.activeStreams.get(clientId) === stream) { | |
| this.activeStreams.delete(clientId); | |
| logger.debug(`客户端 ${clientId} 的流已结束并移除`); | |
| } | |
| }); | |
| stream.on('error', (error) => { | |
| logger.error(`客户端 ${clientId} 的流错误: ${error.message}`); | |
| if (this.activeStreams.get(clientId) === stream) { | |
| this.activeStreams.delete(clientId); | |
| } | |
| }); | |
| return stream; | |
| } | |
| /** | |
| * 关闭指定客户端的流 | |
| * @param {string} clientId - 客户端ID | |
| */ | |
| close(clientId) { | |
| const stream = this.activeStreams.get(clientId); | |
| if (stream) { | |
| try { | |
| logger.debug(`关闭客户端 ${clientId} 的流`); | |
| stream.end(); | |
| this.activeStreams.delete(clientId); | |
| } catch (error) { | |
| logger.error(`关闭流时出错: ${error.message}`); | |
| } | |
| } | |
| } | |
| /** | |
| * 获取指定客户端的流 | |
| * @param {string} clientId - 客户端ID | |
| * @returns {Stream|null} 流对象或null | |
| */ | |
| get(clientId) { | |
| return this.activeStreams.get(clientId) || null; | |
| } | |
| /** | |
| * 检查客户端是否有活跃流 | |
| * @param {string} clientId - 客户端ID | |
| * @returns {boolean} | |
| */ | |
| has(clientId) { | |
| return this.activeStreams.has(clientId); | |
| } | |
| /** | |
| * 获取活跃流的数量 | |
| * @returns {number} | |
| */ | |
| getActiveCount() { | |
| return this.activeStreams.size; | |
| } | |
| /** | |
| * 关闭所有流 | |
| */ | |
| closeAll() { | |
| logger.info(`关闭所有活跃流 (共 ${this.activeStreams.size} 个)`); | |
| for (const [clientId, stream] of this.activeStreams) { | |
| this.close(clientId); | |
| } | |
| } | |
| /** | |
| * 安全写入数据到流 | |
| * @param {Stream} stream - 目标流 | |
| * @param {string|Buffer} data - 要写入的数据 | |
| * @returns {boolean} 写入是否成功 | |
| */ | |
| safeWrite(stream, data) { | |
| if (!stream || stream.destroyed || (stream.isClosed && stream.isClosed())) { | |
| return false; | |
| } | |
| try { | |
| return stream.write(data); | |
| } catch (error) { | |
| logger.error(`流写入错误: ${error.message}`); | |
| return false; | |
| } | |
| } | |
| } | |
| // 创建全局流管理器实例 | |
| export const streamManager = new StreamManager(); | |