File size: 4,583 Bytes
f606b10 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 | // Package watcher watches config/auth files and triggers hot reloads.
// It supports cross-platform fsnotify event handling.
package watcher
import (
"context"
"strings"
"sync"
"time"
"github.com/fsnotify/fsnotify"
"github.com/router-for-me/CLIProxyAPI/v6/internal/config"
"gopkg.in/yaml.v3"
sdkAuth "github.com/router-for-me/CLIProxyAPI/v6/sdk/auth"
coreauth "github.com/router-for-me/CLIProxyAPI/v6/sdk/cliproxy/auth"
log "github.com/sirupsen/logrus"
)
// storePersister captures persistence-capable token store methods used by the watcher.
type storePersister interface {
PersistConfig(ctx context.Context) error
PersistAuthFiles(ctx context.Context, message string, paths ...string) error
}
type authDirProvider interface {
AuthDir() string
}
// Watcher manages file watching for configuration and authentication files
type Watcher struct {
configPath string
authDir string
config *config.Config
clientsMutex sync.RWMutex
configReloadMu sync.Mutex
configReloadTimer *time.Timer
reloadCallback func(*config.Config)
watcher *fsnotify.Watcher
lastAuthHashes map[string]string
lastRemoveTimes map[string]time.Time
lastConfigHash string
authQueue chan<- AuthUpdate
currentAuths map[string]*coreauth.Auth
runtimeAuths map[string]*coreauth.Auth
dispatchMu sync.Mutex
dispatchCond *sync.Cond
pendingUpdates map[string]AuthUpdate
pendingOrder []string
dispatchCancel context.CancelFunc
storePersister storePersister
mirroredAuthDir string
oldConfigYaml []byte
}
// AuthUpdateAction represents the type of change detected in auth sources.
type AuthUpdateAction string
const (
AuthUpdateActionAdd AuthUpdateAction = "add"
AuthUpdateActionModify AuthUpdateAction = "modify"
AuthUpdateActionDelete AuthUpdateAction = "delete"
)
// AuthUpdate describes an incremental change to auth configuration.
type AuthUpdate struct {
Action AuthUpdateAction
ID string
Auth *coreauth.Auth
}
const (
// replaceCheckDelay is a short delay to allow atomic replace (rename) to settle
// before deciding whether a Remove event indicates a real deletion.
replaceCheckDelay = 50 * time.Millisecond
configReloadDebounce = 150 * time.Millisecond
authRemoveDebounceWindow = 1 * time.Second
)
// NewWatcher creates a new file watcher instance
func NewWatcher(configPath, authDir string, reloadCallback func(*config.Config)) (*Watcher, error) {
watcher, errNewWatcher := fsnotify.NewWatcher()
if errNewWatcher != nil {
return nil, errNewWatcher
}
w := &Watcher{
configPath: configPath,
authDir: authDir,
reloadCallback: reloadCallback,
watcher: watcher,
lastAuthHashes: make(map[string]string),
}
w.dispatchCond = sync.NewCond(&w.dispatchMu)
if store := sdkAuth.GetTokenStore(); store != nil {
if persister, ok := store.(storePersister); ok {
w.storePersister = persister
log.Debug("persistence-capable token store detected; watcher will propagate persisted changes")
}
if provider, ok := store.(authDirProvider); ok {
if fixed := strings.TrimSpace(provider.AuthDir()); fixed != "" {
w.mirroredAuthDir = fixed
log.Debugf("mirrored auth directory locked to %s", fixed)
}
}
}
return w, nil
}
// Start begins watching the configuration file and authentication directory
func (w *Watcher) Start(ctx context.Context) error {
return w.start(ctx)
}
// Stop stops the file watcher
func (w *Watcher) Stop() error {
w.stopDispatch()
w.stopConfigReloadTimer()
return w.watcher.Close()
}
// SetConfig updates the current configuration
func (w *Watcher) SetConfig(cfg *config.Config) {
w.clientsMutex.Lock()
defer w.clientsMutex.Unlock()
w.config = cfg
w.oldConfigYaml, _ = yaml.Marshal(cfg)
}
// SetAuthUpdateQueue sets the queue used to emit auth updates.
func (w *Watcher) SetAuthUpdateQueue(queue chan<- AuthUpdate) {
w.setAuthUpdateQueue(queue)
}
// DispatchRuntimeAuthUpdate allows external runtime providers (e.g., websocket-driven auths)
// to push auth updates through the same queue used by file/config watchers.
// Returns true if the update was enqueued; false if no queue is configured.
func (w *Watcher) DispatchRuntimeAuthUpdate(update AuthUpdate) bool {
return w.dispatchRuntimeAuthUpdate(update)
}
// SnapshotCoreAuths converts current clients snapshot into core auth entries.
func (w *Watcher) SnapshotCoreAuths() []*coreauth.Auth {
w.clientsMutex.RLock()
cfg := w.config
w.clientsMutex.RUnlock()
return snapshotCoreAuths(cfg, w.authDir)
}
|