File size: 4,805 Bytes
6a7089a | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 | package scheduler
import (
"crypto/rand"
"encoding/hex"
"fmt"
"sync"
"time"
)
// TaskState represents the current state of a task.
type TaskState string
const (
StateQueued TaskState = "queued"
StateAssigned TaskState = "assigned"
StateRunning TaskState = "running"
StateDone TaskState = "done"
StateFailed TaskState = "failed"
StateCancelled TaskState = "cancelled"
StateRejected TaskState = "rejected"
)
// IsTerminal returns true for states that will not change.
func (s TaskState) IsTerminal() bool {
switch s {
case StateDone, StateFailed, StateCancelled, StateRejected:
return true
}
return false
}
// Task represents a scheduled unit of work dispatched to the executor.
type Task struct {
mu sync.RWMutex
ID string `json:"taskId"`
AgentID string `json:"agentId"`
Action string `json:"action"`
TabID string `json:"tabId,omitempty"`
Selector string `json:"selector,omitempty"`
Ref string `json:"ref,omitempty"` // deprecated: use Selector
Params map[string]any `json:"params,omitempty"`
Priority int `json:"priority"`
State TaskState `json:"state"`
Deadline time.Time `json:"deadline,omitempty"`
CreatedAt time.Time `json:"createdAt"`
StartedAt time.Time `json:"startedAt,omitempty"`
CompletedAt time.Time `json:"completedAt,omitempty"`
LatencyMs int64 `json:"latencyMs,omitempty"`
Result any `json:"result,omitempty"`
Error string `json:"error,omitempty"`
// CallbackURL receives a POST with the task snapshot on completion.
CallbackURL string `json:"callbackUrl,omitempty"`
// position is the queue position at submission time.
Position int `json:"position,omitempty"`
}
// SetState transitions the task to the given state. Returns an error if
// the transition is invalid (e.g. terminal → anything).
func (t *Task) SetState(next TaskState) error {
t.mu.Lock()
defer t.mu.Unlock()
if t.State.IsTerminal() {
return fmt.Errorf("cannot transition from terminal state %q to %q", t.State, next)
}
switch {
case t.State == StateQueued && (next == StateAssigned || next == StateCancelled || next == StateFailed || next == StateRejected):
case t.State == StateAssigned && (next == StateRunning || next == StateCancelled):
case t.State == StateRunning && (next == StateDone || next == StateFailed || next == StateCancelled):
default:
return fmt.Errorf("invalid state transition: %q → %q", t.State, next)
}
now := time.Now()
t.State = next
switch next {
case StateAssigned:
t.StartedAt = now
case StateRunning:
if t.StartedAt.IsZero() {
t.StartedAt = now
}
case StateDone, StateFailed, StateCancelled:
t.CompletedAt = now
if !t.StartedAt.IsZero() {
t.LatencyMs = now.Sub(t.StartedAt).Milliseconds()
}
}
return nil
}
// GetState returns the current task state.
func (t *Task) GetState() TaskState {
t.mu.RLock()
defer t.mu.RUnlock()
return t.State
}
// Snapshot returns a read-consistent copy of the task for serialization.
func (t *Task) Snapshot() *Task {
t.mu.RLock()
defer t.mu.RUnlock()
return &Task{
ID: t.ID,
AgentID: t.AgentID,
Action: t.Action,
TabID: t.TabID,
Selector: t.Selector,
Ref: t.Ref,
Params: t.Params,
Priority: t.Priority,
State: t.State,
Deadline: t.Deadline,
CreatedAt: t.CreatedAt,
StartedAt: t.StartedAt,
CompletedAt: t.CompletedAt,
LatencyMs: t.LatencyMs,
Result: t.Result,
Error: t.Error,
CallbackURL: t.CallbackURL,
Position: t.Position,
}
}
// SubmitRequest is the JSON body for POST /tasks.
type SubmitRequest struct {
AgentID string `json:"agentId"`
Action string `json:"action"`
TabID string `json:"tabId,omitempty"`
Selector string `json:"selector,omitempty"`
Ref string `json:"ref,omitempty"` // deprecated: use Selector
Params map[string]any `json:"params,omitempty"`
Priority int `json:"priority,omitempty"`
Deadline string `json:"deadline,omitempty"`
CallbackURL string `json:"callbackUrl,omitempty"`
}
// Validate checks that the request has the minimum required fields.
func (r *SubmitRequest) Validate() error {
if r.AgentID == "" {
return fmt.Errorf("missing required field 'agentId'")
}
if r.Action == "" {
return fmt.Errorf("missing required field 'action'")
}
return nil
}
// generateTaskID produces a random task ID in the format tsk_XXXXXXXX.
func generateTaskID() string {
b := make([]byte, 4)
if _, err := rand.Read(b); err != nil {
// Fallback to timestamp-based if crypto/rand fails.
return fmt.Sprintf("tsk_%08x", time.Now().UnixNano()&0xFFFFFFFF)
}
return "tsk_" + hex.EncodeToString(b)
}
|