File size: 15,768 Bytes
562a3ac |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 |
package main
import (
"bytes" // 用于创建请求体
"context" // 用于超时和取消
"encoding/json"
"errors"
"fmt"
"io" // 用于读取响应体
"log/slog"
"net/http" // 用于 HTTP 请求
"os"
"strings"
"time"
// 确保已导入
)
// JulepApiBaseURL 存储从环境变量读取的上游 API 基础 URL
var JulepApiBaseURL string
var apiClient *http.Client // 包级别的 HTTP 客户端
const (
defaultTimeout = 30 * time.Second // 默认 HTTP 请求超时
chatTimeout = 90 * time.Second // Chat 请求可能需要更长时间
)
func init() {
JulepApiBaseURL = os.Getenv("JULEP_API_BASE_URL")
if JulepApiBaseURL == "" {
slog.Error("Fatal: JULEP_API_BASE_URL environment variable not set.")
os.Exit(1)
}
JulepApiBaseURL = strings.TrimSuffix(JulepApiBaseURL, "/")
slog.Info("Julep API Base URL configured", "url", JulepApiBaseURL)
// 创建可复用的 HTTP 客户端
apiClient = &http.Client{
Timeout: defaultTimeout, // 设置默认超时
Transport: &http.Transport{
MaxIdleConns: 100,
MaxIdleConnsPerHost: 10,
IdleConnTimeout: 90 * time.Second,
},
}
slog.Info("HTTP client initialized")
}
// --- Julep Specific Request/Response Structs ---
// (可以放在 models.go 或这里,放在这里让 upstream.go 更独立)
type CreateAgentPayload struct {
Name string `json:"name"`
About string `json:"about"`
// 添加 Julep Agent 的其他字段...
}
type CreateSessionPayload struct {
AgentID string `json:"agent"` // Julep API 使用 "agent" 字段
// 添加 Julep Session 的其他字段...
}
// JulepMessage mirrors the structure within Julep's chat payload/response
type JulepMessage struct {
Role string `json:"role"`
Content string `json:"content"`
Name *string `json:"name,omitempty"`
ToolCallID *string `json:"tool_call_id,omitempty"`
ToolCalls []JulepToolCall `json:"tool_calls,omitempty"`
}
// JulepToolCall mirrors the tool call structure within Julep's format
type JulepToolCall struct {
ID string `json:"id"`
Type string `json:"type"` // e.g., "function"
Function JulepFunction `json:"function,omitempty"`
// Add other Julep tool call types if needed
}
// JulepFunction mirrors the function structure within Julep's tool call
type JulepFunction struct {
Name string `json:"name"`
Arguments string `json:"arguments"` // Assuming arguments are a JSON string
}
// JulepChatPayload represents the body sent to Julep's /chat endpoint
type JulepChatPayload struct {
Messages []JulepMessage `json:"messages"`
Model *string `json:"model,omitempty"` // Julep might use model from agent/session
Stream bool `json:"stream"` // Julep doesn't stream, but payload might accept it
MaxTokens *int `json:"max_tokens,omitempty"`
Temperature *float64 `json:"temperature,omitempty"`
TopP *float64 `json:"top_p,omitempty"`
Stop []string `json:"stop,omitempty"`
PresencePenalty *float64 `json:"presence_penalty,omitempty"`
FrequencyPenalty *float64 `json:"frequency_penalty,omitempty"`
Tools []OpenAITool `json:"tools,omitempty"` // Assuming Julep uses OpenAI tool format directly
ToolChoice any `json:"tool_choice,omitempty"` // Assuming Julep uses OpenAI format
// Add other Julep specific parameters if needed
}
// JulepChatResponse represents the non-streaming response from Julep's /chat endpoint
type JulepChatResponse struct {
ID string `json:"id"` // Julep's own response ID (might differ from session ID)
CreatedAt time.Time `json:"created_at"`
Choices []JulepChoice `json:"choices"`
Usage *JulepUsage `json:"usage,omitempty"`
// Add other fields from Julep response
}
type JulepChoice struct {
Index int `json:"index"`
Message JulepMessage `json:"message"`
FinishReason string `json:"finish_reason"`
}
type JulepUsage struct {
PromptTokens int `json:"prompt_tokens"`
CompletionTokens int `json:"completion_tokens"`
TotalTokens int `json:"total_tokens"`
}
// --- Conversion Functions ---
// convertOpenaiToJulep converts OpenAI request payload to Julep chat payload
func convertOpenaiToJulep(openaiReq OpenAIRequest) JulepChatPayload {
julepMessages := make([]JulepMessage, len(openaiReq.Messages))
for i, msg := range openaiReq.Messages {
julepToolCalls := make([]JulepToolCall, len(msg.ToolCalls))
for j, tc := range msg.ToolCalls {
julepToolCalls[j] = JulepToolCall{
ID: tc.ID,
Type: tc.Type,
Function: JulepFunction{ // Assuming only function type for now
Name: tc.Function.Name,
Arguments: tc.Function.Arguments,
},
}
}
julepMessages[i] = JulepMessage{
Role: msg.Role,
Content: msg.Content,
Name: msg.Name,
ToolCallID: msg.ToolCallID,
ToolCalls: julepToolCalls,
}
}
payload := JulepChatPayload{
Messages: julepMessages,
Model: &openaiReq.Model, // Pass model if Julep expects it here
Stream: false, // Force false as Julep doesn't support streaming response
MaxTokens: openaiReq.MaxTokens,
Temperature: openaiReq.Temperature,
TopP: openaiReq.TopP,
Stop: openaiReq.Stop,
PresencePenalty: openaiReq.PresencePenalty,
FrequencyPenalty: openaiReq.FrequencyPenalty,
Tools: openaiReq.Tools,
ToolChoice: openaiReq.ToolChoice,
}
// Clean up nil model pointer if model string is empty
if openaiReq.Model == "" {
payload.Model = nil
}
return payload
}
// convertJulepToOpenai converts Julep chat response to OpenAI response format
// Takes sessionID to use it as the OpenAI response ID, as per JS example.
func convertJulepToOpenai(julepResp *JulepChatResponse, modelName string, sessionID string) *OpenAIResponse {
openaiChoices := make([]OpenAIChoice, len(julepResp.Choices))
for i, choice := range julepResp.Choices {
openaiToolCalls := make([]OpenAIToolCall, len(choice.Message.ToolCalls))
for j, tc := range choice.Message.ToolCalls {
openaiToolCalls[j] = OpenAIToolCall{
ID: tc.ID,
Type: tc.Type,
Function: OpenAIFunction{
Name: tc.Function.Name,
Arguments: tc.Function.Arguments,
},
}
}
openaiChoices[i] = OpenAIChoice{
Index: choice.Index,
Message: OpenAIMessage{
Role: choice.Message.Role,
Content: choice.Message.Content,
ToolCalls: openaiToolCalls,
},
FinishReason: choice.FinishReason,
}
}
var openaiUsage *OpenAIUsage
if julepResp.Usage != nil {
openaiUsage = &OpenAIUsage{
PromptTokens: julepResp.Usage.PromptTokens,
CompletionTokens: julepResp.Usage.CompletionTokens,
TotalTokens: julepResp.Usage.TotalTokens,
}
}
return &OpenAIResponse{
ID: sessionID, // Use the generated Session ID as OpenAI ID
Object: "chat.completion",
Created: julepResp.CreatedAt.Unix(),
Model: modelName, // Use the model requested by the client
Choices: openaiChoices,
Usage: openaiUsage,
}
}
// --- API Call Functions ---
// makeJulepRequest performs the actual HTTP request to a Julep endpoint.
// It handles request creation, sending, and basic response/error handling.
func makeJulepRequest(ctx context.Context, logger *slog.Logger, method, url string, headers http.Header, requestBody any, responseTarget any, reqID string) (int, error) {
logAttrs := []any{"request_id", reqID, "method", method, "url", url}
logger.Debug("Making Julep API request...", logAttrs...)
var reqBodyReader io.Reader
if requestBody != nil {
jsonBody, err := json.Marshal(requestBody)
if err != nil {
logger.Error("Failed to marshal Julep request body", append(logAttrs, "error", err)...)
return 0, fmt.Errorf("failed to marshal request body: %w", err)
}
reqBodyReader = bytes.NewBuffer(jsonBody)
logAttrs = append(logAttrs, "body_size", len(jsonBody)) // Log body size
}
httpReq, err := http.NewRequestWithContext(ctx, method, url, reqBodyReader)
if err != nil {
logger.Error("Failed to create Julep HTTP request", append(logAttrs, "error", err)...)
return 0, fmt.Errorf("failed to create HTTP request: %w", err)
}
// Copy essential headers (Authorization, Content-Type if body exists)
// Avoid copying Host, Content-Length etc.
if auth := headers.Get("Authorization"); auth != "" {
httpReq.Header.Set("Authorization", auth)
}
// Only set Content-Type if we have a body
if requestBody != nil {
httpReq.Header.Set("Content-Type", "application/json")
}
// Add other necessary headers if Julep requires them
startTime := time.Now()
httpResp, err := apiClient.Do(httpReq)
duration := time.Since(startTime)
logAttrs = append(logAttrs, "duration_ms", duration.Milliseconds())
if err != nil {
// Handle context deadline exceeded specifically
if errors.Is(err, context.DeadlineExceeded) {
logger.Error("Julep API request timed out", append(logAttrs, "error", err)...)
return http.StatusGatewayTimeout, fmt.Errorf("request to %s timed out: %w", url, err)
}
logger.Error("Julep API request failed", append(logAttrs, "error", err)...)
return 0, fmt.Errorf("failed to send request to %s: %w", url, err)
}
defer httpResp.Body.Close()
logAttrs = append(logAttrs, "status_code", httpResp.StatusCode)
// Read the body regardless of status code for potential error messages
respBodyBytes, readErr := io.ReadAll(httpResp.Body)
if readErr != nil {
logger.Warn("Failed to read Julep response body", append(logAttrs, "read_error", readErr)...)
// Continue processing status code error if possible
} else {
logAttrs = append(logAttrs, "response_size", len(respBodyBytes))
// Log trimmed response body for debugging (be careful with sensitive data)
// logger.Debug("Julep response body", append(logAttrs, "body", string(respBodyBytes))...)
}
// Check for non-successful status codes
if httpResp.StatusCode < 200 || httpResp.StatusCode >= 300 {
errMsg := fmt.Sprintf("Julep API returned error status %d", httpResp.StatusCode)
if len(respBodyBytes) > 0 {
errMsg = fmt.Sprintf("%s: %s", errMsg, string(respBodyBytes))
}
logger.Error("Julep API request returned non-2xx status", logAttrs...)
// Return the status code and an error containing the message
return httpResp.StatusCode, fmt.Errorf(errMsg)
}
// If successful and a response target is provided, decode the body
if responseTarget != nil && len(respBodyBytes) > 0 {
if err := json.Unmarshal(respBodyBytes, responseTarget); err != nil {
logger.Error("Failed to unmarshal Julep response body", append(logAttrs, "error", err, "body_preview", string(respBodyBytes[:min(len(respBodyBytes), 100)]))...)
return httpResp.StatusCode, fmt.Errorf("failed to decode Julep response: %w", err)
}
logger.Debug("Successfully decoded Julep response", logAttrs...)
} else {
logger.Debug("Julep request successful, no response body expected or decoded.", logAttrs...)
}
return httpResp.StatusCode, nil
}
// callJulepChat orchestrates the calls to Julep: create agent, create session, then chat.
func callJulepChat(ctx context.Context, logger *slog.Logger, headers http.Header, openaiReq OpenAIRequest, requestID string) (*OpenAIResponse, int, error) {
reqLogger := logger.With("request_id", requestID)
// --- 1. Create Agent ---
agentID := generateUUID()
agentURL := fmt.Sprintf("%s/agents/%s", JulepApiBaseURL, agentID)
agentPayload := CreateAgentPayload{
Name: fmt.Sprintf("temp-agent-%s", agentID),
About: "Temporary agent created for a chat session via proxy.",
}
reqLogger.Info("Creating temporary Julep agent", "agent_id", agentID)
statusCode, err := makeJulepRequest(ctx, reqLogger, http.MethodPost, agentURL, headers, agentPayload, nil, requestID) // No response body needed for agent creation? Adjust if needed.
if err != nil {
reqLogger.Error("Failed to create Julep agent", "error", err, "status_code", statusCode)
// Map status code for client response
if statusCode == 0 || statusCode >= 500 {
return nil, http.StatusBadGateway, fmt.Errorf("failed to initialize session (agent creation failed): %w", err)
}
return nil, statusCode, fmt.Errorf("failed to create agent: %w", err) // Propagate client-side errors if needed
}
reqLogger.Info("Julep agent created successfully", "agent_id", agentID)
// --- 2. Create Session ---
sessionID := generateUUID() // Julep uses UUID in path, so generate one here
sessionURL := fmt.Sprintf("%s/sessions/%s", JulepApiBaseURL, sessionID)
sessionPayload := CreateSessionPayload{
AgentID: agentID, // Link to the created agent
}
reqLogger.Info("Creating temporary Julep session", "session_id", sessionID, "linked_agent_id", agentID)
statusCode, err = makeJulepRequest(ctx, reqLogger, http.MethodPost, sessionURL, headers, sessionPayload, nil, requestID) // No response body needed? Adjust if needed.
if err != nil {
reqLogger.Error("Failed to create Julep session", "error", err, "status_code", statusCode)
// Maybe cleanup agent here if session fails? Omitted for simplicity.
if statusCode == 0 || statusCode >= 500 {
return nil, http.StatusBadGateway, fmt.Errorf("failed to initialize session (session creation failed): %w", err)
}
return nil, statusCode, fmt.Errorf("failed to create session: %w", err)
}
reqLogger.Info("Julep session created successfully", "session_id", sessionID)
// --- 3. Call Chat Endpoint ---
chatURL := fmt.Sprintf("%s/sessions/%s/chat", JulepApiBaseURL, sessionID)
julepPayload := convertOpenaiToJulep(openaiReq)
reqLogger.Info("Calling Julep chat endpoint", "url", chatURL)
// Use a longer timeout context specifically for the chat call if needed
chatCtx := ctx // Use original context by default
if _, ok := ctx.Deadline(); !ok { // If no deadline set on original context, apply chat timeout
var cancel context.CancelFunc
chatCtx, cancel = context.WithTimeout(context.Background(), chatTimeout)
defer cancel()
reqLogger.Debug("Applying specific timeout for chat request", "timeout", chatTimeout)
}
var julepResponse JulepChatResponse
statusCode, err = makeJulepRequest(chatCtx, reqLogger, http.MethodPost, chatURL, headers, julepPayload, &julepResponse, requestID)
if err != nil {
reqLogger.Error("Julep chat request failed", "error", err, "status_code", statusCode)
// Map Julep error status codes to appropriate client responses
if statusCode == 0 || statusCode >= 500 || statusCode == http.StatusGatewayTimeout || statusCode == http.StatusServiceUnavailable {
return nil, http.StatusBadGateway, fmt.Errorf("upstream API error during chat: %w", err)
}
// Propagate other errors (e.g., 4xx from Julep)
return nil, statusCode, fmt.Errorf("julep chat API error: %w", err)
}
reqLogger.Info("Julep chat request successful")
// --- 4. Convert Julep Response to OpenAI Response ---
openaiResponse := convertJulepToOpenai(&julepResponse, openaiReq.Model, sessionID)
// Optional: Consider deleting the temporary agent/session here
// reqLogger.Info("Skipping temporary agent/session cleanup for now.")
// Return the converted response, final status (OK), and no error
return openaiResponse, http.StatusOK, nil
}
// Helper for logging byte slices
func min(a, b int) int {
if a < b {
return a
}
return b
}
|