Mohammed Foud commited on
Commit
ac0600a
·
1 Parent(s): 65b0658
src/bots/services/BalanceUpdateService.ts CHANGED
@@ -2,6 +2,7 @@ import { supabase } from '../../db/supabase';
2
  import { createLogger } from '../../utils/logger';
3
  import { BotContext } from '../types/botTypes';
4
  import { messageManager } from '../utils/messageManager';
 
5
 
6
  const logger = createLogger('BalanceUpdateService');
7
 
@@ -9,10 +10,14 @@ export class BalanceUpdateService {
9
  private static instance: BalanceUpdateService;
10
  private bot: any;
11
  private subscription: any;
 
12
 
13
  private constructor() {
14
  // Delay the initial subscription setup to ensure bot is ready
15
- setTimeout(() => this.setupRealtimeSubscription(), 1000);
 
 
 
16
  }
17
 
18
  public static getInstance(): BalanceUpdateService {
@@ -100,6 +105,94 @@ export class BalanceUpdateService {
100
  }
101
  }
102
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
103
  private async handleBalanceUpdate(payload: any) {
104
  logger.info('Processing balance update payload:', JSON.stringify(payload, null, 2));
105
 
 
2
  import { createLogger } from '../../utils/logger';
3
  import { BotContext } from '../types/botTypes';
4
  import { messageManager } from '../utils/messageManager';
5
+ import { initializeBot, stopBot } from '../botManager';
6
 
7
  const logger = createLogger('BalanceUpdateService');
8
 
 
10
  private static instance: BalanceUpdateService;
11
  private bot: any;
12
  private subscription: any;
13
+ private botsSubscription: any;
14
 
15
  private constructor() {
16
  // Delay the initial subscription setup to ensure bot is ready
17
+ setTimeout(() => {
18
+ this.setupRealtimeSubscription();
19
+ this.setupBotsSubscription();
20
+ }, 1000);
21
  }
22
 
23
  public static getInstance(): BalanceUpdateService {
 
105
  }
106
  }
107
 
108
+ private async setupBotsSubscription() {
109
+ try {
110
+ logger.info('Starting bots subscription setup...');
111
+
112
+ // Unsubscribe from any existing subscription
113
+ if (this.botsSubscription) {
114
+ await this.botsSubscription.unsubscribe();
115
+ logger.info('Unsubscribed from previous bots subscription');
116
+ }
117
+
118
+ // Create a new channel for bots
119
+ const channel = supabase.channel('bots_changes', {
120
+ config: {
121
+ broadcast: { self: true }
122
+ }
123
+ });
124
+
125
+ // Subscribe to changes in bots table
126
+ channel
127
+ .on(
128
+ 'postgres_changes',
129
+ {
130
+ event: '*', // Listen to all events (INSERT, UPDATE, DELETE)
131
+ schema: 'public',
132
+ table: 'bots'
133
+ },
134
+ async (payload: any) => {
135
+ logger.info('Received bots table change:', JSON.stringify(payload, null, 2));
136
+
137
+ try {
138
+ const botId = payload.new?.id || payload.old?.id;
139
+ if (!botId) {
140
+ logger.error('No bot ID found in payload');
141
+ return;
142
+ }
143
+
144
+ // Stop the bot first
145
+ await stopBot(botId);
146
+ logger.info(`Stopped bot ${botId}`);
147
+
148
+ // If it's an UPDATE or INSERT, restart the bot with new data
149
+ if (payload.eventType === 'UPDATE' || payload.eventType === 'INSERT') {
150
+ const { data: botData, error } = await supabase
151
+ .from('bots')
152
+ .select('*')
153
+ .eq('id', botId)
154
+ .single();
155
+
156
+ if (error) {
157
+ logger.error(`Error fetching bot data: ${error.message}`);
158
+ return;
159
+ }
160
+
161
+ if (botData) {
162
+ const result = await initializeBot(botData.bot_token, botData);
163
+ if (result.success) {
164
+ logger.info(`Bot ${botId} restarted successfully`);
165
+ } else {
166
+ logger.error(`Failed to restart bot ${botId}: ${result.message}`);
167
+ }
168
+ }
169
+ }
170
+ } catch (error) {
171
+ logger.error('Error handling bot update:', error);
172
+ }
173
+ }
174
+ )
175
+ .subscribe((status: string) => {
176
+ logger.info('Bots subscription status changed:', status);
177
+ if (status === 'SUBSCRIBED') {
178
+ logger.info('Successfully subscribed to bots changes');
179
+ this.botsSubscription = channel;
180
+ } else if (status === 'CLOSED') {
181
+ logger.error('Bots subscription closed unexpectedly');
182
+ setTimeout(() => this.setupBotsSubscription(), 5000);
183
+ } else if (status === 'CHANNEL_ERROR') {
184
+ logger.error('Bots channel error occurred');
185
+ setTimeout(() => this.setupBotsSubscription(), 5000);
186
+ }
187
+ });
188
+
189
+ logger.info('Bots subscription setup completed');
190
+ } catch (error) {
191
+ logger.error('Error setting up bots subscription:', error);
192
+ setTimeout(() => this.setupBotsSubscription(), 5000);
193
+ }
194
+ }
195
+
196
  private async handleBalanceUpdate(payload: any) {
197
  logger.info('Processing balance update payload:', JSON.stringify(payload, null, 2));
198