baha-99 commited on
Commit
766a55c
Β·
1 Parent(s): eec9416

add: Telegram bot with cocurrent processing

Browse files
Files changed (1) hide show
  1. bot_telegram.py +71 -45
bot_telegram.py CHANGED
@@ -5,6 +5,8 @@ from telegram import Update
5
  from telegram.ext import Application, CommandHandler, MessageHandler, filters, CallbackContext
6
  import aiohttp
7
  import io
 
 
8
 
9
  # Configure logging
10
  logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
@@ -49,6 +51,9 @@ class TelegramBot:
49
  logging.info("Authenticating with API...")
50
  # self.authenticate()
51
 
 
 
 
52
  # def authenticate(self):
53
  # """Authenticate with the API and retrieve an access token."""
54
  # payload = {"username": self.username, "password": self.password}
@@ -133,42 +138,56 @@ class TelegramBot:
133
  # return
134
 
135
  user_message = update.message.text
 
 
 
 
 
 
 
 
 
 
 
 
 
136
 
 
 
 
 
 
 
137
  hf_authorization = "Bearer " + HF_TOKEN
138
  headers = {
139
  "Authorization": hf_authorization,
140
  "accept": "application/json"
141
  }
142
 
143
-
144
  json_payload = {"question": user_message}
145
  form_payload = {"question": user_message}
146
 
147
  try:
148
- logging.info(f"Sending payload as JSON: {json_payload}")
149
- response = requests.post(self.ai_url, headers={**headers, "Content-Type": "application/json"}, json=json_payload)
 
 
 
150
 
151
  if response.status_code == 422:
152
- logging.warning("JSON format rejected. Retrying with form-data...")
153
- response = requests.post(self.ai_url, headers={**headers, "Content-Type": "application/x-www-form-urlencoded"},
154
- data=form_payload)
 
 
155
 
156
  if response.status_code == 200:
157
- bot_reply = response.json().get("answer", "I didn't understand that.")
158
- elif response.status_code == 401:
159
- logging.warning("Authorization expired. Re-authenticating...")
160
- # self.authenticate()
161
- await self.chat_with_ai(update, context)
162
- return
163
  else:
164
- logging.error(f"Error: {response.status_code}")
165
- bot_reply = f"Error: {response.status_code}"
166
 
167
  except Exception as e:
168
- logging.error(f"Connection error: {e}")
169
- bot_reply = f"Connection error: {e}"
170
-
171
- await update.message.reply_text(bot_reply)
172
 
173
  async def handle_excel(self, update: Update, context: CallbackContext):
174
  """Handles Excel file uploads."""
@@ -190,11 +209,36 @@ class TelegramBot:
190
  await update.message.reply_text("Please send only Excel files (.xls or .xlsx)")
191
  return
192
 
 
 
 
193
  try:
194
- # Send initial processing message
195
- processing_msg = await update.message.reply_text("πŸ“Š Processing your Excel file... This may take a few minutes.")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
196
 
197
- # Get file from Telegram
 
 
198
  file = await context.bot.get_file(document.file_id)
199
  file_bytes = await file.download_as_bytearray()
200
 
@@ -204,39 +248,21 @@ class TelegramBot:
204
  }
205
 
206
  async with aiohttp.ClientSession() as session:
207
- # Create form data with the file
208
  form_data = aiohttp.FormData()
209
  form_data.add_field('file',
210
- file_bytes,
211
- filename=document.file_name,
212
- content_type='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet')
213
 
214
- # Send file to API
215
  async with session.post(self.excel_url, headers=headers, data=form_data) as response:
216
  if response.status == 200:
217
- # Get the Excel file content directly
218
- file_content = await response.read()
219
-
220
- # Send the Excel file back to Telegram
221
- await context.bot.send_document(
222
- chat_id=update.effective_chat.id,
223
- document=io.BytesIO(file_content),
224
- filename='rfp_responses.xlsx',
225
- caption="βœ… Here's your processed Excel file with answers!"
226
- )
227
- await processing_msg.delete()
228
-
229
- elif response.status == 401:
230
- logging.warning("Authorization expired. Re-authenticating...")
231
- # self.authenticate()
232
- await self.handle_excel(update, context)
233
  else:
234
  error_text = await response.text()
235
- await processing_msg.edit_text(f"❌ Error processing Excel file: {error_text}")
236
 
237
  except Exception as e:
238
- logging.error(f"Error handling Excel file: {e}")
239
- await update.message.reply_text(f"❌ Error processing Excel file: {str(e)}")
240
 
241
  def setup_handlers(self):
242
  """Set up Telegram command and message handlers."""
 
