|
|
|
|
|
|
|
|
package watcher |
|
|
|
|
|
import ( |
|
|
"context" |
|
|
"strings" |
|
|
"sync" |
|
|
"time" |
|
|
|
|
|
"github.com/fsnotify/fsnotify" |
|
|
"github.com/router-for-me/CLIProxyAPI/v6/internal/config" |
|
|
"gopkg.in/yaml.v3" |
|
|
|
|
|
sdkAuth "github.com/router-for-me/CLIProxyAPI/v6/sdk/auth" |
|
|
coreauth "github.com/router-for-me/CLIProxyAPI/v6/sdk/cliproxy/auth" |
|
|
log "github.com/sirupsen/logrus" |
|
|
) |
|
|
|
|
|
|
|
|
type storePersister interface { |
|
|
PersistConfig(ctx context.Context) error |
|
|
PersistAuthFiles(ctx context.Context, message string, paths ...string) error |
|
|
} |
|
|
|
|
|
type authDirProvider interface { |
|
|
AuthDir() string |
|
|
} |
|
|
|
|
|
|
|
|
type Watcher struct { |
|
|
configPath string |
|
|
authDir string |
|
|
config *config.Config |
|
|
clientsMutex sync.RWMutex |
|
|
configReloadMu sync.Mutex |
|
|
configReloadTimer *time.Timer |
|
|
reloadCallback func(*config.Config) |
|
|
watcher *fsnotify.Watcher |
|
|
lastAuthHashes map[string]string |
|
|
lastRemoveTimes map[string]time.Time |
|
|
lastConfigHash string |
|
|
authQueue chan<- AuthUpdate |
|
|
currentAuths map[string]*coreauth.Auth |
|
|
runtimeAuths map[string]*coreauth.Auth |
|
|
dispatchMu sync.Mutex |
|
|
dispatchCond *sync.Cond |
|
|
pendingUpdates map[string]AuthUpdate |
|
|
pendingOrder []string |
|
|
dispatchCancel context.CancelFunc |
|
|
storePersister storePersister |
|
|
mirroredAuthDir string |
|
|
oldConfigYaml []byte |
|
|
} |
|
|
|
|
|
|
|
|
type AuthUpdateAction string |
|
|
|
|
|
const ( |
|
|
AuthUpdateActionAdd AuthUpdateAction = "add" |
|
|
AuthUpdateActionModify AuthUpdateAction = "modify" |
|
|
AuthUpdateActionDelete AuthUpdateAction = "delete" |
|
|
) |
|
|
|
|
|
|
|
|
type AuthUpdate struct { |
|
|
Action AuthUpdateAction |
|
|
ID string |
|
|
Auth *coreauth.Auth |
|
|
} |
|
|
|
|
|
const ( |
|
|
|
|
|
|
|
|
replaceCheckDelay = 50 * time.Millisecond |
|
|
configReloadDebounce = 150 * time.Millisecond |
|
|
authRemoveDebounceWindow = 1 * time.Second |
|
|
) |
|
|
|
|
|
|
|
|
func NewWatcher(configPath, authDir string, reloadCallback func(*config.Config)) (*Watcher, error) { |
|
|
watcher, errNewWatcher := fsnotify.NewWatcher() |
|
|
if errNewWatcher != nil { |
|
|
return nil, errNewWatcher |
|
|
} |
|
|
w := &Watcher{ |
|
|
configPath: configPath, |
|
|
authDir: authDir, |
|
|
reloadCallback: reloadCallback, |
|
|
watcher: watcher, |
|
|
lastAuthHashes: make(map[string]string), |
|
|
} |
|
|
w.dispatchCond = sync.NewCond(&w.dispatchMu) |
|
|
if store := sdkAuth.GetTokenStore(); store != nil { |
|
|
if persister, ok := store.(storePersister); ok { |
|
|
w.storePersister = persister |
|
|
log.Debug("persistence-capable token store detected; watcher will propagate persisted changes") |
|
|
} |
|
|
if provider, ok := store.(authDirProvider); ok { |
|
|
if fixed := strings.TrimSpace(provider.AuthDir()); fixed != "" { |
|
|
w.mirroredAuthDir = fixed |
|
|
log.Debugf("mirrored auth directory locked to %s", fixed) |
|
|
} |
|
|
} |
|
|
} |
|
|
return w, nil |
|
|
} |
|
|
|
|
|
|
|
|
func (w *Watcher) Start(ctx context.Context) error { |
|
|
return w.start(ctx) |
|
|
} |
|
|
|
|
|
|
|
|
func (w *Watcher) Stop() error { |
|
|
w.stopDispatch() |
|
|
w.stopConfigReloadTimer() |
|
|
return w.watcher.Close() |
|
|
} |
|
|
|
|
|
|
|
|
func (w *Watcher) SetConfig(cfg *config.Config) { |
|
|
w.clientsMutex.Lock() |
|
|
defer w.clientsMutex.Unlock() |
|
|
w.config = cfg |
|
|
w.oldConfigYaml, _ = yaml.Marshal(cfg) |
|
|
} |
|
|
|
|
|
|
|
|
func (w *Watcher) SetAuthUpdateQueue(queue chan<- AuthUpdate) { |
|
|
w.setAuthUpdateQueue(queue) |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
func (w *Watcher) DispatchRuntimeAuthUpdate(update AuthUpdate) bool { |
|
|
return w.dispatchRuntimeAuthUpdate(update) |
|
|
} |
|
|
|
|
|
|
|
|
func (w *Watcher) SnapshotCoreAuths() []*coreauth.Auth { |
|
|
w.clientsMutex.RLock() |
|
|
cfg := w.config |
|
|
w.clientsMutex.RUnlock() |
|
|
return snapshotCoreAuths(cfg, w.authDir) |
|
|
} |
|
|
|