| package agent
|
|
|
| import (
|
| "bufio"
|
| "bytes"
|
| "context"
|
| "encoding/binary"
|
| "encoding/json"
|
| "errors"
|
| "fmt"
|
| "io"
|
| "log/slog"
|
| "net"
|
| "net/http"
|
| "net/url"
|
| "os"
|
| "path"
|
| "regexp"
|
| "sort"
|
| "strconv"
|
| "strings"
|
| "sync"
|
| "time"
|
|
|
| "github.com/henrygd/beszel/agent/deltatracker"
|
| "github.com/henrygd/beszel/agent/utils"
|
| "github.com/henrygd/beszel/internal/entities/container"
|
| "github.com/henrygd/beszel/internal/entities/system"
|
|
|
| "github.com/blang/semver"
|
| )
|
|
|
|
|
|
|
| var ansiEscapePattern = regexp.MustCompile(`\x1b\[[0-9;]*[a-zA-Z]|\x1b\][^\x07]*\x07|\x1b[@-Z\\-_]`)
|
| var dockerContainerIDPattern = regexp.MustCompile(`^[a-fA-F0-9]{12,64}$`)
|
|
|
| const (
|
|
|
| dockerTimeoutMs = 2100
|
|
|
| maxNetworkSpeedBps uint64 = 5e9
|
|
|
| maxMemoryUsage uint64 = 100 * 1024 * 1024 * 1024 * 1024
|
|
|
| dockerLogsTail = 200
|
|
|
|
|
| maxLogFrameSize = 1024 * 1024
|
|
|
|
|
| maxTotalLogSize = 5 * 1024 * 1024
|
| )
|
|
|
| type dockerManager struct {
|
| agent *Agent
|
| client *http.Client
|
| wg sync.WaitGroup
|
| sem chan struct{}
|
| containerStatsMutex sync.RWMutex
|
| apiContainerList []*container.ApiInfo
|
| containerStatsMap map[string]*container.Stats
|
| validIds map[string]struct{}
|
| goodDockerVersion bool
|
| dockerVersionChecked bool
|
| isWindows bool
|
| buf *bytes.Buffer
|
| decoder *json.Decoder
|
| apiStats *container.ApiStats
|
| excludeContainers []string
|
| usingPodman bool
|
|
|
|
|
|
|
| lastCpuContainer map[uint16]map[string]uint64
|
| lastCpuSystem map[uint16]map[string]uint64
|
| lastCpuReadTime map[uint16]map[string]time.Time
|
|
|
|
|
|
|
| networkSentTrackers map[uint16]*deltatracker.DeltaTracker[string, uint64]
|
| networkRecvTrackers map[uint16]*deltatracker.DeltaTracker[string, uint64]
|
| lastNetworkReadTime map[uint16]map[string]time.Time
|
| }
|
|
|
|
|
| type userAgentRoundTripper struct {
|
| rt http.RoundTripper
|
| userAgent string
|
| }
|
|
|
|
|
| type dockerVersionResponse struct {
|
| Version string `json:"Version"`
|
| Components []struct {
|
| Name string `json:"Name"`
|
| } `json:"Components"`
|
| }
|
|
|
|
|
| func (u *userAgentRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
|
| req.Header.Set("User-Agent", u.userAgent)
|
| return u.rt.RoundTrip(req)
|
| }
|
|
|
|
|
| func (d *dockerManager) queue() {
|
| d.wg.Add(1)
|
| if d.goodDockerVersion {
|
| d.sem <- struct{}{}
|
| }
|
| }
|
|
|
|
|
| func (d *dockerManager) dequeue() {
|
| d.wg.Done()
|
| if d.goodDockerVersion {
|
| <-d.sem
|
| }
|
| }
|
|
|
|
|
| func (dm *dockerManager) shouldExcludeContainer(name string) bool {
|
| if len(dm.excludeContainers) == 0 {
|
| return false
|
| }
|
| for _, pattern := range dm.excludeContainers {
|
| if match, _ := path.Match(pattern, name); match {
|
| return true
|
| }
|
| }
|
| return false
|
| }
|
|
|
|
|
| func (dm *dockerManager) getDockerStats(cacheTimeMs uint16) ([]*container.Stats, error) {
|
| resp, err := dm.client.Get("http://localhost/containers/json")
|
| if err != nil {
|
| return nil, err
|
| }
|
|
|
| dm.apiContainerList = dm.apiContainerList[:0]
|
| if err := dm.decode(resp, &dm.apiContainerList); err != nil {
|
| return nil, err
|
| }
|
|
|
|
|
| serverHeader := resp.Header.Get("Server")
|
| if !dm.usingPodman && detectPodmanFromHeader(serverHeader) {
|
| dm.setIsPodman()
|
| }
|
| dm.isWindows = strings.Contains(serverHeader, "windows")
|
|
|
| dm.ensureDockerVersionChecked()
|
|
|
| containersLength := len(dm.apiContainerList)
|
|
|
|
|
| if dm.validIds == nil {
|
| dm.validIds = make(map[string]struct{}, containersLength)
|
| } else {
|
| clear(dm.validIds)
|
| }
|
|
|
| var failedContainers []*container.ApiInfo
|
|
|
| for _, ctr := range dm.apiContainerList {
|
| ctr.IdShort = ctr.Id[:12]
|
|
|
|
|
| if dm.shouldExcludeContainer(ctr.Names[0][1:]) {
|
| slog.Debug("Excluding container", "name", ctr.Names[0][1:])
|
| continue
|
| }
|
|
|
| dm.validIds[ctr.IdShort] = struct{}{}
|
|
|
|
|
| if strings.Contains(ctr.Status, "second") {
|
|
|
| dm.deleteContainerStatsSync(ctr.IdShort)
|
| }
|
| dm.queue()
|
| go func(ctr *container.ApiInfo) {
|
| defer dm.dequeue()
|
| err := dm.updateContainerStats(ctr, cacheTimeMs)
|
|
|
| if err != nil {
|
| dm.containerStatsMutex.Lock()
|
| delete(dm.containerStatsMap, ctr.IdShort)
|
| failedContainers = append(failedContainers, ctr)
|
| dm.containerStatsMutex.Unlock()
|
| }
|
| }(ctr)
|
| }
|
|
|
| dm.wg.Wait()
|
|
|
|
|
| if len(failedContainers) > 0 {
|
| slog.Debug("Retrying failed containers", "count", len(failedContainers))
|
| for i := range failedContainers {
|
| ctr := failedContainers[i]
|
| dm.queue()
|
| go func(ctr *container.ApiInfo) {
|
| defer dm.dequeue()
|
| if err2 := dm.updateContainerStats(ctr, cacheTimeMs); err2 != nil {
|
| slog.Error("Error getting container stats", "err", err2)
|
| }
|
| }(ctr)
|
| }
|
| dm.wg.Wait()
|
| }
|
|
|
|
|
| stats := make([]*container.Stats, 0, containersLength)
|
| for id, v := range dm.containerStatsMap {
|
| if _, exists := dm.validIds[id]; !exists {
|
| delete(dm.containerStatsMap, id)
|
| } else {
|
| stats = append(stats, v)
|
| }
|
| }
|
|
|
|
|
| dm.cycleNetworkDeltasForCacheTime(cacheTimeMs)
|
|
|
| return stats, nil
|
| }
|
|
|
|
|
| func (dm *dockerManager) initializeCpuTracking(cacheTimeMs uint16) {
|
|
|
| if dm.lastCpuContainer[cacheTimeMs] == nil {
|
| dm.lastCpuContainer[cacheTimeMs] = make(map[string]uint64)
|
| }
|
| if dm.lastCpuSystem[cacheTimeMs] == nil {
|
| dm.lastCpuSystem[cacheTimeMs] = make(map[string]uint64)
|
| }
|
|
|
| if dm.lastCpuReadTime == nil {
|
| dm.lastCpuReadTime = make(map[uint16]map[string]time.Time)
|
| }
|
| if dm.lastCpuReadTime[cacheTimeMs] == nil {
|
| dm.lastCpuReadTime[cacheTimeMs] = make(map[string]time.Time)
|
| }
|
| }
|
|
|
|
|
| func (dm *dockerManager) getCpuPreviousValues(cacheTimeMs uint16, containerId string) (uint64, uint64) {
|
| return dm.lastCpuContainer[cacheTimeMs][containerId], dm.lastCpuSystem[cacheTimeMs][containerId]
|
| }
|
|
|
|
|
| func (dm *dockerManager) setCpuCurrentValues(cacheTimeMs uint16, containerId string, cpuContainer, cpuSystem uint64) {
|
| dm.lastCpuContainer[cacheTimeMs][containerId] = cpuContainer
|
| dm.lastCpuSystem[cacheTimeMs][containerId] = cpuSystem
|
| }
|
|
|
|
|
| func calculateMemoryUsage(apiStats *container.ApiStats, isWindows bool) (uint64, error) {
|
| if isWindows {
|
| return apiStats.MemoryStats.PrivateWorkingSet, nil
|
| }
|
|
|
| memCache := apiStats.MemoryStats.Stats.InactiveFile
|
| if memCache == 0 {
|
| memCache = apiStats.MemoryStats.Stats.Cache
|
| }
|
|
|
| usedDelta := apiStats.MemoryStats.Usage - memCache
|
| if usedDelta <= 0 || usedDelta > maxMemoryUsage {
|
| return 0, fmt.Errorf("bad memory stats")
|
| }
|
|
|
| return usedDelta, nil
|
| }
|
|
|
|
|
| func (dm *dockerManager) getNetworkTracker(cacheTimeMs uint16, isSent bool) *deltatracker.DeltaTracker[string, uint64] {
|
| var trackers map[uint16]*deltatracker.DeltaTracker[string, uint64]
|
| if isSent {
|
| trackers = dm.networkSentTrackers
|
| } else {
|
| trackers = dm.networkRecvTrackers
|
| }
|
|
|
| if trackers[cacheTimeMs] == nil {
|
| trackers[cacheTimeMs] = deltatracker.NewDeltaTracker[string, uint64]()
|
| }
|
|
|
| return trackers[cacheTimeMs]
|
| }
|
|
|
|
|
| func (dm *dockerManager) cycleNetworkDeltasForCacheTime(cacheTimeMs uint16) {
|
| if dm.networkSentTrackers[cacheTimeMs] != nil {
|
| dm.networkSentTrackers[cacheTimeMs].Cycle()
|
| }
|
| if dm.networkRecvTrackers[cacheTimeMs] != nil {
|
| dm.networkRecvTrackers[cacheTimeMs].Cycle()
|
| }
|
| }
|
|
|
|
|
| func (dm *dockerManager) calculateNetworkStats(ctr *container.ApiInfo, apiStats *container.ApiStats, name string, cacheTimeMs uint16) (uint64, uint64) {
|
| var total_sent, total_recv uint64
|
| for _, v := range apiStats.Networks {
|
| total_sent += v.TxBytes
|
| total_recv += v.RxBytes
|
| }
|
|
|
|
|
| sentTracker := dm.getNetworkTracker(cacheTimeMs, true)
|
| recvTracker := dm.getNetworkTracker(cacheTimeMs, false)
|
|
|
|
|
| sentTracker.Set(ctr.IdShort, total_sent)
|
| recvTracker.Set(ctr.IdShort, total_recv)
|
|
|
|
|
| sent_delta_raw := sentTracker.Delta(ctr.IdShort)
|
| recv_delta_raw := recvTracker.Delta(ctr.IdShort)
|
|
|
|
|
|
|
| var sent_delta, recv_delta uint64
|
| if prevReadTime, ok := dm.lastNetworkReadTime[cacheTimeMs][ctr.IdShort]; ok {
|
| millisecondsElapsed := uint64(time.Since(prevReadTime).Milliseconds())
|
| if millisecondsElapsed > 0 {
|
| if sent_delta_raw > 0 {
|
| sent_delta = sent_delta_raw * 1000 / millisecondsElapsed
|
| if sent_delta > maxNetworkSpeedBps {
|
| slog.Warn("Bad network delta", "container", name)
|
| sent_delta = 0
|
| }
|
| }
|
| if recv_delta_raw > 0 {
|
| recv_delta = recv_delta_raw * 1000 / millisecondsElapsed
|
| if recv_delta > maxNetworkSpeedBps {
|
| slog.Warn("Bad network delta", "container", name)
|
| recv_delta = 0
|
| }
|
| }
|
| }
|
| }
|
|
|
| return sent_delta, recv_delta
|
| }
|
|
|
|
|
| func validateCpuPercentage(cpuPct float64, containerName string) error {
|
| if cpuPct > 100 {
|
| return fmt.Errorf("%s cpu pct greater than 100: %+v", containerName, cpuPct)
|
| }
|
| return nil
|
| }
|
|
|
|
|
| func updateContainerStatsValues(stats *container.Stats, cpuPct float64, usedMemory uint64, sent_delta, recv_delta uint64, readTime time.Time) {
|
| stats.Cpu = utils.TwoDecimals(cpuPct)
|
| stats.Mem = utils.BytesToMegabytes(float64(usedMemory))
|
| stats.Bandwidth = [2]uint64{sent_delta, recv_delta}
|
|
|
| stats.NetworkSent = utils.BytesToMegabytes(float64(sent_delta))
|
| stats.NetworkRecv = utils.BytesToMegabytes(float64(recv_delta))
|
| stats.PrevReadTime = readTime
|
| }
|
|
|
|
|
|
|
| func convertContainerPortsToString(ctr *container.ApiInfo) string {
|
| if len(ctr.Ports) == 0 {
|
| return ""
|
| }
|
| sort.Slice(ctr.Ports, func(i, j int) bool {
|
| return ctr.Ports[i].PublicPort < ctr.Ports[j].PublicPort
|
| })
|
| var builder strings.Builder
|
| seenPorts := make(map[uint16]struct{})
|
| for _, p := range ctr.Ports {
|
| _, ok := seenPorts[p.PublicPort]
|
| if p.PublicPort == 0 || ok {
|
| continue
|
| }
|
| seenPorts[p.PublicPort] = struct{}{}
|
| if builder.Len() > 0 {
|
| builder.WriteString(", ")
|
| }
|
| switch p.IP {
|
| case "0.0.0.0", "::":
|
| default:
|
| builder.WriteString(p.IP)
|
| builder.WriteByte(':')
|
| }
|
| builder.WriteString(strconv.Itoa(int(p.PublicPort)))
|
| }
|
|
|
| ctr.Ports = nil
|
| return builder.String()
|
| }
|
|
|
| func parseDockerStatus(status string) (string, container.DockerHealth) {
|
| trimmed := strings.TrimSpace(status)
|
| if trimmed == "" {
|
| return "", container.DockerHealthNone
|
| }
|
|
|
|
|
| trimmed = strings.Replace(trimmed, "About ", "", 1)
|
|
|
| openIdx := strings.LastIndex(trimmed, "(")
|
| if openIdx == -1 || !strings.HasSuffix(trimmed, ")") {
|
| return trimmed, container.DockerHealthNone
|
| }
|
|
|
| statusText := strings.TrimSpace(trimmed[:openIdx])
|
| if statusText == "" {
|
| statusText = trimmed
|
| }
|
|
|
| healthText := strings.TrimSpace(strings.TrimSuffix(trimmed[openIdx+1:], ")"))
|
|
|
|
|
| if colonIdx := strings.IndexRune(healthText, ':'); colonIdx != -1 {
|
| prefix := strings.ToLower(strings.TrimSpace(healthText[:colonIdx]))
|
| if prefix == "health" || prefix == "health status" {
|
| healthText = strings.TrimSpace(healthText[colonIdx+1:])
|
| }
|
| }
|
| if health, ok := parseDockerHealthStatus(healthText); ok {
|
| return statusText, health
|
| }
|
|
|
| return trimmed, container.DockerHealthNone
|
| }
|
|
|
|
|
| func parseDockerHealthStatus(status string) (container.DockerHealth, bool) {
|
| health, ok := container.DockerHealthStrings[strings.ToLower(strings.TrimSpace(status))]
|
| return health, ok
|
| }
|
|
|
|
|
|
|
|
|
| func (dm *dockerManager) getPodmanContainerHealth(containerID string) (container.DockerHealth, error) {
|
| resp, err := dm.client.Get(fmt.Sprintf("http://localhost/containers/%s/json", url.PathEscape(containerID)))
|
| if err != nil {
|
| return container.DockerHealthNone, err
|
| }
|
| defer resp.Body.Close()
|
|
|
| if resp.StatusCode != http.StatusOK {
|
| return container.DockerHealthNone, fmt.Errorf("container inspect request failed: %s", resp.Status)
|
| }
|
|
|
| var inspectInfo struct {
|
| State struct {
|
| Health struct {
|
| Status string
|
| }
|
| }
|
| }
|
| if err := json.NewDecoder(resp.Body).Decode(&inspectInfo); err != nil {
|
| return container.DockerHealthNone, err
|
| }
|
|
|
| if health, ok := parseDockerHealthStatus(inspectInfo.State.Health.Status); ok {
|
| return health, nil
|
| }
|
|
|
| return container.DockerHealthNone, nil
|
| }
|
|
|
|
|
| func (dm *dockerManager) updateContainerStats(ctr *container.ApiInfo, cacheTimeMs uint16) error {
|
| name := ctr.Names[0][1:]
|
|
|
| resp, err := dm.client.Get(fmt.Sprintf("http://localhost/containers/%s/stats?stream=0&one-shot=1", ctr.IdShort))
|
| if err != nil {
|
| return err
|
| }
|
|
|
| statusText, health := parseDockerStatus(ctr.Status)
|
|
|
|
|
|
|
|
|
| if ctr.Health.Status != "" {
|
| if h, ok := parseDockerHealthStatus(ctr.Health.Status); ok {
|
| health = h
|
| }
|
| } else if dm.usingPodman {
|
| if podmanHealth, err := dm.getPodmanContainerHealth(ctr.IdShort); err == nil {
|
| health = podmanHealth
|
| }
|
| }
|
|
|
| dm.containerStatsMutex.Lock()
|
| defer dm.containerStatsMutex.Unlock()
|
|
|
|
|
| stats, initialized := dm.containerStatsMap[ctr.IdShort]
|
| if !initialized {
|
| stats = &container.Stats{Name: name, Id: ctr.IdShort, Image: ctr.Image}
|
| dm.containerStatsMap[ctr.IdShort] = stats
|
| }
|
|
|
| stats.Id = ctr.IdShort
|
| stats.Status = statusText
|
| stats.Health = health
|
|
|
| if len(ctr.Ports) > 0 {
|
| stats.Ports = convertContainerPortsToString(ctr)
|
| }
|
|
|
|
|
| stats.Cpu = 0
|
| stats.Mem = 0
|
| stats.Bandwidth = [2]uint64{0, 0}
|
|
|
| stats.NetworkSent = 0
|
| stats.NetworkRecv = 0
|
|
|
| res := dm.apiStats
|
| res.Networks = nil
|
| if err := dm.decode(resp, res); err != nil {
|
| return err
|
| }
|
|
|
|
|
| dm.initializeCpuTracking(cacheTimeMs)
|
|
|
|
|
| prevCpuContainer, prevCpuSystem := dm.getCpuPreviousValues(cacheTimeMs, ctr.IdShort)
|
|
|
|
|
| var cpuPct float64
|
| if dm.isWindows {
|
| prevRead := dm.lastCpuReadTime[cacheTimeMs][ctr.IdShort]
|
| cpuPct = res.CalculateCpuPercentWindows(prevCpuContainer, prevRead)
|
| } else {
|
| cpuPct = res.CalculateCpuPercentLinux(prevCpuContainer, prevCpuSystem)
|
| }
|
|
|
|
|
| usedMemory, err := calculateMemoryUsage(res, dm.isWindows)
|
| if err != nil {
|
| return fmt.Errorf("%s - %w - see https://github.com/henrygd/beszel/issues/144", name, err)
|
| }
|
|
|
|
|
| currentCpuContainer := res.CPUStats.CPUUsage.TotalUsage
|
| currentCpuSystem := res.CPUStats.SystemUsage
|
| dm.setCpuCurrentValues(cacheTimeMs, ctr.IdShort, currentCpuContainer, currentCpuSystem)
|
|
|
|
|
| if err := validateCpuPercentage(cpuPct, name); err != nil {
|
| return err
|
| }
|
|
|
|
|
| sent_delta, recv_delta := dm.calculateNetworkStats(ctr, res, name, cacheTimeMs)
|
|
|
|
|
| if dm.lastNetworkReadTime[cacheTimeMs] == nil {
|
| dm.lastNetworkReadTime[cacheTimeMs] = make(map[string]time.Time)
|
| }
|
| dm.lastNetworkReadTime[cacheTimeMs][ctr.IdShort] = time.Now()
|
|
|
|
|
| var total_sent, total_recv uint64
|
| for _, v := range res.Networks {
|
| total_sent += v.TxBytes
|
| total_recv += v.RxBytes
|
| }
|
| stats.PrevNet.Sent, stats.PrevNet.Recv = total_sent, total_recv
|
|
|
|
|
| updateContainerStatsValues(stats, cpuPct, usedMemory, sent_delta, recv_delta, res.Read)
|
|
|
| dm.lastCpuReadTime[cacheTimeMs][ctr.IdShort] = res.Read
|
|
|
| return nil
|
| }
|
|
|
|
|
| func (dm *dockerManager) deleteContainerStatsSync(id string) {
|
| dm.containerStatsMutex.Lock()
|
| defer dm.containerStatsMutex.Unlock()
|
| delete(dm.containerStatsMap, id)
|
| for ct := range dm.lastCpuContainer {
|
| delete(dm.lastCpuContainer[ct], id)
|
| }
|
| for ct := range dm.lastCpuSystem {
|
| delete(dm.lastCpuSystem[ct], id)
|
| }
|
| for ct := range dm.lastCpuReadTime {
|
| delete(dm.lastCpuReadTime[ct], id)
|
| }
|
| for ct := range dm.lastNetworkReadTime {
|
| delete(dm.lastNetworkReadTime[ct], id)
|
| }
|
| }
|
|
|
|
|
| func newDockerManager(agent *Agent) *dockerManager {
|
| dockerHost, exists := utils.GetEnv("DOCKER_HOST")
|
| if exists {
|
|
|
| if dockerHost == "" {
|
| return nil
|
| }
|
| } else {
|
| dockerHost = getDockerHost()
|
| }
|
|
|
| parsedURL, err := url.Parse(dockerHost)
|
| if err != nil {
|
| os.Exit(1)
|
| }
|
|
|
| transport := &http.Transport{
|
| DisableCompression: true,
|
| MaxConnsPerHost: 0,
|
| }
|
|
|
| switch parsedURL.Scheme {
|
| case "unix":
|
| transport.DialContext = func(ctx context.Context, proto, addr string) (net.Conn, error) {
|
| return (&net.Dialer{}).DialContext(ctx, "unix", parsedURL.Path)
|
| }
|
| case "tcp", "http", "https":
|
| transport.DialContext = func(ctx context.Context, proto, addr string) (net.Conn, error) {
|
| return (&net.Dialer{}).DialContext(ctx, "tcp", parsedURL.Host)
|
| }
|
| default:
|
| slog.Error("Invalid DOCKER_HOST", "scheme", parsedURL.Scheme)
|
| os.Exit(1)
|
| }
|
|
|
|
|
| timeout := time.Millisecond * time.Duration(dockerTimeoutMs)
|
| if t, set := utils.GetEnv("DOCKER_TIMEOUT"); set {
|
| timeout, err = time.ParseDuration(t)
|
| if err != nil {
|
| slog.Error(err.Error())
|
| os.Exit(1)
|
| }
|
| slog.Info("DOCKER_TIMEOUT", "timeout", timeout)
|
| }
|
|
|
|
|
| userAgentTransport := &userAgentRoundTripper{
|
| rt: transport,
|
| userAgent: "Docker-Client/",
|
| }
|
|
|
|
|
| var excludeContainers []string
|
| if excludeStr, set := utils.GetEnv("EXCLUDE_CONTAINERS"); set && excludeStr != "" {
|
| parts := strings.SplitSeq(excludeStr, ",")
|
| for part := range parts {
|
| trimmed := strings.TrimSpace(part)
|
| if trimmed != "" {
|
| excludeContainers = append(excludeContainers, trimmed)
|
| }
|
| }
|
| slog.Info("EXCLUDE_CONTAINERS", "patterns", excludeContainers)
|
| }
|
|
|
| manager := &dockerManager{
|
| agent: agent,
|
| client: &http.Client{
|
| Timeout: timeout,
|
| Transport: userAgentTransport,
|
| },
|
| containerStatsMap: make(map[string]*container.Stats),
|
| sem: make(chan struct{}, 5),
|
| apiContainerList: []*container.ApiInfo{},
|
| apiStats: &container.ApiStats{},
|
| excludeContainers: excludeContainers,
|
|
|
|
|
| lastCpuContainer: make(map[uint16]map[string]uint64),
|
| lastCpuSystem: make(map[uint16]map[string]uint64),
|
| lastCpuReadTime: make(map[uint16]map[string]time.Time),
|
| networkSentTrackers: make(map[uint16]*deltatracker.DeltaTracker[string, uint64]),
|
| networkRecvTrackers: make(map[uint16]*deltatracker.DeltaTracker[string, uint64]),
|
| lastNetworkReadTime: make(map[uint16]map[string]time.Time),
|
| }
|
|
|
|
|
|
|
| _, _ = manager.checkDockerVersion()
|
|
|
| return manager
|
| }
|
|
|
|
|
|
|
| func (dm *dockerManager) checkDockerVersion() (bool, error) {
|
| resp, err := dm.client.Get("http://localhost/version")
|
| if err != nil {
|
| return false, err
|
| }
|
| if resp.StatusCode != http.StatusOK {
|
| status := resp.Status
|
| resp.Body.Close()
|
| return false, fmt.Errorf("docker version request failed: %s", status)
|
| }
|
|
|
| var versionInfo dockerVersionResponse
|
| serverHeader := resp.Header.Get("Server")
|
| if err := dm.decode(resp, &versionInfo); err != nil {
|
| return false, err
|
| }
|
|
|
| dm.applyDockerVersionInfo(serverHeader, &versionInfo)
|
| dm.dockerVersionChecked = true
|
| return true, nil
|
| }
|
|
|
|
|
|
|
| func (dm *dockerManager) ensureDockerVersionChecked() {
|
| if dm.dockerVersionChecked {
|
| return
|
| }
|
| if _, err := dm.checkDockerVersion(); err != nil {
|
| slog.Debug("Failed to get Docker version", "err", err)
|
| }
|
| }
|
|
|
|
|
| func (dm *dockerManager) applyDockerVersionInfo(serverHeader string, versionInfo *dockerVersionResponse) {
|
| if detectPodmanEngine(serverHeader, versionInfo) {
|
| dm.setIsPodman()
|
| return
|
| }
|
|
|
| if dockerVersion, err := semver.Parse(versionInfo.Version); err == nil && dockerVersion.Major > 24 {
|
| dm.goodDockerVersion = true
|
| } else {
|
| slog.Info(fmt.Sprintf("Docker %s is outdated. Upgrade if possible. See https://github.com/henrygd/beszel/issues/58", versionInfo.Version))
|
| }
|
| }
|
|
|
|
|
| func (dm *dockerManager) decode(resp *http.Response, d any) error {
|
| if dm.buf == nil {
|
|
|
| dm.buf = bytes.NewBuffer(make([]byte, 0, 1024*256))
|
| dm.decoder = json.NewDecoder(dm.buf)
|
| }
|
| defer resp.Body.Close()
|
| defer dm.buf.Reset()
|
| _, err := dm.buf.ReadFrom(resp.Body)
|
| if err != nil {
|
| return err
|
| }
|
| return dm.decoder.Decode(d)
|
| }
|
|
|
|
|
| func getDockerHost() string {
|
| scheme := "unix://"
|
| socks := []string{"/var/run/docker.sock", fmt.Sprintf("/run/user/%v/podman/podman.sock", os.Getuid())}
|
| for _, sock := range socks {
|
| if _, err := os.Stat(sock); err == nil {
|
| return scheme + sock
|
| }
|
| }
|
| return scheme + socks[0]
|
| }
|
|
|
| func validateContainerID(containerID string) error {
|
| if !dockerContainerIDPattern.MatchString(containerID) {
|
| return fmt.Errorf("invalid container id")
|
| }
|
| return nil
|
| }
|
|
|
| func buildDockerContainerEndpoint(containerID, action string, query url.Values) (string, error) {
|
| if err := validateContainerID(containerID); err != nil {
|
| return "", err
|
| }
|
| u := &url.URL{
|
| Scheme: "http",
|
| Host: "localhost",
|
| Path: fmt.Sprintf("/containers/%s/%s", url.PathEscape(containerID), action),
|
| }
|
| if len(query) > 0 {
|
| u.RawQuery = query.Encode()
|
| }
|
| return u.String(), nil
|
| }
|
|
|
|
|
| func (dm *dockerManager) getContainerInfo(ctx context.Context, containerID string) ([]byte, error) {
|
| endpoint, err := buildDockerContainerEndpoint(containerID, "json", nil)
|
| if err != nil {
|
| return nil, err
|
| }
|
| req, err := http.NewRequestWithContext(ctx, http.MethodGet, endpoint, nil)
|
| if err != nil {
|
| return nil, err
|
| }
|
|
|
| resp, err := dm.client.Do(req)
|
| if err != nil {
|
| return nil, err
|
| }
|
| defer resp.Body.Close()
|
|
|
| if resp.StatusCode != http.StatusOK {
|
| body, _ := io.ReadAll(io.LimitReader(resp.Body, 1024))
|
| return nil, fmt.Errorf("container info request failed: %s: %s", resp.Status, strings.TrimSpace(string(body)))
|
| }
|
|
|
|
|
| var containerInfo map[string]any
|
| if err := json.NewDecoder(resp.Body).Decode(&containerInfo); err != nil {
|
| return nil, err
|
| }
|
| if config, ok := containerInfo["Config"].(map[string]any); ok {
|
| delete(config, "Env")
|
| }
|
|
|
| return json.Marshal(containerInfo)
|
| }
|
|
|
|
|
| func (dm *dockerManager) getLogs(ctx context.Context, containerID string) (string, error) {
|
| query := url.Values{
|
| "stdout": []string{"1"},
|
| "stderr": []string{"1"},
|
| "tail": []string{fmt.Sprintf("%d", dockerLogsTail)},
|
| }
|
| endpoint, err := buildDockerContainerEndpoint(containerID, "logs", query)
|
| if err != nil {
|
| return "", err
|
| }
|
| req, err := http.NewRequestWithContext(ctx, http.MethodGet, endpoint, nil)
|
| if err != nil {
|
| return "", err
|
| }
|
|
|
| resp, err := dm.client.Do(req)
|
| if err != nil {
|
| return "", err
|
| }
|
| defer resp.Body.Close()
|
|
|
| if resp.StatusCode != http.StatusOK {
|
| body, _ := io.ReadAll(io.LimitReader(resp.Body, 1024))
|
| return "", fmt.Errorf("logs request failed: %s: %s", resp.Status, strings.TrimSpace(string(body)))
|
| }
|
|
|
| var builder strings.Builder
|
| contentType := resp.Header.Get("Content-Type")
|
| multiplexed := strings.HasSuffix(contentType, "multiplexed-stream")
|
| logReader := io.Reader(resp.Body)
|
| if !multiplexed {
|
|
|
|
|
| bufferedReader := bufio.NewReaderSize(resp.Body, 8)
|
| multiplexed = detectDockerMultiplexedStream(bufferedReader)
|
| logReader = bufferedReader
|
| }
|
| if err := decodeDockerLogStream(logReader, &builder, multiplexed); err != nil {
|
| return "", err
|
| }
|
|
|
|
|
| logs := builder.String()
|
| if strings.Contains(logs, "\x1b") {
|
| logs = ansiEscapePattern.ReplaceAllString(logs, "")
|
| }
|
| return logs, nil
|
| }
|
|
|
| func detectDockerMultiplexedStream(reader *bufio.Reader) bool {
|
| const headerSize = 8
|
| header, err := reader.Peek(headerSize)
|
| if err != nil {
|
| return false
|
| }
|
| if header[0] != 0x01 && header[0] != 0x02 {
|
| return false
|
| }
|
|
|
| if header[1] != 0 || header[2] != 0 || header[3] != 0 {
|
| return false
|
| }
|
| frameLen := binary.BigEndian.Uint32(header[4:])
|
| return frameLen <= maxLogFrameSize
|
| }
|
|
|
| func decodeDockerLogStream(reader io.Reader, builder *strings.Builder, multiplexed bool) error {
|
| if !multiplexed {
|
| _, err := io.Copy(builder, io.LimitReader(reader, maxTotalLogSize))
|
| return err
|
| }
|
| const headerSize = 8
|
| var header [headerSize]byte
|
| totalBytesRead := 0
|
|
|
| for {
|
| if _, err := io.ReadFull(reader, header[:]); err != nil {
|
| if errors.Is(err, io.EOF) || errors.Is(err, io.ErrUnexpectedEOF) {
|
| return nil
|
| }
|
| return err
|
| }
|
|
|
| frameLen := binary.BigEndian.Uint32(header[4:])
|
| if frameLen == 0 {
|
| continue
|
| }
|
|
|
|
|
| if frameLen > maxLogFrameSize {
|
| return fmt.Errorf("log frame size (%d) exceeds maximum (%d)", frameLen, maxLogFrameSize)
|
| }
|
|
|
|
|
| if totalBytesRead+int(frameLen) > maxTotalLogSize {
|
|
|
| _, _ = io.CopyN(io.Discard, reader, int64(frameLen))
|
| slog.Debug("Truncating logs: limit reached", "read", totalBytesRead, "limit", maxTotalLogSize)
|
| return nil
|
| }
|
|
|
| n, err := io.CopyN(builder, reader, int64(frameLen))
|
| if err != nil {
|
| if errors.Is(err, io.EOF) || errors.Is(err, io.ErrUnexpectedEOF) {
|
| return nil
|
| }
|
| return err
|
| }
|
| totalBytesRead += int(n)
|
| }
|
| }
|
|
|
|
|
| func (dm *dockerManager) GetHostInfo() (info container.HostInfo, err error) {
|
| resp, err := dm.client.Get("http://localhost/info")
|
| if err != nil {
|
| return info, err
|
| }
|
| defer resp.Body.Close()
|
|
|
| if err := json.NewDecoder(resp.Body).Decode(&info); err != nil {
|
| return info, err
|
| }
|
|
|
| return info, nil
|
| }
|
|
|
| func (dm *dockerManager) IsPodman() bool {
|
| return dm.usingPodman
|
| }
|
|
|
|
|
| func (dm *dockerManager) setIsPodman() {
|
| if dm.usingPodman {
|
| return
|
| }
|
| dm.usingPodman = true
|
| dm.goodDockerVersion = true
|
| dm.dockerVersionChecked = true
|
|
|
|
|
| if dm.agent != nil {
|
| dm.agent.updateSystemDetails(func(details *system.Details) {
|
| details.Podman = true
|
| })
|
| }
|
| }
|
|
|
|
|
| func detectPodmanFromHeader(server string) bool {
|
| return strings.HasPrefix(server, "Libpod")
|
| }
|
|
|
|
|
| func detectPodmanFromVersion(versionInfo *dockerVersionResponse) bool {
|
| if versionInfo == nil {
|
| return false
|
| }
|
| for _, component := range versionInfo.Components {
|
| if strings.HasPrefix(component.Name, "Podman") {
|
| return true
|
| }
|
| }
|
| return false
|
| }
|
|
|
|
|
| func detectPodmanEngine(serverHeader string, versionInfo *dockerVersionResponse) bool {
|
| if detectPodmanFromHeader(serverHeader) {
|
| return true
|
| }
|
| return detectPodmanFromVersion(versionInfo)
|
| }
|
|
|