package bot import ( "TelegramCloud/tgf/config" "TelegramCloud/tgf/internal/db" "context" "fmt" "sync" "time" "github.com/celestix/gotgproto" "github.com/celestix/gotgproto/sessionMaker" "github.com/glebarez/sqlite" "go.uber.org/zap" ) // BotInstance represents an active bot instance type BotInstance struct { Client *gotgproto.Client UserID int64 BotID int64 Username string LastUsed time.Time ErrorCount int } // InstanceManager manages multiple bot instances type InstanceManager struct { instances map[int64]*BotInstance // userID -> BotInstance mutex sync.RWMutex log *zap.Logger userBotMgr *db.UserBotManager cleanupTicker *time.Ticker maxInstances int idleTimeout time.Duration } // NewInstanceManager creates a new bot instance manager func NewInstanceManager(userBotMgr *db.UserBotManager, log *zap.Logger) *InstanceManager { im := &InstanceManager{ instances: make(map[int64]*BotInstance), log: log.Named("InstanceManager"), userBotMgr: userBotMgr, maxInstances: 100, // Maximum concurrent bot instances idleTimeout: 30 * time.Minute, // Close idle bots after 30 minutes } // Start cleanup routine im.startCleanupRoutine() return im } // GetBotForUser returns a bot instance for the user (creates if needed) func (im *InstanceManager) GetBotForUser(ctx context.Context, userID int64) (*gotgproto.Client, error) { im.mutex.Lock() defer im.mutex.Unlock() // Check if we already have an active instance if instance, exists := im.instances[userID]; exists { instance.LastUsed = time.Now() im.log.Debug("Reusing existing bot instance", zap.Int64("userID", userID)) return instance.Client, nil } // Check if we're at max capacity if len(im.instances) >= im.maxInstances { // Remove oldest idle instance im.removeOldestInstance() } // Get user's bot token botToken, err := im.userBotMgr.GetDecryptedBotToken(ctx, userID) if err != nil { return nil, fmt.Errorf("failed to get user bot token: %w", err) } // Create new bot instance instance, err := im.createBotInstance(userID, botToken) if err != nil { // Record error in database im.userBotMgr.RecordBotError(ctx, userID, err.Error()) return nil, fmt.Errorf("failed to create bot instance: %w", err) } // Store instance im.instances[userID] = instance // Update usage in database im.userBotMgr.UpdateBotUsage(ctx, userID) im.log.Info("Created new bot instance", zap.Int64("userID", userID), zap.String("botUsername", instance.Username), zap.Int("totalInstances", len(im.instances))) return instance.Client, nil } // createBotInstance creates a new bot client instance func (im *InstanceManager) createBotInstance(userID int64, botToken string) (*BotInstance, error) { // Create unique session path for this user's bot sessionPath := fmt.Sprintf("/tmp/userbot_%d.session", userID) // Create bot client with timeout ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() resultChan := make(chan struct { client *gotgproto.Client err error }) go func() { client, err := gotgproto.NewClient( int(config.ValueOf.ApiID), config.ValueOf.ApiHash, gotgproto.ClientTypeBot(botToken), &gotgproto.ClientOpts{ Session: sessionMaker.SqlSession( sqlite.Open(sessionPath), ), DisableCopyright: true, }, ) resultChan <- struct { client *gotgproto.Client err error }{client, err} }() select { case <-ctx.Done(): return nil, fmt.Errorf("bot creation timeout") case result := <-resultChan: if result.err != nil { return nil, result.err } instance := &BotInstance{ Client: result.client, UserID: userID, BotID: result.client.Self.ID, Username: result.client.Self.Username, LastUsed: time.Now(), ErrorCount: 0, } return instance, nil } } // RemoveBotInstance removes a bot instance for a user func (im *InstanceManager) RemoveBotInstance(userID int64) { im.mutex.Lock() defer im.mutex.Unlock() if instance, exists := im.instances[userID]; exists { // Stop the bot client instance.Client.Stop() delete(im.instances, userID) im.log.Info("Removed bot instance", zap.Int64("userID", userID), zap.String("botUsername", instance.Username)) } } // RecordError records an error for a user's bot instance func (im *InstanceManager) RecordError(ctx context.Context, userID int64, err error) { im.mutex.Lock() defer im.mutex.Unlock() if instance, exists := im.instances[userID]; exists { instance.ErrorCount++ // Remove instance if too many errors if instance.ErrorCount >= 3 { im.log.Warn("Removing bot instance due to errors", zap.Int64("userID", userID), zap.Int("errorCount", instance.ErrorCount), zap.Error(err)) instance.Client.Stop() delete(im.instances, userID) } } // Record error in database im.userBotMgr.RecordBotError(ctx, userID, err.Error()) } // removeOldestInstance removes the oldest idle instance func (im *InstanceManager) removeOldestInstance() { var oldestUserID int64 var oldestTime time.Time = time.Now() for userID, instance := range im.instances { if instance.LastUsed.Before(oldestTime) { oldestTime = instance.LastUsed oldestUserID = userID } } if oldestUserID != 0 { if instance, exists := im.instances[oldestUserID]; exists { instance.Client.Stop() delete(im.instances, oldestUserID) im.log.Info("Removed oldest bot instance to make room", zap.Int64("userID", oldestUserID)) } } } // startCleanupRoutine starts the background cleanup routine func (im *InstanceManager) startCleanupRoutine() { im.cleanupTicker = time.NewTicker(10 * time.Minute) // Run every 10 minutes go func() { for range im.cleanupTicker.C { im.cleanupIdleInstances() } }() } // cleanupIdleInstances removes idle bot instances func (im *InstanceManager) cleanupIdleInstances() { im.mutex.Lock() defer im.mutex.Unlock() now := time.Now() var toRemove []int64 for userID, instance := range im.instances { if now.Sub(instance.LastUsed) > im.idleTimeout { toRemove = append(toRemove, userID) } } for _, userID := range toRemove { if instance, exists := im.instances[userID]; exists { instance.Client.Stop() delete(im.instances, userID) im.log.Info("Cleaned up idle bot instance", zap.Int64("userID", userID), zap.Duration("idleTime", now.Sub(instance.LastUsed))) } } if len(toRemove) > 0 { im.log.Info("Cleanup completed", zap.Int("removedInstances", len(toRemove)), zap.Int("activeInstances", len(im.instances))) } } // GetStats returns statistics about active instances func (im *InstanceManager) GetStats() map[string]interface{} { im.mutex.RLock() defer im.mutex.RUnlock() return map[string]interface{}{ "active_instances": len(im.instances), "max_instances": im.maxInstances, "idle_timeout": im.idleTimeout.String(), } } // Shutdown gracefully shuts down all bot instances func (im *InstanceManager) Shutdown() { im.mutex.Lock() defer im.mutex.Unlock() if im.cleanupTicker != nil { im.cleanupTicker.Stop() } for userID, instance := range im.instances { instance.Client.Stop() im.log.Info("Shutdown bot instance", zap.Int64("userID", userID)) } im.instances = make(map[int64]*BotInstance) im.log.Info("Instance manager shutdown complete") }