File size: 7,325 Bytes
80ffd2e 644c352 80ffd2e 644c352 80ffd2e 644c352 80ffd2e 644c352 80ffd2e | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 | package handler
import (
"bytes"
"encoding/json"
"fmt"
"io"
"log"
"net/http"
"opus-api/internal/converter"
"opus-api/internal/logger"
"opus-api/internal/model"
"opus-api/internal/stream"
"opus-api/internal/tokenizer"
"opus-api/internal/types"
"strings"
"github.com/gin-gonic/gin"
"github.com/google/uuid"
)
// HandleMessages handles POST /v1/messages
func HandleMessages(c *gin.Context) {
// Generate request ID
requestID := uuid.New().String()[:8]
// Rotate logs before creating new folder
if types.DebugMode {
logger.RotateLogs()
}
// Parse Claude request
var claudeReq types.ClaudeRequest
if err := c.ShouldBindJSON(&claudeReq); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": "Invalid request format"})
return
}
// 验证模型是否支持
if claudeReq.Model == "" {
claudeReq.Model = types.DefaultModel
}
if !types.IsModelSupported(claudeReq.Model) {
c.JSON(http.StatusBadRequest, gin.H{
"error": fmt.Sprintf("Model '%s' is not supported. Supported models: %v", claudeReq.Model, types.SupportedModels),
})
return
}
// Log Point 1: Claude request
var logFolder string
if types.DebugMode {
logFolder, _ = logger.CreateLogFolder(requestID)
logger.WriteJSONLog(logFolder, "1_claude_request.json", claudeReq)
}
// Convert to Morph format
morphReq := converter.ClaudeToMorph(claudeReq)
// Log Point 2: Morph request
if types.DebugMode && logFolder != "" {
logger.WriteJSONLog(logFolder, "2_morph_request.json", morphReq)
}
// Send request to MorphLLM API
morphReqJSON, err := json.Marshal(morphReq)
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to marshal request"})
return
}
req, err := http.NewRequest("POST", types.MorphAPIURL, bytes.NewReader(morphReqJSON))
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to create request"})
return
}
// Set headers
headers := make(map[string]string)
for key, value := range types.MorphHeaders {
headers[key] = value
}
// 如果启用了 Cookie 轮询器,使用轮询的 Cookie
if types.CookieRotatorInstance != nil {
cookieInterface, err := types.CookieRotatorInstance.NextCookie()
if err == nil && cookieInterface != nil {
// 类型断言为 *model.MorphCookie
if cookie, ok := cookieInterface.(*model.MorphCookie); ok {
headers["cookie"] = cookie.APIKey
log.Printf("[INFO] Using rotated cookie (ID: %d, Priority: %d)", cookie.ID, cookie.Priority)
} else {
log.Printf("[WARN] Cookie type assertion failed, using default")
}
} else {
log.Printf("[WARN] Failed to get rotated cookie: %v, using default", err)
}
}
// 应用所有请求头
for key, value := range headers {
req.Header.Set(key, value)
}
// Log Point 3: Upstream request with headers
if types.DebugMode && logFolder != "" {
var reqLog strings.Builder
reqLog.WriteString(fmt.Sprintf("%s %s\n", req.Method, req.URL))
for k, v := range req.Header {
reqLog.WriteString(fmt.Sprintf("%s: %s\n", k, strings.Join(v, ", ")))
}
reqLog.WriteString("\n")
reqLog.Write(morphReqJSON)
logger.WriteTextLog(logFolder, "3_upstream_request.txt", reqLog.String())
}
// Send request
client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to connect to upstream API"})
return
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
bodyBytes, _ := io.ReadAll(resp.Body)
if types.DebugMode && logFolder != "" {
logger.WriteTextLog(logFolder, "error.txt", fmt.Sprintf("Error: %d %s\n%s", resp.StatusCode, resp.Status, string(bodyBytes)))
}
c.JSON(http.StatusInternalServerError, gin.H{
"error": "Failed to connect to upstream API",
"status": resp.StatusCode,
})
return
}
// Set SSE headers
c.Header("Content-Type", "text/event-stream")
c.Header("Cache-Control", "no-cache")
c.Header("Connection", "keep-alive")
// Create response writer that captures output for logging
var clientResponseWriter io.Writer = io.Discard
if types.DebugMode && logFolder != "" {
logger.WriteTextLog(logFolder, "5_client_response.txt", "")
clientResponseWriter = &logWriter{logFolder: logFolder, fileName: "5_client_response.txt"}
}
onChunk := func(chunk string) {
if types.DebugMode {
clientResponseWriter.Write([]byte(chunk))
}
}
// Calculate input tokens from request
inputTokens := calculateInputTokens(claudeReq)
// Create a pipe for streaming
pr, pw := io.Pipe()
// Start goroutine to transform stream
go func() {
defer pw.Close()
// Log Point 4: Upstream response
var morphResponseWriter io.Writer = io.Discard
if types.DebugMode && logFolder != "" {
logger.WriteTextLog(logFolder, "4_upstream_response.txt", "")
morphResponseWriter = &logWriter{logFolder: logFolder, fileName: "4_upstream_response.txt"}
}
// Tee the response body
teeReader := io.TeeReader(resp.Body, morphResponseWriter)
// Transform stream
if err := stream.TransformMorphToClaudeStream(teeReader, claudeReq.Model, inputTokens, pw, onChunk); err != nil {
log.Printf("[ERROR] Stream transformation error: %v", err)
}
}()
// Stream response to client
c.Stream(func(w io.Writer) bool {
buf := make([]byte, 4096)
n, err := pr.Read(buf)
if n > 0 {
w.Write(buf[:n])
}
return err == nil
})
}
// logWriter writes to log file
type logWriter struct {
logFolder string
fileName string
}
func (w *logWriter) Write(p []byte) (n int, err error) {
if types.DebugMode && w.logFolder != "" {
logger.AppendLog(w.logFolder, w.fileName, string(p))
}
return len(p), nil
}
// calculateInputTokens calculates the total input tokens from a Claude request
func calculateInputTokens(req types.ClaudeRequest) int {
var totalText strings.Builder
// Add system prompt
if req.System != nil {
if sysStr, ok := req.System.(string); ok {
totalText.WriteString(sysStr)
} else if sysList, ok := req.System.([]interface{}); ok {
for _, item := range sysList {
if itemMap, ok := item.(map[string]interface{}); ok {
if text, ok := itemMap["text"].(string); ok {
totalText.WriteString(text)
}
}
}
}
}
// Add messages content
for _, msg := range req.Messages {
if content, ok := msg.Content.(string); ok {
totalText.WriteString(content)
} else if contentBlocks, ok := msg.Content.([]types.ClaudeContentBlock); ok {
for _, block := range contentBlocks {
if textBlock, ok := block.(types.ClaudeContentBlockText); ok {
totalText.WriteString(textBlock.Text)
} else if toolResult, ok := block.(types.ClaudeContentBlockToolResult); ok {
if resultStr, ok := toolResult.Content.(string); ok {
totalText.WriteString(resultStr)
}
}
}
}
}
// Add tools definitions
for _, tool := range req.Tools {
totalText.WriteString(tool.Name)
totalText.WriteString(tool.Description)
if tool.InputSchema != nil {
schemaBytes, _ := json.Marshal(tool.InputSchema)
totalText.Write(schemaBytes)
}
}
return tokenizer.CountTokens(totalText.String())
} |