baha-99 commited on
Commit
c564f32
Β·
1 Parent(s): 12bb9ee

feat: implement true concurrent processing

Browse files

Add separate thread pools and async tasks for both text and Excel processing,
enabling simultaneous handling of multiple requests without blocking

Files changed (1) hide show
  1. bot_telegram.py +121 -118
bot_telegram.py CHANGED
@@ -44,6 +44,14 @@ class TelegramBot:
44
  self.ai_url = f"{self.base_url}/api/v1/questions/text"
45
  self.excel_url = f"{self.base_url}/api/v1/questions/excel"
46
 
 
 
 
 
 
 
 
 
47
  # Start Telegram Bot
48
  self.app = Application.builder().token(self.bot_token).build()
49
  self.setup_handlers()
@@ -94,9 +102,10 @@ class TelegramBot:
94
  await update.message.reply_text("πŸ”‘ Please enter the secret password to access the bot.")
95
 
96
  async def handle_message(self, update: Update, context: CallbackContext):
97
- """Handles all incoming messages."""
98
  user_id = update.message.from_user.id
99
  user_message = update.message.text.strip()
 
100
 
101
  # If user is waiting to enter a password, validate it
102
  if user_id in AWAITING_PASSWORD:
@@ -105,7 +114,8 @@ class TelegramBot:
105
 
106
  # If user is authenticated, process AI request
107
  if user_id in AUTHENTICATED_USERS:
108
- await self.chat_with_ai(update, context)
 
109
  else:
110
  await update.message.reply_text("❌ You are not authenticated. Please enter the password first.")
111
 
@@ -129,38 +139,45 @@ class TelegramBot:
129
  await update.message.reply_text("❌ Wrong password. Try again.")
130
 
131
  async def chat_with_ai(self, update: Update, context: CallbackContext):
132
- """Handles messages and sends them to the AI API."""
133
- user_id = update.message.from_user.id
134
-
135
- if user_id not in AUTHENTICATED_USERS:
136
- await update.message.reply_text("❌ You are not authenticated. Please enter the password first.")
137
- return
138
-
139
- # if not self.auth_token:
140
- # self.authenticate()
141
- #
142
- # if not self.auth_token:
143
- # await update.message.reply_text("Authentication failed. Please try again later.")
144
- # return
145
-
146
  user_message = update.message.text
147
- # Send immediate acknowledgment
148
- processing_msg = await update.message.reply_text("Processing your request...")
149
 
150
  try:
151
- # Run the API request in a separate thread to avoid blocking
 
 
 
 
 
 
 
 
 
 
 
 
 
152
  response = await asyncio.get_event_loop().run_in_executor(
153
- self.executor,
154
  self._make_api_request,
155
  user_message
156
  )
157
-
158
- # Update the processing message with the response
159
- await processing_msg.edit_text(response)
 
 
160
 
161
  except Exception as e:
162
- logging.error(f"Error processing request: {e}")
163
- await processing_msg.edit_text(f"❌ Error: {str(e)}")
 
 
 
 
 
 
164
 
165
  def _make_api_request(self, user_message):
166
  """Make the actual API request in a separate thread"""
@@ -196,128 +213,114 @@ class TelegramBot:
196
  return f"Connection error: {e}"
197
 
198
  async def handle_excel(self, update: Update, context: CallbackContext):
199
- """Handles Excel file uploads asynchronously"""
200
- user_id = update.message.from_user.id
201
  message_id = update.message.message_id
 
 
 
202
 
203
- if user_id not in AUTHENTICATED_USERS:
204
- await update.message.reply_text("❌ You are not authenticated. Please enter the password first.")
205
- return
206
-
207
- # if not self.auth_token:
208
- # self.authenticate()
209
- #
210
- # if not self.auth_token:
211
- # await update.message.reply_text("Authentication failed. Please try again later.")
212
- # return
213
-
214
- document = update.message.document
215
- if not document.file_name.endswith(('.xls', '.xlsx')):
216
- await update.message.reply_text("Please send only Excel files (.xls or .xlsx)")
217
- return
218
-
219
- # Track this Excel file
220
- self.active_excel_files[message_id] = {
221
- 'filename': document.file_name,
222
- 'start_time': time.time(),
223
- 'status': 'starting'
224
- }
225
-
226
  try:
 
 
