|
|
import { supabase } from '../../db/supabase'; |
|
|
import { createLogger } from '../../utils/logger'; |
|
|
import { BotContext } from '../types/botTypes'; |
|
|
import { messageManager } from '../utils/messageManager'; |
|
|
import { initializeBot, stopBot } from '../botManager'; |
|
|
|
|
|
const logger = createLogger('BalanceUpdateService'); |
|
|
|
|
|
export class BalanceUpdateService { |
|
|
private static instance: BalanceUpdateService; |
|
|
private bot: any; |
|
|
private subscription: any; |
|
|
private botsSubscription: any; |
|
|
|
|
|
private constructor() { |
|
|
|
|
|
setTimeout(() => { |
|
|
|
|
|
|
|
|
}, 1000); |
|
|
} |
|
|
|
|
|
public static getInstance(): BalanceUpdateService { |
|
|
if (!BalanceUpdateService.instance) { |
|
|
BalanceUpdateService.instance = new BalanceUpdateService(); |
|
|
} |
|
|
return BalanceUpdateService.instance; |
|
|
} |
|
|
|
|
|
public setBot(bot: any) { |
|
|
this.bot = bot; |
|
|
logger.info('Bot instance set in BalanceUpdateService'); |
|
|
|
|
|
this.setupRealtimeSubscription(); |
|
|
} |
|
|
|
|
|
private async setupRealtimeSubscription() { |
|
|
try { |
|
|
logger.info('Starting realtime subscription setup...'); |
|
|
|
|
|
|
|
|
if (this.subscription) { |
|
|
await this.subscription.unsubscribe(); |
|
|
logger.info('Unsubscribed from previous subscription'); |
|
|
} |
|
|
|
|
|
|
|
|
const channel = supabase.channel('balance_changes', { |
|
|
config: { |
|
|
broadcast: { self: true } |
|
|
} |
|
|
}); |
|
|
|
|
|
|
|
|
channel |
|
|
.on( |
|
|
'postgres_changes', |
|
|
{ |
|
|
event: 'UPDATE', |
|
|
schema: 'public', |
|
|
table: 'users_bot_telegram' |
|
|
}, |
|
|
async (payload: any) => { |
|
|
logger.info('Received database change:', JSON.stringify(payload, null, 2)); |
|
|
|
|
|
|
|
|
if (payload.old.balance !== payload.new.balance) { |
|
|
logger.info('Balance change detected, processing update...'); |
|
|
try { |
|
|
await this.handleBalanceUpdate(payload); |
|
|
} catch (error) { |
|
|
logger.error('Error handling balance update:', error); |
|
|
} |
|
|
} else { |
|
|
logger.info('No balance change detected in update'); |
|
|
} |
|
|
} |
|
|
) |
|
|
.subscribe((status: string) => { |
|
|
logger.info('Subscription status changed:', status); |
|
|
if (status === 'SUBSCRIBED') { |
|
|
logger.info('Successfully subscribed to balance changes'); |
|
|
this.subscription = channel; |
|
|
} else if (status === 'CLOSED') { |
|
|
logger.error('Subscription closed unexpectedly'); |
|
|
setTimeout(() => this.setupRealtimeSubscription(), 5000); |
|
|
} else if (status === 'CHANNEL_ERROR') { |
|
|
logger.error('Channel error occurred'); |
|
|
setTimeout(() => this.setupRealtimeSubscription(), 5000); |
|
|
} |
|
|
}); |
|
|
|
|
|
|
|
|
supabase |
|
|
.channel('system') |
|
|
.on('system', { event: '*' }, (payload) => { |
|
|
logger.info('System event:', payload); |
|
|
}) |
|
|
.subscribe(); |
|
|
|
|
|
logger.info('Balance update subscription setup completed'); |
|
|
} catch (error) { |
|
|
logger.error('Error setting up realtime subscription:', error); |
|
|
setTimeout(() => this.setupRealtimeSubscription(), 5000); |
|
|
} |
|
|
} |
|
|
|
|
|
private async setupBotsSubscription() { |
|
|
try { |
|
|
logger.info('Starting bots subscription setup...'); |
|
|
|
|
|
|
|
|
if (this.botsSubscription) { |
|
|
await this.botsSubscription.unsubscribe(); |
|
|
logger.info('Unsubscribed from previous bots subscription'); |
|
|
} |
|
|
|
|
|
|
|
|
const channel = supabase.channel('bots_changes', { |
|
|
config: { |
|
|
broadcast: { self: true } |
|
|
} |
|
|
}); |
|
|
|
|
|
|
|
|
channel |
|
|
.on( |
|
|
'postgres_changes', |
|
|
{ |
|
|
event: '*', |
|
|
schema: 'public', |
|
|
table: 'bots' |
|
|
}, |
|
|
async (payload: any) => { |
|
|
logger.info('Received bots table change:', JSON.stringify(payload, null, 2)); |
|
|
|
|
|
try { |
|
|
const botId = payload.new?.id || payload.old?.id; |
|
|
if (!botId) { |
|
|
logger.error('No bot ID found in payload'); |
|
|
return; |
|
|
} |
|
|
|
|
|
|
|
|
await stopBot(botId); |
|
|
logger.info(`Stopped bot ${botId}`); |
|
|
|
|
|
|
|
|
if (payload.eventType === 'UPDATE' || payload.eventType === 'INSERT') { |
|
|
const { data: botData, error } = await supabase |
|
|
.from('bots') |
|
|
.select('*') |
|
|
.eq('id', botId) |
|
|
.single(); |
|
|
|
|
|
if (error) { |
|
|
logger.error(`Error fetching bot data: ${error.message}`); |
|
|
return; |
|
|
} |
|
|
|
|
|
if (botData) { |
|
|
const result = await initializeBot(botData.bot_token, botData); |
|
|
if (result.success) { |
|
|
logger.info(`Bot ${botId} restarted successfully`); |
|
|
} else { |
|
|
logger.error(`Failed to restart bot ${botId}: ${result.message}`); |
|
|
} |
|
|
} |
|
|
} |
|
|
} catch (error) { |
|
|
logger.error('Error handling bot update:', error); |
|
|
} |
|
|
} |
|
|
) |
|
|
.subscribe((status: string) => { |
|
|
logger.info('Bots subscription status changed:', status); |
|
|
if (status === 'SUBSCRIBED') { |
|
|
logger.info('Successfully subscribed to bots changes'); |
|
|
this.botsSubscription = channel; |
|
|
} else if (status === 'CLOSED') { |
|
|
logger.error('Bots subscription closed unexpectedly'); |
|
|
setTimeout(() => this.setupBotsSubscription(), 5000); |
|
|
} else if (status === 'CHANNEL_ERROR') { |
|
|
logger.error('Bots channel error occurred'); |
|
|
setTimeout(() => this.setupBotsSubscription(), 5000); |
|
|
} |
|
|
}); |
|
|
|
|
|
logger.info('Bots subscription setup completed'); |
|
|
} catch (error) { |
|
|
logger.error('Error setting up bots subscription:', error); |
|
|
setTimeout(() => this.setupBotsSubscription(), 5000); |
|
|
} |
|
|
} |
|
|
|
|
|
private async handleBalanceUpdate(payload: any) { |
|
|
logger.info('Processing balance update payload:', JSON.stringify(payload, null, 2)); |
|
|
|
|
|
if (!this.bot) { |
|
|
logger.error('Bot instance not set in BalanceUpdateService'); |
|
|
return; |
|
|
} |
|
|
|
|
|
const { old: oldRecord, new: newRecord } = payload; |
|
|
|
|
|
if (!oldRecord || !newRecord) { |
|
|
logger.error('Invalid payload format:', payload); |
|
|
return; |
|
|
} |
|
|
|
|
|
const telegramId = newRecord.telegram_id; |
|
|
const oldBalance = Number(oldRecord.balance) || 0; |
|
|
const newBalance = Number(newRecord.balance) || 0; |
|
|
|
|
|
logger.info(`Balance change detected for user ${telegramId}: ${oldBalance} -> ${newBalance}`); |
|
|
|
|
|
|
|
|
if (oldBalance !== newBalance) { |
|
|
try { |
|
|
const balanceChange = newBalance - oldBalance; |
|
|
const changeType = balanceChange > 0 ? 'increase' : 'decrease'; |
|
|
|
|
|
const message = messageManager.getMessage('balance_update_notification') |
|
|
.replace('{old_balance}', oldBalance.toFixed(2)) |
|
|
.replace('{new_balance}', newBalance.toFixed(2)) |
|
|
.replace('{change_type}', changeType) |
|
|
.replace('{change_amount}', Math.abs(balanceChange).toFixed(2)); |
|
|
|
|
|
logger.info(`Sending balance update message to user ${telegramId}:`, message); |
|
|
|
|
|
await this.bot.telegram.sendMessage(telegramId, message, { parse_mode: 'HTML' }); |
|
|
logger.info(`Balance update notification sent to user ${telegramId}`); |
|
|
} catch (error) { |
|
|
logger.error(`Failed to send balance update notification to user ${telegramId}:`, error); |
|
|
} |
|
|
} else { |
|
|
logger.info(`No balance change detected for user ${telegramId}`); |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
public async sendBalanceUpdateNotification(telegramId: number, oldBalance: number, newBalance: number) { |
|
|
try { |
|
|
const balanceChange = newBalance - oldBalance; |
|
|
const changeType = balanceChange > 0 ? 'increase' : 'decrease'; |
|
|
|
|
|
const message = messageManager.getMessage('balance_update_notification') |
|
|
.replace('{old_balance}', oldBalance.toFixed(2)) |
|
|
.replace('{new_balance}', newBalance.toFixed(2)) |
|
|
.replace('{change_type}', changeType) |
|
|
.replace('{change_amount}', Math.abs(balanceChange).toFixed(2)); |
|
|
|
|
|
await this.bot.telegram.sendMessage(telegramId, message, { parse_mode: 'HTML' }); |
|
|
logger.info(`Manual balance update notification sent to user ${telegramId}`); |
|
|
} catch (error) { |
|
|
logger.error(`Failed to send manual balance update notification to user ${telegramId}:`, error); |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
public getSubscriptionStatus(): string { |
|
|
return this.subscription?.state || 'NOT_INITIALIZED'; |
|
|
} |
|
|
} |
|
|
|
|
|
|