cc / src /services /openaiResponsesAccountService.js
hequ's picture
Upload 220 files
497686c verified
const { v4: uuidv4 } = require('uuid')
const crypto = require('crypto')
const redis = require('../models/redis')
const logger = require('../utils/logger')
const config = require('../../config/config')
const LRUCache = require('../utils/lruCache')
class OpenAIResponsesAccountService {
constructor() {
// 加密相关常量
this.ENCRYPTION_ALGORITHM = 'aes-256-cbc'
this.ENCRYPTION_SALT = 'openai-responses-salt'
// Redis 键前缀
this.ACCOUNT_KEY_PREFIX = 'openai_responses_account:'
this.SHARED_ACCOUNTS_KEY = 'shared_openai_responses_accounts'
// 🚀 性能优化:缓存派生的加密密钥,避免每次重复计算
this._encryptionKeyCache = null
// 🔄 解密结果缓存,提高解密性能
this._decryptCache = new LRUCache(500)
// 🧹 定期清理缓存(每10分钟)
setInterval(
() => {
this._decryptCache.cleanup()
logger.info(
'🧹 OpenAI-Responses decrypt cache cleanup completed',
this._decryptCache.getStats()
)
},
10 * 60 * 1000
)
}
// 创建账户
async createAccount(options = {}) {
const {
name = 'OpenAI Responses Account',
description = '',
baseApi = '', // 必填:API 基础地址
apiKey = '', // 必填:API 密钥
userAgent = '', // 可选:自定义 User-Agent,空则透传原始请求
priority = 50, // 调度优先级 (1-100)
proxy = null,
isActive = true,
accountType = 'shared', // 'dedicated' or 'shared'
schedulable = true, // 是否可被调度
dailyQuota = 0, // 每日额度限制(美元),0表示不限制
quotaResetTime = '00:00', // 额度重置时间(HH:mm格式)
rateLimitDuration = 60 // 限流时间(分钟)
} = options
// 验证必填字段
if (!baseApi || !apiKey) {
throw new Error('Base API URL and API Key are required for OpenAI-Responses account')
}
// 规范化 baseApi(确保不以 / 结尾)
const normalizedBaseApi = baseApi.endsWith('/') ? baseApi.slice(0, -1) : baseApi
const accountId = uuidv4()
const accountData = {
id: accountId,
platform: 'openai-responses',
name,
description,
baseApi: normalizedBaseApi,
apiKey: this._encryptSensitiveData(apiKey),
userAgent,
priority: priority.toString(),
proxy: proxy ? JSON.stringify(proxy) : '',
isActive: isActive.toString(),
accountType,
schedulable: schedulable.toString(),
// ✅ 新增:账户订阅到期时间(业务字段,手动管理)
// 注意:OpenAI-Responses 使用 API Key 认证,没有 OAuth token,因此没有 expiresAt
subscriptionExpiresAt: options.subscriptionExpiresAt || null,
createdAt: new Date().toISOString(),
lastUsedAt: '',
status: 'active',
errorMessage: '',
// 限流相关
rateLimitedAt: '',
rateLimitStatus: '',
rateLimitDuration: rateLimitDuration.toString(),
// 额度管理
dailyQuota: dailyQuota.toString(),
dailyUsage: '0',
lastResetDate: redis.getDateStringInTimezone(),
quotaResetTime,
quotaStoppedAt: ''
}
// 保存到 Redis
await this._saveAccount(accountId, accountData)
logger.success(`🚀 Created OpenAI-Responses account: ${name} (${accountId})`)
return {
...accountData,
apiKey: '***' // 返回时隐藏敏感信息
}
}
// 获取账户
async getAccount(accountId) {
const client = redis.getClientSafe()
const key = `${this.ACCOUNT_KEY_PREFIX}${accountId}`
const accountData = await client.hgetall(key)
if (!accountData || !accountData.id) {
return null
}
// 解密敏感数据
accountData.apiKey = this._decryptSensitiveData(accountData.apiKey)
// 解析 JSON 字段
if (accountData.proxy) {
try {
accountData.proxy = JSON.parse(accountData.proxy)
} catch (e) {
accountData.proxy = null
}
}
return accountData
}
// 更新账户
async updateAccount(accountId, updates) {
const account = await this.getAccount(accountId)
if (!account) {
throw new Error('Account not found')
}
// 处理敏感字段加密
if (updates.apiKey) {
updates.apiKey = this._encryptSensitiveData(updates.apiKey)
}
// 处理 JSON 字段
if (updates.proxy !== undefined) {
updates.proxy = updates.proxy ? JSON.stringify(updates.proxy) : ''
}
// 规范化 baseApi
if (updates.baseApi) {
updates.baseApi = updates.baseApi.endsWith('/')
? updates.baseApi.slice(0, -1)
: updates.baseApi
}
// ✅ 直接保存 subscriptionExpiresAt(如果提供)
// OpenAI-Responses 使用 API Key,没有 token 刷新逻辑,不会覆盖此字段
if (updates.subscriptionExpiresAt !== undefined) {
// 直接保存,不做任何调整
}
// 更新 Redis
const client = redis.getClientSafe()
const key = `${this.ACCOUNT_KEY_PREFIX}${accountId}`
await client.hset(key, updates)
logger.info(`📝 Updated OpenAI-Responses account: ${account.name}`)
return { success: true }
}
// 删除账户
async deleteAccount(accountId) {
const client = redis.getClientSafe()
const key = `${this.ACCOUNT_KEY_PREFIX}${accountId}`
// 从共享账户列表中移除
await client.srem(this.SHARED_ACCOUNTS_KEY, accountId)
// 删除账户数据
await client.del(key)
logger.info(`🗑️ Deleted OpenAI-Responses account: ${accountId}`)
return { success: true }
}
// 获取所有账户
async getAllAccounts(includeInactive = false) {
const client = redis.getClientSafe()
const accountIds = await client.smembers(this.SHARED_ACCOUNTS_KEY)
const accounts = []
for (const accountId of accountIds) {
const account = await this.getAccount(accountId)
if (account) {
// 过滤非活跃账户
if (includeInactive || account.isActive === 'true') {
// 隐藏敏感信息
account.apiKey = '***'
// 获取限流状态信息(与普通OpenAI账号保持一致的格式)
const rateLimitInfo = this._getRateLimitInfo(account)
// 格式化 rateLimitStatus 为对象(与普通 OpenAI 账号一致)
account.rateLimitStatus = rateLimitInfo.isRateLimited
? {
isRateLimited: true,
rateLimitedAt: account.rateLimitedAt || null,
minutesRemaining: rateLimitInfo.remainingMinutes || 0
}
: {
isRateLimited: false,
rateLimitedAt: null,
minutesRemaining: 0
}
// 转换 schedulable 字段为布尔值(前端需要布尔值来判断)
account.schedulable = account.schedulable !== 'false'
// 转换 isActive 字段为布尔值
account.isActive = account.isActive === 'true'
// ✅ 前端显示订阅过期时间(业务字段)
account.expiresAt = account.subscriptionExpiresAt || null
account.platform = account.platform || 'openai-responses'
accounts.push(account)
}
}
}
// 直接从 Redis 获取所有账户(包括非共享账户)
const keys = await client.keys(`${this.ACCOUNT_KEY_PREFIX}*`)
for (const key of keys) {
const accountId = key.replace(this.ACCOUNT_KEY_PREFIX, '')
if (!accountIds.includes(accountId)) {
const accountData = await client.hgetall(key)
if (accountData && accountData.id) {
// 过滤非活跃账户
if (includeInactive || accountData.isActive === 'true') {
// 隐藏敏感信息
accountData.apiKey = '***'
// 解析 JSON 字段
if (accountData.proxy) {
try {
accountData.proxy = JSON.parse(accountData.proxy)
} catch (e) {
accountData.proxy = null
}
}
// 获取限流状态信息(与普通OpenAI账号保持一致的格式)
const rateLimitInfo = this._getRateLimitInfo(accountData)
// 格式化 rateLimitStatus 为对象(与普通 OpenAI 账号一致)
accountData.rateLimitStatus = rateLimitInfo.isRateLimited
? {
isRateLimited: true,
rateLimitedAt: accountData.rateLimitedAt || null,
minutesRemaining: rateLimitInfo.remainingMinutes || 0
}
: {
isRateLimited: false,
rateLimitedAt: null,
minutesRemaining: 0
}
// 转换 schedulable 字段为布尔值(前端需要布尔值来判断)
accountData.schedulable = accountData.schedulable !== 'false'
// 转换 isActive 字段为布尔值
accountData.isActive = accountData.isActive === 'true'
// ✅ 前端显示订阅过期时间(业务字段)
accountData.expiresAt = accountData.subscriptionExpiresAt || null
accountData.platform = accountData.platform || 'openai-responses'
accounts.push(accountData)
}
}
}
}
return accounts
}
// 标记账户限流
async markAccountRateLimited(accountId, duration = null) {
const account = await this.getAccount(accountId)
if (!account) {
return
}
const rateLimitDuration = duration || parseInt(account.rateLimitDuration) || 60
const now = new Date()
const resetAt = new Date(now.getTime() + rateLimitDuration * 60000)
await this.updateAccount(accountId, {
rateLimitedAt: now.toISOString(),
rateLimitStatus: 'limited',
rateLimitResetAt: resetAt.toISOString(),
rateLimitDuration: rateLimitDuration.toString(),
status: 'rateLimited',
schedulable: 'false', // 防止被调度
errorMessage: `Rate limited until ${resetAt.toISOString()}`
})
logger.warn(
`⏳ Account ${account.name} marked as rate limited for ${rateLimitDuration} minutes (until ${resetAt.toISOString()})`
)
}
// 🚫 标记账户为未授权状态(401错误)
async markAccountUnauthorized(accountId, reason = 'OpenAI Responses账号认证失败(401错误)') {
const account = await this.getAccount(accountId)
if (!account) {
return
}
const now = new Date().toISOString()
const currentCount = parseInt(account.unauthorizedCount || '0', 10)
const unauthorizedCount = Number.isFinite(currentCount) ? currentCount + 1 : 1
await this.updateAccount(accountId, {
status: 'unauthorized',
schedulable: 'false',
errorMessage: reason,
unauthorizedAt: now,
unauthorizedCount: unauthorizedCount.toString()
})
logger.warn(
`🚫 OpenAI-Responses account ${account.name || accountId} marked as unauthorized due to 401 error`
)
try {
const webhookNotifier = require('../utils/webhookNotifier')
await webhookNotifier.sendAccountAnomalyNotification({
accountId,
accountName: account.name || accountId,
platform: 'openai',
status: 'unauthorized',
errorCode: 'OPENAI_UNAUTHORIZED',
reason,
timestamp: now
})
logger.info(
`📢 Webhook notification sent for OpenAI-Responses account ${account.name || accountId} unauthorized state`
)
} catch (webhookError) {
logger.error('Failed to send unauthorized webhook notification:', webhookError)
}
}
// 检查并清除过期的限流状态
async checkAndClearRateLimit(accountId) {
const account = await this.getAccount(accountId)
if (!account || account.rateLimitStatus !== 'limited') {
return false
}
const now = new Date()
let shouldClear = false
// 优先使用 rateLimitResetAt 字段
if (account.rateLimitResetAt) {
const resetAt = new Date(account.rateLimitResetAt)
shouldClear = now >= resetAt
} else {
// 如果没有 rateLimitResetAt,使用旧的逻辑
const rateLimitedAt = new Date(account.rateLimitedAt)
const rateLimitDuration = parseInt(account.rateLimitDuration) || 60
shouldClear = now - rateLimitedAt > rateLimitDuration * 60000
}
if (shouldClear) {
// 限流已过期,清除状态
await this.updateAccount(accountId, {
rateLimitedAt: '',
rateLimitStatus: '',
rateLimitResetAt: '',
status: 'active',
schedulable: 'true', // 恢复调度
errorMessage: ''
})
logger.info(`✅ Rate limit cleared for account ${account.name}`)
return true
}
return false
}
// 切换调度状态
async toggleSchedulable(accountId) {
const account = await this.getAccount(accountId)
if (!account) {
throw new Error('Account not found')
}
const newSchedulableStatus = account.schedulable === 'true' ? 'false' : 'true'
await this.updateAccount(accountId, {
schedulable: newSchedulableStatus
})
logger.info(
`🔄 Toggled schedulable status for account ${account.name}: ${newSchedulableStatus}`
)
return {
success: true,
schedulable: newSchedulableStatus === 'true'
}
}
// 更新使用额度
async updateUsageQuota(accountId, amount) {
const account = await this.getAccount(accountId)
if (!account) {
return
}
// 检查是否需要重置额度
const today = redis.getDateStringInTimezone()
if (account.lastResetDate !== today) {
// 重置额度
await this.updateAccount(accountId, {
dailyUsage: amount.toString(),
lastResetDate: today,
quotaStoppedAt: ''
})
} else {
// 累加使用额度
const currentUsage = parseFloat(account.dailyUsage) || 0
const newUsage = currentUsage + amount
const dailyQuota = parseFloat(account.dailyQuota) || 0
const updates = {
dailyUsage: newUsage.toString()
}
// 检查是否超出额度
if (dailyQuota > 0 && newUsage >= dailyQuota) {
updates.status = 'quotaExceeded'
updates.quotaStoppedAt = new Date().toISOString()
updates.errorMessage = `Daily quota exceeded: $${newUsage.toFixed(2)} / $${dailyQuota.toFixed(2)}`
logger.warn(`💸 Account ${account.name} exceeded daily quota`)
}
await this.updateAccount(accountId, updates)
}
}
// 更新账户使用统计(记录 token 使用量)
async updateAccountUsage(accountId, tokens = 0) {
const account = await this.getAccount(accountId)
if (!account) {
return
}
const updates = {
lastUsedAt: new Date().toISOString()
}
// 如果有 tokens 参数且大于0,同时更新使用统计
if (tokens > 0) {
const currentTokens = parseInt(account.totalUsedTokens) || 0
updates.totalUsedTokens = (currentTokens + tokens).toString()
}
await this.updateAccount(accountId, updates)
}
// 记录使用量(为了兼容性的别名)
async recordUsage(accountId, tokens = 0) {
return this.updateAccountUsage(accountId, tokens)
}
// 重置账户状态(清除所有异常状态)
async resetAccountStatus(accountId) {
const account = await this.getAccount(accountId)
if (!account) {
throw new Error('Account not found')
}
const updates = {
// 根据是否有有效的 apiKey 来设置 status
status: account.apiKey ? 'active' : 'created',
// 恢复可调度状态
schedulable: 'true',
// 清除错误相关字段
errorMessage: '',
rateLimitedAt: '',
rateLimitStatus: '',
rateLimitResetAt: '',
rateLimitDuration: ''
}
await this.updateAccount(accountId, updates)
logger.info(`✅ Reset all error status for OpenAI-Responses account ${accountId}`)
// 发送 Webhook 通知
try {
const webhookNotifier = require('../utils/webhookNotifier')
await webhookNotifier.sendAccountAnomalyNotification({
accountId,
accountName: account.name || accountId,
platform: 'openai-responses',
status: 'recovered',
errorCode: 'STATUS_RESET',
reason: 'Account status manually reset',
timestamp: new Date().toISOString()
})
logger.info(
`📢 Webhook notification sent for OpenAI-Responses account ${account.name} status reset`
)
} catch (webhookError) {
logger.error('Failed to send status reset webhook notification:', webhookError)
}
return { success: true, message: 'Account status reset successfully' }
}
// ⏰ 检查账户订阅是否已过期
isSubscriptionExpired(account) {
if (!account.subscriptionExpiresAt) {
return false // 未设置过期时间,视为永不过期
}
const expiryDate = new Date(account.subscriptionExpiresAt)
const now = new Date()
if (expiryDate <= now) {
logger.debug(
`⏰ OpenAI-Responses Account ${account.name} (${account.id}) subscription expired at ${account.subscriptionExpiresAt}`
)
return true
}
return false
}
// 获取限流信息
_getRateLimitInfo(accountData) {
if (accountData.rateLimitStatus !== 'limited') {
return { isRateLimited: false }
}
const now = new Date()
let willBeAvailableAt
let remainingMinutes
// 优先使用 rateLimitResetAt 字段
if (accountData.rateLimitResetAt) {
willBeAvailableAt = new Date(accountData.rateLimitResetAt)
remainingMinutes = Math.max(0, Math.ceil((willBeAvailableAt - now) / 60000))
} else {
// 如果没有 rateLimitResetAt,使用旧的逻辑
const rateLimitedAt = new Date(accountData.rateLimitedAt)
const rateLimitDuration = parseInt(accountData.rateLimitDuration) || 60
const elapsedMinutes = Math.floor((now - rateLimitedAt) / 60000)
remainingMinutes = Math.max(0, rateLimitDuration - elapsedMinutes)
willBeAvailableAt = new Date(rateLimitedAt.getTime() + rateLimitDuration * 60000)
}
return {
isRateLimited: remainingMinutes > 0,
remainingMinutes,
willBeAvailableAt
}
}
// 加密敏感数据
_encryptSensitiveData(text) {
if (!text) {
return ''
}
const key = this._getEncryptionKey()
const iv = crypto.randomBytes(16)
const cipher = crypto.createCipheriv(this.ENCRYPTION_ALGORITHM, key, iv)
let encrypted = cipher.update(text)
encrypted = Buffer.concat([encrypted, cipher.final()])
return `${iv.toString('hex')}:${encrypted.toString('hex')}`
}
// 解密敏感数据
_decryptSensitiveData(text) {
if (!text || text === '') {
return ''
}
// 检查缓存
const cacheKey = crypto.createHash('sha256').update(text).digest('hex')
const cached = this._decryptCache.get(cacheKey)
if (cached !== undefined) {
return cached
}
try {
const key = this._getEncryptionKey()
const [ivHex, encryptedHex] = text.split(':')
const iv = Buffer.from(ivHex, 'hex')
const encryptedText = Buffer.from(encryptedHex, 'hex')
const decipher = crypto.createDecipheriv(this.ENCRYPTION_ALGORITHM, key, iv)
let decrypted = decipher.update(encryptedText)
decrypted = Buffer.concat([decrypted, decipher.final()])
const result = decrypted.toString()
// 存入缓存(5分钟过期)
this._decryptCache.set(cacheKey, result, 5 * 60 * 1000)
return result
} catch (error) {
logger.error('Decryption error:', error)
return ''
}
}
// 获取加密密钥
_getEncryptionKey() {
if (!this._encryptionKeyCache) {
this._encryptionKeyCache = crypto.scryptSync(
config.security.encryptionKey,
this.ENCRYPTION_SALT,
32
)
}
return this._encryptionKeyCache
}
// 保存账户到 Redis
async _saveAccount(accountId, accountData) {
const client = redis.getClientSafe()
const key = `${this.ACCOUNT_KEY_PREFIX}${accountId}`
// 保存账户数据
await client.hset(key, accountData)
// 添加到共享账户列表
if (accountData.accountType === 'shared') {
await client.sadd(this.SHARED_ACCOUNTS_KEY, accountId)
}
}
}
module.exports = new OpenAIResponsesAccountService()