bot-me / src /bots /services /BalanceUpdateService.ts
Mohammed Foud
all
0d830e5
import { supabase } from '../../db/supabase';
import { createLogger } from '../../utils/logger';
import { BotContext } from '../types/botTypes';
import { messageManager } from '../utils/messageManager';
import { initializeBot, stopBot } from '../botManager';
const logger = createLogger('BalanceUpdateService');
export class BalanceUpdateService {
private static instance: BalanceUpdateService;
private bot: any;
private subscription: any;
private botsSubscription: any;
private constructor() {
// Delay the initial subscription setup to ensure bot is ready
setTimeout(() => {
//this.setupRealtimeSubscription();
//this.setupBotsSubscription();
}, 1000);
}
public static getInstance(): BalanceUpdateService {
if (!BalanceUpdateService.instance) {
BalanceUpdateService.instance = new BalanceUpdateService();
}
return BalanceUpdateService.instance;
}
public setBot(bot: any) {
this.bot = bot;
logger.info('Bot instance set in BalanceUpdateService');
// Setup subscription after bot is set
this.setupRealtimeSubscription();
}
private async setupRealtimeSubscription() {
try {
logger.info('Starting realtime subscription setup...');
// Unsubscribe from any existing subscription
if (this.subscription) {
await this.subscription.unsubscribe();
logger.info('Unsubscribed from previous subscription');
}
// Create a new channel
const channel = supabase.channel('balance_changes', {
config: {
broadcast: { self: true }
}
});
// Subscribe to changes
channel
.on(
'postgres_changes',
{
event: 'UPDATE',
schema: 'public',
table: 'users_bot_telegram'
},
async (payload: any) => {
logger.info('Received database change:', JSON.stringify(payload, null, 2));
// Check if balance was actually changed
if (payload.old.balance !== payload.new.balance) {
logger.info('Balance change detected, processing update...');
try {
await this.handleBalanceUpdate(payload);
} catch (error) {
logger.error('Error handling balance update:', error);
}
} else {
logger.info('No balance change detected in update');
}
}
)
.subscribe((status: string) => {
logger.info('Subscription status changed:', status);
if (status === 'SUBSCRIBED') {
logger.info('Successfully subscribed to balance changes');
this.subscription = channel;
} else if (status === 'CLOSED') {
logger.error('Subscription closed unexpectedly');
setTimeout(() => this.setupRealtimeSubscription(), 5000);
} else if (status === 'CHANNEL_ERROR') {
logger.error('Channel error occurred');
setTimeout(() => this.setupRealtimeSubscription(), 5000);
}
});
// Add connection status listener
supabase
.channel('system')
.on('system', { event: '*' }, (payload) => {
logger.info('System event:', payload);
})
.subscribe();
logger.info('Balance update subscription setup completed');
} catch (error) {
logger.error('Error setting up realtime subscription:', error);
setTimeout(() => this.setupRealtimeSubscription(), 5000);
}
}
private async setupBotsSubscription() {
try {
logger.info('Starting bots subscription setup...');
// Unsubscribe from any existing subscription
if (this.botsSubscription) {
await this.botsSubscription.unsubscribe();
logger.info('Unsubscribed from previous bots subscription');
}
// Create a new channel for bots
const channel = supabase.channel('bots_changes', {
config: {
broadcast: { self: true }
}
});
// Subscribe to changes in bots table
channel
.on(
'postgres_changes',
{
event: '*', // Listen to all events (INSERT, UPDATE, DELETE)
schema: 'public',
table: 'bots'
},
async (payload: any) => {
logger.info('Received bots table change:', JSON.stringify(payload, null, 2));
try {
const botId = payload.new?.id || payload.old?.id;
if (!botId) {
logger.error('No bot ID found in payload');
return;
}
// Stop the bot first
await stopBot(botId);
logger.info(`Stopped bot ${botId}`);
// If it's an UPDATE or INSERT, restart the bot with new data
if (payload.eventType === 'UPDATE' || payload.eventType === 'INSERT') {
const { data: botData, error } = await supabase
.from('bots')
.select('*')
.eq('id', botId)
.single();
if (error) {
logger.error(`Error fetching bot data: ${error.message}`);
return;
}
if (botData) {
const result = await initializeBot(botData.bot_token, botData);
if (result.success) {
logger.info(`Bot ${botId} restarted successfully`);
} else {
logger.error(`Failed to restart bot ${botId}: ${result.message}`);
}
}
}
} catch (error) {
logger.error('Error handling bot update:', error);
}
}
)
.subscribe((status: string) => {
logger.info('Bots subscription status changed:', status);
if (status === 'SUBSCRIBED') {
logger.info('Successfully subscribed to bots changes');
this.botsSubscription = channel;
} else if (status === 'CLOSED') {
logger.error('Bots subscription closed unexpectedly');
setTimeout(() => this.setupBotsSubscription(), 5000);
} else if (status === 'CHANNEL_ERROR') {
logger.error('Bots channel error occurred');
setTimeout(() => this.setupBotsSubscription(), 5000);
}
});
logger.info('Bots subscription setup completed');
} catch (error) {
logger.error('Error setting up bots subscription:', error);
setTimeout(() => this.setupBotsSubscription(), 5000);
}
}
private async handleBalanceUpdate(payload: any) {
logger.info('Processing balance update payload:', JSON.stringify(payload, null, 2));
if (!this.bot) {
logger.error('Bot instance not set in BalanceUpdateService');
return;
}
const { old: oldRecord, new: newRecord } = payload;
if (!oldRecord || !newRecord) {
logger.error('Invalid payload format:', payload);
return;
}
const telegramId = newRecord.telegram_id;
const oldBalance = Number(oldRecord.balance) || 0;
const newBalance = Number(newRecord.balance) || 0;
logger.info(`Balance change detected for user ${telegramId}: ${oldBalance} -> ${newBalance}`);
// Only send notification if balance actually changed
if (oldBalance !== newBalance) {
try {
const balanceChange = newBalance - oldBalance;
const changeType = balanceChange > 0 ? 'increase' : 'decrease';
const message = messageManager.getMessage('balance_update_notification')
.replace('{old_balance}', oldBalance.toFixed(2))
.replace('{new_balance}', newBalance.toFixed(2))
.replace('{change_type}', changeType)
.replace('{change_amount}', Math.abs(balanceChange).toFixed(2));
logger.info(`Sending balance update message to user ${telegramId}:`, message);
await this.bot.telegram.sendMessage(telegramId, message, { parse_mode: 'HTML' });
logger.info(`Balance update notification sent to user ${telegramId}`);
} catch (error) {
logger.error(`Failed to send balance update notification to user ${telegramId}:`, error);
}
} else {
logger.info(`No balance change detected for user ${telegramId}`);
}
}
// Method to manually trigger a balance update notification
public async sendBalanceUpdateNotification(telegramId: number, oldBalance: number, newBalance: number) {
try {
const balanceChange = newBalance - oldBalance;
const changeType = balanceChange > 0 ? 'increase' : 'decrease';
const message = messageManager.getMessage('balance_update_notification')
.replace('{old_balance}', oldBalance.toFixed(2))
.replace('{new_balance}', newBalance.toFixed(2))
.replace('{change_type}', changeType)
.replace('{change_amount}', Math.abs(balanceChange).toFixed(2));
await this.bot.telegram.sendMessage(telegramId, message, { parse_mode: 'HTML' });
logger.info(`Manual balance update notification sent to user ${telegramId}`);
} catch (error) {
logger.error(`Failed to send manual balance update notification to user ${telegramId}:`, error);
}
}
// Method to check subscription status
public getSubscriptionStatus(): string {
return this.subscription?.state || 'NOT_INITIALIZED';
}
}
// export const balanceUpdateService = BalanceUpdateService.getInstance();