package main import ( "bytes" // 用于创建请求体 "context" // 用于超时和取消 "encoding/json" "errors" "fmt" "io" // 用于读取响应体 "log/slog" "net/http" // 用于 HTTP 请求 "os" "strings" "time" // 确保已导入 ) // JulepApiBaseURL 存储从环境变量读取的上游 API 基础 URL var JulepApiBaseURL string var apiClient *http.Client // 包级别的 HTTP 客户端 const ( defaultTimeout = 30 * time.Second // 默认 HTTP 请求超时 chatTimeout = 90 * time.Second // Chat 请求可能需要更长时间 ) func init() { JulepApiBaseURL = os.Getenv("JULEP_API_BASE_URL") if JulepApiBaseURL == "" { slog.Error("Fatal: JULEP_API_BASE_URL environment variable not set.") os.Exit(1) } JulepApiBaseURL = strings.TrimSuffix(JulepApiBaseURL, "/") slog.Info("Julep API Base URL configured", "url", JulepApiBaseURL) // 创建可复用的 HTTP 客户端 apiClient = &http.Client{ Timeout: defaultTimeout, // 设置默认超时 Transport: &http.Transport{ MaxIdleConns: 100, MaxIdleConnsPerHost: 10, IdleConnTimeout: 90 * time.Second, }, } slog.Info("HTTP client initialized") } // --- Julep Specific Request/Response Structs --- // (可以放在 models.go 或这里,放在这里让 upstream.go 更独立) type CreateAgentPayload struct { Name string `json:"name"` About string `json:"about"` // 添加 Julep Agent 的其他字段... } type CreateSessionPayload struct { AgentID string `json:"agent"` // Julep API 使用 "agent" 字段 // 添加 Julep Session 的其他字段... } // JulepMessage mirrors the structure within Julep's chat payload/response type JulepMessage struct { Role string `json:"role"` Content string `json:"content"` Name *string `json:"name,omitempty"` ToolCallID *string `json:"tool_call_id,omitempty"` ToolCalls []JulepToolCall `json:"tool_calls,omitempty"` } // JulepToolCall mirrors the tool call structure within Julep's format type JulepToolCall struct { ID string `json:"id"` Type string `json:"type"` // e.g., "function" Function JulepFunction `json:"function,omitempty"` // Add other Julep tool call types if needed } // JulepFunction mirrors the function structure within Julep's tool call type JulepFunction struct { Name string `json:"name"` Arguments string `json:"arguments"` // Assuming arguments are a JSON string } // JulepChatPayload represents the body sent to Julep's /chat endpoint type JulepChatPayload struct { Messages []JulepMessage `json:"messages"` Model *string `json:"model,omitempty"` // Julep might use model from agent/session Stream bool `json:"stream"` // Julep doesn't stream, but payload might accept it MaxTokens *int `json:"max_tokens,omitempty"` Temperature *float64 `json:"temperature,omitempty"` TopP *float64 `json:"top_p,omitempty"` Stop []string `json:"stop,omitempty"` PresencePenalty *float64 `json:"presence_penalty,omitempty"` FrequencyPenalty *float64 `json:"frequency_penalty,omitempty"` Tools []OpenAITool `json:"tools,omitempty"` // Assuming Julep uses OpenAI tool format directly ToolChoice any `json:"tool_choice,omitempty"` // Assuming Julep uses OpenAI format // Add other Julep specific parameters if needed } // JulepChatResponse represents the non-streaming response from Julep's /chat endpoint type JulepChatResponse struct { ID string `json:"id"` // Julep's own response ID (might differ from session ID) CreatedAt time.Time `json:"created_at"` Choices []JulepChoice `json:"choices"` Usage *JulepUsage `json:"usage,omitempty"` // Add other fields from Julep response } type JulepChoice struct { Index int `json:"index"` Message JulepMessage `json:"message"` FinishReason string `json:"finish_reason"` } type JulepUsage struct { PromptTokens int `json:"prompt_tokens"` CompletionTokens int `json:"completion_tokens"` TotalTokens int `json:"total_tokens"` } // --- Conversion Functions --- // convertOpenaiToJulep converts OpenAI request payload to Julep chat payload func convertOpenaiToJulep(openaiReq OpenAIRequest) JulepChatPayload { julepMessages := make([]JulepMessage, len(openaiReq.Messages)) for i, msg := range openaiReq.Messages { julepToolCalls := make([]JulepToolCall, len(msg.ToolCalls)) for j, tc := range msg.ToolCalls { julepToolCalls[j] = JulepToolCall{ ID: tc.ID, Type: tc.Type, Function: JulepFunction{ // Assuming only function type for now Name: tc.Function.Name, Arguments: tc.Function.Arguments, }, } } julepMessages[i] = JulepMessage{ Role: msg.Role, Content: msg.Content, Name: msg.Name, ToolCallID: msg.ToolCallID, ToolCalls: julepToolCalls, } } payload := JulepChatPayload{ Messages: julepMessages, Model: &openaiReq.Model, // Pass model if Julep expects it here Stream: false, // Force false as Julep doesn't support streaming response MaxTokens: openaiReq.MaxTokens, Temperature: openaiReq.Temperature, TopP: openaiReq.TopP, Stop: openaiReq.Stop, PresencePenalty: openaiReq.PresencePenalty, FrequencyPenalty: openaiReq.FrequencyPenalty, Tools: openaiReq.Tools, ToolChoice: openaiReq.ToolChoice, } // Clean up nil model pointer if model string is empty if openaiReq.Model == "" { payload.Model = nil } return payload } // convertJulepToOpenai converts Julep chat response to OpenAI response format // Takes sessionID to use it as the OpenAI response ID, as per JS example. func convertJulepToOpenai(julepResp *JulepChatResponse, modelName string, sessionID string) *OpenAIResponse { openaiChoices := make([]OpenAIChoice, len(julepResp.Choices)) for i, choice := range julepResp.Choices { openaiToolCalls := make([]OpenAIToolCall, len(choice.Message.ToolCalls)) for j, tc := range choice.Message.ToolCalls { openaiToolCalls[j] = OpenAIToolCall{ ID: tc.ID, Type: tc.Type, Function: OpenAIFunction{ Name: tc.Function.Name, Arguments: tc.Function.Arguments, }, } } openaiChoices[i] = OpenAIChoice{ Index: choice.Index, Message: OpenAIMessage{ Role: choice.Message.Role, Content: choice.Message.Content, ToolCalls: openaiToolCalls, }, FinishReason: choice.FinishReason, } } var openaiUsage *OpenAIUsage if julepResp.Usage != nil { openaiUsage = &OpenAIUsage{ PromptTokens: julepResp.Usage.PromptTokens, CompletionTokens: julepResp.Usage.CompletionTokens, TotalTokens: julepResp.Usage.TotalTokens, } } return &OpenAIResponse{ ID: sessionID, // Use the generated Session ID as OpenAI ID Object: "chat.completion", Created: julepResp.CreatedAt.Unix(), Model: modelName, // Use the model requested by the client Choices: openaiChoices, Usage: openaiUsage, } } // --- API Call Functions --- // makeJulepRequest performs the actual HTTP request to a Julep endpoint. // It handles request creation, sending, and basic response/error handling. func makeJulepRequest(ctx context.Context, logger *slog.Logger, method, url string, headers http.Header, requestBody any, responseTarget any, reqID string) (int, error) { logAttrs := []any{"request_id", reqID, "method", method, "url", url} logger.Debug("Making Julep API request...", logAttrs...) var reqBodyReader io.Reader if requestBody != nil { jsonBody, err := json.Marshal(requestBody) if err != nil { logger.Error("Failed to marshal Julep request body", append(logAttrs, "error", err)...) return 0, fmt.Errorf("failed to marshal request body: %w", err) } reqBodyReader = bytes.NewBuffer(jsonBody) logAttrs = append(logAttrs, "body_size", len(jsonBody)) // Log body size } httpReq, err := http.NewRequestWithContext(ctx, method, url, reqBodyReader) if err != nil { logger.Error("Failed to create Julep HTTP request", append(logAttrs, "error", err)...) return 0, fmt.Errorf("failed to create HTTP request: %w", err) } // Copy essential headers (Authorization, Content-Type if body exists) // Avoid copying Host, Content-Length etc. if auth := headers.Get("Authorization"); auth != "" { httpReq.Header.Set("Authorization", auth) } // Only set Content-Type if we have a body if requestBody != nil { httpReq.Header.Set("Content-Type", "application/json") } // Add other necessary headers if Julep requires them startTime := time.Now() httpResp, err := apiClient.Do(httpReq) duration := time.Since(startTime) logAttrs = append(logAttrs, "duration_ms", duration.Milliseconds()) if err != nil { // Handle context deadline exceeded specifically if errors.Is(err, context.DeadlineExceeded) { logger.Error("Julep API request timed out", append(logAttrs, "error", err)...) return http.StatusGatewayTimeout, fmt.Errorf("request to %s timed out: %w", url, err) } logger.Error("Julep API request failed", append(logAttrs, "error", err)...) return 0, fmt.Errorf("failed to send request to %s: %w", url, err) } defer httpResp.Body.Close() logAttrs = append(logAttrs, "status_code", httpResp.StatusCode) // Read the body regardless of status code for potential error messages respBodyBytes, readErr := io.ReadAll(httpResp.Body) if readErr != nil { logger.Warn("Failed to read Julep response body", append(logAttrs, "read_error", readErr)...) // Continue processing status code error if possible } else { logAttrs = append(logAttrs, "response_size", len(respBodyBytes)) // Log trimmed response body for debugging (be careful with sensitive data) // logger.Debug("Julep response body", append(logAttrs, "body", string(respBodyBytes))...) } // Check for non-successful status codes if httpResp.StatusCode < 200 || httpResp.StatusCode >= 300 { errMsg := fmt.Sprintf("Julep API returned error status %d", httpResp.StatusCode) if len(respBodyBytes) > 0 { errMsg = fmt.Sprintf("%s: %s", errMsg, string(respBodyBytes)) } logger.Error("Julep API request returned non-2xx status", logAttrs...) // Return the status code and an error containing the message return httpResp.StatusCode, fmt.Errorf(errMsg) } // If successful and a response target is provided, decode the body if responseTarget != nil && len(respBodyBytes) > 0 { if err := json.Unmarshal(respBodyBytes, responseTarget); err != nil { logger.Error("Failed to unmarshal Julep response body", append(logAttrs, "error", err, "body_preview", string(respBodyBytes[:min(len(respBodyBytes), 100)]))...) return httpResp.StatusCode, fmt.Errorf("failed to decode Julep response: %w", err) } logger.Debug("Successfully decoded Julep response", logAttrs...) } else { logger.Debug("Julep request successful, no response body expected or decoded.", logAttrs...) } return httpResp.StatusCode, nil } // callJulepChat orchestrates the calls to Julep: create agent, create session, then chat. func callJulepChat(ctx context.Context, logger *slog.Logger, headers http.Header, openaiReq OpenAIRequest, requestID string) (*OpenAIResponse, int, error) { reqLogger := logger.With("request_id", requestID) // --- 1. Create Agent --- agentID := generateUUID() agentURL := fmt.Sprintf("%s/agents/%s", JulepApiBaseURL, agentID) agentPayload := CreateAgentPayload{ Name: fmt.Sprintf("temp-agent-%s", agentID), About: "Temporary agent created for a chat session via proxy.", } reqLogger.Info("Creating temporary Julep agent", "agent_id", agentID) statusCode, err := makeJulepRequest(ctx, reqLogger, http.MethodPost, agentURL, headers, agentPayload, nil, requestID) // No response body needed for agent creation? Adjust if needed. if err != nil { reqLogger.Error("Failed to create Julep agent", "error", err, "status_code", statusCode) // Map status code for client response if statusCode == 0 || statusCode >= 500 { return nil, http.StatusBadGateway, fmt.Errorf("failed to initialize session (agent creation failed): %w", err) } return nil, statusCode, fmt.Errorf("failed to create agent: %w", err) // Propagate client-side errors if needed } reqLogger.Info("Julep agent created successfully", "agent_id", agentID) // --- 2. Create Session --- sessionID := generateUUID() // Julep uses UUID in path, so generate one here sessionURL := fmt.Sprintf("%s/sessions/%s", JulepApiBaseURL, sessionID) sessionPayload := CreateSessionPayload{ AgentID: agentID, // Link to the created agent } reqLogger.Info("Creating temporary Julep session", "session_id", sessionID, "linked_agent_id", agentID) statusCode, err = makeJulepRequest(ctx, reqLogger, http.MethodPost, sessionURL, headers, sessionPayload, nil, requestID) // No response body needed? Adjust if needed. if err != nil { reqLogger.Error("Failed to create Julep session", "error", err, "status_code", statusCode) // Maybe cleanup agent here if session fails? Omitted for simplicity. if statusCode == 0 || statusCode >= 500 { return nil, http.StatusBadGateway, fmt.Errorf("failed to initialize session (session creation failed): %w", err) } return nil, statusCode, fmt.Errorf("failed to create session: %w", err) } reqLogger.Info("Julep session created successfully", "session_id", sessionID) // --- 3. Call Chat Endpoint --- chatURL := fmt.Sprintf("%s/sessions/%s/chat", JulepApiBaseURL, sessionID) julepPayload := convertOpenaiToJulep(openaiReq) reqLogger.Info("Calling Julep chat endpoint", "url", chatURL) // Use a longer timeout context specifically for the chat call if needed chatCtx := ctx // Use original context by default if _, ok := ctx.Deadline(); !ok { // If no deadline set on original context, apply chat timeout var cancel context.CancelFunc chatCtx, cancel = context.WithTimeout(context.Background(), chatTimeout) defer cancel() reqLogger.Debug("Applying specific timeout for chat request", "timeout", chatTimeout) } var julepResponse JulepChatResponse statusCode, err = makeJulepRequest(chatCtx, reqLogger, http.MethodPost, chatURL, headers, julepPayload, &julepResponse, requestID) if err != nil { reqLogger.Error("Julep chat request failed", "error", err, "status_code", statusCode) // Map Julep error status codes to appropriate client responses if statusCode == 0 || statusCode >= 500 || statusCode == http.StatusGatewayTimeout || statusCode == http.StatusServiceUnavailable { return nil, http.StatusBadGateway, fmt.Errorf("upstream API error during chat: %w", err) } // Propagate other errors (e.g., 4xx from Julep) return nil, statusCode, fmt.Errorf("julep chat API error: %w", err) } reqLogger.Info("Julep chat request successful") // --- 4. Convert Julep Response to OpenAI Response --- openaiResponse := convertJulepToOpenai(&julepResponse, openaiReq.Model, sessionID) // Optional: Consider deleting the temporary agent/session here // reqLogger.Info("Skipping temporary agent/session cleanup for now.") // Return the converted response, final status (OK), and no error return openaiResponse, http.StatusOK, nil } // Helper for logging byte slices func min(a, b int) int { if a < b { return a } return b }