aiclient-2-api / src /providers /forward /forward-core.js
Jaasomn
Initial deployment
ceb3821
import axios from 'axios';
import * as http from 'http';
import * as https from 'https';
import { configureAxiosProxy } from '../../utils/proxy-utils.js';
import { isRetryableNetworkError } from '../../utils/common.js';
/**
* ForwardApiService - A provider that forwards requests to a specified API endpoint.
* Transparently passes all parameters and includes an API key in the headers.
*/
export class ForwardApiService {
constructor(config) {
if (!config.FORWARD_API_KEY) {
throw new Error("API Key is required for ForwardApiService (FORWARD_API_KEY).");
}
if (!config.FORWARD_BASE_URL) {
throw new Error("Base URL is required for ForwardApiService (FORWARD_BASE_URL).");
}
this.config = config;
this.apiKey = config.FORWARD_API_KEY;
this.baseUrl = config.FORWARD_BASE_URL;
this.useSystemProxy = config?.USE_SYSTEM_PROXY_FORWARD ?? false;
this.headerName = config?.FORWARD_HEADER_NAME || 'Authorization';
this.headerValuePrefix = config?.FORWARD_HEADER_VALUE_PREFIX || 'Bearer ';
console.log(`[Forward] Base URL: ${this.baseUrl}, System proxy ${this.useSystemProxy ? 'enabled' : 'disabled'}`);
const httpAgent = new http.Agent({
keepAlive: true,
maxSockets: 100,
maxFreeSockets: 5,
timeout: 120000,
});
const httpsAgent = new https.Agent({
keepAlive: true,
maxSockets: 100,
maxFreeSockets: 5,
timeout: 120000,
});
const headers = {
'Content-Type': 'application/json'
};
headers[this.headerName] = `${this.headerValuePrefix}${this.apiKey}`;
const axiosConfig = {
baseURL: this.baseUrl,
httpAgent,
httpsAgent,
headers,
};
if (!this.useSystemProxy) {
axiosConfig.proxy = false;
}
configureAxiosProxy(axiosConfig, config, 'forward-custom');
this.axiosInstance = axios.create(axiosConfig);
}
async callApi(endpoint, body, isRetry = false, retryCount = 0) {
const maxRetries = this.config.REQUEST_MAX_RETRIES || 3;
const baseDelay = this.config.REQUEST_BASE_DELAY || 1000;
try {
const response = await this.axiosInstance.post(endpoint, body);
return response.data;
} catch (error) {
const status = error.response?.status;
const data = error.response?.data;
const errorCode = error.code;
const errorMessage = error.message || '';
const isNetworkError = isRetryableNetworkError(error);
if (status === 401 || status === 403) {
console.error(`[Forward API] Received ${status}. API Key might be invalid or expired.`);
throw error;
}
if ((status === 429 || (status >= 500 && status < 600) || isNetworkError) && retryCount < maxRetries) {
const delay = baseDelay * Math.pow(2, retryCount);
console.log(`[Forward API] Error ${status || errorCode}. Retrying in ${delay}ms... (attempt ${retryCount + 1}/${maxRetries})`);
await new Promise(resolve => setTimeout(resolve, delay));
return this.callApi(endpoint, body, isRetry, retryCount + 1);
}
console.error(`[Forward API] Error calling API (Status: ${status}, Code: ${errorCode}):`, data || error.message);
throw error;
}
}
async *streamApi(endpoint, body, isRetry = false, retryCount = 0) {
const maxRetries = this.config.REQUEST_MAX_RETRIES || 3;
const baseDelay = this.config.REQUEST_BASE_DELAY || 1000;
try {
const response = await this.axiosInstance.post(endpoint, body, {
responseType: 'stream'
});
const stream = response.data;
let buffer = '';
for await (const chunk of stream) {
buffer += chunk.toString();
let newlineIndex;
while ((newlineIndex = buffer.indexOf('\n')) !== -1) {
const line = buffer.substring(0, newlineIndex).trim();
buffer = buffer.substring(newlineIndex + 1);
if (line.startsWith('data: ')) {
const jsonData = line.substring(6).trim();
if (jsonData === '[DONE]') {
return;
}
try {
const parsedChunk = JSON.parse(jsonData);
yield parsedChunk;
} catch (e) {
// If it's not JSON, it might be a different format, but for a forwarder we try to parse common SSE formats
console.warn("[ForwardApiService] Failed to parse stream chunk JSON:", e.message, "Data:", jsonData);
}
}
}
}
} catch (error) {
const status = error.response?.status;
const errorCode = error.code;
const isNetworkError = isRetryableNetworkError(error);
if ((status === 429 || (status >= 500 && status < 600) || isNetworkError) && retryCount < maxRetries) {
const delay = baseDelay * Math.pow(2, retryCount);
console.log(`[Forward API] Stream error ${status || errorCode}. Retrying in ${delay}ms... (attempt ${retryCount + 1}/${maxRetries})`);
await new Promise(resolve => setTimeout(resolve, delay));
yield* this.streamApi(endpoint, body, isRetry, retryCount + 1);
return;
}
throw error;
}
}
async generateContent(model, requestBody) {
// Transparently pass the endpoint if provided in requestBody, otherwise use default
const endpoint = requestBody.endpoint || '';
return this.callApi(endpoint, requestBody);
}
async *generateContentStream(model, requestBody) {
const endpoint = requestBody.endpoint || '';
yield* this.streamApi(endpoint, requestBody);
}
async listModels() {
try {
const response = await this.axiosInstance.get('/models');
return response.data;
} catch (error) {
console.error(`Error listing Forward models:`, error.message);
return { data: [] };
}
}
}