import { supabase } from '../../db/supabase'; import { createLogger } from '../../utils/logger'; import { BotContext } from '../types/botTypes'; import { messageManager } from '../utils/messageManager'; import { initializeBot, stopBot } from '../botManager'; const logger = createLogger('BalanceUpdateService'); export class BalanceUpdateService { private static instance: BalanceUpdateService; private bot: any; private subscription: any; private botsSubscription: any; private constructor() { // Delay the initial subscription setup to ensure bot is ready setTimeout(() => { //this.setupRealtimeSubscription(); //this.setupBotsSubscription(); }, 1000); } public static getInstance(): BalanceUpdateService { if (!BalanceUpdateService.instance) { BalanceUpdateService.instance = new BalanceUpdateService(); } return BalanceUpdateService.instance; } public setBot(bot: any) { this.bot = bot; logger.info('Bot instance set in BalanceUpdateService'); // Setup subscription after bot is set this.setupRealtimeSubscription(); } private async setupRealtimeSubscription() { try { logger.info('Starting realtime subscription setup...'); // Unsubscribe from any existing subscription if (this.subscription) { await this.subscription.unsubscribe(); logger.info('Unsubscribed from previous subscription'); } // Create a new channel const channel = supabase.channel('balance_changes', { config: { broadcast: { self: true } } }); // Subscribe to changes channel .on( 'postgres_changes', { event: 'UPDATE', schema: 'public', table: 'users_bot_telegram' }, async (payload: any) => { logger.info('Received database change:', JSON.stringify(payload, null, 2)); // Check if balance was actually changed if (payload.old.balance !== payload.new.balance) { logger.info('Balance change detected, processing update...'); try { await this.handleBalanceUpdate(payload); } catch (error) { logger.error('Error handling balance update:', error); } } else { logger.info('No balance change detected in update'); } } ) .subscribe((status: string) => { logger.info('Subscription status changed:', status); if (status === 'SUBSCRIBED') { logger.info('Successfully subscribed to balance changes'); this.subscription = channel; } else if (status === 'CLOSED') { logger.error('Subscription closed unexpectedly'); setTimeout(() => this.setupRealtimeSubscription(), 5000); } else if (status === 'CHANNEL_ERROR') { logger.error('Channel error occurred'); setTimeout(() => this.setupRealtimeSubscription(), 5000); } }); // Add connection status listener supabase .channel('system') .on('system', { event: '*' }, (payload) => { logger.info('System event:', payload); }) .subscribe(); logger.info('Balance update subscription setup completed'); } catch (error) { logger.error('Error setting up realtime subscription:', error); setTimeout(() => this.setupRealtimeSubscription(), 5000); } } private async setupBotsSubscription() { try { logger.info('Starting bots subscription setup...'); // Unsubscribe from any existing subscription if (this.botsSubscription) { await this.botsSubscription.unsubscribe(); logger.info('Unsubscribed from previous bots subscription'); } // Create a new channel for bots const channel = supabase.channel('bots_changes', { config: { broadcast: { self: true } } }); // Subscribe to changes in bots table channel .on( 'postgres_changes', { event: '*', // Listen to all events (INSERT, UPDATE, DELETE) schema: 'public', table: 'bots' }, async (payload: any) => { logger.info('Received bots table change:', JSON.stringify(payload, null, 2)); try { const botId = payload.new?.id || payload.old?.id; if (!botId) { logger.error('No bot ID found in payload'); return; } // Stop the bot first await stopBot(botId); logger.info(`Stopped bot ${botId}`); // If it's an UPDATE or INSERT, restart the bot with new data if (payload.eventType === 'UPDATE' || payload.eventType === 'INSERT') { const { data: botData, error } = await supabase .from('bots') .select('*') .eq('id', botId) .single(); if (error) { logger.error(`Error fetching bot data: ${error.message}`); return; } if (botData) { const result = await initializeBot(botData.bot_token, botData); if (result.success) { logger.info(`Bot ${botId} restarted successfully`); } else { logger.error(`Failed to restart bot ${botId}: ${result.message}`); } } } } catch (error) { logger.error('Error handling bot update:', error); } } ) .subscribe((status: string) => { logger.info('Bots subscription status changed:', status); if (status === 'SUBSCRIBED') { logger.info('Successfully subscribed to bots changes'); this.botsSubscription = channel; } else if (status === 'CLOSED') { logger.error('Bots subscription closed unexpectedly'); setTimeout(() => this.setupBotsSubscription(), 5000); } else if (status === 'CHANNEL_ERROR') { logger.error('Bots channel error occurred'); setTimeout(() => this.setupBotsSubscription(), 5000); } }); logger.info('Bots subscription setup completed'); } catch (error) { logger.error('Error setting up bots subscription:', error); setTimeout(() => this.setupBotsSubscription(), 5000); } } private async handleBalanceUpdate(payload: any) { logger.info('Processing balance update payload:', JSON.stringify(payload, null, 2)); if (!this.bot) { logger.error('Bot instance not set in BalanceUpdateService'); return; } const { old: oldRecord, new: newRecord } = payload; if (!oldRecord || !newRecord) { logger.error('Invalid payload format:', payload); return; } const telegramId = newRecord.telegram_id; const oldBalance = Number(oldRecord.balance) || 0; const newBalance = Number(newRecord.balance) || 0; logger.info(`Balance change detected for user ${telegramId}: ${oldBalance} -> ${newBalance}`); // Only send notification if balance actually changed if (oldBalance !== newBalance) { try { const balanceChange = newBalance - oldBalance; const changeType = balanceChange > 0 ? 'increase' : 'decrease'; const message = messageManager.getMessage('balance_update_notification') .replace('{old_balance}', oldBalance.toFixed(2)) .replace('{new_balance}', newBalance.toFixed(2)) .replace('{change_type}', changeType) .replace('{change_amount}', Math.abs(balanceChange).toFixed(2)); logger.info(`Sending balance update message to user ${telegramId}:`, message); await this.bot.telegram.sendMessage(telegramId, message, { parse_mode: 'HTML' }); logger.info(`Balance update notification sent to user ${telegramId}`); } catch (error) { logger.error(`Failed to send balance update notification to user ${telegramId}:`, error); } } else { logger.info(`No balance change detected for user ${telegramId}`); } } // Method to manually trigger a balance update notification public async sendBalanceUpdateNotification(telegramId: number, oldBalance: number, newBalance: number) { try { const balanceChange = newBalance - oldBalance; const changeType = balanceChange > 0 ? 'increase' : 'decrease'; const message = messageManager.getMessage('balance_update_notification') .replace('{old_balance}', oldBalance.toFixed(2)) .replace('{new_balance}', newBalance.toFixed(2)) .replace('{change_type}', changeType) .replace('{change_amount}', Math.abs(balanceChange).toFixed(2)); await this.bot.telegram.sendMessage(telegramId, message, { parse_mode: 'HTML' }); logger.info(`Manual balance update notification sent to user ${telegramId}`); } catch (error) { logger.error(`Failed to send manual balance update notification to user ${telegramId}:`, error); } } // Method to check subscription status public getSubscriptionStatus(): string { return this.subscription?.state || 'NOT_INITIALIZED'; } } // export const balanceUpdateService = BalanceUpdateService.getInstance();