baha-99 commited on
Commit
68d2bca
Β·
1 Parent(s): 6bd2c3d

feat: add concurrent Excel file processing with status tracking

Browse files

Implement multi-file Excel processing system with ThreadPoolExecutor,
detailed status tracking, and new /excelstatus command for monitoring progress

Files changed (1) hide show
  1. bot_telegram.py +101 -87
bot_telegram.py CHANGED
@@ -55,6 +55,11 @@ class TelegramBot:
55
  # Create a ThreadPoolExecutor for handling concurrent requests
56
  self.executor = ThreadPoolExecutor(max_workers=10)
57
 
 
 
 
 
 
58
  # def authenticate(self):
59
  # """Authenticate with the API and retrieve an access token."""
60
  # payload = {"username": self.username, "password": self.password}
@@ -191,8 +196,9 @@ class TelegramBot:
191
  return f"Connection error: {e}"
192
 
193
  async def handle_excel(self, update: Update, context: CallbackContext):
194
- """Handles Excel file uploads."""
195
  user_id = update.message.from_user.id
 
196
 
197
  if user_id not in AUTHENTICATED_USERS:
198
  await update.message.reply_text("❌ You are not authenticated. Please enter the password first.")
@@ -210,105 +216,113 @@ class TelegramBot:
210
  await update.message.reply_text("Please send only Excel files (.xls or .xlsx)")
211
  return
212
 
213
- # Send initial processing message
214
- processing_msg = await update.message.reply_text("πŸ“Š Processing your Excel file... This may take a few minutes.")
 
 
 
 
215
 
216
  try:
217
- # Run Excel processing in a separate thread
218
- result = await asyncio.get_event_loop().run_in_executor(
219
- self.executor,
220
- self._process_excel,
221
- update, context, document
222
  )
223
-
224
- if isinstance(result, tuple):
225
- file_content, error = result
226
- if error:
227
- await processing_msg.edit_text(f"❌ Error: {error}")
228
- else:
229
- await context.bot.send_document(
230
- chat_id=update.effective_chat.id,
231
- document=io.BytesIO(file_content),
232
- filename='rfp_responses.xlsx',
233
- caption="βœ… Here's your processed Excel file with answers!"
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
234
  )
235
- await processing_msg.delete()
 
 
 
 
 
 
 
 
 
 
 
 
236
  except Exception as e:
237
  logging.error(f"Error handling Excel file: {e}")
238
- await processing_msg.edit_text(f"❌ Error processing Excel file: {str(e)}")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
239
 
240
- async def _process_excel(self, update, context, document):
241
- """Process Excel file in a separate thread"""
242
- try:
243
- file = await context.bot.get_file(document.file_id)
244
- file_bytes = await file.download_as_bytearray()
245
-
246
- headers = {
247
- "Authorization": f"Bearer {HF_TOKEN}",
248
- "accept": "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet"
249
- }
250
-
251
- async with aiohttp.ClientSession() as session:
252
- form_data = aiohttp.FormData()
253
- form_data.add_field('file',
254
- file_bytes,
255
- filename=document.file_name,
256
- content_type='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet')
257
-
258
- async with session.post(self.excel_url, headers=headers, data=form_data) as response:
259
- if response.status == 200:
260
- return await response.read(), None
261
- else:
262
- error_text = await response.text()
263
- return None, error_text
264
 
265
- except Exception as e:
266
- return None, str(e)
267
-
268
- async def test_concurrent(self, update: Update, context: CallbackContext):
269
- """Test concurrent processing"""
270
- await update.message.reply_text("Sending 5 requests simultaneously...")
271
-
272
- tasks = []
273
- for i in range(5):
274
- msg = f"Test question {i+1}"
275
- tasks.append(self.chat_with_ai(update, context))
276
-
277
- await asyncio.gather(*tasks)
278
-
279
- async def load_test_command(self, update: Update, context: CallbackContext):
280
- """Command to test concurrent processing"""
281
- if not context.args or not context.args[0].isdigit():
282
- await update.message.reply_text("Usage: /loadtest <number_of_requests>")
283
- return
284
-
285
- num_requests = min(int(context.args[0]), 20) # Limit to 20 for safety
286
- await update.message.reply_text(f"Starting load test with {num_requests} concurrent requests...")
287
-
288
- start_time = time.time()
289
- tasks = []
290
- for i in range(num_requests):
291
- tasks.append(self.chat_with_ai(
292
- update,
293
- context,
294
- f"Test question {i+1}"
295
- ))
296
-
297
- await asyncio.gather(*tasks)
298
- duration = time.time() - start_time
299
-
300
- await update.message.reply_text(
301
- f"Load test completed!\n"
302
- f"Processed {num_requests} requests in {duration:.2f} seconds\n"
303
- f"Average time per request: {(duration/num_requests):.2f} seconds"
304
- )
305
 
306
  def setup_handlers(self):
307
  """Set up Telegram command and message handlers."""
308
  self.app.add_handler(CommandHandler("start", self.start_command))
309
- self.app.add_handler(CommandHandler("loadtest", self.load_test_command))
310
  self.app.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, self.handle_message))
311
- self.app.add_handler(MessageHandler(filters.Document.FileExtension("xlsx") | filters.Document.FileExtension("xls"), self.handle_excel))
 
 
 
312
 
313
  async def run(self):
314
  """Start the bot and listen for messages."""
 
55
  # Create a ThreadPoolExecutor for handling concurrent requests
56
  self.executor = ThreadPoolExecutor(max_workers=10)
57
 
