File size: 5,475 Bytes
2986042 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 | package app
import (
"bytes"
"net/http"
"strings"
"sync"
"time"
"ccLoad/internal/protocol"
"ccLoad/internal/util"
"github.com/bytedance/sonic"
)
// Codex Responses API 的 prompt 缓存需要 `prompt_cache_key` 请求体字段与 `Session_id` 请求头配合,
// 仅当稳定分桶时 OpenAI 才能稳定命中缓存。ccLoad 需在 Anthropic/OpenAI 客户端转换到 Codex 上游时补齐,
// 策略参考 CLIProxyAPI internal/runtime/executor/codex_executor.go:cacheHelper。
type codexSessionEntry struct {
id string
expire time.Time
}
const (
codexSessionTTL = time.Hour
codexSessionCleanupInterval = 15 * time.Minute
)
var (
codexSessionMap = make(map[string]codexSessionEntry)
codexSessionMu sync.RWMutex
codexSessionOnce sync.Once
)
// getOrCreateCodexSessionID 返回同一 cacheKey 下的稳定 UUID,命中即续期 TTL。
func getOrCreateCodexSessionID(cacheKey string) string {
if cacheKey == "" {
return ""
}
codexSessionOnce.Do(startCodexSessionCleanup)
now := time.Now()
codexSessionMu.Lock()
defer codexSessionMu.Unlock()
if entry, ok := codexSessionMap[cacheKey]; ok && entry.id != "" && entry.expire.After(now) {
entry.expire = now.Add(codexSessionTTL)
codexSessionMap[cacheKey] = entry
return entry.id
}
id := util.NewUUIDv4()
codexSessionMap[cacheKey] = codexSessionEntry{id: id, expire: now.Add(codexSessionTTL)}
return id
}
// codexSessionIDForOpenAIKey 基于 API Key 生成确定性 UUID(v5 + OID namespace)。
// 不同 Key 之间得到不同桶;同一 Key 的连续请求稳定命中同一桶。
func codexSessionIDForOpenAIKey(apiKey string) string {
apiKey = strings.TrimSpace(apiKey)
if apiKey == "" {
return ""
}
return util.NewUUIDv5(util.NameSpaceOID, "ccload:codex:prompt-cache:"+apiKey)
}
// resolveCodexSessionHint 仅在 Codex 上游场景下返回稳定的会话 ID;否则返回空。
// - Anthropic 客户端:优先 metadata.user_id(model-userID 内存缓存)→ X-Claude-Code-Session-Id 头 → apiKey 确定性 UUID
// - Codex 客户端:读 body 内已有的 prompt_cache_key(不主动创建)
// - OpenAI 客户端:基于 apiKey 生成确定性 UUID
// - 其他协议:返回空
func resolveCodexSessionHint(reqCtx *requestContext, translatedBody []byte, apiKey string, header http.Header) string {
if reqCtx == nil || runtimeUpstreamProtocol(reqCtx, nil) != string(protocol.Codex) {
return ""
}
switch reqCtx.clientProtocol {
case protocol.Anthropic:
if userID := extractAnthropicUserID(reqCtx.originalBody); userID != "" {
model := strings.TrimSpace(reqCtx.originalModel)
if model == "" {
model = "unknown"
}
return getOrCreateCodexSessionID(model + "-" + userID)
}
if sid := strings.TrimSpace(header.Get("X-Claude-Code-Session-Id")); sid != "" {
return util.NewUUIDv5(util.NameSpaceOID, "ccload:codex:prompt-cache:session:"+sid)
}
return codexSessionIDForOpenAIKey(apiKey)
case protocol.Codex:
return readCodexPromptCacheKey(translatedBody)
case protocol.OpenAI:
return codexSessionIDForOpenAIKey(apiKey)
}
return ""
}
// injectCodexPromptCacheKey 在 body 顶层写入 prompt_cache_key;已有非空值则保留。
// 非 JSON 对象或解析失败时原样返回。
func injectCodexPromptCacheKey(body []byte, id string) []byte {
if len(body) == 0 || id == "" {
return body
}
if readCodexPromptCacheKey(body) != "" {
return body
}
var payload map[string]any
if err := sonic.Unmarshal(body, &payload); err != nil || payload == nil {
return body
}
encodedID, err := sonic.Marshal(id)
if err != nil {
return body
}
end := len(body)
for end > 0 {
switch body[end-1] {
case ' ', '\n', '\r', '\t':
end--
default:
goto foundEnd
}
}
foundEnd:
if end == 0 || body[end-1] != '}' {
return body
}
start := 0
for start < end {
switch body[start] {
case ' ', '\n', '\r', '\t':
start++
default:
goto foundStart
}
}
foundStart:
if start >= end || body[start] != '{' {
return body
}
hasFields := len(bytes.TrimSpace(body[start+1:end-1])) > 0
insertLen := len(`"prompt_cache_key":`) + len(encodedID)
if hasFields {
insertLen++
}
out := make([]byte, 0, len(body)+insertLen)
out = append(out, body[:end-1]...)
if hasFields {
out = append(out, ',')
}
out = append(out, `"prompt_cache_key":`...)
out = append(out, encodedID...)
out = append(out, body[end-1:]...)
return out
}
func extractAnthropicUserID(body []byte) string {
if len(body) == 0 {
return ""
}
var payload struct {
Metadata struct {
UserID string `json:"user_id"`
} `json:"metadata"`
}
if err := sonic.Unmarshal(body, &payload); err != nil {
return ""
}
return strings.TrimSpace(payload.Metadata.UserID)
}
func readCodexPromptCacheKey(body []byte) string {
if len(body) == 0 {
return ""
}
var payload struct {
PromptCacheKey string `json:"prompt_cache_key"`
}
if err := sonic.Unmarshal(body, &payload); err != nil {
return ""
}
return strings.TrimSpace(payload.PromptCacheKey)
}
func startCodexSessionCleanup() {
go func() {
t := time.NewTicker(codexSessionCleanupInterval)
defer t.Stop()
for range t.C {
now := time.Now()
codexSessionMu.Lock()
for k, v := range codexSessionMap {
if !v.expire.After(now) {
delete(codexSessionMap, k)
}
}
codexSessionMu.Unlock()
}
}()
}
// UUID v4/v5 已统一到 internal/util/uuid_local.go(util.NewUUIDv4 / util.NewUUIDv5 / util.NameSpaceOID)。
|