|
|
const axios = require('axios') |
|
|
const ProxyHelper = require('../utils/proxyHelper') |
|
|
const logger = require('../utils/logger') |
|
|
const openaiResponsesAccountService = require('./openaiResponsesAccountService') |
|
|
const apiKeyService = require('./apiKeyService') |
|
|
const unifiedOpenAIScheduler = require('./unifiedOpenAIScheduler') |
|
|
const config = require('../../config/config') |
|
|
const crypto = require('crypto') |
|
|
|
|
|
|
|
|
function extractCacheCreationTokens(usageData) { |
|
|
if (!usageData || typeof usageData !== 'object') { |
|
|
return 0 |
|
|
} |
|
|
|
|
|
const details = usageData.input_tokens_details || usageData.prompt_tokens_details || {} |
|
|
const candidates = [ |
|
|
details.cache_creation_input_tokens, |
|
|
details.cache_creation_tokens, |
|
|
usageData.cache_creation_input_tokens, |
|
|
usageData.cache_creation_tokens |
|
|
] |
|
|
|
|
|
for (const value of candidates) { |
|
|
if (value !== undefined && value !== null && value !== '') { |
|
|
const parsed = Number(value) |
|
|
if (!Number.isNaN(parsed)) { |
|
|
return parsed |
|
|
} |
|
|
} |
|
|
} |
|
|
|
|
|
return 0 |
|
|
} |
|
|
|
|
|
class OpenAIResponsesRelayService { |
|
|
constructor() { |
|
|
this.defaultTimeout = config.requestTimeout || 600000 |
|
|
} |
|
|
|
|
|
|
|
|
async handleRequest(req, res, account, apiKeyData) { |
|
|
let abortController = null |
|
|
|
|
|
const sessionId = req.headers['session_id'] || req.body?.session_id |
|
|
const sessionHash = sessionId |
|
|
? crypto.createHash('sha256').update(sessionId).digest('hex') |
|
|
: null |
|
|
|
|
|
try { |
|
|
|
|
|
const fullAccount = await openaiResponsesAccountService.getAccount(account.id) |
|
|
if (!fullAccount) { |
|
|
throw new Error('Account not found') |
|
|
} |
|
|
|
|
|
|
|
|
abortController = new AbortController() |
|
|
|
|
|
|
|
|
const handleClientDisconnect = () => { |
|
|
logger.info('🔌 Client disconnected, aborting OpenAI-Responses request') |
|
|
if (abortController && !abortController.signal.aborted) { |
|
|
abortController.abort() |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
req.once('close', handleClientDisconnect) |
|
|
res.once('close', handleClientDisconnect) |
|
|
|
|
|
|
|
|
const targetUrl = `${fullAccount.baseApi}${req.path}` |
|
|
logger.info(`🎯 Forwarding to: ${targetUrl}`) |
|
|
|
|
|
|
|
|
const headers = { |
|
|
...this._filterRequestHeaders(req.headers), |
|
|
Authorization: `Bearer ${fullAccount.apiKey}`, |
|
|
'Content-Type': 'application/json' |
|
|
} |
|
|
|
|
|
|
|
|
if (fullAccount.userAgent) { |
|
|
|
|
|
headers['User-Agent'] = fullAccount.userAgent |
|
|
logger.debug(`📱 Using custom User-Agent: ${fullAccount.userAgent}`) |
|
|
} else if (req.headers['user-agent']) { |
|
|
|
|
|
headers['User-Agent'] = req.headers['user-agent'] |
|
|
logger.debug(`📱 Forwarding original User-Agent: ${req.headers['user-agent']}`) |
|
|
} |
|
|
|
|
|
|
|
|
const requestOptions = { |
|
|
method: req.method, |
|
|
url: targetUrl, |
|
|
headers, |
|
|
data: req.body, |
|
|
timeout: this.defaultTimeout, |
|
|
responseType: req.body?.stream ? 'stream' : 'json', |
|
|
validateStatus: () => true, |
|
|
signal: abortController.signal |
|
|
} |
|
|
|
|
|
|
|
|
if (fullAccount.proxy) { |
|
|
const proxyAgent = ProxyHelper.createProxyAgent(fullAccount.proxy) |
|
|
if (proxyAgent) { |
|
|
requestOptions.httpAgent = proxyAgent |
|
|
requestOptions.httpsAgent = proxyAgent |
|
|
requestOptions.proxy = false |
|
|
logger.info( |
|
|
`🌐 Using proxy for OpenAI-Responses: ${ProxyHelper.getProxyDescription(fullAccount.proxy)}` |
|
|
) |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
logger.info('📤 OpenAI-Responses relay request', { |
|
|
accountId: account.id, |
|
|
accountName: account.name, |
|
|
targetUrl, |
|
|
method: req.method, |
|
|
stream: req.body?.stream || false, |
|
|
model: req.body?.model || 'unknown', |
|
|
userAgent: headers['User-Agent'] || 'not set' |
|
|
}) |
|
|
|
|
|
|
|
|
const response = await axios(requestOptions) |
|
|
|
|
|
|
|
|
if (response.status === 429) { |
|
|
const { resetsInSeconds, errorData } = await this._handle429Error( |
|
|
account, |
|
|
response, |
|
|
req.body?.stream, |
|
|
sessionHash |
|
|
) |
|
|
|
|
|
|
|
|
const errorResponse = errorData || { |
|
|
error: { |
|
|
message: 'Rate limit exceeded', |
|
|
type: 'rate_limit_error', |
|
|
code: 'rate_limit_exceeded', |
|
|
resets_in_seconds: resetsInSeconds |
|
|
} |
|
|
} |
|
|
return res.status(429).json(errorResponse) |
|
|
} |
|
|
|
|
|
|
|
|
if (response.status >= 400) { |
|
|
|
|
|
let errorData = response.data |
|
|
if (response.data && typeof response.data.pipe === 'function') { |
|
|
|
|
|
const chunks = [] |
|
|
await new Promise((resolve) => { |
|
|
response.data.on('data', (chunk) => chunks.push(chunk)) |
|
|
response.data.on('end', resolve) |
|
|
response.data.on('error', resolve) |
|
|
setTimeout(resolve, 5000) |
|
|
}) |
|
|
const fullResponse = Buffer.concat(chunks).toString() |
|
|
|
|
|
|
|
|
try { |
|
|
if (fullResponse.includes('data: ')) { |
|
|
|
|
|
const lines = fullResponse.split('\n') |
|
|
for (const line of lines) { |
|
|
if (line.startsWith('data: ')) { |
|
|
const jsonStr = line.slice(6).trim() |
|
|
if (jsonStr && jsonStr !== '[DONE]') { |
|
|
errorData = JSON.parse(jsonStr) |
|
|
break |
|
|
} |
|
|
} |
|
|
} |
|
|
} else { |
|
|
|
|
|
errorData = JSON.parse(fullResponse) |
|
|
} |
|
|
} catch (e) { |
|
|
logger.error('Failed to parse error response:', e) |
|
|
errorData = { error: { message: fullResponse || 'Unknown error' } } |
|
|
} |
|
|
} |
|
|
|
|
|
logger.error('OpenAI-Responses API error', { |
|
|
status: response.status, |
|
|
statusText: response.statusText, |
|
|
errorData |
|
|
}) |
|
|
|
|
|
if (response.status === 401) { |
|
|
let reason = 'OpenAI Responses账号认证失败(401错误)' |
|
|
if (errorData) { |
|
|
if (typeof errorData === 'string' && errorData.trim()) { |
|
|
reason = `OpenAI Responses账号认证失败(401错误):${errorData.trim()}` |
|
|
} else if ( |
|
|
errorData.error && |
|
|
typeof errorData.error.message === 'string' && |
|
|
errorData.error.message.trim() |
|
|
) { |
|
|
reason = `OpenAI Responses账号认证失败(401错误):${errorData.error.message.trim()}` |
|
|
} else if (typeof errorData.message === 'string' && errorData.message.trim()) { |
|
|
reason = `OpenAI Responses账号认证失败(401错误):${errorData.message.trim()}` |
|
|
} |
|
|
} |
|
|
|
|
|
try { |
|
|
await unifiedOpenAIScheduler.markAccountUnauthorized( |
|
|
account.id, |
|
|
'openai-responses', |
|
|
sessionHash, |
|
|
reason |
|
|
) |
|
|
} catch (markError) { |
|
|
logger.error( |
|
|
'❌ Failed to mark OpenAI-Responses account unauthorized after 401:', |
|
|
markError |
|
|
) |
|
|
} |
|
|
|
|
|
let unauthorizedResponse = errorData |
|
|
if ( |
|
|
!unauthorizedResponse || |
|
|
typeof unauthorizedResponse !== 'object' || |
|
|
unauthorizedResponse.pipe || |
|
|
Buffer.isBuffer(unauthorizedResponse) |
|
|
) { |
|
|
const fallbackMessage = |
|
|
typeof errorData === 'string' && errorData.trim() ? errorData.trim() : 'Unauthorized' |
|
|
unauthorizedResponse = { |
|
|
error: { |
|
|
message: fallbackMessage, |
|
|
type: 'unauthorized', |
|
|
code: 'unauthorized' |
|
|
} |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
req.removeListener('close', handleClientDisconnect) |
|
|
res.removeListener('close', handleClientDisconnect) |
|
|
|
|
|
return res.status(401).json(unauthorizedResponse) |
|
|
} |
|
|
|
|
|
|
|
|
req.removeListener('close', handleClientDisconnect) |
|
|
res.removeListener('close', handleClientDisconnect) |
|
|
|
|
|
return res.status(response.status).json(errorData) |
|
|
} |
|
|
|
|
|
|
|
|
await openaiResponsesAccountService.updateAccount(account.id, { |
|
|
lastUsedAt: new Date().toISOString() |
|
|
}) |
|
|
|
|
|
|
|
|
if (req.body?.stream && response.data && typeof response.data.pipe === 'function') { |
|
|
return this._handleStreamResponse( |
|
|
response, |
|
|
res, |
|
|
account, |
|
|
apiKeyData, |
|
|
req.body?.model, |
|
|
handleClientDisconnect, |
|
|
req |
|
|
) |
|
|
} |
|
|
|
|
|
|
|
|
return this._handleNormalResponse(response, res, account, apiKeyData, req.body?.model) |
|
|
} catch (error) { |
|
|
|
|
|
if (abortController && !abortController.signal.aborted) { |
|
|
abortController.abort() |
|
|
} |
|
|
|
|
|
|
|
|
const errorInfo = { |
|
|
message: error.message, |
|
|
code: error.code, |
|
|
status: error.response?.status, |
|
|
statusText: error.response?.statusText |
|
|
} |
|
|
logger.error('OpenAI-Responses relay error:', errorInfo) |
|
|
|
|
|
|
|
|
if (error.code === 'ECONNREFUSED' || error.code === 'ETIMEDOUT') { |
|
|
await openaiResponsesAccountService.updateAccount(account.id, { |
|
|
status: 'error', |
|
|
errorMessage: `Connection error: ${error.code}` |
|
|
}) |
|
|
} |
|
|
|
|
|
|
|
|
if (res.headersSent) { |
|
|
return res.end() |
|
|
} |
|
|
|
|
|
|
|
|
if (error.response) { |
|
|
|
|
|
const status = error.response.status || 500 |
|
|
let errorData = { |
|
|
error: { |
|
|
message: error.response.statusText || 'Request failed', |
|
|
type: 'api_error', |
|
|
code: error.code || 'unknown' |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
if (error.response.data) { |
|
|
|
|
|
if (typeof error.response.data === 'object' && !error.response.data.pipe) { |
|
|
errorData = error.response.data |
|
|
} else if (typeof error.response.data === 'string') { |
|
|
try { |
|
|
errorData = JSON.parse(error.response.data) |
|
|
} catch (e) { |
|
|
errorData.error.message = error.response.data |
|
|
} |
|
|
} |
|
|
} |
|
|
|
|
|
if (status === 401) { |
|
|
let reason = 'OpenAI Responses账号认证失败(401错误)' |
|
|
if (errorData) { |
|
|
if (typeof errorData === 'string' && errorData.trim()) { |
|
|
reason = `OpenAI Responses账号认证失败(401错误):${errorData.trim()}` |
|
|
} else if ( |
|
|
errorData.error && |
|
|
typeof errorData.error.message === 'string' && |
|
|
errorData.error.message.trim() |
|
|
) { |
|
|
reason = `OpenAI Responses账号认证失败(401错误):${errorData.error.message.trim()}` |
|
|
} else if (typeof errorData.message === 'string' && errorData.message.trim()) { |
|
|
reason = `OpenAI Responses账号认证失败(401错误):${errorData.message.trim()}` |
|
|
} |
|
|
} |
|
|
|
|
|
try { |
|
|
await unifiedOpenAIScheduler.markAccountUnauthorized( |
|
|
account.id, |
|
|
'openai-responses', |
|
|
sessionHash, |
|
|
reason |
|
|
) |
|
|
} catch (markError) { |
|
|
logger.error( |
|
|
'❌ Failed to mark OpenAI-Responses account unauthorized in catch handler:', |
|
|
markError |
|
|
) |
|
|
} |
|
|
|
|
|
let unauthorizedResponse = errorData |
|
|
if ( |
|
|
!unauthorizedResponse || |
|
|
typeof unauthorizedResponse !== 'object' || |
|
|
unauthorizedResponse.pipe || |
|
|
Buffer.isBuffer(unauthorizedResponse) |
|
|
) { |
|
|
const fallbackMessage = |
|
|
typeof errorData === 'string' && errorData.trim() ? errorData.trim() : 'Unauthorized' |
|
|
unauthorizedResponse = { |
|
|
error: { |
|
|
message: fallbackMessage, |
|
|
type: 'unauthorized', |
|
|
code: 'unauthorized' |
|
|
} |
|
|
} |
|
|
} |
|
|
|
|
|
return res.status(401).json(unauthorizedResponse) |
|
|
} |
|
|
|
|
|
return res.status(status).json(errorData) |
|
|
} |
|
|
|
|
|
|
|
|
return res.status(500).json({ |
|
|
error: { |
|
|
message: 'Internal server error', |
|
|
type: 'internal_error', |
|
|
details: error.message |
|
|
} |
|
|
}) |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
async _handleStreamResponse( |
|
|
response, |
|
|
res, |
|
|
account, |
|
|
apiKeyData, |
|
|
requestedModel, |
|
|
handleClientDisconnect, |
|
|
req |
|
|
) { |
|
|
|
|
|
res.setHeader('Content-Type', 'text/event-stream') |
|
|
res.setHeader('Cache-Control', 'no-cache') |
|
|
res.setHeader('Connection', 'keep-alive') |
|
|
res.setHeader('X-Accel-Buffering', 'no') |
|
|
|
|
|
let usageData = null |
|
|
let actualModel = null |
|
|
let buffer = '' |
|
|
let rateLimitDetected = false |
|
|
let rateLimitResetsInSeconds = null |
|
|
let streamEnded = false |
|
|
|
|
|
|
|
|
const parseSSEForUsage = (data) => { |
|
|
const lines = data.split('\n') |
|
|
|
|
|
for (const line of lines) { |
|
|
if (line.startsWith('data: ')) { |
|
|
try { |
|
|
const jsonStr = line.slice(6) |
|
|
if (jsonStr === '[DONE]') { |
|
|
continue |
|
|
} |
|
|
|
|
|
const eventData = JSON.parse(jsonStr) |
|
|
|
|
|
|
|
|
if (eventData.type === 'response.completed' && eventData.response) { |
|
|
|
|
|
if (eventData.response.model) { |
|
|
actualModel = eventData.response.model |
|
|
logger.debug(`📊 Captured actual model from response.completed: ${actualModel}`) |
|
|
} |
|
|
|
|
|
|
|
|
if (eventData.response.usage) { |
|
|
usageData = eventData.response.usage |
|
|
logger.info('📊 Successfully captured usage data from OpenAI-Responses:', { |
|
|
input_tokens: usageData.input_tokens, |
|
|
output_tokens: usageData.output_tokens, |
|
|
total_tokens: usageData.total_tokens |
|
|
}) |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
if (eventData.error) { |
|
|
|
|
|
if ( |
|
|
eventData.error.type === 'rate_limit_error' || |
|
|
eventData.error.type === 'usage_limit_reached' || |
|
|
eventData.error.type === 'rate_limit_exceeded' |
|
|
) { |
|
|
rateLimitDetected = true |
|
|
if (eventData.error.resets_in_seconds) { |
|
|
rateLimitResetsInSeconds = eventData.error.resets_in_seconds |
|
|
logger.warn( |
|
|
`🚫 Rate limit detected in stream, resets in ${rateLimitResetsInSeconds} seconds (${Math.ceil(rateLimitResetsInSeconds / 60)} minutes)` |
|
|
) |
|
|
} |
|
|
} |
|
|
} |
|
|
} catch (e) { |
|
|
|
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
response.data.on('data', (chunk) => { |
|
|
try { |
|
|
const chunkStr = chunk.toString() |
|
|
|
|
|
|
|
|
if (!res.destroyed && !streamEnded) { |
|
|
res.write(chunk) |
|
|
} |
|
|
|
|
|
|
|
|
buffer += chunkStr |
|
|
|
|
|
|
|
|
if (buffer.includes('\n\n')) { |
|
|
const events = buffer.split('\n\n') |
|
|
buffer = events.pop() || '' |
|
|
|
|
|
for (const event of events) { |
|
|
if (event.trim()) { |
|
|
parseSSEForUsage(event) |
|
|
} |
|
|
} |
|
|
} |
|
|
} catch (error) { |
|
|
logger.error('Error processing stream chunk:', error) |
|
|
} |
|
|
}) |
|
|
|
|
|
response.data.on('end', async () => { |
|
|
streamEnded = true |
|
|
|
|
|
|
|
|
if (buffer.trim()) { |
|
|
parseSSEForUsage(buffer) |
|
|
} |
|
|
|
|
|
|
|
|
if (usageData) { |
|
|
try { |
|
|
|
|
|
const totalInputTokens = usageData.input_tokens || usageData.prompt_tokens || 0 |
|
|
const outputTokens = usageData.output_tokens || usageData.completion_tokens || 0 |
|
|
|
|
|
|
|
|
const cacheReadTokens = usageData.input_tokens_details?.cached_tokens || 0 |
|
|
const cacheCreateTokens = extractCacheCreationTokens(usageData) |
|
|
|
|
|
const actualInputTokens = Math.max(0, totalInputTokens - cacheReadTokens) |
|
|
|
|
|
const totalTokens = |
|
|
usageData.total_tokens || totalInputTokens + outputTokens + cacheCreateTokens |
|
|
const modelToRecord = actualModel || requestedModel || 'gpt-4' |
|
|
|
|
|
await apiKeyService.recordUsage( |
|
|
apiKeyData.id, |
|
|
actualInputTokens, |
|
|
outputTokens, |
|
|
cacheCreateTokens, |
|
|
cacheReadTokens, |
|
|
modelToRecord, |
|
|
account.id |
|
|
) |
|
|
|
|
|
logger.info( |
|
|
`📊 Recorded usage - Input: ${totalInputTokens}(actual:${actualInputTokens}+cached:${cacheReadTokens}), CacheCreate: ${cacheCreateTokens}, Output: ${outputTokens}, Total: ${totalTokens}, Model: ${modelToRecord}` |
|
|
) |
|
|
|
|
|
|
|
|
await openaiResponsesAccountService.updateAccountUsage(account.id, totalTokens) |
|
|
|
|
|
|
|
|
if (parseFloat(account.dailyQuota) > 0) { |
|
|
|
|
|
const CostCalculator = require('../utils/costCalculator') |
|
|
const costInfo = CostCalculator.calculateCost( |
|
|
{ |
|
|
input_tokens: actualInputTokens, |
|
|
output_tokens: outputTokens, |
|
|
cache_creation_input_tokens: cacheCreateTokens, |
|
|
cache_read_input_tokens: cacheReadTokens |
|
|
}, |
|
|
modelToRecord |
|
|
) |
|
|
await openaiResponsesAccountService.updateUsageQuota(account.id, costInfo.costs.total) |
|
|
} |
|
|
} catch (error) { |
|
|
logger.error('Failed to record usage:', error) |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
if (rateLimitDetected) { |
|
|
|
|
|
const sessionId = req.headers['session_id'] || req.body?.session_id |
|
|
const sessionHash = sessionId |
|
|
? crypto.createHash('sha256').update(sessionId).digest('hex') |
|
|
: null |
|
|
|
|
|
await unifiedOpenAIScheduler.markAccountRateLimited( |
|
|
account.id, |
|
|
'openai-responses', |
|
|
sessionHash, |
|
|
rateLimitResetsInSeconds |
|
|
) |
|
|
|
|
|
logger.warn( |
|
|
`🚫 Processing rate limit for OpenAI-Responses account ${account.id} from stream` |
|
|
) |
|
|
} |
|
|
|
|
|
|
|
|
req.removeListener('close', handleClientDisconnect) |
|
|
res.removeListener('close', handleClientDisconnect) |
|
|
|
|
|
if (!res.destroyed) { |
|
|
res.end() |
|
|
} |
|
|
|
|
|
logger.info('Stream response completed', { |
|
|
accountId: account.id, |
|
|
hasUsage: !!usageData, |
|
|
actualModel: actualModel || 'unknown' |
|
|
}) |
|
|
}) |
|
|
|
|
|
response.data.on('error', (error) => { |
|
|
streamEnded = true |
|
|
logger.error('Stream error:', error) |
|
|
|
|
|
|
|
|
req.removeListener('close', handleClientDisconnect) |
|
|
res.removeListener('close', handleClientDisconnect) |
|
|
|
|
|
if (!res.headersSent) { |
|
|
res.status(502).json({ error: { message: 'Upstream stream error' } }) |
|
|
} else if (!res.destroyed) { |
|
|
res.end() |
|
|
} |
|
|
}) |
|
|
|
|
|
|
|
|
const cleanup = () => { |
|
|
streamEnded = true |
|
|
try { |
|
|
response.data?.unpipe?.(res) |
|
|
response.data?.destroy?.() |
|
|
} catch (_) { |
|
|
|
|
|
} |
|
|
} |
|
|
|
|
|
req.on('close', cleanup) |
|
|
req.on('aborted', cleanup) |
|
|
} |
|
|
|
|
|
|
|
|
async _handleNormalResponse(response, res, account, apiKeyData, requestedModel) { |
|
|
const responseData = response.data |
|
|
|
|
|
|
|
|
|
|
|
const usageData = responseData?.usage || responseData?.response?.usage |
|
|
const actualModel = |
|
|
responseData?.model || responseData?.response?.model || requestedModel || 'gpt-4' |
|
|
|
|
|
|
|
|
if (usageData) { |
|
|
try { |
|
|
|
|
|
const totalInputTokens = usageData.input_tokens || usageData.prompt_tokens || 0 |
|
|
const outputTokens = usageData.output_tokens || usageData.completion_tokens || 0 |
|
|
|
|
|
|
|
|
const cacheReadTokens = usageData.input_tokens_details?.cached_tokens || 0 |
|
|
const cacheCreateTokens = extractCacheCreationTokens(usageData) |
|
|
|
|
|
const actualInputTokens = Math.max(0, totalInputTokens - cacheReadTokens) |
|
|
|
|
|
const totalTokens = |
|
|
usageData.total_tokens || totalInputTokens + outputTokens + cacheCreateTokens |
|
|
|
|
|
await apiKeyService.recordUsage( |
|
|
apiKeyData.id, |
|
|
actualInputTokens, |
|
|
outputTokens, |
|
|
cacheCreateTokens, |
|
|
cacheReadTokens, |
|
|
actualModel, |
|
|
account.id |
|
|
) |
|
|
|
|
|
logger.info( |
|
|
`📊 Recorded non-stream usage - Input: ${totalInputTokens}(actual:${actualInputTokens}+cached:${cacheReadTokens}), CacheCreate: ${cacheCreateTokens}, Output: ${outputTokens}, Total: ${totalTokens}, Model: ${actualModel}` |
|
|
) |
|
|
|
|
|
|
|
|
await openaiResponsesAccountService.updateAccountUsage(account.id, totalTokens) |
|
|
|
|
|
|
|
|
if (parseFloat(account.dailyQuota) > 0) { |
|
|
|
|
|
const CostCalculator = require('../utils/costCalculator') |
|
|
const costInfo = CostCalculator.calculateCost( |
|
|
{ |
|
|
input_tokens: actualInputTokens, |
|
|
output_tokens: outputTokens, |
|
|
cache_creation_input_tokens: cacheCreateTokens, |
|
|
cache_read_input_tokens: cacheReadTokens |
|
|
}, |
|
|
actualModel |
|
|
) |
|
|
await openaiResponsesAccountService.updateUsageQuota(account.id, costInfo.costs.total) |
|
|
} |
|
|
} catch (error) { |
|
|
logger.error('Failed to record usage:', error) |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
res.status(response.status).json(responseData) |
|
|
|
|
|
logger.info('Normal response completed', { |
|
|
accountId: account.id, |
|
|
status: response.status, |
|
|
hasUsage: !!usageData, |
|
|
model: actualModel |
|
|
}) |
|
|
} |
|
|
|
|
|
|
|
|
async _handle429Error(account, response, isStream = false, sessionHash = null) { |
|
|
let resetsInSeconds = null |
|
|
let errorData = null |
|
|
|
|
|
try { |
|
|
|
|
|
if (isStream && response.data && typeof response.data.pipe === 'function') { |
|
|
|
|
|
const chunks = [] |
|
|
await new Promise((resolve, reject) => { |
|
|
response.data.on('data', (chunk) => chunks.push(chunk)) |
|
|
response.data.on('end', resolve) |
|
|
response.data.on('error', reject) |
|
|
|
|
|
setTimeout(resolve, 5000) |
|
|
}) |
|
|
|
|
|
const fullResponse = Buffer.concat(chunks).toString() |
|
|
|
|
|
|
|
|
if (fullResponse.includes('data: ')) { |
|
|
const lines = fullResponse.split('\n') |
|
|
for (const line of lines) { |
|
|
if (line.startsWith('data: ')) { |
|
|
try { |
|
|
const jsonStr = line.slice(6).trim() |
|
|
if (jsonStr && jsonStr !== '[DONE]') { |
|
|
errorData = JSON.parse(jsonStr) |
|
|
break |
|
|
} |
|
|
} catch (e) { |
|
|
|
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
if (!errorData) { |
|
|
try { |
|
|
errorData = JSON.parse(fullResponse) |
|
|
} catch (e) { |
|
|
logger.error('Failed to parse 429 error response:', e) |
|
|
logger.debug('Raw response:', fullResponse) |
|
|
} |
|
|
} |
|
|
} else if (response.data && typeof response.data !== 'object') { |
|
|
|
|
|
try { |
|
|
errorData = JSON.parse(response.data) |
|
|
} catch (e) { |
|
|
logger.error('Failed to parse 429 error response as JSON:', e) |
|
|
errorData = { error: { message: response.data } } |
|
|
} |
|
|
} else if (response.data && typeof response.data === 'object' && !response.data.pipe) { |
|
|
|
|
|
errorData = response.data |
|
|
} |
|
|
|
|
|
|
|
|
if (errorData && errorData.error) { |
|
|
if (errorData.error.resets_in_seconds) { |
|
|
resetsInSeconds = errorData.error.resets_in_seconds |
|
|
logger.info( |
|
|
`🕐 Rate limit will reset in ${resetsInSeconds} seconds (${Math.ceil(resetsInSeconds / 60)} minutes / ${Math.ceil(resetsInSeconds / 3600)} hours)` |
|
|
) |
|
|
} else if (errorData.error.resets_in) { |
|
|
|
|
|
resetsInSeconds = parseInt(errorData.error.resets_in) |
|
|
logger.info( |
|
|
`🕐 Rate limit will reset in ${resetsInSeconds} seconds (${Math.ceil(resetsInSeconds / 60)} minutes / ${Math.ceil(resetsInSeconds / 3600)} hours)` |
|
|
) |
|
|
} |
|
|
} |
|
|
|
|
|
if (!resetsInSeconds) { |
|
|
logger.warn('⚠️ Could not extract reset time from 429 response, using default 60 minutes') |
|
|
} |
|
|
} catch (e) { |
|
|
logger.error('⚠️ Failed to parse rate limit error:', e) |
|
|
} |
|
|
|
|
|
|
|
|
await unifiedOpenAIScheduler.markAccountRateLimited( |
|
|
account.id, |
|
|
'openai-responses', |
|
|
sessionHash, |
|
|
resetsInSeconds |
|
|
) |
|
|
|
|
|
logger.warn('OpenAI-Responses account rate limited', { |
|
|
accountId: account.id, |
|
|
accountName: account.name, |
|
|
resetsInSeconds: resetsInSeconds || 'unknown', |
|
|
resetInMinutes: resetsInSeconds ? Math.ceil(resetsInSeconds / 60) : 60, |
|
|
resetInHours: resetsInSeconds ? Math.ceil(resetsInSeconds / 3600) : 1 |
|
|
}) |
|
|
|
|
|
|
|
|
return { resetsInSeconds, errorData } |
|
|
} |
|
|
|
|
|
|
|
|
_filterRequestHeaders(headers) { |
|
|
const filtered = {} |
|
|
const skipHeaders = [ |
|
|
'host', |
|
|
'content-length', |
|
|
'authorization', |
|
|
'x-api-key', |
|
|
'x-cr-api-key', |
|
|
'connection', |
|
|
'upgrade', |
|
|
'sec-websocket-key', |
|
|
'sec-websocket-version', |
|
|
'sec-websocket-extensions' |
|
|
] |
|
|
|
|
|
for (const [key, value] of Object.entries(headers)) { |
|
|
if (!skipHeaders.includes(key.toLowerCase())) { |
|
|
filtered[key] = value |
|
|
} |
|
|
} |
|
|
|
|
|
return filtered |
|
|
} |
|
|
|
|
|
|
|
|
_estimateCost(model, inputTokens, outputTokens) { |
|
|
|
|
|
const rates = { |
|
|
'gpt-4': { input: 0.03, output: 0.06 }, |
|
|
'gpt-4-turbo': { input: 0.01, output: 0.03 }, |
|
|
'gpt-3.5-turbo': { input: 0.0005, output: 0.0015 }, |
|
|
'claude-3-opus': { input: 0.015, output: 0.075 }, |
|
|
'claude-3-sonnet': { input: 0.003, output: 0.015 }, |
|
|
'claude-3-haiku': { input: 0.00025, output: 0.00125 } |
|
|
} |
|
|
|
|
|
|
|
|
let rate = rates['gpt-3.5-turbo'] |
|
|
for (const [modelKey, modelRate] of Object.entries(rates)) { |
|
|
if (model.toLowerCase().includes(modelKey.toLowerCase())) { |
|
|
rate = modelRate |
|
|
break |
|
|
} |
|
|
} |
|
|
|
|
|
const inputCost = (inputTokens / 1000) * rate.input |
|
|
const outputCost = (outputTokens / 1000) * rate.output |
|
|
return inputCost + outputCost |
|
|
} |
|
|
} |
|
|
|
|
|
module.exports = new OpenAIResponsesRelayService() |
|
|
|