File size: 20,503 Bytes
69b897d 497686c 69b897d 497686c 69b897d 497686c 69b897d 497686c 69b897d 497686c 69b897d 497686c 69b897d 497686c 69b897d |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 |
const { v4: uuidv4 } = require('uuid')
const crypto = require('crypto')
const redis = require('../models/redis')
const logger = require('../utils/logger')
const config = require('../../config/config')
const LRUCache = require('../utils/lruCache')
class OpenAIResponsesAccountService {
constructor() {
// 加密相关常量
this.ENCRYPTION_ALGORITHM = 'aes-256-cbc'
this.ENCRYPTION_SALT = 'openai-responses-salt'
// Redis 键前缀
this.ACCOUNT_KEY_PREFIX = 'openai_responses_account:'
this.SHARED_ACCOUNTS_KEY = 'shared_openai_responses_accounts'
// 🚀 性能优化:缓存派生的加密密钥,避免每次重复计算
this._encryptionKeyCache = null
// 🔄 解密结果缓存,提高解密性能
this._decryptCache = new LRUCache(500)
// 🧹 定期清理缓存(每10分钟)
setInterval(
() => {
this._decryptCache.cleanup()
logger.info(
'🧹 OpenAI-Responses decrypt cache cleanup completed',
this._decryptCache.getStats()
)
},
10 * 60 * 1000
)
}
// 创建账户
async createAccount(options = {}) {
const {
name = 'OpenAI Responses Account',
description = '',
baseApi = '', // 必填:API 基础地址
apiKey = '', // 必填:API 密钥
userAgent = '', // 可选:自定义 User-Agent,空则透传原始请求
priority = 50, // 调度优先级 (1-100)
proxy = null,
isActive = true,
accountType = 'shared', // 'dedicated' or 'shared'
schedulable = true, // 是否可被调度
dailyQuota = 0, // 每日额度限制(美元),0表示不限制
quotaResetTime = '00:00', // 额度重置时间(HH:mm格式)
rateLimitDuration = 60 // 限流时间(分钟)
} = options
// 验证必填字段
if (!baseApi || !apiKey) {
throw new Error('Base API URL and API Key are required for OpenAI-Responses account')
}
// 规范化 baseApi(确保不以 / 结尾)
const normalizedBaseApi = baseApi.endsWith('/') ? baseApi.slice(0, -1) : baseApi
const accountId = uuidv4()
const accountData = {
id: accountId,
platform: 'openai-responses',
name,
description,
baseApi: normalizedBaseApi,
apiKey: this._encryptSensitiveData(apiKey),
userAgent,
priority: priority.toString(),
proxy: proxy ? JSON.stringify(proxy) : '',
isActive: isActive.toString(),
accountType,
schedulable: schedulable.toString(),
// ✅ 新增:账户订阅到期时间(业务字段,手动管理)
// 注意:OpenAI-Responses 使用 API Key 认证,没有 OAuth token,因此没有 expiresAt
subscriptionExpiresAt: options.subscriptionExpiresAt || null,
createdAt: new Date().toISOString(),
lastUsedAt: '',
status: 'active',
errorMessage: '',
// 限流相关
rateLimitedAt: '',
rateLimitStatus: '',
rateLimitDuration: rateLimitDuration.toString(),
// 额度管理
dailyQuota: dailyQuota.toString(),
dailyUsage: '0',
lastResetDate: redis.getDateStringInTimezone(),
quotaResetTime,
quotaStoppedAt: ''
}
// 保存到 Redis
await this._saveAccount(accountId, accountData)
logger.success(`🚀 Created OpenAI-Responses account: ${name} (${accountId})`)
return {
...accountData,
apiKey: '***' // 返回时隐藏敏感信息
}
}
// 获取账户
async getAccount(accountId) {
const client = redis.getClientSafe()
const key = `${this.ACCOUNT_KEY_PREFIX}${accountId}`
const accountData = await client.hgetall(key)
if (!accountData || !accountData.id) {
return null
}
// 解密敏感数据
accountData.apiKey = this._decryptSensitiveData(accountData.apiKey)
// 解析 JSON 字段
if (accountData.proxy) {
try {
accountData.proxy = JSON.parse(accountData.proxy)
} catch (e) {
accountData.proxy = null
}
}
return accountData
}
// 更新账户
async updateAccount(accountId, updates) {
const account = await this.getAccount(accountId)
if (!account) {
throw new Error('Account not found')
}
// 处理敏感字段加密
if (updates.apiKey) {
updates.apiKey = this._encryptSensitiveData(updates.apiKey)
}
// 处理 JSON 字段
if (updates.proxy !== undefined) {
updates.proxy = updates.proxy ? JSON.stringify(updates.proxy) : ''
}
// 规范化 baseApi
if (updates.baseApi) {
updates.baseApi = updates.baseApi.endsWith('/')
? updates.baseApi.slice(0, -1)
: updates.baseApi
}
// ✅ 直接保存 subscriptionExpiresAt(如果提供)
// OpenAI-Responses 使用 API Key,没有 token 刷新逻辑,不会覆盖此字段
if (updates.subscriptionExpiresAt !== undefined) {
// 直接保存,不做任何调整
}
// 更新 Redis
const client = redis.getClientSafe()
const key = `${this.ACCOUNT_KEY_PREFIX}${accountId}`
await client.hset(key, updates)
logger.info(`📝 Updated OpenAI-Responses account: ${account.name}`)
return { success: true }
}
// 删除账户
async deleteAccount(accountId) {
const client = redis.getClientSafe()
const key = `${this.ACCOUNT_KEY_PREFIX}${accountId}`
// 从共享账户列表中移除
await client.srem(this.SHARED_ACCOUNTS_KEY, accountId)
// 删除账户数据
await client.del(key)
logger.info(`🗑️ Deleted OpenAI-Responses account: ${accountId}`)
return { success: true }
}
// 获取所有账户
async getAllAccounts(includeInactive = false) {
const client = redis.getClientSafe()
const accountIds = await client.smembers(this.SHARED_ACCOUNTS_KEY)
const accounts = []
for (const accountId of accountIds) {
const account = await this.getAccount(accountId)
if (account) {
// 过滤非活跃账户
if (includeInactive || account.isActive === 'true') {
// 隐藏敏感信息
account.apiKey = '***'
// 获取限流状态信息(与普通OpenAI账号保持一致的格式)
const rateLimitInfo = this._getRateLimitInfo(account)
// 格式化 rateLimitStatus 为对象(与普通 OpenAI 账号一致)
account.rateLimitStatus = rateLimitInfo.isRateLimited
? {
isRateLimited: true,
rateLimitedAt: account.rateLimitedAt || null,
minutesRemaining: rateLimitInfo.remainingMinutes || 0
}
: {
isRateLimited: false,
rateLimitedAt: null,
minutesRemaining: 0
}
// 转换 schedulable 字段为布尔值(前端需要布尔值来判断)
account.schedulable = account.schedulable !== 'false'
// 转换 isActive 字段为布尔值
account.isActive = account.isActive === 'true'
// ✅ 前端显示订阅过期时间(业务字段)
account.expiresAt = account.subscriptionExpiresAt || null
account.platform = account.platform || 'openai-responses'
accounts.push(account)
}
}
}
// 直接从 Redis 获取所有账户(包括非共享账户)
const keys = await client.keys(`${this.ACCOUNT_KEY_PREFIX}*`)
for (const key of keys) {
const accountId = key.replace(this.ACCOUNT_KEY_PREFIX, '')
if (!accountIds.includes(accountId)) {
const accountData = await client.hgetall(key)
if (accountData && accountData.id) {
// 过滤非活跃账户
if (includeInactive || accountData.isActive === 'true') {
// 隐藏敏感信息
accountData.apiKey = '***'
// 解析 JSON 字段
if (accountData.proxy) {
try {
accountData.proxy = JSON.parse(accountData.proxy)
} catch (e) {
accountData.proxy = null
}
}
// 获取限流状态信息(与普通OpenAI账号保持一致的格式)
const rateLimitInfo = this._getRateLimitInfo(accountData)
// 格式化 rateLimitStatus 为对象(与普通 OpenAI 账号一致)
accountData.rateLimitStatus = rateLimitInfo.isRateLimited
? {
isRateLimited: true,
rateLimitedAt: accountData.rateLimitedAt || null,
minutesRemaining: rateLimitInfo.remainingMinutes || 0
}
: {
isRateLimited: false,
rateLimitedAt: null,
minutesRemaining: 0
}
// 转换 schedulable 字段为布尔值(前端需要布尔值来判断)
accountData.schedulable = accountData.schedulable !== 'false'
// 转换 isActive 字段为布尔值
accountData.isActive = accountData.isActive === 'true'
// ✅ 前端显示订阅过期时间(业务字段)
accountData.expiresAt = accountData.subscriptionExpiresAt || null
accountData.platform = accountData.platform || 'openai-responses'
accounts.push(accountData)
}
}
}
}
return accounts
}
// 标记账户限流
async markAccountRateLimited(accountId, duration = null) {
const account = await this.getAccount(accountId)
if (!account) {
return
}
const rateLimitDuration = duration || parseInt(account.rateLimitDuration) || 60
const now = new Date()
const resetAt = new Date(now.getTime() + rateLimitDuration * 60000)
await this.updateAccount(accountId, {
rateLimitedAt: now.toISOString(),
rateLimitStatus: 'limited',
rateLimitResetAt: resetAt.toISOString(),
rateLimitDuration: rateLimitDuration.toString(),
status: 'rateLimited',
schedulable: 'false', // 防止被调度
errorMessage: `Rate limited until ${resetAt.toISOString()}`
})
logger.warn(
`⏳ Account ${account.name} marked as rate limited for ${rateLimitDuration} minutes (until ${resetAt.toISOString()})`
)
}
// 🚫 标记账户为未授权状态(401错误)
async markAccountUnauthorized(accountId, reason = 'OpenAI Responses账号认证失败(401错误)') {
const account = await this.getAccount(accountId)
if (!account) {
return
}
const now = new Date().toISOString()
const currentCount = parseInt(account.unauthorizedCount || '0', 10)
const unauthorizedCount = Number.isFinite(currentCount) ? currentCount + 1 : 1
await this.updateAccount(accountId, {
status: 'unauthorized',
schedulable: 'false',
errorMessage: reason,
unauthorizedAt: now,
unauthorizedCount: unauthorizedCount.toString()
})
logger.warn(
`🚫 OpenAI-Responses account ${account.name || accountId} marked as unauthorized due to 401 error`
)
try {
const webhookNotifier = require('../utils/webhookNotifier')
await webhookNotifier.sendAccountAnomalyNotification({
accountId,
accountName: account.name || accountId,
platform: 'openai',
status: 'unauthorized',
errorCode: 'OPENAI_UNAUTHORIZED',
reason,
timestamp: now
})
logger.info(
`📢 Webhook notification sent for OpenAI-Responses account ${account.name || accountId} unauthorized state`
)
} catch (webhookError) {
logger.error('Failed to send unauthorized webhook notification:', webhookError)
}
}
// 检查并清除过期的限流状态
async checkAndClearRateLimit(accountId) {
const account = await this.getAccount(accountId)
if (!account || account.rateLimitStatus !== 'limited') {
return false
}
const now = new Date()
let shouldClear = false
// 优先使用 rateLimitResetAt 字段
if (account.rateLimitResetAt) {
const resetAt = new Date(account.rateLimitResetAt)
shouldClear = now >= resetAt
} else {
// 如果没有 rateLimitResetAt,使用旧的逻辑
const rateLimitedAt = new Date(account.rateLimitedAt)
const rateLimitDuration = parseInt(account.rateLimitDuration) || 60
shouldClear = now - rateLimitedAt > rateLimitDuration * 60000
}
if (shouldClear) {
// 限流已过期,清除状态
await this.updateAccount(accountId, {
rateLimitedAt: '',
rateLimitStatus: '',
rateLimitResetAt: '',
status: 'active',
schedulable: 'true', // 恢复调度
errorMessage: ''
})
logger.info(`✅ Rate limit cleared for account ${account.name}`)
return true
}
return false
}
// 切换调度状态
async toggleSchedulable(accountId) {
const account = await this.getAccount(accountId)
if (!account) {
throw new Error('Account not found')
}
const newSchedulableStatus = account.schedulable === 'true' ? 'false' : 'true'
await this.updateAccount(accountId, {
schedulable: newSchedulableStatus
})
logger.info(
`🔄 Toggled schedulable status for account ${account.name}: ${newSchedulableStatus}`
)
return {
success: true,
schedulable: newSchedulableStatus === 'true'
}
}
// 更新使用额度
async updateUsageQuota(accountId, amount) {
const account = await this.getAccount(accountId)
if (!account) {
return
}
// 检查是否需要重置额度
const today = redis.getDateStringInTimezone()
if (account.lastResetDate !== today) {
// 重置额度
await this.updateAccount(accountId, {
dailyUsage: amount.toString(),
lastResetDate: today,
quotaStoppedAt: ''
})
} else {
// 累加使用额度
const currentUsage = parseFloat(account.dailyUsage) || 0
const newUsage = currentUsage + amount
const dailyQuota = parseFloat(account.dailyQuota) || 0
const updates = {
dailyUsage: newUsage.toString()
}
// 检查是否超出额度
if (dailyQuota > 0 && newUsage >= dailyQuota) {
updates.status = 'quotaExceeded'
updates.quotaStoppedAt = new Date().toISOString()
updates.errorMessage = `Daily quota exceeded: $${newUsage.toFixed(2)} / $${dailyQuota.toFixed(2)}`
logger.warn(`💸 Account ${account.name} exceeded daily quota`)
}
await this.updateAccount(accountId, updates)
}
}
// 更新账户使用统计(记录 token 使用量)
async updateAccountUsage(accountId, tokens = 0) {
const account = await this.getAccount(accountId)
if (!account) {
return
}
const updates = {
lastUsedAt: new Date().toISOString()
}
// 如果有 tokens 参数且大于0,同时更新使用统计
if (tokens > 0) {
const currentTokens = parseInt(account.totalUsedTokens) || 0
updates.totalUsedTokens = (currentTokens + tokens).toString()
}
await this.updateAccount(accountId, updates)
}
// 记录使用量(为了兼容性的别名)
async recordUsage(accountId, tokens = 0) {
return this.updateAccountUsage(accountId, tokens)
}
// 重置账户状态(清除所有异常状态)
async resetAccountStatus(accountId) {
const account = await this.getAccount(accountId)
if (!account) {
throw new Error('Account not found')
}
const updates = {
// 根据是否有有效的 apiKey 来设置 status
status: account.apiKey ? 'active' : 'created',
// 恢复可调度状态
schedulable: 'true',
// 清除错误相关字段
errorMessage: '',
rateLimitedAt: '',
rateLimitStatus: '',
rateLimitResetAt: '',
rateLimitDuration: ''
}
await this.updateAccount(accountId, updates)
logger.info(`✅ Reset all error status for OpenAI-Responses account ${accountId}`)
// 发送 Webhook 通知
try {
const webhookNotifier = require('../utils/webhookNotifier')
await webhookNotifier.sendAccountAnomalyNotification({
accountId,
accountName: account.name || accountId,
platform: 'openai-responses',
status: 'recovered',
errorCode: 'STATUS_RESET',
reason: 'Account status manually reset',
timestamp: new Date().toISOString()
})
logger.info(
`📢 Webhook notification sent for OpenAI-Responses account ${account.name} status reset`
)
} catch (webhookError) {
logger.error('Failed to send status reset webhook notification:', webhookError)
}
return { success: true, message: 'Account status reset successfully' }
}
// ⏰ 检查账户订阅是否已过期
isSubscriptionExpired(account) {
if (!account.subscriptionExpiresAt) {
return false // 未设置过期时间,视为永不过期
}
const expiryDate = new Date(account.subscriptionExpiresAt)
const now = new Date()
if (expiryDate <= now) {
logger.debug(
`⏰ OpenAI-Responses Account ${account.name} (${account.id}) subscription expired at ${account.subscriptionExpiresAt}`
)
return true
}
return false
}
// 获取限流信息
_getRateLimitInfo(accountData) {
if (accountData.rateLimitStatus !== 'limited') {
return { isRateLimited: false }
}
const now = new Date()
let willBeAvailableAt
let remainingMinutes
// 优先使用 rateLimitResetAt 字段
if (accountData.rateLimitResetAt) {
willBeAvailableAt = new Date(accountData.rateLimitResetAt)
remainingMinutes = Math.max(0, Math.ceil((willBeAvailableAt - now) / 60000))
} else {
// 如果没有 rateLimitResetAt,使用旧的逻辑
const rateLimitedAt = new Date(accountData.rateLimitedAt)
const rateLimitDuration = parseInt(accountData.rateLimitDuration) || 60
const elapsedMinutes = Math.floor((now - rateLimitedAt) / 60000)
remainingMinutes = Math.max(0, rateLimitDuration - elapsedMinutes)
willBeAvailableAt = new Date(rateLimitedAt.getTime() + rateLimitDuration * 60000)
}
return {
isRateLimited: remainingMinutes > 0,
remainingMinutes,
willBeAvailableAt
}
}
// 加密敏感数据
_encryptSensitiveData(text) {
if (!text) {
return ''
}
const key = this._getEncryptionKey()
const iv = crypto.randomBytes(16)
const cipher = crypto.createCipheriv(this.ENCRYPTION_ALGORITHM, key, iv)
let encrypted = cipher.update(text)
encrypted = Buffer.concat([encrypted, cipher.final()])
return `${iv.toString('hex')}:${encrypted.toString('hex')}`
}
// 解密敏感数据
_decryptSensitiveData(text) {
if (!text || text === '') {
return ''
}
// 检查缓存
const cacheKey = crypto.createHash('sha256').update(text).digest('hex')
const cached = this._decryptCache.get(cacheKey)
if (cached !== undefined) {
return cached
}
try {
const key = this._getEncryptionKey()
const [ivHex, encryptedHex] = text.split(':')
const iv = Buffer.from(ivHex, 'hex')
const encryptedText = Buffer.from(encryptedHex, 'hex')
const decipher = crypto.createDecipheriv(this.ENCRYPTION_ALGORITHM, key, iv)
let decrypted = decipher.update(encryptedText)
decrypted = Buffer.concat([decrypted, decipher.final()])
const result = decrypted.toString()
// 存入缓存(5分钟过期)
this._decryptCache.set(cacheKey, result, 5 * 60 * 1000)
return result
} catch (error) {
logger.error('Decryption error:', error)
return ''
}
}
// 获取加密密钥
_getEncryptionKey() {
if (!this._encryptionKeyCache) {
this._encryptionKeyCache = crypto.scryptSync(
config.security.encryptionKey,
this.ENCRYPTION_SALT,
32
)
}
return this._encryptionKeyCache
}
// 保存账户到 Redis
async _saveAccount(accountId, accountData) {
const client = redis.getClientSafe()
const key = `${this.ACCOUNT_KEY_PREFIX}${accountId}`
// 保存账户数据
await client.hset(key, accountData)
// 添加到共享账户列表
if (accountData.accountType === 'shared') {
await client.sadd(this.SHARED_ACCOUNTS_KEY, accountId)
}
}
}
module.exports = new OpenAIResponsesAccountService()
|