5
  from telegram.ext import Application, CommandHandler, MessageHandler, filters, CallbackContext
6
  import aiohttp
7
  import io
8
+ import asyncio
9
+ from concurrent.futures import ThreadPoolExecutor
10
 
11
  # Configure logging
12
  logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
 
51
  logging.info("Authenticating with API...")
52
  # self.authenticate()
53
 
54
+ # Create a ThreadPoolExecutor for handling concurrent requests
55
+ self.executor = ThreadPoolExecutor(max_workers=10)
56
+
57
  # def authenticate(self):
58
  # """Authenticate with the API and retrieve an access token."""
59
  # payload = {"username": self.username, "password": self.password}
 
138
  # return
139
 
140
  user_message = update.message.text
141
+ # Send immediate acknowledgment
142
+ processing_msg = await update.message.reply_text("Processing your request...")
143
+
144
+ try:
145
+ # Run the API request in a separate thread to avoid blocking
146
+ response = await asyncio.get_event_loop().run_in_executor(
147
+ self.executor,
148
+ self._make_api_request,
149
+ user_message
150
+ )
151
+
152
+ # Update the processing message with the response
153
+ await processing_msg.edit_text(response)
154
 
155
+ except Exception as e:
156
+ logging.error(f"Error processing request: {e}")
157
+ await processing_msg.edit_text(f"❌ Error: {str(e)}")
158
+
159
+ def _make_api_request(self, user_message):
160
+ """Make the actual API request in a separate thread"""
161
  hf_authorization = "Bearer " + HF_TOKEN
162
  headers = {
163
  "Authorization": hf_authorization,
164
  "accept": "application/json"
165
  }
166
 
 
167
  json_payload = {"question": user_message}
168
  form_payload = {"question": user_message}
169
 
170
  try:
171
+ response = requests.post(
172
+ self.ai_url,
173
+ headers={**headers, "Content-Type": "application/json"},
174
+ json=json_payload
175
+ )
176
 
177
  if response.status_code == 422:
178
+ response = requests.post(
179
+ self.ai_url,
180
+ headers={**headers, "Content-Type": "application/x-www-form-urlencoded"},
181
+ data=form_payload
182
+ )
183
 
184
  if response.status_code == 200:
185
+ return response.json().get("answer", "I didn't understand that.")
 
 
 
 
 
186
  else:
187
+ return f"Error: {response.status_code}"
 
188
 
189
  except Exception as e:
190
+ return f"Connection error: {e}"
 
 
 
191
 
192
  async def handle_excel(self, update: Update, context: CallbackContext):
193
  """Handles Excel file uploads."""
 
209
  await update.message.reply_text("Please send only Excel files (.xls or .xlsx)")
210
  return
211
 
212
+ # Send initial processing message
213
+ processing_msg = await update.message.reply_text("πŸ“Š Processing your Excel file... This may take a few minutes.")
214
+
215
  try:
216
+ # Run Excel processing in a separate thread
217
+ result = await asyncio.get_event_loop().run_in_executor(
218
+ self.executor,
219
+ self._process_excel,
220
+ update, context, document
221
+ )
222
+
223
+ if isinstance(result, tuple):
224
+ file_content, error = result
225
+ if error:
226
+ await processing_msg.edit_text(f"❌ Error: {error}")
227
+ else:
228
+ await context.bot.send_document(
229
+ chat_id=update.effective_chat.id,
230
+ document=io.BytesIO(file_content),
231
+ filename='rfp_responses.xlsx',
232
+ caption="βœ… Here's your processed Excel file with answers!"
233
+ )
234
+ await processing_msg.delete()
235
+ except Exception as e:
236
+ logging.error(f"Error handling Excel file: {e}")
237
+ await processing_msg.edit_text(f"❌ Error processing Excel file: {str(e)}")
238
 
239
+ async def _process_excel(self, update, context, document):
240
+ """Process Excel file in a separate thread"""
241
+ try:
242
  file = await context.bot.get_file(document.file_id)
243
  file_bytes = await file.download_as_bytearray()
244
 
 
248
  }
249
 
250
  async with aiohttp.ClientSession() as session:
 
251
  form_data = aiohttp.FormData()
252
  form_data.add_field('file',
253
+ file_bytes,
254
+ filename=document.file_name,
255
+ content_type='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet')
256
 
 
257
  async with session.post(self.excel_url, headers=headers, data=form_data) as response:
258
  if response.status == 200:
259
+ return await response.read(), None
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
260
  else:
261
  error_text = await response.text()
262
+ return None, error_text
263
 
264
  except Exception as e:
265
+ return None, str(e)
 
266
 
267
  def setup_handlers(self):
268
  """Set up Telegram command and message handlers."""