58
+ # Increase Excel workers for more concurrent processing
59
+ self.excel_executor = ThreadPoolExecutor(max_workers=10, thread_name_prefix="excel_worker")
60
+ self.excel_semaphore = asyncio.Semaphore(10) # Allow 10 concurrent Excel processes
61
+ self.active_excel_files = {}
62
+
63
  # def authenticate(self):
64
  # """Authenticate with the API and retrieve an access token."""
65
  # payload = {"username": self.username, "password": self.password}
 
196
  return f"Connection error: {e}"
197
 
198
  async def handle_excel(self, update: Update, context: CallbackContext):
199
+ """Handles Excel file uploads asynchronously"""
200
  user_id = update.message.from_user.id
201
+ message_id = update.message.message_id
202
 
203
  if user_id not in AUTHENTICATED_USERS:
204
  await update.message.reply_text("❌ You are not authenticated. Please enter the password first.")
 
216
  await update.message.reply_text("Please send only Excel files (.xls or .xlsx)")
217
  return
218
 
219
+ # Track this Excel file
220
+ self.active_excel_files[message_id] = {
221
+ 'filename': document.file_name,
222
+ 'start_time': time.time(),
223
+ 'status': 'starting'
224
+ }
225
 
226
  try:
227
+ # Send initial processing message
228
+ processing_msg = await update.message.reply_text(
229
+ f"πŸ“Š Starting Excel file processing...\n"
230
+ f"File: {document.file_name}\n"
231
+ f"Request ID: #{message_id}"
232
  )
233
+
234
+ # Use semaphore to control concurrent Excel processing
235
+ async with self.excel_semaphore:
236
+ self.active_excel_files[message_id]['status'] = 'downloading'
237
+ await processing_msg.edit_text(
238
+ f"πŸ“₯ Downloading file...\n"
239
+ f"File: {document.file_name}\n"
240
+ f"Request ID: #{message_id}"
241
+ )
242
+
243
+ # Download file
244
+ file = await context.bot.get_file(document.file_id)
245
+ file_bytes = await file.download_as_bytearray()
246
+
247
+ self.active_excel_files[message_id]['status'] = 'processing'
248
+ await processing_msg.edit_text(
249
+ f"βš™οΈ Processing Excel file...\n"
250
+ f"File: {document.file_name}\n"
251
+ f"Request ID: #{message_id}"
252
+ )
253
+
254
+ # Process Excel in thread pool
255
+ result = await asyncio.get_event_loop().run_in_executor(
256
+ self.excel_executor,
257
+ self._process_excel_sync,
258
+ file_bytes,
259
+ document.file_name
260
+ )
261
+
262
+ if result is None:
263
+ self.active_excel_files[message_id]['status'] = 'failed'
264
+ await processing_msg.edit_text(
265
+ f"❌ Error processing Excel file\n"
266
+ f"File: {document.file_name}\n"
267
+ f"Request ID: #{message_id}"
268
  )
269
+ return
270
+
271
+ # Send processed file
272
+ self.active_excel_files[message_id]['status'] = 'sending'
273
+ await context.bot.send_document(
274
+ chat_id=update.effective_chat.id,
275
+ document=io.BytesIO(result),
276
+ filename=f'processed_{document.file_name}',
277
+ caption=f"βœ… Excel processing completed!\nRequest ID: #{message_id}"
278
+ )
279
+ await processing_msg.delete()
280
+ self.active_excel_files[message_id]['status'] = 'completed'
281
+
282
  except Exception as e:
283
  logging.error(f"Error handling Excel file: {e}")
284
+ self.active_excel_files[message_id]['status'] = 'error'
285
+ self.active_excel_files[message_id]['error'] = str(e)
286
+ await processing_msg.edit_text(
287
+ f"❌ Error processing Excel file\n"
288
+ f"File: {document.file_name}\n"
289
+ f"Request ID: #{message_id}\n"
290
+ f"Error: {str(e)}"
291
+ )
292
+ finally:
293
+ # Calculate processing time
294
+ end_time = time.time()
295
+ processing_time = end_time - self.active_excel_files[message_id]['start_time']
296
+ self.active_excel_files[message_id]['processing_time'] = processing_time
297
+
298
+ async def excel_status_command(self, update: Update, context: CallbackContext):
299
+ """Show status of Excel file processing"""
300
+ if not self.active_excel_files:
301
+ await update.message.reply_text("No active Excel file processing")
302
+ return
303
 
304
+ status_message = "πŸ“Š Excel Processing Status:\n\n"
305
+ for msg_id, info in self.active_excel_files.items():
306
+ processing_time = time.time() - info['start_time']
307
+ status_message += (
308
+ f"Request #{msg_id}:\n"
309
+ f"β”œβ”€ File: {info['filename']}\n"
310
+ f"β”œβ”€ Status: {info['status']}\n"
311
+ f"β”œβ”€ Time: {processing_time:.1f}s\n"
312
+ f"└─ {info.get('error', '')}\n\n"
313
+ )
 
 
 
 
 
 
 
 
 
 
 
 
 
 
314
 
315
+ await update.message.reply_text(status_message)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
316
 
317
  def setup_handlers(self):
318
  """Set up Telegram command and message handlers."""
319
  self.app.add_handler(CommandHandler("start", self.start_command))
320
+ self.app.add_handler(CommandHandler("excelstatus", self.excel_status_command))
321
  self.app.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, self.handle_message))
322
+ self.app.add_handler(MessageHandler(
323
+ filters.Document.FileExtension("xlsx") | filters.Document.FileExtension("xls"),
324
+ self.handle_excel
325
+ ))
326
 
327
  async def run(self):
328
  """Start the bot and listen for messages."""