File size: 15,768 Bytes
562a3ac
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
package main

import (
	"bytes"   // 用于创建请求体
	"context" // 用于超时和取消
	"encoding/json"
	"errors"
	"fmt"
	"io" // 用于读取响应体
	"log/slog"
	"net/http" // 用于 HTTP 请求
	"os"
	"strings"
	"time"
	// 确保已导入
)

// JulepApiBaseURL 存储从环境变量读取的上游 API 基础 URL
var JulepApiBaseURL string
var apiClient *http.Client // 包级别的 HTTP 客户端

const (
	defaultTimeout = 30 * time.Second // 默认 HTTP 请求超时
	chatTimeout    = 90 * time.Second // Chat 请求可能需要更长时间
)

func init() {
	JulepApiBaseURL = os.Getenv("JULEP_API_BASE_URL")
	if JulepApiBaseURL == "" {
		slog.Error("Fatal: JULEP_API_BASE_URL environment variable not set.")
		os.Exit(1)
	}
	JulepApiBaseURL = strings.TrimSuffix(JulepApiBaseURL, "/")
	slog.Info("Julep API Base URL configured", "url", JulepApiBaseURL)

	// 创建可复用的 HTTP 客户端
	apiClient = &http.Client{
		Timeout: defaultTimeout, // 设置默认超时
		Transport: &http.Transport{
			MaxIdleConns:        100,
			MaxIdleConnsPerHost: 10,
			IdleConnTimeout:     90 * time.Second,
		},
	}
	slog.Info("HTTP client initialized")
}

// --- Julep Specific Request/Response Structs ---
// (可以放在 models.go 或这里,放在这里让 upstream.go 更独立)

type CreateAgentPayload struct {
	Name  string `json:"name"`
	About string `json:"about"`
	// 添加 Julep Agent 的其他字段...
}

type CreateSessionPayload struct {
	AgentID string `json:"agent"` // Julep API 使用 "agent" 字段
	// 添加 Julep Session 的其他字段...
}

// JulepMessage mirrors the structure within Julep's chat payload/response
type JulepMessage struct {
	Role       string          `json:"role"`
	Content    string          `json:"content"`
	Name       *string         `json:"name,omitempty"`
	ToolCallID *string         `json:"tool_call_id,omitempty"`
	ToolCalls  []JulepToolCall `json:"tool_calls,omitempty"`
}

// JulepToolCall mirrors the tool call structure within Julep's format
type JulepToolCall struct {
	ID       string        `json:"id"`
	Type     string        `json:"type"` // e.g., "function"
	Function JulepFunction `json:"function,omitempty"`
	// Add other Julep tool call types if needed
}

// JulepFunction mirrors the function structure within Julep's tool call
type JulepFunction struct {
	Name      string `json:"name"`
	Arguments string `json:"arguments"` // Assuming arguments are a JSON string
}

// JulepChatPayload represents the body sent to Julep's /chat endpoint
type JulepChatPayload struct {
	Messages         []JulepMessage `json:"messages"`
	Model            *string        `json:"model,omitempty"` // Julep might use model from agent/session
	Stream           bool           `json:"stream"`          // Julep doesn't stream, but payload might accept it
	MaxTokens        *int           `json:"max_tokens,omitempty"`
	Temperature      *float64       `json:"temperature,omitempty"`
	TopP             *float64       `json:"top_p,omitempty"`
	Stop             []string       `json:"stop,omitempty"`
	PresencePenalty  *float64       `json:"presence_penalty,omitempty"`
	FrequencyPenalty *float64       `json:"frequency_penalty,omitempty"`
	Tools            []OpenAITool   `json:"tools,omitempty"`       // Assuming Julep uses OpenAI tool format directly
	ToolChoice       any            `json:"tool_choice,omitempty"` // Assuming Julep uses OpenAI format
	// Add other Julep specific parameters if needed
}

// JulepChatResponse represents the non-streaming response from Julep's /chat endpoint
type JulepChatResponse struct {
	ID        string        `json:"id"` // Julep's own response ID (might differ from session ID)
	CreatedAt time.Time     `json:"created_at"`
	Choices   []JulepChoice `json:"choices"`
	Usage     *JulepUsage   `json:"usage,omitempty"`
	// Add other fields from Julep response
}

