julep2api / upstream.go
pauper-tarot-chain's picture
Upload 9 files
562a3ac verified
package main
import (
"bytes" // 用于创建请求体
"context" // 用于超时和取消
"encoding/json"
"errors"
"fmt"
"io" // 用于读取响应体
"log/slog"
"net/http" // 用于 HTTP 请求
"os"
"strings"
"time"
// 确保已导入
)
// JulepApiBaseURL 存储从环境变量读取的上游 API 基础 URL
var JulepApiBaseURL string
var apiClient *http.Client // 包级别的 HTTP 客户端
const (
defaultTimeout = 30 * time.Second // 默认 HTTP 请求超时
chatTimeout = 90 * time.Second // Chat 请求可能需要更长时间
)
func init() {
JulepApiBaseURL = os.Getenv("JULEP_API_BASE_URL")
if JulepApiBaseURL == "" {
slog.Error("Fatal: JULEP_API_BASE_URL environment variable not set.")
os.Exit(1)
}
JulepApiBaseURL = strings.TrimSuffix(JulepApiBaseURL, "/")
slog.Info("Julep API Base URL configured", "url", JulepApiBaseURL)
// 创建可复用的 HTTP 客户端
apiClient = &http.Client{
Timeout: defaultTimeout, // 设置默认超时
Transport: &http.Transport{
MaxIdleConns: 100,
MaxIdleConnsPerHost: 10,
IdleConnTimeout: 90 * time.Second,
},
}
slog.Info("HTTP client initialized")
}
// --- Julep Specific Request/Response Structs ---
// (可以放在 models.go 或这里,放在这里让 upstream.go 更独立)
type CreateAgentPayload struct {
Name string `json:"name"`
About string `json:"about"`
// 添加 Julep Agent 的其他字段...
}
type CreateSessionPayload struct {
AgentID string `json:"agent"` // Julep API 使用 "agent" 字段
// 添加 Julep Session 的其他字段...
}
// JulepMessage mirrors the structure within Julep's chat payload/response
type JulepMessage struct {
Role string `json:"role"`
Content string `json:"content"`
Name *string `json:"name,omitempty"`
ToolCallID *string `json:"tool_call_id,omitempty"`
ToolCalls []JulepToolCall `json:"tool_calls,omitempty"`
}
// JulepToolCall mirrors the tool call structure within Julep's format
type JulepToolCall struct {
ID string `json:"id"`
Type string `json:"type"` // e.g., "function"
Function JulepFunction `json:"function,omitempty"`
// Add other Julep tool call types if needed
}
// JulepFunction mirrors the function structure within Julep's tool call
type JulepFunction struct {
Name string `json:"name"`
Arguments string `json:"arguments"` // Assuming arguments are a JSON string
}
// JulepChatPayload represents the body sent to Julep's /chat endpoint
type JulepChatPayload struct {
Messages []JulepMessage `json:"messages"`
Model *string `json:"model,omitempty"` // Julep might use model from agent/session
Stream bool `json:"stream"` // Julep doesn't stream, but payload might accept it
MaxTokens *int `json:"max_tokens,omitempty"`
Temperature *float64 `json:"temperature,omitempty"`
TopP *float64 `json:"top_p,omitempty"`
Stop []string `json:"stop,omitempty"`
PresencePenalty *float64 `json:"presence_penalty,omitempty"`
FrequencyPenalty *float64 `json:"frequency_penalty,omitempty"`
Tools []OpenAITool `json:"tools,omitempty"` // Assuming Julep uses OpenAI tool format directly
ToolChoice any `json:"tool_choice,omitempty"` // Assuming Julep uses OpenAI format
// Add other Julep specific parameters if needed
}
// JulepChatResponse represents the non-streaming response from Julep's /chat endpoint
type JulepChatResponse struct {
ID string `json:"id"` // Julep's own response ID (might differ from session ID)
CreatedAt time.Time `json:"created_at"`
Choices []JulepChoice `json:"choices"`
Usage *JulepUsage `json:"usage,omitempty"`
// Add other fields from Julep response
}
type JulepChoice struct {
Index int `json:"index"`
Message JulepMessage `json:"message"`
FinishReason string `json:"finish_reason"`
}
type JulepUsage struct {
PromptTokens int `json:"prompt_tokens"`
CompletionTokens int `json:"completion_tokens"`
TotalTokens int `json:"total_tokens"`
}
// --- Conversion Functions ---
// convertOpenaiToJulep converts OpenAI request payload to Julep chat payload
func convertOpenaiToJulep(openaiReq OpenAIRequest) JulepChatPayload {
julepMessages := make([]JulepMessage, len(openaiReq.Messages))
for i, msg := range openaiReq.Messages {
julepToolCalls := make([]JulepToolCall, len(msg.ToolCalls))
for j, tc := range msg.ToolCalls {
julepToolCalls[j] = JulepToolCall{
ID: tc.ID,
Type: tc.Type,
Function: JulepFunction{ // Assuming only function type for now
Name: tc.Function.Name,
Arguments: tc.Function.Arguments,
},
}
}
julepMessages[i] = JulepMessage{
Role: msg.Role,
Content: msg.Content,
Name: msg.Name,
ToolCallID: msg.ToolCallID,
ToolCalls: julepToolCalls,
}
}
payload := JulepChatPayload{
Messages: julepMessages,
Model: &openaiReq.Model, // Pass model if Julep expects it here
Stream: false, // Force false as Julep doesn't support streaming response
MaxTokens: openaiReq.MaxTokens,
Temperature: openaiReq.Temperature,
TopP: openaiReq.TopP,
Stop: openaiReq.Stop,
PresencePenalty: openaiReq.PresencePenalty,
FrequencyPenalty: openaiReq.FrequencyPenalty,
Tools: openaiReq.Tools,
ToolChoice: openaiReq.ToolChoice,
}
// Clean up nil model pointer if model string is empty
if openaiReq.Model == "" {
payload.Model = nil
}
return payload
}
// convertJulepToOpenai converts Julep chat response to OpenAI response format
// Takes sessionID to use it as the OpenAI response ID, as per JS example.
func convertJulepToOpenai(julepResp *JulepChatResponse, modelName string, sessionID string) *OpenAIResponse {
openaiChoices := make([]OpenAIChoice, len(julepResp.Choices))
for i, choice := range julepResp.Choices {
openaiToolCalls := make([]OpenAIToolCall, len(choice.Message.ToolCalls))
for j, tc := range choice.Message.ToolCalls {
openaiToolCalls[j] = OpenAIToolCall{
ID: tc.ID,
Type: tc.Type,
Function: OpenAIFunction{
Name: tc.Function.Name,
Arguments: tc.Function.Arguments,
},
}
}
openaiChoices[i] = OpenAIChoice{
Index: choice.Index,
Message: OpenAIMessage{
Role: choice.Message.Role,
Content: choice.Message.Content,
ToolCalls: openaiToolCalls,
},
FinishReason: choice.FinishReason,
}
}
var openaiUsage *OpenAIUsage
if julepResp.Usage != nil {
openaiUsage = &OpenAIUsage{
PromptTokens: julepResp.Usage.PromptTokens,
CompletionTokens: julepResp.Usage.CompletionTokens,
TotalTokens: julepResp.Usage.TotalTokens,
}
}
return &OpenAIResponse{
ID: sessionID, // Use the generated Session ID as OpenAI ID
Object: "chat.completion",
Created: julepResp.CreatedAt.Unix(),
Model: modelName, // Use the model requested by the client
Choices: openaiChoices,
Usage: openaiUsage,
}
}
// --- API Call Functions ---
// makeJulepRequest performs the actual HTTP request to a Julep endpoint.
// It handles request creation, sending, and basic response/error handling.
func makeJulepRequest(ctx context.Context, logger *slog.Logger, method, url string, headers http.Header, requestBody any, responseTarget any, reqID string) (int, error) {
logAttrs := []any{"request_id", reqID, "method", method, "url", url}
logger.Debug("Making Julep API request...", logAttrs...)
var reqBodyReader io.Reader
if requestBody != nil {
jsonBody, err := json.Marshal(requestBody)
if err != nil {
logger.Error("Failed to marshal Julep request body", append(logAttrs, "error", err)...)
return 0, fmt.Errorf("failed to marshal request body: %w", err)
}
reqBodyReader = bytes.NewBuffer(jsonBody)
logAttrs = append(logAttrs, "body_size", len(jsonBody)) // Log body size
}
httpReq, err := http.NewRequestWithContext(ctx, method, url, reqBodyReader)
if err != nil {
logger.Error("Failed to create Julep HTTP request", append(logAttrs, "error", err)...)
return 0, fmt.Errorf("failed to create HTTP request: %w", err)
}
// Copy essential headers (Authorization, Content-Type if body exists)
// Avoid copying Host, Content-Length etc.
if auth := headers.Get("Authorization"); auth != "" {
httpReq.Header.Set("Authorization", auth)
}
// Only set Content-Type if we have a body
if requestBody != nil {
httpReq.Header.Set("Content-Type", "application/json")
}
// Add other necessary headers if Julep requires them
startTime := time.Now()
httpResp, err := apiClient.Do(httpReq)
duration := time.Since(startTime)
logAttrs = append(logAttrs, "duration_ms", duration.Milliseconds())
if err != nil {
// Handle context deadline exceeded specifically
if errors.Is(err, context.DeadlineExceeded) {
logger.Error("Julep API request timed out", append(logAttrs, "error", err)...)
return http.StatusGatewayTimeout, fmt.Errorf("request to %s timed out: %w", url, err)
}
logger.Error("Julep API request failed", append(logAttrs, "error", err)...)
return 0, fmt.Errorf("failed to send request to %s: %w", url, err)
}
defer httpResp.Body.Close()
logAttrs = append(logAttrs, "status_code", httpResp.StatusCode)
// Read the body regardless of status code for potential error messages
respBodyBytes, readErr := io.ReadAll(httpResp.Body)
if readErr != nil {
logger.Warn("Failed to read Julep response body", append(logAttrs, "read_error", readErr)...)
// Continue processing status code error if possible
} else {
logAttrs = append(logAttrs, "response_size", len(respBodyBytes))
// Log trimmed response body for debugging (be careful with sensitive data)
// logger.Debug("Julep response body", append(logAttrs, "body", string(respBodyBytes))...)
}
// Check for non-successful status codes
if httpResp.StatusCode < 200 || httpResp.StatusCode >= 300 {
errMsg := fmt.Sprintf("Julep API returned error status %d", httpResp.StatusCode)
if len(respBodyBytes) > 0 {
errMsg = fmt.Sprintf("%s: %s", errMsg, string(respBodyBytes))
}
logger.Error("Julep API request returned non-2xx status", logAttrs...)
// Return the status code and an error containing the message
return httpResp.StatusCode, fmt.Errorf(errMsg)
}
// If successful and a response target is provided, decode the body
if responseTarget != nil && len(respBodyBytes) > 0 {
if err := json.Unmarshal(respBodyBytes, responseTarget); err != nil {
logger.Error("Failed to unmarshal Julep response body", append(logAttrs, "error", err, "body_preview", string(respBodyBytes[:min(len(respBodyBytes), 100)]))...)
return httpResp.StatusCode, fmt.Errorf("failed to decode Julep response: %w", err)
}
logger.Debug("Successfully decoded Julep response", logAttrs...)
} else {
logger.Debug("Julep request successful, no response body expected or decoded.", logAttrs...)
}
return httpResp.StatusCode, nil
}
// callJulepChat orchestrates the calls to Julep: create agent, create session, then chat.
func callJulepChat(ctx context.Context, logger *slog.Logger, headers http.Header, openaiReq OpenAIRequest, requestID string) (*OpenAIResponse, int, error) {
reqLogger := logger.With("request_id", requestID)
// --- 1. Create Agent ---
agentID := generateUUID()
agentURL := fmt.Sprintf("%s/agents/%s", JulepApiBaseURL, agentID)
agentPayload := CreateAgentPayload{
Name: fmt.Sprintf("temp-agent-%s", agentID),
About: "Temporary agent created for a chat session via proxy.",
}
reqLogger.Info("Creating temporary Julep agent", "agent_id", agentID)
statusCode, err := makeJulepRequest(ctx, reqLogger, http.MethodPost, agentURL, headers, agentPayload, nil, requestID) // No response body needed for agent creation? Adjust if needed.
if err != nil {
reqLogger.Error("Failed to create Julep agent", "error", err, "status_code", statusCode)
// Map status code for client response
if statusCode == 0 || statusCode >= 500 {
return nil, http.StatusBadGateway, fmt.Errorf("failed to initialize session (agent creation failed): %w", err)
}
return nil, statusCode, fmt.Errorf("failed to create agent: %w", err) // Propagate client-side errors if needed
}
reqLogger.Info("Julep agent created successfully", "agent_id", agentID)
// --- 2. Create Session ---
sessionID := generateUUID() // Julep uses UUID in path, so generate one here
sessionURL := fmt.Sprintf("%s/sessions/%s", JulepApiBaseURL, sessionID)
sessionPayload := CreateSessionPayload{
AgentID: agentID, // Link to the created agent
}
reqLogger.Info("Creating temporary Julep session", "session_id", sessionID, "linked_agent_id", agentID)
statusCode, err = makeJulepRequest(ctx, reqLogger, http.MethodPost, sessionURL, headers, sessionPayload, nil, requestID) // No response body needed? Adjust if needed.
if err != nil {
reqLogger.Error("Failed to create Julep session", "error", err, "status_code", statusCode)
// Maybe cleanup agent here if session fails? Omitted for simplicity.
if statusCode == 0 || statusCode >= 500 {
return nil, http.StatusBadGateway, fmt.Errorf("failed to initialize session (session creation failed): %w", err)
}
return nil, statusCode, fmt.Errorf("failed to create session: %w", err)
}
reqLogger.Info("Julep session created successfully", "session_id", sessionID)
// --- 3. Call Chat Endpoint ---
chatURL := fmt.Sprintf("%s/sessions/%s/chat", JulepApiBaseURL, sessionID)
julepPayload := convertOpenaiToJulep(openaiReq)
reqLogger.Info("Calling Julep chat endpoint", "url", chatURL)
// Use a longer timeout context specifically for the chat call if needed
chatCtx := ctx // Use original context by default
if _, ok := ctx.Deadline(); !ok { // If no deadline set on original context, apply chat timeout
var cancel context.CancelFunc
chatCtx, cancel = context.WithTimeout(context.Background(), chatTimeout)
defer cancel()
reqLogger.Debug("Applying specific timeout for chat request", "timeout", chatTimeout)
}
var julepResponse JulepChatResponse
statusCode, err = makeJulepRequest(chatCtx, reqLogger, http.MethodPost, chatURL, headers, julepPayload, &julepResponse, requestID)
if err != nil {
reqLogger.Error("Julep chat request failed", "error", err, "status_code", statusCode)
// Map Julep error status codes to appropriate client responses
if statusCode == 0 || statusCode >= 500 || statusCode == http.StatusGatewayTimeout || statusCode == http.StatusServiceUnavailable {
return nil, http.StatusBadGateway, fmt.Errorf("upstream API error during chat: %w", err)
}
// Propagate other errors (e.g., 4xx from Julep)
return nil, statusCode, fmt.Errorf("julep chat API error: %w", err)
}
reqLogger.Info("Julep chat request successful")
// --- 4. Convert Julep Response to OpenAI Response ---
openaiResponse := convertJulepToOpenai(&julepResponse, openaiReq.Model, sessionID)
// Optional: Consider deleting the temporary agent/session here
// reqLogger.Info("Skipping temporary agent/session cleanup for now.")
// Return the converted response, final status (OK), and no error
return openaiResponse, http.StatusOK, nil
}
// Helper for logging byte slices
func min(a, b int) int {
if a < b {
return a
}
return b
}