tgf / internal /bot /workers.go
Mohammad Shahid
added user managment
aa300d8
package bot
import (
"TelegramCloud/tgf/config"
"context"
"fmt"
"os"
"path/filepath"
"sync"
"sync/atomic"
"time"
"github.com/celestix/gotgproto"
"github.com/celestix/gotgproto/sessionMaker"
"github.com/glebarez/sqlite"
"github.com/gotd/td/tg"
"go.uber.org/zap"
)
type Worker struct {
ID int
Client *gotgproto.Client
Self *tg.User
log *zap.Logger
}
func (w *Worker) String() string {
return fmt.Sprintf("{Worker (%d|@%s)}", w.ID, w.Self.Username)
}
type BotWorkers struct {
Bots []*Worker
starting int
index int
mut sync.Mutex
log *zap.Logger
}
var Workers *BotWorkers = &BotWorkers{
log: nil,
Bots: make([]*Worker, 0),
}
func (w *BotWorkers) Init(log *zap.Logger) {
w.log = log.Named("Workers")
}
func (w *BotWorkers) AddDefaultClient(client *gotgproto.Client, self *tg.User) {
if w.Bots == nil {
w.Bots = make([]*Worker, 0)
}
w.incStarting()
w.Bots = append(w.Bots, &Worker{
Client: client,
ID: w.starting,
Self: self,
log: w.log,
})
w.log.Sugar().Info("Default bot loaded")
}
func (w *BotWorkers) incStarting() {
w.mut.Lock()
defer w.mut.Unlock()
w.starting++
}
func (w *BotWorkers) Add(token string) (err error) {
w.incStarting()
var botID int = w.starting
client, err := startWorker(w.log, token, botID)
if err != nil {
return err
}
w.log.Sugar().Infof("Bot @%s loaded with ID %d", client.Self.Username, botID)
w.Bots = append(w.Bots, &Worker{
Client: client,
ID: botID,
Self: client.Self,
log: w.log,
})
return nil
}
func GetNextWorker() *Worker {
Workers.mut.Lock()
defer Workers.mut.Unlock()
index := (Workers.index + 1) % len(Workers.Bots)
Workers.index = index
worker := Workers.Bots[index]
Workers.log.Sugar().Debugf("Using worker %d", worker.ID)
return worker
}
func StartWorkers(log *zap.Logger) (*BotWorkers, error) {
Workers.Init(log)
if len(config.ValueOf.MultiTokens) == 0 {
Workers.log.Sugar().Info("No worker bot tokens provided, skipping worker initialization")
return Workers, nil
}
Workers.log.Sugar().Info("Starting")
if config.ValueOf.UseSessionFile {
Workers.log.Sugar().Info("Using session file for workers")
newpath := filepath.Join(".", "sessions")
if err := os.MkdirAll(newpath, os.ModePerm); err != nil {
Workers.log.Error("Failed to create sessions directory", zap.Error(err))
return nil, err
}
}
var wg sync.WaitGroup
var successfulStarts int32
totalBots := len(config.ValueOf.MultiTokens)
for i := 0; i < totalBots; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
done := make(chan error, 1)
go func() {
err := Workers.Add(config.ValueOf.MultiTokens[i])
done <- err
}()
select {
case err := <-done:
if err != nil {
Workers.log.Error("Failed to start worker", zap.Int("index", i), zap.Error(err))
} else {
atomic.AddInt32(&successfulStarts, 1)
}
case <-ctx.Done():
Workers.log.Error("Timed out starting worker", zap.Int("index", i))
}
}(i)
}
wg.Wait() // Wait for all goroutines to finish
Workers.log.Sugar().Infof("Successfully started %d/%d bots", successfulStarts, totalBots)
return Workers, nil
}
func startWorker(l *zap.Logger, botToken string, index int) (*gotgproto.Client, error) {
log := l.Named("Worker").Sugar()
log.Infof("Starting worker with index - %d", index)
var sessionType sessionMaker.SessionConstructor
if config.ValueOf.UseSessionFile {
sessionType = sessionMaker.SqlSession(sqlite.Open(fmt.Sprintf("/tmp/worker-%d.session", index)))
} else {
sessionType = sessionMaker.SimpleSession()
}
client, err := gotgproto.NewClient(
int(config.ValueOf.ApiID),
config.ValueOf.ApiHash,
gotgproto.ClientTypeBot(botToken),
&gotgproto.ClientOpts{
Session: sessionType,
DisableCopyright: true,
Middlewares: GetFloodMiddleware(log.Desugar()),
},
)
if err != nil {
return nil, err
}
return client, nil
}