type JulepChoice struct {
	Index        int          `json:"index"`
	Message      JulepMessage `json:"message"`
	FinishReason string       `json:"finish_reason"`
}

type JulepUsage struct {
	PromptTokens     int `json:"prompt_tokens"`
	CompletionTokens int `json:"completion_tokens"`
	TotalTokens      int `json:"total_tokens"`
}

// --- Conversion Functions ---

// convertOpenaiToJulep converts OpenAI request payload to Julep chat payload
func convertOpenaiToJulep(openaiReq OpenAIRequest) JulepChatPayload {
	julepMessages := make([]JulepMessage, len(openaiReq.Messages))
	for i, msg := range openaiReq.Messages {
		julepToolCalls := make([]JulepToolCall, len(msg.ToolCalls))
		for j, tc := range msg.ToolCalls {
			julepToolCalls[j] = JulepToolCall{
				ID:   tc.ID,
				Type: tc.Type,
				Function: JulepFunction{ // Assuming only function type for now
					Name:      tc.Function.Name,
					Arguments: tc.Function.Arguments,
				},
			}
		}
		julepMessages[i] = JulepMessage{
			Role:       msg.Role,
			Content:    msg.Content,
			Name:       msg.Name,
			ToolCallID: msg.ToolCallID,
			ToolCalls:  julepToolCalls,
		}
	}

	payload := JulepChatPayload{
		Messages:         julepMessages,
		Model:            &openaiReq.Model, // Pass model if Julep expects it here
		Stream:           false,            // Force false as Julep doesn't support streaming response
		MaxTokens:        openaiReq.MaxTokens,
		Temperature:      openaiReq.Temperature,
		TopP:             openaiReq.TopP,
		Stop:             openaiReq.Stop,
		PresencePenalty:  openaiReq.PresencePenalty,
		FrequencyPenalty: openaiReq.FrequencyPenalty,
		Tools:            openaiReq.Tools,
		ToolChoice:       openaiReq.ToolChoice,
	}
	// Clean up nil model pointer if model string is empty
	if openaiReq.Model == "" {
		payload.Model = nil
	}
	return payload
}

// convertJulepToOpenai converts Julep chat response to OpenAI response format
// Takes sessionID to use it as the OpenAI response ID, as per JS example.
func convertJulepToOpenai(julepResp *JulepChatResponse, modelName string, sessionID string) *OpenAIResponse {
	openaiChoices := make([]OpenAIChoice, len(julepResp.Choices))
	for i, choice := range julepResp.Choices {
		openaiToolCalls := make([]OpenAIToolCall, len(choice.Message.ToolCalls))
		for j, tc := range choice.Message.ToolCalls {
			openaiToolCalls[j] = OpenAIToolCall{
				ID:   tc.ID,
				Type: tc.Type,
				Function: OpenAIFunction{
					Name:      tc.Function.Name,
					Arguments: tc.Function.Arguments,
				},
			}
		}
		openaiChoices[i] = OpenAIChoice{
			Index: choice.Index,
			Message: OpenAIMessage{
				Role:      choice.Message.Role,
				Content:   choice.Message.Content,
				ToolCalls: openaiToolCalls,
			},
			FinishReason: choice.FinishReason,
		}
	}

	var openaiUsage *OpenAIUsage
	if julepResp.Usage != nil {
		openaiUsage = &OpenAIUsage{
			PromptTokens:     julepResp.Usage.PromptTokens,
			CompletionTokens: julepResp.Usage.CompletionTokens,
			TotalTokens:      julepResp.Usage.TotalTokens,
		}
	}

	return &OpenAIResponse{
		ID:      sessionID, // Use the generated Session ID as OpenAI ID
		Object:  "chat.completion",
		Created: julepResp.CreatedAt.Unix(),
		Model:   modelName, // Use the model requested by the client
		Choices: openaiChoices,
		Usage:   openaiUsage,
	}
}

// --- API Call Functions ---