227
  # Send initial processing message
228
  processing_msg = await update.message.reply_text(
229
- f"πŸ“Š Starting Excel file processing...\n"
230
  f"File: {document.file_name}\n"
231
  f"Request ID: #{message_id}"
232
  )
233
 
234
- # Use semaphore to control concurrent Excel processing
235
- async with self.excel_semaphore:
236
- self.active_excel_files[message_id]['status'] = 'downloading'
237
- await processing_msg.edit_text(
238
- f"πŸ“₯ Downloading file...\n"
239
- f"File: {document.file_name}\n"
240
- f"Request ID: #{message_id}"
241
- )
242
 
243
- # Download file
244
- file = await context.bot.get_file(document.file_id)
245
- file_bytes = await file.download_as_bytearray()
246
 
247
- self.active_excel_files[message_id]['status'] = 'processing'
248
- await processing_msg.edit_text(
249
- f"βš™οΈ Processing Excel file...\n"
250
- f"File: {document.file_name}\n"
251
- f"Request ID: #{message_id}"
252
- )
 
253
 
254
- # Process Excel in thread pool
255
- result = await asyncio.get_event_loop().run_in_executor(
256
- self.excel_executor,
257
- self._process_excel_sync,
258
- file_bytes,
259
- document.file_name
260
- )
261
 
262
- if result is None:
263
- self.active_excel_files[message_id]['status'] = 'failed'
264
- await processing_msg.edit_text(
265
- f"❌ Error processing Excel file\n"
266
- f"File: {document.file_name}\n"
267
- f"Request ID: #{message_id}"
268
- )
269
- return
270
-
271
- # Send processed file
272
- self.active_excel_files[message_id]['status'] = 'sending'
273
- await context.bot.send_document(
274
- chat_id=update.effective_chat.id,
275
- document=io.BytesIO(result),
276
- filename=f'processed_{document.file_name}',
277
- caption=f"βœ… Excel processing completed!\nRequest ID: #{message_id}"
278
- )
279
- await processing_msg.delete()
280
- self.active_excel_files[message_id]['status'] = 'completed'
281
 
282
  except Exception as e:
283
  logging.error(f"Error handling Excel file: {e}")
284
- self.active_excel_files[message_id]['status'] = 'error'
285
- self.active_excel_files[message_id]['error'] = str(e)
286
- await processing_msg.edit_text(
287
  f"❌ Error processing Excel file\n"
288
  f"File: {document.file_name}\n"
289
  f"Request ID: #{message_id}\n"
290
  f"Error: {str(e)}"
291
  )
292
  finally:
293
- # Calculate processing time
294
- end_time = time.time()
295
- processing_time = end_time - self.active_excel_files[message_id]['start_time']
296
- self.active_excel_files[message_id]['processing_time'] = processing_time
297
-
298
- async def excel_status_command(self, update: Update, context: CallbackContext):
299
- """Show status of Excel file processing"""
300
- if not self.active_excel_files:
301
- await update.message.reply_text("No active Excel file processing")
302
- return
 
 
 
 
 
 
 
 
 
303
 
304
- status_message = "πŸ“Š Excel Processing Status:\n\n"
305
- for msg_id, info in self.active_excel_files.items():
306
- processing_time = time.time() - info['start_time']
307
- status_message += (
308
- f"Request #{msg_id}:\n"
309
- f"β”œβ”€ File: {info['filename']}\n"
310
- f"β”œβ”€ Status: {info['status']}\n"
311
- f"β”œβ”€ Time: {processing_time:.1f}s\n"
312
- f"└─ {info.get('error', '')}\n\n"
313
- )
 
 
 
 
 
314
 
315
  await update.message.reply_text(status_message)
316
 
317
  def setup_handlers(self):
318
  """Set up Telegram command and message handlers."""
319
  self.app.add_handler(CommandHandler("start", self.start_command))
