| const express = require('express') |
| const router = express.Router() |
| const logger = require('../utils/logger') |
| const { authenticateApiKey } = require('../middleware/auth') |
| const geminiAccountService = require('../services/geminiAccountService') |
| const { sendGeminiRequest, getAvailableModels } = require('../services/geminiRelayService') |
| const crypto = require('crypto') |
| const sessionHelper = require('../utils/sessionHelper') |
| const unifiedGeminiScheduler = require('../services/unifiedGeminiScheduler') |
| const apiKeyService = require('../services/apiKeyService') |
| const { updateRateLimitCounters } = require('../utils/rateLimitHelper') |
| |
|
|
| |
| function generateSessionHash(req) { |
| const apiKeyPrefix = |
| req.headers['x-api-key']?.substring(0, 10) || req.headers['x-goog-api-key']?.substring(0, 10) |
|
|
| const sessionData = [req.headers['user-agent'], req.ip, apiKeyPrefix].filter(Boolean).join(':') |
|
|
| return crypto.createHash('sha256').update(sessionData).digest('hex') |
| } |
|
|
| |
| function checkPermissions(apiKeyData, requiredPermission = 'gemini') { |
| const permissions = apiKeyData.permissions || 'all' |
| return permissions === 'all' || permissions === requiredPermission |
| } |
|
|
| |
| function ensureGeminiPermission(req, res) { |
| const apiKeyData = req.apiKey || {} |
| if (checkPermissions(apiKeyData, 'gemini')) { |
| return true |
| } |
|
|
| logger.security( |
| `🚫 API Key ${apiKeyData.id || 'unknown'} 缺少 Gemini 权限,拒绝访问 ${req.originalUrl}` |
| ) |
|
|
| res.status(403).json({ |
| error: { |
| message: 'This API key does not have permission to access Gemini', |
| type: 'permission_denied' |
| } |
| }) |
| return false |
| } |
|
|
| async function applyRateLimitTracking(req, usageSummary, model, context = '') { |
| if (!req.rateLimitInfo) { |
| return |
| } |
|
|
| const label = context ? ` (${context})` : '' |
|
|
| try { |
| const { totalTokens, totalCost } = await updateRateLimitCounters( |
| req.rateLimitInfo, |
| usageSummary, |
| model |
| ) |
|
|
| if (totalTokens > 0) { |
| logger.api(`📊 Updated rate limit token count${label}: +${totalTokens} tokens`) |
| } |
| if (typeof totalCost === 'number' && totalCost > 0) { |
| logger.api(`💰 Updated rate limit cost count${label}: +$${totalCost.toFixed(6)}`) |
| } |
| } catch (error) { |
| logger.error(`❌ Failed to update rate limit counters${label}:`, error) |
| } |
| } |
|
|
| |
| router.post('/messages', authenticateApiKey, async (req, res) => { |
| const startTime = Date.now() |
| let abortController = null |
|
|
| try { |
| const apiKeyData = req.apiKey |
|
|
| |
| if (!checkPermissions(apiKeyData, 'gemini')) { |
| return res.status(403).json({ |
| error: { |
| message: 'This API key does not have permission to access Gemini', |
| type: 'permission_denied' |
| } |
| }) |
| } |
|
|
| |
| const { |
| messages, |
| model = 'gemini-2.5-flash', |
| temperature = 0.7, |
| max_tokens = 4096, |
| stream = false |
| } = req.body |
|
|
| |
| if (!messages || !Array.isArray(messages) || messages.length === 0) { |
| return res.status(400).json({ |
| error: { |
| message: 'Messages array is required', |
| type: 'invalid_request_error' |
| } |
| }) |
| } |
|
|
| |
| const sessionHash = generateSessionHash(req) |
|
|
| |
| let accountId |
| try { |
| const schedulerResult = await unifiedGeminiScheduler.selectAccountForApiKey( |
| apiKeyData, |
| sessionHash, |
| model |
| ) |
| const { accountId: selectedAccountId } = schedulerResult |
| accountId = selectedAccountId |
| } catch (error) { |
| logger.error('Failed to select Gemini account:', error) |
| return res.status(503).json({ |
| error: { |
| message: error.message || 'No available Gemini accounts', |
| type: 'service_unavailable' |
| } |
| }) |
| } |
|
|
| |
| const account = await geminiAccountService.getAccount(accountId) |
| if (!account) { |
| return res.status(503).json({ |
| error: { |
| message: 'Selected account not found', |
| type: 'service_unavailable' |
| } |
| }) |
| } |
|
|
| logger.info(`Using Gemini account: ${account.id} for API key: ${apiKeyData.id}`) |
|
|
| |
| await geminiAccountService.markAccountUsed(account.id) |
|
|
| |
| abortController = new AbortController() |
|
|
| |
| req.on('close', () => { |
| if (abortController && !abortController.signal.aborted) { |
| logger.info('Client disconnected, aborting Gemini request') |
| abortController.abort() |
| } |
| }) |
|
|
| |
| const geminiResponse = await sendGeminiRequest({ |
| messages, |
| model, |
| temperature, |
| maxTokens: max_tokens, |
| stream, |
| accessToken: account.accessToken, |
| proxy: account.proxy, |
| apiKeyId: apiKeyData.id, |
| signal: abortController.signal, |
| projectId: account.projectId, |
| accountId: account.id |
| }) |
|
|
| if (stream) { |
| |
| res.setHeader('Content-Type', 'text/event-stream') |
| res.setHeader('Cache-Control', 'no-cache') |
| res.setHeader('Connection', 'keep-alive') |
| res.setHeader('X-Accel-Buffering', 'no') |
|
|
| |
| for await (const chunk of geminiResponse) { |
| if (abortController.signal.aborted) { |
| break |
| } |
| res.write(chunk) |
| } |
|
|
| res.end() |
| } else { |
| |
| res.json(geminiResponse) |
| } |
|
|
| const duration = Date.now() - startTime |
| logger.info(`Gemini request completed in ${duration}ms`) |
| } catch (error) { |
| logger.error('Gemini request error:', error) |
|
|
| |
| if (error.status === 429) { |
| if (req.apiKey && req.account) { |
| await geminiAccountService.setAccountRateLimited(req.account.id, true) |
| } |
| } |
|
|
| |
| const status = error.status || 500 |
| const errorResponse = { |
| error: error.error || { |
| message: error.message || 'Internal server error', |
| type: 'api_error' |
| } |
| } |
|
|
| res.status(status).json(errorResponse) |
| } finally { |
| |
| if (abortController) { |
| abortController = null |
| } |
| } |
| return undefined |
| }) |
|
|
| |
| router.get('/models', authenticateApiKey, async (req, res) => { |
| try { |
| const apiKeyData = req.apiKey |
|
|
| |
| if (!checkPermissions(apiKeyData, 'gemini')) { |
| return res.status(403).json({ |
| error: { |
| message: 'This API key does not have permission to access Gemini', |
| type: 'permission_denied' |
| } |
| }) |
| } |
|
|
| |
| let account = null |
| try { |
| const accountSelection = await unifiedGeminiScheduler.selectAccountForApiKey( |
| apiKeyData, |
| null, |
| null |
| ) |
| account = await geminiAccountService.getAccount(accountSelection.accountId) |
| } catch (error) { |
| logger.warn('Failed to select Gemini account for models endpoint:', error) |
| } |
|
|
| if (!account) { |
| |
| return res.json({ |
| object: 'list', |
| data: [ |
| { |
| id: 'gemini-2.5-flash', |
| object: 'model', |
| created: Date.now() / 1000, |
| owned_by: 'google' |
| } |
| ] |
| }) |
| } |
|
|
| |
| const models = await getAvailableModels(account.accessToken, account.proxy) |
|
|
| res.json({ |
| object: 'list', |
| data: models |
| }) |
| } catch (error) { |
| logger.error('Failed to get Gemini models:', error) |
| res.status(500).json({ |
| error: { |
| message: 'Failed to retrieve models', |
| type: 'api_error' |
| } |
| }) |
| } |
| return undefined |
| }) |
|
|
| |
| router.get('/usage', authenticateApiKey, async (req, res) => { |
| try { |
| const { usage } = req.apiKey |
|
|
| res.json({ |
| object: 'usage', |
| total_tokens: usage.total.tokens, |
| total_requests: usage.total.requests, |
| daily_tokens: usage.daily.tokens, |
| daily_requests: usage.daily.requests, |
| monthly_tokens: usage.monthly.tokens, |
| monthly_requests: usage.monthly.requests |
| }) |
| } catch (error) { |
| logger.error('Failed to get usage stats:', error) |
| res.status(500).json({ |
| error: { |
| message: 'Failed to retrieve usage statistics', |
| type: 'api_error' |
| } |
| }) |
| } |
| }) |
|
|
| |
| router.get('/key-info', authenticateApiKey, async (req, res) => { |
| try { |
| const keyData = req.apiKey |
|
|
| res.json({ |
| id: keyData.id, |
| name: keyData.name, |
| permissions: keyData.permissions || 'all', |
| token_limit: keyData.tokenLimit, |
| tokens_used: keyData.usage.total.tokens, |
| tokens_remaining: |
| keyData.tokenLimit > 0 |
| ? Math.max(0, keyData.tokenLimit - keyData.usage.total.tokens) |
| : null, |
| rate_limit: { |
| window: keyData.rateLimitWindow, |
| requests: keyData.rateLimitRequests |
| }, |
| concurrency_limit: keyData.concurrencyLimit, |
| model_restrictions: { |
| enabled: keyData.enableModelRestriction, |
| models: keyData.restrictedModels |
| } |
| }) |
| } catch (error) { |
| logger.error('Failed to get key info:', error) |
| res.status(500).json({ |
| error: { |
| message: 'Failed to retrieve API key information', |
| type: 'api_error' |
| } |
| }) |
| } |
| }) |
|
|
| |
| async function handleLoadCodeAssist(req, res) { |
| try { |
| if (!ensureGeminiPermission(req, res)) { |
| return undefined |
| } |
|
|
| const sessionHash = sessionHelper.generateSessionHash(req.body) |
|
|
| |
| const requestedModel = req.body.model || req.params.modelName || 'gemini-2.5-flash' |
| const { accountId } = await unifiedGeminiScheduler.selectAccountForApiKey( |
| req.apiKey, |
| sessionHash, |
| requestedModel |
| ) |
| const account = await geminiAccountService.getAccount(accountId) |
| const { accessToken, refreshToken, projectId } = account |
|
|
| const { metadata, cloudaicompanionProject } = req.body |
|
|
| const version = req.path.includes('v1beta') ? 'v1beta' : 'v1internal' |
| logger.info(`LoadCodeAssist request (${version})`, { |
| metadata: metadata || {}, |
| requestedProject: cloudaicompanionProject || null, |
| accountProject: projectId || null, |
| apiKeyId: req.apiKey?.id || 'unknown' |
| }) |
|
|
| |
| let proxyConfig = null |
| if (account.proxy) { |
| try { |
| proxyConfig = typeof account.proxy === 'string' ? JSON.parse(account.proxy) : account.proxy |
| } catch (e) { |
| logger.warn('Failed to parse proxy configuration:', e) |
| } |
| } |
|
|
| const client = await geminiAccountService.getOauthClient(accessToken, refreshToken, proxyConfig) |
|
|
| |
| |
| |
| |
| const effectiveProjectId = projectId || cloudaicompanionProject || null |
|
|
| logger.info('📋 loadCodeAssist项目ID处理逻辑', { |
| accountProjectId: projectId, |
| requestProjectId: cloudaicompanionProject, |
| effectiveProjectId, |
| decision: projectId |
| ? '使用账户配置' |
| : cloudaicompanionProject |
| ? '使用请求参数' |
| : '不使用项目ID' |
| }) |
|
|
| const response = await geminiAccountService.loadCodeAssist( |
| client, |
| effectiveProjectId, |
| proxyConfig |
| ) |
|
|
| |
| if (response.cloudaicompanionProject && !account.projectId) { |
| await geminiAccountService.updateTempProjectId(accountId, response.cloudaicompanionProject) |
| logger.info( |
| `📋 Cached temporary projectId from loadCodeAssist: ${response.cloudaicompanionProject}` |
| ) |
| } |
|
|
| res.json(response) |
| } catch (error) { |
| const version = req.path.includes('v1beta') ? 'v1beta' : 'v1internal' |
| logger.error(`Error in loadCodeAssist endpoint (${version})`, { error: error.message }) |
| res.status(500).json({ |
| error: 'Internal server error', |
| message: error.message |
| }) |
| } |
| } |
|
|
| |
| async function handleOnboardUser(req, res) { |
| try { |
| if (!ensureGeminiPermission(req, res)) { |
| return undefined |
| } |
|
|
| |
| const { tierId, cloudaicompanionProject, metadata } = req.body |
| const sessionHash = sessionHelper.generateSessionHash(req.body) |
|
|
| |
| const requestedModel = req.body.model || req.params.modelName || 'gemini-2.5-flash' |
| const { accountId } = await unifiedGeminiScheduler.selectAccountForApiKey( |
| req.apiKey, |
| sessionHash, |
| requestedModel |
| ) |
| const account = await geminiAccountService.getAccount(accountId) |
| const { accessToken, refreshToken, projectId } = account |
|
|
| const version = req.path.includes('v1beta') ? 'v1beta' : 'v1internal' |
| logger.info(`OnboardUser request (${version})`, { |
| tierId: tierId || 'not provided', |
| requestedProject: cloudaicompanionProject || null, |
| accountProject: projectId || null, |
| metadata: metadata || {}, |
| apiKeyId: req.apiKey?.id || 'unknown' |
| }) |
|
|
| |
| let proxyConfig = null |
| if (account.proxy) { |
| try { |
| proxyConfig = typeof account.proxy === 'string' ? JSON.parse(account.proxy) : account.proxy |
| } catch (e) { |
| logger.warn('Failed to parse proxy configuration:', e) |
| } |
| } |
|
|
| const client = await geminiAccountService.getOauthClient(accessToken, refreshToken, proxyConfig) |
|
|
| |
| |
| |
| |
| const effectiveProjectId = projectId || cloudaicompanionProject || null |
|
|
| logger.info('📋 onboardUser项目ID处理逻辑', { |
| accountProjectId: projectId, |
| requestProjectId: cloudaicompanionProject, |
| effectiveProjectId, |
| decision: projectId |
| ? '使用账户配置' |
| : cloudaicompanionProject |
| ? '使用请求参数' |
| : '不使用项目ID' |
| }) |
|
|
| |
| if (tierId) { |
| const response = await geminiAccountService.onboardUser( |
| client, |
| tierId, |
| effectiveProjectId, |
| metadata, |
| proxyConfig |
| ) |
|
|
| res.json(response) |
| } else { |
| |
| const response = await geminiAccountService.setupUser( |
| client, |
| effectiveProjectId, |
| metadata, |
| proxyConfig |
| ) |
|
|
| res.json(response) |
| } |
| } catch (error) { |
| const version = req.path.includes('v1beta') ? 'v1beta' : 'v1internal' |
| logger.error(`Error in onboardUser endpoint (${version})`, { error: error.message }) |
| res.status(500).json({ |
| error: 'Internal server error', |
| message: error.message |
| }) |
| } |
| } |
|
|
| |
| async function handleCountTokens(req, res) { |
| try { |
| if (!ensureGeminiPermission(req, res)) { |
| return undefined |
| } |
|
|
| |
| const requestData = req.body.request || req.body |
| const { contents } = requestData |
| |
| const model = requestData.model || req.params.modelName || 'gemini-2.5-flash' |
| const sessionHash = sessionHelper.generateSessionHash(req.body) |
|
|
| |
| if (!contents || !Array.isArray(contents)) { |
| return res.status(400).json({ |
| error: { |
| message: 'Contents array is required', |
| type: 'invalid_request_error' |
| } |
| }) |
| } |
|
|
| |
| const { accountId } = await unifiedGeminiScheduler.selectAccountForApiKey( |
| req.apiKey, |
| sessionHash, |
| model |
| ) |
| const account = await geminiAccountService.getAccount(accountId) |
| const { accessToken, refreshToken } = account |
|
|
| const version = req.path.includes('v1beta') ? 'v1beta' : 'v1internal' |
| logger.info(`CountTokens request (${version})`, { |
| model, |
| contentsLength: contents.length, |
| apiKeyId: req.apiKey?.id || 'unknown' |
| }) |
|
|
| |
| let proxyConfig = null |
| if (account.proxy) { |
| try { |
| proxyConfig = typeof account.proxy === 'string' ? JSON.parse(account.proxy) : account.proxy |
| } catch (e) { |
| logger.warn('Failed to parse proxy configuration:', e) |
| } |
| } |
|
|
| const client = await geminiAccountService.getOauthClient(accessToken, refreshToken, proxyConfig) |
| const response = await geminiAccountService.countTokens(client, contents, model, proxyConfig) |
|
|
| res.json(response) |
| } catch (error) { |
| const version = req.path.includes('v1beta') ? 'v1beta' : 'v1internal' |
| logger.error(`Error in countTokens endpoint (${version})`, { error: error.message }) |
| res.status(500).json({ |
| error: { |
| message: error.message || 'Internal server error', |
| type: 'api_error' |
| } |
| }) |
| } |
| return undefined |
| } |
|
|
| |
| async function handleGenerateContent(req, res) { |
| try { |
| if (!ensureGeminiPermission(req, res)) { |
| return undefined |
| } |
|
|
| const { project, user_prompt_id, request: requestData } = req.body |
| |
| const model = req.body.model || req.params.modelName || 'gemini-2.5-flash' |
| const sessionHash = sessionHelper.generateSessionHash(req.body) |
|
|
| |
| let actualRequestData = requestData |
| if (!requestData) { |
| if (req.body.messages) { |
| |
| actualRequestData = { |
| contents: req.body.messages.map((msg) => ({ |
| role: msg.role === 'assistant' ? 'model' : msg.role, |
| parts: [{ text: msg.content }] |
| })), |
| generationConfig: { |
| temperature: req.body.temperature !== undefined ? req.body.temperature : 0.7, |
| maxOutputTokens: req.body.max_tokens !== undefined ? req.body.max_tokens : 4096, |
| topP: req.body.top_p !== undefined ? req.body.top_p : 0.95, |
| topK: req.body.top_k !== undefined ? req.body.top_k : 40 |
| } |
| } |
| } else if (req.body.contents) { |
| |
| actualRequestData = req.body |
| } |
| } |
|
|
| |
| if (!actualRequestData || !actualRequestData.contents) { |
| return res.status(400).json({ |
| error: { |
| message: 'Request contents are required', |
| type: 'invalid_request_error' |
| } |
| }) |
| } |
|
|
| |
| const { accountId } = await unifiedGeminiScheduler.selectAccountForApiKey( |
| req.apiKey, |
| sessionHash, |
| model |
| ) |
| const account = await geminiAccountService.getAccount(accountId) |
| const { accessToken, refreshToken } = account |
|
|
| const version = req.path.includes('v1beta') ? 'v1beta' : 'v1internal' |
| logger.info(`GenerateContent request (${version})`, { |
| model, |
| userPromptId: user_prompt_id, |
| projectId: project || account.projectId, |
| apiKeyId: req.apiKey?.id || 'unknown' |
| }) |
|
|
| |
| let proxyConfig = null |
| if (account.proxy) { |
| try { |
| proxyConfig = typeof account.proxy === 'string' ? JSON.parse(account.proxy) : account.proxy |
| } catch (e) { |
| logger.warn('Failed to parse proxy configuration:', e) |
| } |
| } |
|
|
| const client = await geminiAccountService.getOauthClient(accessToken, refreshToken, proxyConfig) |
|
|
| |
| |
| |
| |
| const effectiveProjectId = account.projectId || project || null |
|
|
| logger.info('📋 项目ID处理逻辑', { |
| accountProjectId: account.projectId, |
| requestProjectId: project, |
| effectiveProjectId, |
| decision: account.projectId ? '使用账户配置' : project ? '使用请求参数' : '不使用项目ID' |
| }) |
|
|
| const response = await geminiAccountService.generateContent( |
| client, |
| { model, request: actualRequestData }, |
| user_prompt_id, |
| effectiveProjectId, |
| req.apiKey?.id, |
| proxyConfig |
| ) |
|
|
| |
| if (response?.response?.usageMetadata) { |
| try { |
| const usage = response.response.usageMetadata |
| await apiKeyService.recordUsage( |
| req.apiKey.id, |
| usage.promptTokenCount || 0, |
| usage.candidatesTokenCount || 0, |
| 0, |
| 0, |
| model, |
| account.id |
| ) |
| logger.info( |
| `📊 Recorded Gemini usage - Input: ${usage.promptTokenCount}, Output: ${usage.candidatesTokenCount}, Total: ${usage.totalTokenCount}` |
| ) |
|
|
| await applyRateLimitTracking( |
| req, |
| { |
| inputTokens: usage.promptTokenCount || 0, |
| outputTokens: usage.candidatesTokenCount || 0, |
| cacheCreateTokens: 0, |
| cacheReadTokens: 0 |
| }, |
| model, |
| 'gemini-non-stream' |
| ) |
| } catch (error) { |
| logger.error('Failed to record Gemini usage:', error) |
| } |
| } |
|
|
| res.json(version === 'v1beta' ? response.response : response) |
| } catch (error) { |
| const version = req.path.includes('v1beta') ? 'v1beta' : 'v1internal' |
| |
| logger.error(`Error in generateContent endpoint (${version})`, { |
| message: error.message, |
| status: error.response?.status, |
| statusText: error.response?.statusText, |
| responseData: error.response?.data, |
| requestUrl: error.config?.url, |
| requestMethod: error.config?.method, |
| stack: error.stack |
| }) |
| res.status(500).json({ |
| error: { |
| message: error.message || 'Internal server error', |
| type: 'api_error' |
| } |
| }) |
| } |
| return undefined |
| } |
|
|
| |
| async function handleStreamGenerateContent(req, res) { |
| let abortController = null |
|
|
| try { |
| if (!ensureGeminiPermission(req, res)) { |
| return undefined |
| } |
|
|
| const { project, user_prompt_id, request: requestData } = req.body |
| |
| const model = req.body.model || req.params.modelName || 'gemini-2.5-flash' |
| const sessionHash = sessionHelper.generateSessionHash(req.body) |
|
|
| |
| let actualRequestData = requestData |
| if (!requestData) { |
| if (req.body.messages) { |
| |
| actualRequestData = { |
| contents: req.body.messages.map((msg) => ({ |
| role: msg.role === 'assistant' ? 'model' : msg.role, |
| parts: [{ text: msg.content }] |
| })), |
| generationConfig: { |
| temperature: req.body.temperature !== undefined ? req.body.temperature : 0.7, |
| maxOutputTokens: req.body.max_tokens !== undefined ? req.body.max_tokens : 4096, |
| topP: req.body.top_p !== undefined ? req.body.top_p : 0.95, |
| topK: req.body.top_k !== undefined ? req.body.top_k : 40 |
| } |
| } |
| } else if (req.body.contents) { |
| |
| actualRequestData = req.body |
| } |
| } |
|
|
| |
| if (!actualRequestData || !actualRequestData.contents) { |
| return res.status(400).json({ |
| error: { |
| message: 'Request contents are required', |
| type: 'invalid_request_error' |
| } |
| }) |
| } |
|
|
| |
| const { accountId } = await unifiedGeminiScheduler.selectAccountForApiKey( |
| req.apiKey, |
| sessionHash, |
| model |
| ) |
| const account = await geminiAccountService.getAccount(accountId) |
| const { accessToken, refreshToken } = account |
|
|
| const version = req.path.includes('v1beta') ? 'v1beta' : 'v1internal' |
| logger.info(`StreamGenerateContent request (${version})`, { |
| model, |
| userPromptId: user_prompt_id, |
| projectId: project || account.projectId, |
| apiKeyId: req.apiKey?.id || 'unknown' |
| }) |
|
|
| |
| abortController = new AbortController() |
|
|
| |
| req.on('close', () => { |
| if (abortController && !abortController.signal.aborted) { |
| logger.info('Client disconnected, aborting stream request') |
| abortController.abort() |
| } |
| }) |
|
|
| |
| let proxyConfig = null |
| if (account.proxy) { |
| try { |
| proxyConfig = typeof account.proxy === 'string' ? JSON.parse(account.proxy) : account.proxy |
| } catch (e) { |
| logger.warn('Failed to parse proxy configuration:', e) |
| } |
| } |
|
|
| const client = await geminiAccountService.getOauthClient(accessToken, refreshToken, proxyConfig) |
|
|
| |
| |
| |
| |
| const effectiveProjectId = account.projectId || project || null |
|
|
| logger.info('📋 流式请求项目ID处理逻辑', { |
| accountProjectId: account.projectId, |
| requestProjectId: project, |
| effectiveProjectId, |
| decision: account.projectId ? '使用账户配置' : project ? '使用请求参数' : '不使用项目ID' |
| }) |
|
|
| const streamResponse = await geminiAccountService.generateContentStream( |
| client, |
| { model, request: actualRequestData }, |
| user_prompt_id, |
| effectiveProjectId, |
| req.apiKey?.id, |
| abortController.signal, |
| proxyConfig |
| ) |
|
|
| |
| res.setHeader('Content-Type', 'text/event-stream') |
| res.setHeader('Cache-Control', 'no-cache') |
| res.setHeader('Connection', 'keep-alive') |
| res.setHeader('X-Accel-Buffering', 'no') |
|
|
| |
| const parseSSELine = (line) => { |
| if (!line.startsWith('data: ')) { |
| return { type: 'other', line, data: null } |
| } |
|
|
| const jsonStr = line.substring(6).trim() |
|
|
| if (!jsonStr || jsonStr === '[DONE]') { |
| return { type: 'control', line, data: null, jsonStr } |
| } |
|
|
| try { |
| const data = JSON.parse(jsonStr) |
| return { type: 'data', line, data, jsonStr } |
| } catch (e) { |
| return { type: 'invalid', line, data: null, jsonStr, error: e } |
| } |
| } |
|
|
| |
| let streamBuffer = '' |
| let totalUsage = { |
| promptTokenCount: 0, |
| candidatesTokenCount: 0, |
| totalTokenCount: 0 |
| } |
| const usageReported = false |
|
|
| streamResponse.on('data', (chunk) => { |
| try { |
| const chunkStr = chunk.toString() |
|
|
| if (!chunkStr.trim()) { |
| return |
| } |
|
|
| |
| streamBuffer += chunkStr |
| const lines = streamBuffer.split('\n') |
| streamBuffer = lines.pop() || '' |
|
|
| const processedLines = [] |
|
|
| for (const line of lines) { |
| if (!line.trim()) { |
| continue |
| } |
|
|
| |
| const parsed = parseSSELine(line) |
|
|
| |
| if (parsed.type === 'data' && parsed.data.response?.usageMetadata) { |
| totalUsage = parsed.data.response.usageMetadata |
| logger.debug('📊 Captured Gemini usage data:', totalUsage) |
| } |
|
|
| |
| if (version === 'v1beta') { |
| if (parsed.type === 'data') { |
| if (parsed.data.response) { |
| |
| processedLines.push(`data: ${JSON.stringify(parsed.data.response)}`) |
| } else { |
| |
| processedLines.push(`data: ${JSON.stringify(parsed.data)}`) |
| } |
| } else if (parsed.type === 'control') { |
| |
| processedLines.push(line) |
| } |
| |
| } |
| } |
|
|
| |
| if (version === 'v1beta') { |
| for (const line of processedLines) { |
| if (!res.destroyed) { |
| res.write(`${line}\n\n`) |
| } |
| } |
| } else { |
| |
| if (!res.destroyed) { |
| res.write(chunkStr) |
| } |
| } |
| } catch (error) { |
| logger.error('Error processing stream chunk:', error) |
| } |
| }) |
|
|
| streamResponse.on('end', async () => { |
| logger.info('Stream completed successfully') |
|
|
| |
| if (!usageReported && totalUsage.totalTokenCount > 0) { |
| try { |
| await apiKeyService.recordUsage( |
| req.apiKey.id, |
| totalUsage.promptTokenCount || 0, |
| totalUsage.candidatesTokenCount || 0, |
| 0, |
| 0, |
| model, |
| account.id |
| ) |
| logger.info( |
| `📊 Recorded Gemini stream usage - Input: ${totalUsage.promptTokenCount}, Output: ${totalUsage.candidatesTokenCount}, Total: ${totalUsage.totalTokenCount}` |
| ) |
|
|
| await applyRateLimitTracking( |
| req, |
| { |
| inputTokens: totalUsage.promptTokenCount || 0, |
| outputTokens: totalUsage.candidatesTokenCount || 0, |
| cacheCreateTokens: 0, |
| cacheReadTokens: 0 |
| }, |
| model, |
| 'gemini-stream' |
| ) |
| } catch (error) { |
| logger.error('Failed to record Gemini usage:', error) |
| } |
| } |
|
|
| res.end() |
| }) |
|
|
| streamResponse.on('error', (error) => { |
| logger.error('Stream error:', error) |
| if (!res.headersSent) { |
| res.status(500).json({ |
| error: { |
| message: error.message || 'Stream error', |
| type: 'api_error' |
| } |
| }) |
| } else { |
| res.end() |
| } |
| }) |
| } catch (error) { |
| const version = req.path.includes('v1beta') ? 'v1beta' : 'v1internal' |
| |
| logger.error(`Error in streamGenerateContent endpoint (${version})`, { |
| message: error.message, |
| status: error.response?.status, |
| statusText: error.response?.statusText, |
| responseData: error.response?.data, |
| requestUrl: error.config?.url, |
| requestMethod: error.config?.method, |
| stack: error.stack |
| }) |
|
|
| if (!res.headersSent) { |
| res.status(500).json({ |
| error: { |
| message: error.message || 'Internal server error', |
| type: 'api_error' |
| } |
| }) |
| } |
| } finally { |
| |
| if (abortController) { |
| abortController = null |
| } |
| } |
| return undefined |
| } |
|
|
| |
| |
| router.post('/v1internal\\:loadCodeAssist', authenticateApiKey, handleLoadCodeAssist) |
| router.post('/v1internal\\:onboardUser', authenticateApiKey, handleOnboardUser) |
| router.post('/v1internal\\:countTokens', authenticateApiKey, handleCountTokens) |
| router.post('/v1internal\\:generateContent', authenticateApiKey, handleGenerateContent) |
| router.post('/v1internal\\:streamGenerateContent', authenticateApiKey, handleStreamGenerateContent) |
|
|
| |
| router.post('/v1beta/models/:modelName\\:loadCodeAssist', authenticateApiKey, handleLoadCodeAssist) |
| router.post('/v1beta/models/:modelName\\:onboardUser', authenticateApiKey, handleOnboardUser) |
| router.post('/v1beta/models/:modelName\\:countTokens', authenticateApiKey, handleCountTokens) |
| router.post( |
| '/v1beta/models/:modelName\\:generateContent', |
| authenticateApiKey, |
| handleGenerateContent |
| ) |
| router.post( |
| '/v1beta/models/:modelName\\:streamGenerateContent', |
| authenticateApiKey, |
| handleStreamGenerateContent |
| ) |
|
|
| |
| module.exports = router |
| module.exports.handleLoadCodeAssist = handleLoadCodeAssist |
| module.exports.handleOnboardUser = handleOnboardUser |
| module.exports.handleCountTokens = handleCountTokens |
| module.exports.handleGenerateContent = handleGenerateContent |
| module.exports.handleStreamGenerateContent = handleStreamGenerateContent |
|
|