// makeJulepRequest performs the actual HTTP request to a Julep endpoint.
// It handles request creation, sending, and basic response/error handling.
func makeJulepRequest(ctx context.Context, logger *slog.Logger, method, url string, headers http.Header, requestBody any, responseTarget any, reqID string) (int, error) {
	logAttrs := []any{"request_id", reqID, "method", method, "url", url}
	logger.Debug("Making Julep API request...", logAttrs...)

	var reqBodyReader io.Reader
	if requestBody != nil {
		jsonBody, err := json.Marshal(requestBody)
		if err != nil {
			logger.Error("Failed to marshal Julep request body", append(logAttrs, "error", err)...)
			return 0, fmt.Errorf("failed to marshal request body: %w", err)
		}
		reqBodyReader = bytes.NewBuffer(jsonBody)
		logAttrs = append(logAttrs, "body_size", len(jsonBody)) // Log body size
	}

	httpReq, err := http.NewRequestWithContext(ctx, method, url, reqBodyReader)
	if err != nil {
		logger.Error("Failed to create Julep HTTP request", append(logAttrs, "error", err)...)
		return 0, fmt.Errorf("failed to create HTTP request: %w", err)
	}

	// Copy essential headers (Authorization, Content-Type if body exists)
	// Avoid copying Host, Content-Length etc.
	if auth := headers.Get("Authorization"); auth != "" {
		httpReq.Header.Set("Authorization", auth)
	}
	// Only set Content-Type if we have a body
	if requestBody != nil {
		httpReq.Header.Set("Content-Type", "application/json")
	}
	// Add other necessary headers if Julep requires them

	startTime := time.Now()
	httpResp, err := apiClient.Do(httpReq)
	duration := time.Since(startTime)
	logAttrs = append(logAttrs, "duration_ms", duration.Milliseconds())

	if err != nil {
		// Handle context deadline exceeded specifically
		if errors.Is(err, context.DeadlineExceeded) {
			logger.Error("Julep API request timed out", append(logAttrs, "error", err)...)
			return http.StatusGatewayTimeout, fmt.Errorf("request to %s timed out: %w", url, err)
		}
		logger.Error("Julep API request failed", append(logAttrs, "error", err)...)
		return 0, fmt.Errorf("failed to send request to %s: %w", url, err)
	}
	defer httpResp.Body.Close()

	logAttrs = append(logAttrs, "status_code", httpResp.StatusCode)

	// Read the body regardless of status code for potential error messages
	respBodyBytes, readErr := io.ReadAll(httpResp.Body)
	if readErr != nil {
		logger.Warn("Failed to read Julep response body", append(logAttrs, "read_error", readErr)...)
		// Continue processing status code error if possible
	} else {
		logAttrs = append(logAttrs, "response_size", len(respBodyBytes))
		// Log trimmed response body for debugging (be careful with sensitive data)
		// logger.Debug("Julep response body", append(logAttrs, "body", string(respBodyBytes))...)
	}

	// Check for non-successful status codes
	if httpResp.StatusCode < 200 || httpResp.StatusCode >= 300 {
		errMsg := fmt.Sprintf("Julep API returned error status %d", httpResp.StatusCode)
		if len(respBodyBytes) > 0 {
			errMsg = fmt.Sprintf("%s: %s", errMsg, string(respBodyBytes))
		}
		logger.Error("Julep API request returned non-2xx status", logAttrs...)
		// Return the status code and an error containing the message
		return httpResp.StatusCode, fmt.Errorf(errMsg)
	}

	// If successful and a response target is provided, decode the body
	if responseTarget != nil && len(respBodyBytes) > 0 {
		if err := json.Unmarshal(respBodyBytes, responseTarget); err != nil {
			logger.Error("Failed to unmarshal Julep response body", append(logAttrs, "error", err, "body_preview", string(respBodyBytes[:min(len(respBodyBytes), 100)]))...)
			return httpResp.StatusCode, fmt.Errorf("failed to decode Julep response: %w", err)
		}
		logger.Debug("Successfully decoded Julep response", logAttrs...)
	} else {
		logger.Debug("Julep request successful, no response body expected or decoded.", logAttrs...)
	}

	return httpResp.StatusCode, nil
}

