Spaces:
Paused
Paused
| import { supabase } from '../../db/supabase'; | |
| import { createLogger } from '../../utils/logger'; | |
| import { BotContext } from '../types/botTypes'; | |
| import { messageManager } from '../utils/messageManager'; | |
| import { initializeBot, stopBot } from '../botManager'; | |
| const logger = createLogger('BalanceUpdateService'); | |
| export class BalanceUpdateService { | |
| private static instance: BalanceUpdateService; | |
| private bot: any; | |
| private subscription: any; | |
| private botsSubscription: any; | |
| private constructor() { | |
| // Delay the initial subscription setup to ensure bot is ready | |
| setTimeout(() => { | |
| //this.setupRealtimeSubscription(); | |
| //this.setupBotsSubscription(); | |
| }, 1000); | |
| } | |
| public static getInstance(): BalanceUpdateService { | |
| if (!BalanceUpdateService.instance) { | |
| BalanceUpdateService.instance = new BalanceUpdateService(); | |
| } | |
| return BalanceUpdateService.instance; | |
| } | |
| public setBot(bot: any) { | |
| this.bot = bot; | |
| logger.info('Bot instance set in BalanceUpdateService'); | |
| // Setup subscription after bot is set | |
| this.setupRealtimeSubscription(); | |
| } | |
| private async setupRealtimeSubscription() { | |
| try { | |
| logger.info('Starting realtime subscription setup...'); | |
| // Unsubscribe from any existing subscription | |
| if (this.subscription) { | |
| await this.subscription.unsubscribe(); | |
| logger.info('Unsubscribed from previous subscription'); | |
| } | |
| // Create a new channel | |
| const channel = supabase.channel('balance_changes', { | |
| config: { | |
| broadcast: { self: true } | |
| } | |
| }); | |
| // Subscribe to changes | |
| channel | |
| .on( | |
| 'postgres_changes', | |
| { | |
| event: 'UPDATE', | |
| schema: 'public', | |
| table: 'users_bot_telegram' | |
| }, | |
| async (payload: any) => { | |
| logger.info('Received database change:', JSON.stringify(payload, null, 2)); | |
| // Check if balance was actually changed | |
| if (payload.old.balance !== payload.new.balance) { | |
| logger.info('Balance change detected, processing update...'); | |
| try { | |
| await this.handleBalanceUpdate(payload); | |
| } catch (error) { | |
| logger.error('Error handling balance update:', error); | |
| } | |
| } else { | |
| logger.info('No balance change detected in update'); | |
| } | |
| } | |
| ) | |
| .subscribe((status: string) => { | |
| logger.info('Subscription status changed:', status); | |
| if (status === 'SUBSCRIBED') { | |
| logger.info('Successfully subscribed to balance changes'); | |
| this.subscription = channel; | |
| } else if (status === 'CLOSED') { | |
| logger.error('Subscription closed unexpectedly'); | |
| setTimeout(() => this.setupRealtimeSubscription(), 5000); | |
| } else if (status === 'CHANNEL_ERROR') { | |
| logger.error('Channel error occurred'); | |
| setTimeout(() => this.setupRealtimeSubscription(), 5000); | |
| } | |
| }); | |
| // Add connection status listener | |
| supabase | |
| .channel('system') | |
| .on('system', { event: '*' }, (payload) => { | |
| logger.info('System event:', payload); | |
| }) | |
| .subscribe(); | |
| logger.info('Balance update subscription setup completed'); | |
| } catch (error) { | |
| logger.error('Error setting up realtime subscription:', error); | |
| setTimeout(() => this.setupRealtimeSubscription(), 5000); | |
| } | |
| } | |
| private async setupBotsSubscription() { | |
| try { | |
| logger.info('Starting bots subscription setup...'); | |
| // Unsubscribe from any existing subscription | |
| if (this.botsSubscription) { | |
| await this.botsSubscription.unsubscribe(); | |
| logger.info('Unsubscribed from previous bots subscription'); | |
| } | |
| // Create a new channel for bots | |
| const channel = supabase.channel('bots_changes', { | |
| config: { | |
| broadcast: { self: true } | |
| } | |
| }); | |
| // Subscribe to changes in bots table | |
| channel | |
| .on( | |
| 'postgres_changes', | |
| { | |
| event: '*', // Listen to all events (INSERT, UPDATE, DELETE) | |
| schema: 'public', | |
| table: 'bots' | |
| }, | |
| async (payload: any) => { | |
| logger.info('Received bots table change:', JSON.stringify(payload, null, 2)); | |
| try { | |
| const botId = payload.new?.id || payload.old?.id; | |
| if (!botId) { | |
| logger.error('No bot ID found in payload'); | |
| return; | |
| } | |
| // Stop the bot first | |
| await stopBot(botId); | |
| logger.info(`Stopped bot ${botId}`); | |
| // If it's an UPDATE or INSERT, restart the bot with new data | |
| if (payload.eventType === 'UPDATE' || payload.eventType === 'INSERT') { | |
| const { data: botData, error } = await supabase | |
| .from('bots') | |
| .select('*') | |
| .eq('id', botId) | |
| .single(); | |
| if (error) { | |
| logger.error(`Error fetching bot data: ${error.message}`); | |
| return; | |
| } | |
| if (botData) { | |
| const result = await initializeBot(botData.bot_token, botData); | |
| if (result.success) { | |
| logger.info(`Bot ${botId} restarted successfully`); | |
| } else { | |
| logger.error(`Failed to restart bot ${botId}: ${result.message}`); | |
| } | |
| } | |
| } | |
| } catch (error) { | |
| logger.error('Error handling bot update:', error); | |
| } | |
| } | |
| ) | |
| .subscribe((status: string) => { | |
| logger.info('Bots subscription status changed:', status); | |
| if (status === 'SUBSCRIBED') { | |
| logger.info('Successfully subscribed to bots changes'); | |
| this.botsSubscription = channel; | |
| } else if (status === 'CLOSED') { | |
| logger.error('Bots subscription closed unexpectedly'); | |
| setTimeout(() => this.setupBotsSubscription(), 5000); | |
| } else if (status === 'CHANNEL_ERROR') { | |
| logger.error('Bots channel error occurred'); | |
| setTimeout(() => this.setupBotsSubscription(), 5000); | |
| } | |
| }); | |
| logger.info('Bots subscription setup completed'); | |
| } catch (error) { | |
| logger.error('Error setting up bots subscription:', error); | |
| setTimeout(() => this.setupBotsSubscription(), 5000); | |
| } | |
| } | |
| private async handleBalanceUpdate(payload: any) { | |
| logger.info('Processing balance update payload:', JSON.stringify(payload, null, 2)); | |
| if (!this.bot) { | |
| logger.error('Bot instance not set in BalanceUpdateService'); | |
| return; | |
| } | |
| const { old: oldRecord, new: newRecord } = payload; | |
| if (!oldRecord || !newRecord) { | |
| logger.error('Invalid payload format:', payload); | |
| return; | |
| } | |
| const telegramId = newRecord.telegram_id; | |
| const oldBalance = Number(oldRecord.balance) || 0; | |
| const newBalance = Number(newRecord.balance) || 0; | |
| logger.info(`Balance change detected for user ${telegramId}: ${oldBalance} -> ${newBalance}`); | |
| // Only send notification if balance actually changed | |
| if (oldBalance !== newBalance) { | |
| try { | |
| const balanceChange = newBalance - oldBalance; | |
| const changeType = balanceChange > 0 ? 'increase' : 'decrease'; | |
| const message = messageManager.getMessage('balance_update_notification') | |
| .replace('{old_balance}', oldBalance.toFixed(2)) | |
| .replace('{new_balance}', newBalance.toFixed(2)) | |
| .replace('{change_type}', changeType) | |
| .replace('{change_amount}', Math.abs(balanceChange).toFixed(2)); | |
| logger.info(`Sending balance update message to user ${telegramId}:`, message); | |
| await this.bot.telegram.sendMessage(telegramId, message, { parse_mode: 'HTML' }); | |
| logger.info(`Balance update notification sent to user ${telegramId}`); | |
| } catch (error) { | |
| logger.error(`Failed to send balance update notification to user ${telegramId}:`, error); | |
| } | |
| } else { | |
| logger.info(`No balance change detected for user ${telegramId}`); | |
| } | |
| } | |
| // Method to manually trigger a balance update notification | |
| public async sendBalanceUpdateNotification(telegramId: number, oldBalance: number, newBalance: number) { | |
| try { | |
| const balanceChange = newBalance - oldBalance; | |
| const changeType = balanceChange > 0 ? 'increase' : 'decrease'; | |
| const message = messageManager.getMessage('balance_update_notification') | |
| .replace('{old_balance}', oldBalance.toFixed(2)) | |
| .replace('{new_balance}', newBalance.toFixed(2)) | |
| .replace('{change_type}', changeType) | |
| .replace('{change_amount}', Math.abs(balanceChange).toFixed(2)); | |
| await this.bot.telegram.sendMessage(telegramId, message, { parse_mode: 'HTML' }); | |
| logger.info(`Manual balance update notification sent to user ${telegramId}`); | |
| } catch (error) { | |
| logger.error(`Failed to send manual balance update notification to user ${telegramId}:`, error); | |
| } | |
| } | |
| // Method to check subscription status | |
| public getSubscriptionStatus(): string { | |
| return this.subscription?.state || 'NOT_INITIALIZED'; | |
| } | |
| } | |
| // export const balanceUpdateService = BalanceUpdateService.getInstance(); |