Spaces:
Sleeping
Sleeping
| package cron | |
| import ( | |
| "crypto/rand" | |
| "encoding/hex" | |
| "encoding/json" | |
| "fmt" | |
| "log" | |
| "os" | |
| "path/filepath" | |
| "sync" | |
| "time" | |
| "github.com/adhocore/gronx" | |
| ) | |
| type CronSchedule struct { | |
| Kind string `json:"kind"` | |
| AtMS *int64 `json:"atMs,omitempty"` | |
| EveryMS *int64 `json:"everyMs,omitempty"` | |
| Expr string `json:"expr,omitempty"` | |
| TZ string `json:"tz,omitempty"` | |
| } | |
| type CronPayload struct { | |
| Kind string `json:"kind"` | |
| Message string `json:"message"` | |
| Command string `json:"command,omitempty"` | |
| Deliver bool `json:"deliver"` | |
| Channel string `json:"channel,omitempty"` | |
| To string `json:"to,omitempty"` | |
| } | |
| type CronJobState struct { | |
| NextRunAtMS *int64 `json:"nextRunAtMs,omitempty"` | |
| LastRunAtMS *int64 `json:"lastRunAtMs,omitempty"` | |
| LastStatus string `json:"lastStatus,omitempty"` | |
| LastError string `json:"lastError,omitempty"` | |
| } | |
| type CronJob struct { | |
| ID string `json:"id"` | |
| Name string `json:"name"` | |
| Enabled bool `json:"enabled"` | |
| Schedule CronSchedule `json:"schedule"` | |
| Payload CronPayload `json:"payload"` | |
| State CronJobState `json:"state"` | |
| CreatedAtMS int64 `json:"createdAtMs"` | |
| UpdatedAtMS int64 `json:"updatedAtMs"` | |
| DeleteAfterRun bool `json:"deleteAfterRun"` | |
| } | |
| type CronStore struct { | |
| Version int `json:"version"` | |
| Jobs []CronJob `json:"jobs"` | |
| } | |
| type JobHandler func(job *CronJob) (string, error) | |
| type CronService struct { | |
| storePath string | |
| store *CronStore | |
| onJob JobHandler | |
| mu sync.RWMutex | |
| running bool | |
| stopChan chan struct{} | |
| gronx *gronx.Gronx | |
| } | |
| func NewCronService(storePath string, onJob JobHandler) *CronService { | |
| cs := &CronService{ | |
| storePath: storePath, | |
| onJob: onJob, | |
| gronx: gronx.New(), | |
| } | |
| // Initialize and load store on creation | |
| cs.loadStore() | |
| return cs | |
| } | |
| func (cs *CronService) Start() error { | |
| cs.mu.Lock() | |
| defer cs.mu.Unlock() | |
| if cs.running { | |
| return nil | |
| } | |
| if err := cs.loadStore(); err != nil { | |
| return fmt.Errorf("failed to load store: %w", err) | |
| } | |
| cs.recomputeNextRuns() | |
| if err := cs.saveStoreUnsafe(); err != nil { | |
| return fmt.Errorf("failed to save store: %w", err) | |
| } | |
| cs.stopChan = make(chan struct{}) | |
| cs.running = true | |
| go cs.runLoop(cs.stopChan) | |
| return nil | |
| } | |
| func (cs *CronService) Stop() { | |
| cs.mu.Lock() | |
| defer cs.mu.Unlock() | |
| if !cs.running { | |
| return | |
| } | |
| cs.running = false | |
| if cs.stopChan != nil { | |
| close(cs.stopChan) | |
| cs.stopChan = nil | |
| } | |
| } | |
| func (cs *CronService) runLoop(stopChan chan struct{}) { | |
| ticker := time.NewTicker(1 * time.Second) | |
| defer ticker.Stop() | |
| for { | |
| select { | |
| case <-stopChan: | |
| return | |
| case <-ticker.C: | |
| cs.checkJobs() | |
| } | |
| } | |
| } | |
| func (cs *CronService) checkJobs() { | |
| cs.mu.Lock() | |
| if !cs.running { | |
| cs.mu.Unlock() | |
| return | |
| } | |
| now := time.Now().UnixMilli() | |
| var dueJobIDs []string | |
| // Collect jobs that are due (we need to copy them to execute outside lock) | |
| for i := range cs.store.Jobs { | |
| job := &cs.store.Jobs[i] | |
| if job.Enabled && job.State.NextRunAtMS != nil && *job.State.NextRunAtMS <= now { | |
| dueJobIDs = append(dueJobIDs, job.ID) | |
| } | |
| } | |
| // Reset next run for due jobs before unlocking to avoid duplicate execution. | |
| dueMap := make(map[string]bool, len(dueJobIDs)) | |
| for _, jobID := range dueJobIDs { | |
| dueMap[jobID] = true | |
| } | |
| for i := range cs.store.Jobs { | |
| if dueMap[cs.store.Jobs[i].ID] { | |
| cs.store.Jobs[i].State.NextRunAtMS = nil | |
| } | |
| } | |
| if err := cs.saveStoreUnsafe(); err != nil { | |
| log.Printf("[cron] failed to save store: %v", err) | |
| } | |
| cs.mu.Unlock() | |
| // Execute jobs outside lock. | |
| for _, jobID := range dueJobIDs { | |
| cs.executeJobByID(jobID) | |
| } | |
| } | |
| func (cs *CronService) executeJobByID(jobID string) { | |
| startTime := time.Now().UnixMilli() | |
| cs.mu.RLock() | |
| var callbackJob *CronJob | |
| for i := range cs.store.Jobs { | |
| job := &cs.store.Jobs[i] | |
| if job.ID == jobID { | |
| jobCopy := *job | |
| callbackJob = &jobCopy | |
| break | |
| } | |
| } | |
| cs.mu.RUnlock() | |
| if callbackJob == nil { | |
| return | |
| } | |
| var err error | |
| if cs.onJob != nil { | |
| _, err = cs.onJob(callbackJob) | |
| } | |
| // Now acquire lock to update state | |
| cs.mu.Lock() | |
| defer cs.mu.Unlock() | |
| var job *CronJob | |
| for i := range cs.store.Jobs { | |
| if cs.store.Jobs[i].ID == jobID { | |
| job = &cs.store.Jobs[i] | |
| break | |
| } | |
| } | |
| if job == nil { | |
| log.Printf("[cron] job %s disappeared before state update", jobID) | |
| return | |
| } | |
| job.State.LastRunAtMS = &startTime | |
| job.UpdatedAtMS = time.Now().UnixMilli() | |
| if err != nil { | |
| job.State.LastStatus = "error" | |
| job.State.LastError = err.Error() | |
| } else { | |
| job.State.LastStatus = "ok" | |
| job.State.LastError = "" | |
| } | |
| // Compute next run time | |
| if job.Schedule.Kind == "at" { | |
| if job.DeleteAfterRun { | |
| cs.removeJobUnsafe(job.ID) | |
| } else { | |
| job.Enabled = false | |
| job.State.NextRunAtMS = nil | |
| } | |
| } else { | |
| nextRun := cs.computeNextRun(&job.Schedule, time.Now().UnixMilli()) | |
| job.State.NextRunAtMS = nextRun | |
| } | |
| if err := cs.saveStoreUnsafe(); err != nil { | |
| log.Printf("[cron] failed to save store: %v", err) | |
| } | |
| } | |
| func (cs *CronService) computeNextRun(schedule *CronSchedule, nowMS int64) *int64 { | |
| if schedule.Kind == "at" { | |
| if schedule.AtMS != nil && *schedule.AtMS > nowMS { | |
| return schedule.AtMS | |
| } | |
| return nil | |
| } | |
| if schedule.Kind == "every" { | |
| if schedule.EveryMS == nil || *schedule.EveryMS <= 0 { | |
| return nil | |
| } | |
| next := nowMS + *schedule.EveryMS | |
| return &next | |
| } | |
| if schedule.Kind == "cron" { | |
| if schedule.Expr == "" { | |
| return nil | |
| } | |
| // Use gronx to calculate next run time | |
| now := time.UnixMilli(nowMS) | |
| nextTime, err := gronx.NextTickAfter(schedule.Expr, now, false) | |
| if err != nil { | |
| log.Printf("[cron] failed to compute next run for expr '%s': %v", schedule.Expr, err) | |
| return nil | |
| } | |
| nextMS := nextTime.UnixMilli() | |
| return &nextMS | |
| } | |
| return nil | |
| } | |
| func (cs *CronService) recomputeNextRuns() { | |
| now := time.Now().UnixMilli() | |
| for i := range cs.store.Jobs { | |
| job := &cs.store.Jobs[i] | |
| if job.Enabled { | |
| job.State.NextRunAtMS = cs.computeNextRun(&job.Schedule, now) | |
| } | |
| } | |
| } | |
| func (cs *CronService) getNextWakeMS() *int64 { | |
| var nextWake *int64 | |
| for _, job := range cs.store.Jobs { | |
| if job.Enabled && job.State.NextRunAtMS != nil { | |
| if nextWake == nil || *job.State.NextRunAtMS < *nextWake { | |
| nextWake = job.State.NextRunAtMS | |
| } | |
| } | |
| } | |
| return nextWake | |
| } | |
| func (cs *CronService) Load() error { | |
| cs.mu.Lock() | |
| defer cs.mu.Unlock() | |
| return cs.loadStore() | |
| } | |
| func (cs *CronService) SetOnJob(handler JobHandler) { | |
| cs.mu.Lock() | |
| defer cs.mu.Unlock() | |
| cs.onJob = handler | |
| } | |
| func (cs *CronService) loadStore() error { | |
| cs.store = &CronStore{ | |
| Version: 1, | |
| Jobs: []CronJob{}, | |
| } | |
| data, err := os.ReadFile(cs.storePath) | |
| if err != nil { | |
| if os.IsNotExist(err) { | |
| return nil | |
| } | |
| return err | |
| } | |
| return json.Unmarshal(data, cs.store) | |
| } | |
| func (cs *CronService) saveStoreUnsafe() error { | |
| dir := filepath.Dir(cs.storePath) | |
| if err := os.MkdirAll(dir, 0755); err != nil { | |
| return err | |
| } | |
| data, err := json.MarshalIndent(cs.store, "", " ") | |
| if err != nil { | |
| return err | |
| } | |
| return os.WriteFile(cs.storePath, data, 0600) | |
| } | |
| func (cs *CronService) AddJob(name string, schedule CronSchedule, message string, deliver bool, channel, to string) (*CronJob, error) { | |
| cs.mu.Lock() | |
| defer cs.mu.Unlock() | |
| now := time.Now().UnixMilli() | |
| // One-time tasks (at) should be deleted after execution | |
| deleteAfterRun := (schedule.Kind == "at") | |
| job := CronJob{ | |
| ID: generateID(), | |
| Name: name, | |
| Enabled: true, | |
| Schedule: schedule, | |
| Payload: CronPayload{ | |
| Kind: "agent_turn", | |
| Message: message, | |
| Deliver: deliver, | |
| Channel: channel, | |
| To: to, | |
| }, | |
| State: CronJobState{ | |
| NextRunAtMS: cs.computeNextRun(&schedule, now), | |
| }, | |
| CreatedAtMS: now, | |
| UpdatedAtMS: now, | |
| DeleteAfterRun: deleteAfterRun, | |
| } | |
| cs.store.Jobs = append(cs.store.Jobs, job) | |
| if err := cs.saveStoreUnsafe(); err != nil { | |
| return nil, err | |
| } | |
| return &job, nil | |
| } | |
| func (cs *CronService) UpdateJob(job *CronJob) error { | |
| cs.mu.Lock() | |
| defer cs.mu.Unlock() | |
| for i := range cs.store.Jobs { | |
| if cs.store.Jobs[i].ID == job.ID { | |
| cs.store.Jobs[i] = *job | |
| cs.store.Jobs[i].UpdatedAtMS = time.Now().UnixMilli() | |
| return cs.saveStoreUnsafe() | |
| } | |
| } | |
| return fmt.Errorf("job not found") | |
| } | |
| func (cs *CronService) RemoveJob(jobID string) bool { | |
| cs.mu.Lock() | |
| defer cs.mu.Unlock() | |
| return cs.removeJobUnsafe(jobID) | |
| } | |
| func (cs *CronService) removeJobUnsafe(jobID string) bool { | |
| before := len(cs.store.Jobs) | |
| var jobs []CronJob | |
| for _, job := range cs.store.Jobs { | |
| if job.ID != jobID { | |
| jobs = append(jobs, job) | |
| } | |
| } | |
| cs.store.Jobs = jobs | |
| removed := len(cs.store.Jobs) < before | |
| if removed { | |
| if err := cs.saveStoreUnsafe(); err != nil { | |
| log.Printf("[cron] failed to save store after remove: %v", err) | |
| } | |
| } | |
| return removed | |
| } | |
| func (cs *CronService) EnableJob(jobID string, enabled bool) *CronJob { | |
| cs.mu.Lock() | |
| defer cs.mu.Unlock() | |
| for i := range cs.store.Jobs { | |
| job := &cs.store.Jobs[i] | |
| if job.ID == jobID { | |
| job.Enabled = enabled | |
| job.UpdatedAtMS = time.Now().UnixMilli() | |
| if enabled { | |
| job.State.NextRunAtMS = cs.computeNextRun(&job.Schedule, time.Now().UnixMilli()) | |
| } else { | |
| job.State.NextRunAtMS = nil | |
| } | |
| if err := cs.saveStoreUnsafe(); err != nil { | |
| log.Printf("[cron] failed to save store after enable: %v", err) | |
| } | |
| return job | |
| } | |
| } | |
| return nil | |
| } | |
| func (cs *CronService) ListJobs(includeDisabled bool) []CronJob { | |
| cs.mu.RLock() | |
| defer cs.mu.RUnlock() | |
| if includeDisabled { | |
| return cs.store.Jobs | |
| } | |
| var enabled []CronJob | |
| for _, job := range cs.store.Jobs { | |
| if job.Enabled { | |
| enabled = append(enabled, job) | |
| } | |
| } | |
| return enabled | |
| } | |
| func (cs *CronService) Status() map[string]interface{} { | |
| cs.mu.RLock() | |
| defer cs.mu.RUnlock() | |
| var enabledCount int | |
| for _, job := range cs.store.Jobs { | |
| if job.Enabled { | |
| enabledCount++ | |
| } | |
| } | |
| return map[string]interface{}{ | |
| "enabled": cs.running, | |
| "jobs": len(cs.store.Jobs), | |
| "nextWakeAtMS": cs.getNextWakeMS(), | |
| } | |
| } | |
| func generateID() string { | |
| // Use crypto/rand for better uniqueness under concurrent access | |
| b := make([]byte, 8) | |
| if _, err := rand.Read(b); err != nil { | |
| // Fallback to time-based if crypto/rand fails | |
| return fmt.Sprintf("%d", time.Now().UnixNano()) | |
| } | |
| return hex.EncodeToString(b) | |
| } | |