| package orchestrator |
|
|
| import ( |
| "encoding/json" |
| "fmt" |
| "log/slog" |
| "net/http" |
| "net/url" |
| "strings" |
| "time" |
| ) |
|
|
| const ( |
| instanceHealthPollInterval = 500 * time.Millisecond |
| instanceStartupTimeout = 45 * time.Second |
| ) |
|
|
| func (o *Orchestrator) monitor(inst *InstanceInternal) { |
| healthy := false |
| exitedEarly := false |
| lastProbe := "no response" |
| resolvedURL := "" |
| waitCh := make(chan error, 1) |
| go func() { |
| waitCh <- inst.cmd.Wait() |
| }() |
| var waitErr error |
| started := time.Now() |
| for time.Since(started) < instanceStartupTimeout { |
| select { |
| case waitErr = <-waitCh: |
| exitedEarly = true |
| default: |
| } |
| if exitedEarly { |
| break |
| } |
| time.Sleep(instanceHealthPollInterval) |
|
|
| for _, baseURL := range instanceBaseURLs(inst.URL, inst.Port) { |
| baseParsed, parseErr := url.Parse(baseURL) |
| if parseErr != nil { |
| lastProbe = fmt.Sprintf("%s -> %s", baseURL, parseErr.Error()) |
| continue |
| } |
| target := &url.URL{Scheme: baseParsed.Scheme, Host: baseParsed.Host, Path: "/health"} |
| req, reqErr := http.NewRequest(http.MethodGet, target.String(), nil) |
| if reqErr != nil { |
| lastProbe = fmt.Sprintf("%s -> %s", baseURL, reqErr.Error()) |
| continue |
| } |
| o.applyInstanceAuth(req, inst) |
| resp, err := o.client.Do(req) |
| if err == nil { |
| _ = resp.Body.Close() |
| lastProbe = fmt.Sprintf("%s -> HTTP %d", baseURL, resp.StatusCode) |
| if isInstanceHealthyStatus(resp.StatusCode) { |
| healthy = true |
| resolvedURL = baseURL |
| break |
| } |
| } else { |
| lastProbe = fmt.Sprintf("%s -> %s", baseURL, err.Error()) |
| } |
| } |
| if healthy { |
| break |
| } |
| } |
|
|
| o.mu.Lock() |
| var eventType string |
| switch inst.Status { |
| case "stopping", "stopped": |
| default: |
| if healthy { |
| inst.Status = "running" |
| if resolvedURL != "" { |
| inst.URL = resolvedURL |
| inst.Instance.URL = resolvedURL |
| } |
| o.syncInstanceToManager(&inst.Instance) |
| eventType = "instance.started" |
| slog.Info("instance ready", "id", inst.ID, "port", inst.Port) |
| } else if exitedEarly { |
| inst.Status = "error" |
| if waitErr != nil { |
| inst.Error = "process exited before health check: " + waitErr.Error() |
| } else { |
| inst.Error = "process exited before health check succeeded" |
| } |
| if tail := tailLogLine(inst.logBuf.String()); tail != "" { |
| inst.Error += " | " + tail |
| } |
| eventType = "instance.error" |
| slog.Error("instance exited before ready", "id", inst.ID) |
| } else { |
| inst.Status = "error" |
| inst.Error = fmt.Errorf("health check timeout after %s (%s)", instanceStartupTimeout, lastProbe).Error() |
| if tail := tailLogLine(inst.logBuf.String()); tail != "" { |
| inst.Error += " | " + tail |
| } |
| eventType = "instance.error" |
| slog.Error("instance failed to start", "id", inst.ID) |
| } |
| } |
| instCopy := inst.Instance |
| o.mu.Unlock() |
| if eventType != "" { |
| o.emitEvent(eventType, &instCopy) |
| } |
|
|
| if !exitedEarly { |
| <-waitCh |
| } |
| o.mu.Lock() |
| wasStopped := false |
| if inst.Status == "running" || inst.Status == "stopping" { |
| inst.Status = "stopped" |
| wasStopped = true |
| } |
| instCopy = inst.Instance |
| o.mu.Unlock() |
| if wasStopped { |
| o.emitEvent("instance.stopped", &instCopy) |
| } |
| slog.Info("instance exited", "id", inst.ID) |
| } |
|
|
| type remoteTab struct { |
| ID string `json:"id"` |
| URL string `json:"url"` |
| Title string `json:"title"` |
| } |
|
|
| type remoteMetrics struct { |
| Memory *memoryMetrics `json:"memory,omitempty"` |
| } |
|
|
| type memoryMetrics struct { |
| JSHeapUsedMB float64 `json:"jsHeapUsedMB"` |
| JSHeapTotalMB float64 `json:"jsHeapTotalMB"` |
| Documents int64 `json:"documents"` |
| Frames int64 `json:"frames"` |
| Nodes int64 `json:"nodes"` |
| Listeners int64 `json:"listeners"` |
| } |
|
|
| func (o *Orchestrator) fetchTabs(inst *InstanceInternal) ([]remoteTab, error) { |
| target, err := o.instancePathURL(inst, "/tabs", "") |
| if err != nil { |
| return nil, err |
| } |
| req, err := http.NewRequest(http.MethodGet, target.String(), nil) |
| if err != nil { |
| return nil, err |
| } |
| o.applyInstanceAuth(req, inst) |
|
|
| resp, err := o.client.Do(req) |
| if err != nil { |
| return nil, err |
| } |
| defer func() { _ = resp.Body.Close() }() |
|
|
| if resp.StatusCode != http.StatusOK { |
| return nil, fmt.Errorf("fetch tabs: status %d", resp.StatusCode) |
| } |
|
|
| var result struct { |
| Tabs []remoteTab `json:"tabs"` |
| } |
| if err := json.NewDecoder(resp.Body).Decode(&result); err != nil { |
| return nil, err |
| } |
| return result.Tabs, nil |
| } |
|
|
| func (o *Orchestrator) fetchMetrics(inst *InstanceInternal) (*memoryMetrics, error) { |
| target, err := o.instancePathURL(inst, "/metrics", "") |
| if err != nil { |
| return nil, err |
| } |
| req, err := http.NewRequest(http.MethodGet, target.String(), nil) |
| if err != nil { |
| return nil, err |
| } |
| o.applyInstanceAuth(req, inst) |
|
|
| resp, err := o.client.Do(req) |
| if err != nil { |
| return nil, err |
| } |
| defer func() { _ = resp.Body.Close() }() |
|
|
| if resp.StatusCode != 200 { |
| return nil, nil |
| } |
|
|
| var result remoteMetrics |
| if err := json.NewDecoder(resp.Body).Decode(&result); err != nil { |
| return nil, err |
| } |
| return result.Memory, nil |
| } |
|
|
| func isInstanceHealthyStatus(code int) bool { |
| return code > 0 && code < http.StatusInternalServerError |
| } |
|
|
| func instanceBaseURLs(rawURL, port string) []string { |
| if rawURL != "" { |
| return []string{strings.TrimRight(rawURL, "/")} |
| } |
| return []string{ |
| fmt.Sprintf("http://127.0.0.1:%s", port), |
| fmt.Sprintf("http://[::1]:%s", port), |
| fmt.Sprintf("http://localhost:%s", port), |
| } |
| } |
|
|