| package httpclient
|
|
|
| import (
|
| "bytes"
|
| "context"
|
| "errors"
|
| "fmt"
|
| "io"
|
| "net"
|
| "net/http"
|
| "net/url"
|
| "time"
|
|
|
| "github.com/looplj/axonhub/internal/log"
|
| "github.com/looplj/axonhub/llm/streams"
|
| )
|
|
|
|
|
| type HttpClient struct {
|
| client *http.Client
|
| proxyConfig *ProxyConfig
|
| }
|
|
|
|
|
| func NewHttpClientWithProxy(proxyConfig *ProxyConfig) *HttpClient {
|
| transport := &http.Transport{
|
| Proxy: getProxyFunc(proxyConfig),
|
| DialContext: (&net.Dialer{
|
| Timeout: 30 * time.Second,
|
| KeepAlive: 30 * time.Second,
|
| }).DialContext,
|
| ForceAttemptHTTP2: true,
|
| MaxIdleConns: 100,
|
| IdleConnTimeout: 90 * time.Second,
|
| TLSHandshakeTimeout: 10 * time.Second,
|
| ExpectContinueTimeout: 1 * time.Second,
|
| }
|
|
|
| return &HttpClient{
|
| client: &http.Client{
|
| Transport: transport,
|
| },
|
| proxyConfig: proxyConfig,
|
| }
|
| }
|
|
|
|
|
| func getProxyFunc(config *ProxyConfig) func(*http.Request) (*url.URL, error) {
|
|
|
| if config == nil {
|
| return http.ProxyFromEnvironment
|
| }
|
|
|
| switch config.Type {
|
| case ProxyTypeDisabled:
|
|
|
| return func(*http.Request) (*url.URL, error) {
|
| return nil, nil
|
| }
|
|
|
| case ProxyTypeEnvironment:
|
|
|
| return http.ProxyFromEnvironment
|
|
|
| case ProxyTypeURL:
|
|
|
| if config.URL == "" {
|
| return func(*http.Request) (*url.URL, error) {
|
| return nil, errors.New("proxy URL is required when type is 'url'")
|
| }
|
| }
|
|
|
| proxyURL, err := url.Parse(config.URL)
|
| if err != nil {
|
| return func(_ *http.Request) (*url.URL, error) {
|
| return nil, fmt.Errorf("invalid proxy URL: %w", err)
|
| }
|
| }
|
|
|
| if config.Username != "" && config.Password != "" {
|
| proxyURL.User = url.UserPassword(config.Username, config.Password)
|
| }
|
|
|
| log.Debug(context.Background(), "use custom proxy", log.Any("proxy_url", proxyURL.Redacted()))
|
|
|
| return http.ProxyURL(proxyURL)
|
|
|
| default:
|
|
|
| return http.ProxyFromEnvironment
|
| }
|
| }
|
|
|
|
|
| func NewHttpClient() *HttpClient {
|
| return &HttpClient{
|
| client: &http.Client{},
|
| }
|
| }
|
|
|
|
|
| func NewHttpClientWithClient(client *http.Client) *HttpClient {
|
| return &HttpClient{
|
| client: client,
|
| }
|
| }
|
|
|
|
|
| func (hc *HttpClient) Do(ctx context.Context, request *Request) (*Response, error) {
|
| log.Debug(ctx, "execute http request", log.Any("request", request), log.Any("proxy", hc.proxyConfig))
|
|
|
| rawReq, err := hc.BuildHttpRequest(ctx, request)
|
| if err != nil {
|
| return nil, fmt.Errorf("failed to build HTTP request: %w", err)
|
| }
|
|
|
| rawReq.Header.Set("Accept", "application/json")
|
|
|
| rawResp, err := hc.client.Do(rawReq)
|
| if err != nil {
|
| return nil, fmt.Errorf("HTTP request failed: %w", err)
|
| }
|
|
|
| defer func() {
|
| err := rawResp.Body.Close()
|
| if err != nil {
|
| log.Warn(ctx, "failed to close HTTP response body", log.Cause(err))
|
| }
|
| }()
|
|
|
| body, err := io.ReadAll(rawResp.Body)
|
| if err != nil {
|
| return nil, fmt.Errorf("failed to read response body: %w", err)
|
| }
|
|
|
| if rawResp.StatusCode >= 400 {
|
| if log.DebugEnabled(ctx) {
|
| log.Debug(ctx, "HTTP request failed",
|
| log.String("method", rawReq.Method),
|
| log.String("url", rawReq.URL.String()),
|
| log.Any("status_code", rawResp.StatusCode),
|
| log.String("body", string(body)))
|
| }
|
|
|
| return nil, &Error{
|
| Method: rawReq.Method,
|
| URL: rawReq.URL.String(),
|
| StatusCode: rawResp.StatusCode,
|
| Status: rawResp.Status,
|
| Body: body,
|
| }
|
| }
|
|
|
| if log.DebugEnabled(ctx) {
|
| log.Debug(ctx, "HTTP request success",
|
| log.String("method", rawReq.Method),
|
| log.String("url", rawReq.URL.String()),
|
| log.Any("status_code", rawResp.StatusCode),
|
| log.String("body", string(body)))
|
| }
|
|
|
|
|
| response := &Response{
|
| StatusCode: rawResp.StatusCode,
|
| Headers: rawResp.Header,
|
| Body: body,
|
| RawResponse: rawResp,
|
| Stream: nil,
|
| Request: request,
|
| RawRequest: rawReq,
|
| }
|
|
|
| return response, nil
|
| }
|
|
|
|
|
| func (hc *HttpClient) DoStream(ctx context.Context, request *Request) (streams.Stream[*StreamEvent], error) {
|
| log.Debug(ctx, "execute stream request", log.Any("request", request))
|
|
|
| rawReq, err := hc.BuildHttpRequest(ctx, request)
|
| if err != nil {
|
| return nil, fmt.Errorf("failed to build HTTP request: %w", err)
|
| }
|
|
|
|
|
| rawReq.Header.Set("Accept", "text/event-stream")
|
| rawReq.Header.Set("Cache-Control", "no-cache")
|
| rawReq.Header.Set("Connection", "keep-alive")
|
|
|
|
|
| rawResp, err := hc.client.Do(rawReq)
|
| if err != nil {
|
| return nil, fmt.Errorf("HTTP stream request failed: %w", err)
|
| }
|
|
|
|
|
| if rawResp.StatusCode >= 400 {
|
| defer func() {
|
| err := rawResp.Body.Close()
|
| if err != nil {
|
| log.Warn(ctx, "failed to close HTTP response body", log.Cause(err))
|
| }
|
| }()
|
|
|
|
|
| body, err := io.ReadAll(rawResp.Body)
|
| if err != nil {
|
| return nil, err
|
| }
|
|
|
| if log.DebugEnabled(ctx) {
|
| log.Debug(ctx, "HTTP stream request failed",
|
| log.String("method", rawReq.Method),
|
| log.String("url", rawReq.URL.String()),
|
| log.Any("status_code", rawResp.StatusCode),
|
| log.String("body", string(body)))
|
| }
|
|
|
| return nil, &Error{
|
| Method: rawReq.Method,
|
| URL: rawReq.URL.String(),
|
| StatusCode: rawResp.StatusCode,
|
| Status: rawResp.Status,
|
| Body: body,
|
| }
|
| }
|
|
|
|
|
| contentType := rawResp.Header.Get("Content-Type")
|
| if contentType == "" {
|
| contentType = "text/event-stream"
|
| }
|
|
|
|
|
| decoderFactory, exists := GetDecoder(contentType)
|
| if !exists {
|
|
|
| log.Debug(ctx, "no decoder found for content type, using default SSE", log.String("content_type", contentType))
|
|
|
| decoderFactory = NewDefaultSSEDecoder
|
| }
|
|
|
| stream := decoderFactory(ctx, rawResp.Body)
|
|
|
| return stream, nil
|
| }
|
|
|
|
|
| func BuildHttpRequest(
|
| ctx context.Context,
|
| request *Request,
|
| ) (*http.Request, error) {
|
| var body io.Reader
|
| if len(request.Body) > 0 {
|
| body = bytes.NewReader(request.Body)
|
| }
|
|
|
| httpReq, err := http.NewRequestWithContext(ctx, request.Method, request.URL, body)
|
| if err != nil {
|
| return nil, err
|
| }
|
|
|
| httpReq.Header = request.Headers
|
| if httpReq.Header == nil {
|
| httpReq.Header = make(http.Header)
|
| }
|
|
|
| if httpReq.Header.Get("User-Agent") == "" {
|
| httpReq.Header.Set("User-Agent", "axonhub/1.0")
|
| }
|
|
|
| for k := range libManagedHeaders {
|
| httpReq.Header.Del(k)
|
| }
|
|
|
| if request.Auth != nil {
|
| err = applyAuth(httpReq.Header, request.Auth)
|
| if err != nil {
|
| return nil, fmt.Errorf("failed to apply authentication: %w", err)
|
| }
|
| }
|
|
|
| if len(request.Query) > 0 {
|
| if httpReq.URL.RawQuery != "" {
|
| httpReq.URL.RawQuery += "&"
|
| }
|
|
|
| httpReq.URL.RawQuery += request.Query.Encode()
|
| }
|
|
|
| return httpReq, nil
|
| }
|
|
|
|
|
| func (hc *HttpClient) BuildHttpRequest(
|
| ctx context.Context,
|
| request *Request,
|
| ) (*http.Request, error) {
|
| return BuildHttpRequest(ctx, request)
|
| }
|
|
|
|
|
| func applyAuth(headers http.Header, auth *AuthConfig) error {
|
| switch auth.Type {
|
| case "bearer":
|
| if auth.APIKey == "" {
|
| return fmt.Errorf("bearer token is required")
|
| }
|
|
|
| headers.Set("Authorization", "Bearer "+auth.APIKey)
|
| case "api_key":
|
| if auth.HeaderKey == "" {
|
| return fmt.Errorf("header key is required")
|
| }
|
|
|
| headers.Set(auth.HeaderKey, auth.APIKey)
|
| default:
|
| return fmt.Errorf("unsupported auth type: %s", auth.Type)
|
| }
|
|
|
| return nil
|
| }
|
|
|
|
|
| func (hc *HttpClient) extractHeaders(headers http.Header) map[string]string {
|
| result := make(map[string]string)
|
|
|
| for key, values := range headers {
|
| if len(values) > 0 {
|
| result[key] = values[0]
|
| }
|
| }
|
|
|
| return result
|
| }
|
|
|