| package chat |
|
|
| import ( |
| "crypto/subtle" |
| "encoding/json" |
| "net/http" |
| "os" |
| "strconv" |
| "strings" |
| "time" |
|
|
| "ds2api/internal/auth" |
| "ds2api/internal/config" |
| "ds2api/internal/promptcompat" |
| "ds2api/internal/util" |
|
|
| "github.com/google/uuid" |
| ) |
|
|
| func (h *Handler) handleVercelStreamPrepare(w http.ResponseWriter, r *http.Request) { |
| if !config.IsVercel() { |
| http.NotFound(w, r) |
| return |
| } |
| h.sweepExpiredStreamLeases() |
| internalSecret := vercelInternalSecret() |
| internalToken := strings.TrimSpace(r.Header.Get("X-Ds2-Internal-Token")) |
| if internalSecret == "" || subtle.ConstantTimeCompare([]byte(internalToken), []byte(internalSecret)) != 1 { |
| writeOpenAIError(w, http.StatusUnauthorized, "unauthorized internal request") |
| return |
| } |
|
|
| a, err := h.Auth.Determine(r) |
| if err != nil { |
| status := http.StatusUnauthorized |
| if err == auth.ErrNoAccount { |
| status = http.StatusTooManyRequests |
| } |
| writeOpenAIError(w, status, err.Error()) |
| return |
| } |
| leased := false |
| defer func() { |
| if !leased { |
| h.Auth.Release(a) |
| } |
| }() |
| r = r.WithContext(auth.WithAuth(r.Context(), a)) |
|
|
| var req map[string]any |
| if err := json.NewDecoder(r.Body).Decode(&req); err != nil { |
| writeOpenAIError(w, http.StatusBadRequest, "invalid json") |
| return |
| } |
| if err := h.preprocessInlineFileInputs(r.Context(), a, req); err != nil { |
| writeOpenAIInlineFileError(w, err) |
| return |
| } |
| if !util.ToBool(req["stream"]) { |
| writeOpenAIError(w, http.StatusBadRequest, "stream must be true") |
| return |
| } |
| stdReq, err := promptcompat.NormalizeOpenAIChatRequest(h.Store, req, requestTraceID(r)) |
| if err != nil { |
| writeOpenAIError(w, http.StatusBadRequest, err.Error()) |
| return |
| } |
| if !stdReq.Stream { |
| writeOpenAIError(w, http.StatusBadRequest, "stream must be true") |
| return |
| } |
| stdReq, err = h.applyCurrentInputFile(r.Context(), a, stdReq) |
| if err != nil { |
| status, message := mapCurrentInputFileError(err) |
| writeOpenAIError(w, status, message) |
| return |
| } |
|
|
| sessionID, err := h.DS.CreateSession(r.Context(), a, 3) |
| if err != nil { |
| if a.UseConfigToken { |
| writeOpenAIError(w, http.StatusUnauthorized, "Account token is invalid. Please re-login the account in admin.") |
| } else { |
| writeOpenAIError(w, http.StatusUnauthorized, "Invalid token. If this should be a DS2API key, add it to config.keys first.") |
| } |
| return |
| } |
| powHeader, err := h.DS.GetPow(r.Context(), a, 3) |
| if err != nil { |
| writeOpenAIError(w, http.StatusUnauthorized, "Failed to get PoW (invalid token or unknown error).") |
| return |
| } |
| if strings.TrimSpace(a.DeepSeekToken) == "" { |
| writeOpenAIError(w, http.StatusUnauthorized, "Invalid token. If this should be a DS2API key, add it to config.keys first.") |
| return |
| } |
|
|
| payload := stdReq.CompletionPayload(sessionID) |
| leaseID := h.holdStreamLease(a) |
| if leaseID == "" { |
| writeOpenAIError(w, http.StatusInternalServerError, "failed to create stream lease") |
| return |
| } |
| leased = true |
| writeJSON(w, http.StatusOK, map[string]any{ |
| "session_id": sessionID, |
| "lease_id": leaseID, |
| "model": stdReq.ResponseModel, |
| "final_prompt": stdReq.FinalPrompt, |
| "thinking_enabled": stdReq.Thinking, |
| "search_enabled": stdReq.Search, |
| "tool_names": stdReq.ToolNames, |
| "deepseek_token": a.DeepSeekToken, |
| "pow_header": powHeader, |
| "payload": payload, |
| }) |
| } |
|
|
| func (h *Handler) handleVercelStreamRelease(w http.ResponseWriter, r *http.Request) { |
| if !config.IsVercel() { |
| http.NotFound(w, r) |
| return |
| } |
| h.sweepExpiredStreamLeases() |
| internalSecret := vercelInternalSecret() |
| internalToken := strings.TrimSpace(r.Header.Get("X-Ds2-Internal-Token")) |
| if internalSecret == "" || subtle.ConstantTimeCompare([]byte(internalToken), []byte(internalSecret)) != 1 { |
| writeOpenAIError(w, http.StatusUnauthorized, "unauthorized internal request") |
| return |
| } |
|
|
| var req map[string]any |
| if err := json.NewDecoder(r.Body).Decode(&req); err != nil { |
| writeOpenAIError(w, http.StatusBadRequest, "invalid json") |
| return |
| } |
| leaseID, _ := req["lease_id"].(string) |
| leaseID = strings.TrimSpace(leaseID) |
| if leaseID == "" { |
| writeOpenAIError(w, http.StatusBadRequest, "lease_id is required") |
| return |
| } |
| if !h.releaseStreamLease(leaseID) { |
| writeOpenAIError(w, http.StatusNotFound, "stream lease not found") |
| return |
| } |
| writeJSON(w, http.StatusOK, map[string]any{"success": true}) |
| } |
|
|
| func (h *Handler) handleVercelStreamPow(w http.ResponseWriter, r *http.Request) { |
| if !config.IsVercel() { |
| http.NotFound(w, r) |
| return |
| } |
| internalSecret := vercelInternalSecret() |
| internalToken := strings.TrimSpace(r.Header.Get("X-Ds2-Internal-Token")) |
| if internalSecret == "" || subtle.ConstantTimeCompare([]byte(internalToken), []byte(internalSecret)) != 1 { |
| writeOpenAIError(w, http.StatusUnauthorized, "unauthorized internal request") |
| return |
| } |
|
|
| var req map[string]any |
| if err := json.NewDecoder(r.Body).Decode(&req); err != nil { |
| writeOpenAIError(w, http.StatusBadRequest, "invalid json") |
| return |
| } |
| leaseID, _ := req["lease_id"].(string) |
| leaseID = strings.TrimSpace(leaseID) |
| if leaseID == "" { |
| writeOpenAIError(w, http.StatusBadRequest, "lease_id is required") |
| return |
| } |
| leaseAuth := h.lookupStreamLeaseAuth(leaseID) |
| if leaseAuth == nil { |
| writeOpenAIError(w, http.StatusNotFound, "stream lease not found or expired") |
| return |
| } |
| powHeader, err := h.DS.GetPow(r.Context(), leaseAuth, 3) |
| if err != nil { |
| writeOpenAIError(w, http.StatusInternalServerError, "Failed to get PoW.") |
| return |
| } |
| writeJSON(w, http.StatusOK, map[string]any{ |
| "pow_header": powHeader, |
| }) |
| } |
|
|
| func isVercelStreamPrepareRequest(r *http.Request) bool { |
| if r == nil { |
| return false |
| } |
| return strings.TrimSpace(r.URL.Query().Get("__stream_prepare")) == "1" |
| } |
|
|
| func isVercelStreamReleaseRequest(r *http.Request) bool { |
| if r == nil { |
| return false |
| } |
| return strings.TrimSpace(r.URL.Query().Get("__stream_release")) == "1" |
| } |
|
|
| func isVercelStreamPowRequest(r *http.Request) bool { |
| if r == nil { |
| return false |
| } |
| return strings.TrimSpace(r.URL.Query().Get("__stream_pow")) == "1" |
| } |
|
|
| func vercelInternalSecret() string { |
| if v := strings.TrimSpace(os.Getenv("DS2API_VERCEL_INTERNAL_SECRET")); v != "" { |
| return v |
| } |
| if v := strings.TrimSpace(os.Getenv("DS2API_ADMIN_KEY")); v != "" { |
| return v |
| } |
| return "admin" |
| } |
|
|
| func (h *Handler) holdStreamLease(a *auth.RequestAuth) string { |
| if a == nil { |
| return "" |
| } |
| now := time.Now() |
| ttl := streamLeaseTTL() |
| if ttl <= 0 { |
| ttl = 15 * time.Minute |
| } |
|
|
| h.leaseMu.Lock() |
| expired := h.popExpiredLeasesLocked(now) |
| if h.streamLeases == nil { |
| h.streamLeases = make(map[string]streamLease) |
| } |
| leaseID := newLeaseID() |
| h.streamLeases[leaseID] = streamLease{ |
| Auth: a, |
| ExpiresAt: now.Add(ttl), |
| } |
| h.leaseMu.Unlock() |
| h.releaseExpiredAuths(expired) |
| return leaseID |
| } |
|
|
| func (h *Handler) lookupStreamLeaseAuth(leaseID string) *auth.RequestAuth { |
| leaseID = strings.TrimSpace(leaseID) |
| if leaseID == "" { |
| return nil |
| } |
| h.leaseMu.Lock() |
| lease, ok := h.streamLeases[leaseID] |
| h.leaseMu.Unlock() |
| if !ok || time.Now().After(lease.ExpiresAt) { |
| return nil |
| } |
| return lease.Auth |
| } |
|
|
| func (h *Handler) releaseStreamLease(leaseID string) bool { |
| leaseID = strings.TrimSpace(leaseID) |
| if leaseID == "" { |
| return false |
| } |
|
|
| h.leaseMu.Lock() |
| expired := h.popExpiredLeasesLocked(time.Now()) |
| lease, ok := h.streamLeases[leaseID] |
| if ok { |
| delete(h.streamLeases, leaseID) |
| } |
| h.leaseMu.Unlock() |
| h.releaseExpiredAuths(expired) |
|
|
| if !ok { |
| return false |
| } |
| if h.Auth != nil { |
| h.Auth.Release(lease.Auth) |
| } |
| return true |
| } |
|
|
| func (h *Handler) popExpiredLeasesLocked(now time.Time) []*auth.RequestAuth { |
| if len(h.streamLeases) == 0 { |
| return nil |
| } |
| expired := make([]*auth.RequestAuth, 0) |
| for leaseID, lease := range h.streamLeases { |
| if now.After(lease.ExpiresAt) { |
| delete(h.streamLeases, leaseID) |
| expired = append(expired, lease.Auth) |
| } |
| } |
| return expired |
| } |
|
|
| func (h *Handler) releaseExpiredAuths(expired []*auth.RequestAuth) { |
| if h.Auth == nil || len(expired) == 0 { |
| return |
| } |
| for _, a := range expired { |
| h.Auth.Release(a) |
| } |
| } |
|
|
| func (h *Handler) sweepExpiredStreamLeases() { |
| h.leaseMu.Lock() |
| expired := h.popExpiredLeasesLocked(time.Now()) |
| h.leaseMu.Unlock() |
| h.releaseExpiredAuths(expired) |
| } |
|
|
| func streamLeaseTTL() time.Duration { |
| raw := strings.TrimSpace(os.Getenv("DS2API_VERCEL_STREAM_LEASE_TTL_SECONDS")) |
| if raw == "" { |
| return 15 * time.Minute |
| } |
| seconds, err := strconv.Atoi(raw) |
| if err != nil || seconds <= 0 { |
| return 15 * time.Minute |
| } |
| return time.Duration(seconds) * time.Second |
| } |
|
|
| func newLeaseID() string { |
| return strings.ReplaceAll(uuid.NewString(), "-", "") |
| } |
|
|