// callJulepChat orchestrates the calls to Julep: create agent, create session, then chat.
func callJulepChat(ctx context.Context, logger *slog.Logger, headers http.Header, openaiReq OpenAIRequest, requestID string) (*OpenAIResponse, int, error) {
	reqLogger := logger.With("request_id", requestID)

	// --- 1. Create Agent ---
	agentID := generateUUID()
	agentURL := fmt.Sprintf("%s/agents/%s", JulepApiBaseURL, agentID)
	agentPayload := CreateAgentPayload{
		Name:  fmt.Sprintf("temp-agent-%s", agentID),
		About: "Temporary agent created for a chat session via proxy.",
	}
	reqLogger.Info("Creating temporary Julep agent", "agent_id", agentID)
	statusCode, err := makeJulepRequest(ctx, reqLogger, http.MethodPost, agentURL, headers, agentPayload, nil, requestID) // No response body needed for agent creation? Adjust if needed.
	if err != nil {
		reqLogger.Error("Failed to create Julep agent", "error", err, "status_code", statusCode)
		// Map status code for client response
		if statusCode == 0 || statusCode >= 500 {
			return nil, http.StatusBadGateway, fmt.Errorf("failed to initialize session (agent creation failed): %w", err)
		}
		return nil, statusCode, fmt.Errorf("failed to create agent: %w", err) // Propagate client-side errors if needed
	}
	reqLogger.Info("Julep agent created successfully", "agent_id", agentID)

	// --- 2. Create Session ---
	sessionID := generateUUID() // Julep uses UUID in path, so generate one here
	sessionURL := fmt.Sprintf("%s/sessions/%s", JulepApiBaseURL, sessionID)
	sessionPayload := CreateSessionPayload{
		AgentID: agentID, // Link to the created agent
	}
	reqLogger.Info("Creating temporary Julep session", "session_id", sessionID, "linked_agent_id", agentID)
	statusCode, err = makeJulepRequest(ctx, reqLogger, http.MethodPost, sessionURL, headers, sessionPayload, nil, requestID) // No response body needed? Adjust if needed.
	if err != nil {
		reqLogger.Error("Failed to create Julep session", "error", err, "status_code", statusCode)
		// Maybe cleanup agent here if session fails? Omitted for simplicity.
		if statusCode == 0 || statusCode >= 500 {
			return nil, http.StatusBadGateway, fmt.Errorf("failed to initialize session (session creation failed): %w", err)
		}
		return nil, statusCode, fmt.Errorf("failed to create session: %w", err)
	}
	reqLogger.Info("Julep session created successfully", "session_id", sessionID)

	// --- 3. Call Chat Endpoint ---
	chatURL := fmt.Sprintf("%s/sessions/%s/chat", JulepApiBaseURL, sessionID)
	julepPayload := convertOpenaiToJulep(openaiReq)
	reqLogger.Info("Calling Julep chat endpoint", "url", chatURL)

	// Use a longer timeout context specifically for the chat call if needed
	chatCtx := ctx                    // Use original context by default
	if _, ok := ctx.Deadline(); !ok { // If no deadline set on original context, apply chat timeout
		var cancel context.CancelFunc
		chatCtx, cancel = context.WithTimeout(context.Background(), chatTimeout)
		defer cancel()
		reqLogger.Debug("Applying specific timeout for chat request", "timeout", chatTimeout)
	}

	var julepResponse JulepChatResponse
	statusCode, err = makeJulepRequest(chatCtx, reqLogger, http.MethodPost, chatURL, headers, julepPayload, &julepResponse, requestID)
	if err != nil {
		reqLogger.Error("Julep chat request failed", "error", err, "status_code", statusCode)
		// Map Julep error status codes to appropriate client responses
		if statusCode == 0 || statusCode >= 500 || statusCode == http.StatusGatewayTimeout || statusCode == http.StatusServiceUnavailable {
			return nil, http.StatusBadGateway, fmt.Errorf("upstream API error during chat: %w", err)
		}
		// Propagate other errors (e.g., 4xx from Julep)
		return nil, statusCode, fmt.Errorf("julep chat API error: %w", err)
	}
	reqLogger.Info("Julep chat request successful")

	// --- 4. Convert Julep Response to OpenAI Response ---
	openaiResponse := convertJulepToOpenai(&julepResponse, openaiReq.Model, sessionID)

	// Optional: Consider deleting the temporary agent/session here
	// reqLogger.Info("Skipping temporary agent/session cleanup for now.")

	// Return the converted response, final status (OK), and no error
	return openaiResponse, http.StatusOK, nil
}

// Helper for logging byte slices
func min(a, b int) int {
	if a < b {
		return a
	}
	return b
}