tgf / internal /bot /instance_manager.go
Mohammad Shahid
.
f592c10
package bot
import (
"TelegramCloud/tgf/config"
"TelegramCloud/tgf/internal/db"
"context"
"fmt"
"sync"
"time"
"github.com/celestix/gotgproto"
"github.com/celestix/gotgproto/sessionMaker"
"github.com/glebarez/sqlite"
"go.uber.org/zap"
)
// BotInstance represents an active bot instance
type BotInstance struct {
Client *gotgproto.Client
UserID int64
BotID int64
Username string
LastUsed time.Time
ErrorCount int
}
// InstanceManager manages multiple bot instances
type InstanceManager struct {
instances map[int64]*BotInstance // userID -> BotInstance
mutex sync.RWMutex
log *zap.Logger
userBotMgr *db.UserBotManager
cleanupTicker *time.Ticker
maxInstances int
idleTimeout time.Duration
}
// NewInstanceManager creates a new bot instance manager
func NewInstanceManager(userBotMgr *db.UserBotManager, log *zap.Logger) *InstanceManager {
im := &InstanceManager{
instances: make(map[int64]*BotInstance),
log: log.Named("InstanceManager"),
userBotMgr: userBotMgr,
maxInstances: 100, // Maximum concurrent bot instances
idleTimeout: 30 * time.Minute, // Close idle bots after 30 minutes
}
// Start cleanup routine
im.startCleanupRoutine()
return im
}
// GetBotForUser returns a bot instance for the user (creates if needed)
func (im *InstanceManager) GetBotForUser(ctx context.Context, userID int64) (*gotgproto.Client, error) {
im.mutex.Lock()
defer im.mutex.Unlock()
// Check if we already have an active instance
if instance, exists := im.instances[userID]; exists {
instance.LastUsed = time.Now()
im.log.Debug("Reusing existing bot instance", zap.Int64("userID", userID))
return instance.Client, nil
}
// Check if we're at max capacity
if len(im.instances) >= im.maxInstances {
// Remove oldest idle instance
im.removeOldestInstance()
}
// Get user's bot token
botToken, err := im.userBotMgr.GetDecryptedBotToken(ctx, userID)
if err != nil {
return nil, fmt.Errorf("failed to get user bot token: %w", err)
}
// Create new bot instance
instance, err := im.createBotInstance(userID, botToken)
if err != nil {
// Record error in database
im.userBotMgr.RecordBotError(ctx, userID, err.Error())
return nil, fmt.Errorf("failed to create bot instance: %w", err)
}
// Store instance
im.instances[userID] = instance
// Update usage in database
im.userBotMgr.UpdateBotUsage(ctx, userID)
im.log.Info("Created new bot instance",
zap.Int64("userID", userID),
zap.String("botUsername", instance.Username),
zap.Int("totalInstances", len(im.instances)))
return instance.Client, nil
}
// createBotInstance creates a new bot client instance
func (im *InstanceManager) createBotInstance(userID int64, botToken string) (*BotInstance, error) {
// Create unique session path for this user's bot
sessionPath := fmt.Sprintf("/tmp/userbot_%d.session", userID)
// Create bot client with timeout
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
resultChan := make(chan struct {
client *gotgproto.Client
err error
})
go func() {
client, err := gotgproto.NewClient(
int(config.ValueOf.ApiID),
config.ValueOf.ApiHash,
gotgproto.ClientTypeBot(botToken),
&gotgproto.ClientOpts{
Session: sessionMaker.SqlSession(
sqlite.Open(sessionPath),
),
DisableCopyright: true,
},
)
resultChan <- struct {
client *gotgproto.Client
err error
}{client, err}
}()
select {
case <-ctx.Done():
return nil, fmt.Errorf("bot creation timeout")
case result := <-resultChan:
if result.err != nil {
return nil, result.err
}
instance := &BotInstance{
Client: result.client,
UserID: userID,
BotID: result.client.Self.ID,
Username: result.client.Self.Username,
LastUsed: time.Now(),
ErrorCount: 0,
}
return instance, nil
}
}
// RemoveBotInstance removes a bot instance for a user
func (im *InstanceManager) RemoveBotInstance(userID int64) {
im.mutex.Lock()
defer im.mutex.Unlock()
if instance, exists := im.instances[userID]; exists {
// Stop the bot client
instance.Client.Stop()
delete(im.instances, userID)
im.log.Info("Removed bot instance",
zap.Int64("userID", userID),
zap.String("botUsername", instance.Username))
}
}
// RecordError records an error for a user's bot instance
func (im *InstanceManager) RecordError(ctx context.Context, userID int64, err error) {
im.mutex.Lock()
defer im.mutex.Unlock()
if instance, exists := im.instances[userID]; exists {
instance.ErrorCount++
// Remove instance if too many errors
if instance.ErrorCount >= 3 {
im.log.Warn("Removing bot instance due to errors",
zap.Int64("userID", userID),
zap.Int("errorCount", instance.ErrorCount),
zap.Error(err))
instance.Client.Stop()
delete(im.instances, userID)
}
}
// Record error in database
im.userBotMgr.RecordBotError(ctx, userID, err.Error())
}
// removeOldestInstance removes the oldest idle instance
func (im *InstanceManager) removeOldestInstance() {
var oldestUserID int64
var oldestTime time.Time = time.Now()
for userID, instance := range im.instances {
if instance.LastUsed.Before(oldestTime) {
oldestTime = instance.LastUsed
oldestUserID = userID
}
}
if oldestUserID != 0 {
if instance, exists := im.instances[oldestUserID]; exists {
instance.Client.Stop()
delete(im.instances, oldestUserID)
im.log.Info("Removed oldest bot instance to make room",
zap.Int64("userID", oldestUserID))
}
}
}
// startCleanupRoutine starts the background cleanup routine
func (im *InstanceManager) startCleanupRoutine() {
im.cleanupTicker = time.NewTicker(10 * time.Minute) // Run every 10 minutes
go func() {
for range im.cleanupTicker.C {
im.cleanupIdleInstances()
}
}()
}
// cleanupIdleInstances removes idle bot instances
func (im *InstanceManager) cleanupIdleInstances() {
im.mutex.Lock()
defer im.mutex.Unlock()
now := time.Now()
var toRemove []int64
for userID, instance := range im.instances {
if now.Sub(instance.LastUsed) > im.idleTimeout {
toRemove = append(toRemove, userID)
}
}
for _, userID := range toRemove {
if instance, exists := im.instances[userID]; exists {
instance.Client.Stop()
delete(im.instances, userID)
im.log.Info("Cleaned up idle bot instance",
zap.Int64("userID", userID),
zap.Duration("idleTime", now.Sub(instance.LastUsed)))
}
}
if len(toRemove) > 0 {
im.log.Info("Cleanup completed",
zap.Int("removedInstances", len(toRemove)),
zap.Int("activeInstances", len(im.instances)))
}
}
// GetStats returns statistics about active instances
func (im *InstanceManager) GetStats() map[string]interface{} {
im.mutex.RLock()
defer im.mutex.RUnlock()
return map[string]interface{}{
"active_instances": len(im.instances),
"max_instances": im.maxInstances,
"idle_timeout": im.idleTimeout.String(),
}
}
// Shutdown gracefully shuts down all bot instances
func (im *InstanceManager) Shutdown() {
im.mutex.Lock()
defer im.mutex.Unlock()
if im.cleanupTicker != nil {
im.cleanupTicker.Stop()
}
for userID, instance := range im.instances {
instance.Client.Stop()
im.log.Info("Shutdown bot instance", zap.Int64("userID", userID))
}
im.instances = make(map[int64]*BotInstance)
im.log.Info("Instance manager shutdown complete")
}