320
- self.app.add_handler(CommandHandler("excelstatus", self.excel_status_command))
321
  self.app.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, self.handle_message))
322
  self.app.add_handler(MessageHandler(
323
  filters.Document.FileExtension("xlsx") | filters.Document.FileExtension("xls"),
 
44
  self.ai_url = f"{self.base_url}/api/v1/questions/text"
45
  self.excel_url = f"{self.base_url}/api/v1/questions/excel"
46
 
47
+ # Separate executors for text and Excel
48
+ self.text_executor = ThreadPoolExecutor(max_workers=20, thread_name_prefix="text_worker")
49
+ self.excel_executor = ThreadPoolExecutor(max_workers=10, thread_name_prefix="excel_worker")
50
+
51
+ # Track active processes
52
+ self.active_requests = {}
53
+ self.active_excel_files = {}
54
+
55
  # Start Telegram Bot
56
  self.app = Application.builder().token(self.bot_token).build()
57
  self.setup_handlers()
 
102
  await update.message.reply_text("πŸ”‘ Please enter the secret password to access the bot.")
103
 
104
  async def handle_message(self, update: Update, context: CallbackContext):
105
+ """Handles all incoming messages concurrently"""
106
  user_id = update.message.from_user.id
107
  user_message = update.message.text.strip()
108
+ message_id = update.message.message_id
109
 
110
  # If user is waiting to enter a password, validate it
111
  if user_id in AWAITING_PASSWORD:
 
114
 
115
  # If user is authenticated, process AI request
116
  if user_id in AUTHENTICATED_USERS:
117
+ # Create task for processing
118
+ asyncio.create_task(self.chat_with_ai(update, context))
119
  else:
120
  await update.message.reply_text("❌ You are not authenticated. Please enter the password first.")
121
 
 
139
  await update.message.reply_text("❌ Wrong password. Try again.")
140
 
141
  async def chat_with_ai(self, update: Update, context: CallbackContext):
142
+ """Process text messages asynchronously"""
143
+ message_id = update.message.message_id
 
 
 
 
 
 
 
 
 
 
 
 
144
  user_message = update.message.text
 
 
145
 
146
  try:
147
+ # Send immediate acknowledgment
148
+ processing_msg = await update.message.reply_text(
149
+ f"πŸ€” Processing your request...\n"
150
+ f"Request ID: #{message_id}"
151
+ )
152
+
153
+ # Track this request
154
+ self.active_requests[message_id] = {
155
+ 'type': 'text',
156
+ 'status': 'processing',
157
+ 'start_time': time.time()
158
+ }
159
+
160
+ # Process in thread pool
161
  response = await asyncio.get_event_loop().run_in_executor(
162
+ self.text_executor,
163
  self._make_api_request,
164
  user_message
165
  )
166
+
167
+ # Update with response
168
+ await processing_msg.edit_text(
169
+ f"βœ… Response for #{message_id}:\n{response}"
170
+ )
171
 
172
  except Exception as e:
173
+ logging.error(f"Error processing text request: {e}")
174
+ await processing_msg.edit_text(
175
+ f"❌ Error processing request #{message_id}: {str(e)}"
176
+ )
177
+ finally:
178
+ if message_id in self.active_requests:
179
+ self.active_requests[message_id]['status'] = 'completed'
180
+ self.active_requests[message_id]['end_time'] = time.time()
181
 
182
  def _make_api_request(self, user_message):
183
  """Make the actual API request in a separate thread"""
 
213
  return f"Connection error: {e}"
214
 
215
  async def handle_excel(self, update: Update, context: CallbackContext):
216
+ """Handle Excel files concurrently"""
 
217
  message_id = update.message.message_id
218
+
219
+ # Create task for processing
220
+ asyncio.create_task(self._process_excel_file(update, context, message_id))
221
 
222
+ async def _process_excel_file(self, update, context, message_id):
223
+ """Process Excel file asynchronously"""
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
224
  try:
225
+ document = update.message.document
226
+
227
  # Send initial processing message
228
  processing_msg = await update.message.reply_text(
229
+ f"πŸ“Š Processing Excel file...\n"
230
  f"File: {document.file_name}\n"
231
  f"Request ID: #{message_id}"
232
  )
233
 
234
+ # Track this Excel file
235
+ self.active_excel_files[message_id] = {
236
+ 'filename': document.file_name,
237
+ 'status': 'downloading',
238
+ 'start_time': time.time()
239
+ }
 
 
240
 
241
+ # Download file
242
+ file = await context.bot.get_file(document.file_id)
243
+ file_bytes = await file.download_as_bytearray()
244
 
245
+ # Update status to processing
246
+ self.active_excel_files[message_id]['status'] = 'processing'
247
+ await processing_msg.edit_text(
248
+ f"βš™οΈ Processing Excel file...\n"
249
+ f"File: {document.file_name}\n"
250
+ f"Request ID: #{message_id}"
251
+ )
252
 
253
+ # Process in thread pool
254
+ result = await asyncio.get_event_loop().run_in_executor(
255
+ self.excel_executor,
256
+ self._process_excel_sync,
257
+ file_bytes,
258
+ document.file_name
259
+ )
260
 
261
+ if result is None:
262
+ raise Exception("Failed to process Excel file")
263
+
264
+ # Send processed file
265
+ await context.bot.send_document(
266
+ chat_id=update.effective_chat.id,
267
+ document=io.BytesIO(result),
268
+ filename=f'processed_{document.file_name}',
269
+ caption=f"βœ… Excel processing completed!\nRequest ID: #{message_id}"
270
+ )
271
+ await processing_msg.delete()
 
 
 
 
 
 
 
 
272
 
273
  except Exception as e:
274
  logging.error(f"Error handling Excel file: {e}")
275
+ await update.message.reply_text(
 
 
276
  f"❌ Error processing Excel file\n"
277
  f"File: {document.file_name}\n"
278
  f"Request ID: #{message_id}\n"
279
  f"Error: {str(e)}"
280
  )
281
  finally:
282
+ if message_id in self.active_excel_files:
283
+ self.active_excel_files[message_id]['status'] = 'completed'
284
+ self.active_excel_files[message_id]['end_time'] = time.time()
285
+
286
+ async def status_command(self, update: Update, context: CallbackContext):
287
+ """Show status of all active processes"""
288
+ status_message = "πŸ€– Current Status:\n\n"
289
+
290
+ # Text requests status
291
+ if self.active_requests:
292
+ status_message += "πŸ“ Text Requests:\n"
293
+ for msg_id, info in self.active_requests.items():
294
+ current_time = time.time()
295
+ processing_time = current_time - info['start_time']
296
+ status_message += (
297
+ f"Request #{msg_id}:\n"
298
+ f"β”œβ”€ Status: {info['status']}\n"
299
+ f"└─ Time: {processing_time:.1f}s\n\n"
300
+ )
301
 
302
+ # Excel files status
303
+ if self.active_excel_files:
304
+ status_message += "πŸ“Š Excel Files:\n"
305
+ for msg_id, info in self.active_excel_files.items():
306
+ current_time = time.time()
307
+ processing_time = current_time - info['start_time']
308
+ status_message += (
309
+ f"File #{msg_id}:\n"
310
+ f"β”œβ”€ Name: {info['filename']}\n"
311
+ f"β”œβ”€ Status: {info['status']}\n"
312
+ f"└─ Time: {processing_time:.1f}s\n\n"
313
+ )
314
+
315
+ if not self.active_requests and not self.active_excel_files:
316
+ status_message += "No active processes"
317
 
318
  await update.message.reply_text(status_message)
319
 
320
  def setup_handlers(self):
321
  """Set up Telegram command and message handlers."""
322
  self.app.add_handler(CommandHandler("start", self.start_command))
323
+ self.app.add_handler(CommandHandler("status", self.status_command))
324
  self.app.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, self.handle_message))
325
  self.app.add_handler(MessageHandler(
326
  filters.Document.FileExtension("xlsx") | filters.Document.FileExtension("xls"),