baha-99 commited on
Commit
c7e129a
Β·
1 Parent(s): ff7576e

req.txt reverrted

Browse files
Files changed (2) hide show
  1. bot_telegram.py +242 -95
  2. requirements.txt +5 -5
bot_telegram.py CHANGED
@@ -5,6 +5,9 @@ from telegram import Update
5
  from telegram.ext import Application, CommandHandler, MessageHandler, filters, CallbackContext
6
  import aiohttp
7
  import io
 
 
 
8
 
9
  # Configure logging
10
  logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
@@ -24,6 +27,7 @@ SECRET_PASSWORD = BOT_SECRET_PASSWORD
24
  AUTHENTICATED_USERS = set()
25
  AWAITING_PASSWORD = set()
26
 
 
27
 
28
  class TelegramBot:
29
  """A Telegram bot with password-based authentication."""
@@ -41,6 +45,15 @@ class TelegramBot:
41
  self.ai_url = f"{self.base_url}/api/v1/questions/text"
42
  self.excel_url = f"{self.base_url}/api/v1/questions/excel"
43
 
 
 
 
 
 
 
 
 
 
44
  # Start Telegram Bot
45
  self.app = Application.builder().token(self.bot_token).build()
46
  self.setup_handlers()
@@ -49,6 +62,14 @@ class TelegramBot:
49
  logging.info("Authenticating with API...")
50
  # self.authenticate()
51
 
 
 
 
 
 
 
 
 
52
  # def authenticate(self):
53
  # """Authenticate with the API and retrieve an access token."""
54
  # payload = {"username": self.username, "password": self.password}
@@ -76,16 +97,18 @@ class TelegramBot:
76
  "You can:\n"
77
  "1. Send me any question as text\n"
78
  "2. Send me an Excel file with questions (must have a 'question' column in 'rfp' sheet)\n\n"
79
- "Note: Excel files must contain no more than 50 questions."
 
80
  )
81
  else:
82
  AWAITING_PASSWORD.add(user_id)
83
  await update.message.reply_text("πŸ”‘ Please enter the secret password to access the bot.")
84
 
85
  async def handle_message(self, update: Update, context: CallbackContext):
86
- """Handles all incoming messages."""
87
  user_id = update.message.from_user.id
88
  user_message = update.message.text.strip()
 
89
 
90
  # If user is waiting to enter a password, validate it
91
  if user_id in AWAITING_PASSWORD:
@@ -94,9 +117,10 @@ class TelegramBot:
94
 
95
  # If user is authenticated, process AI request
96
  if user_id in AUTHENTICATED_USERS:
97
- await self.chat_with_ai(update, context)
 
98
  else:
99
- await update.message.reply_text("❌ You are not authenticated. Please enter the password first.")
100
 
101
  async def check_password(self, update: Update, context: CallbackContext):
102
  """Checks if the password is correct and authenticates the user."""
@@ -118,131 +142,254 @@ class TelegramBot:
118
  await update.message.reply_text("❌ Wrong password. Try again.")
119
 
120
  async def chat_with_ai(self, update: Update, context: CallbackContext):
121
- """Handles messages and sends them to the AI API."""
122
- user_id = update.message.from_user.id
123
-
124
- if user_id not in AUTHENTICATED_USERS:
125
- await update.message.reply_text("❌ You are not authenticated. Please enter the password first.")
126
- return
127
 
128
- # if not self.auth_token:
129
- # self.authenticate()
130
- #
131
- # if not self.auth_token:
132
- # await update.message.reply_text("Authentication failed. Please try again later.")
133
- # return
134
 
135
- user_message = update.message.text
 
 
 
 
 
136
 
137
- hf_authorization = "Bearer " + HF_TOKEN
138
- headers = {
139
- "Authorization": hf_authorization,
140
- "accept": "application/json"
141
- }
 
142
 
 
 
 
 
143
 
144
- json_payload = {"question": user_message}
145
- form_payload = {"question": user_message}
 
 
 
 
 
 
 
146
 
 
 
147
  try:
148
- logging.info(f"Sending payload as JSON: {json_payload}")
149
- response = requests.post(self.ai_url, headers={**headers, "Content-Type": "application/json"}, json=json_payload)
 
 
 
 
 
 
 
 
 
 
 
150
 
151
  if response.status_code == 422:
152
- logging.warning("JSON format rejected. Retrying with form-data...")
153
- response = requests.post(self.ai_url, headers={**headers, "Content-Type": "application/x-www-form-urlencoded"},
154
- data=form_payload)
 
 
155
 
156
  if response.status_code == 200:
157
- bot_reply = response.json().get("answer", "I didn't understand that.")
158
- elif response.status_code == 401:
159
- logging.warning("Authorization expired. Re-authenticating...")
160
- # self.authenticate()
161
- await self.chat_with_ai(update, context)
162
- return
163
  else:
164
- logging.error(f"Error: {response.status_code}")
165
- bot_reply = f"Error: {response.status_code}"
166
 
167
  except Exception as e:
168
- logging.error(f"Connection error: {e}")
169
- bot_reply = f"Connection error: {e}"
170
-
171
- await update.message.reply_text(bot_reply)
172
 
173
  async def handle_excel(self, update: Update, context: CallbackContext):
174
- """Handles Excel file uploads."""
175
- user_id = update.message.from_user.id
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
176
 
177
- if user_id not in AUTHENTICATED_USERS:
178
- await update.message.reply_text("❌ You are not authenticated. Please enter the password first.")
179
- return
 
 
 
 
 
 
 
 
180
 
181
- # if not self.auth_token:
182
- # self.authenticate()
183
- #
184
- # if not self.auth_token:
185
- # await update.message.reply_text("Authentication failed. Please try again later.")
186
- # return
 
 
 
 
 
187
 
188
- document = update.message.document
189
- if not document.file_name.endswith(('.xls', '.xlsx')):
190
- await update.message.reply_text("Please send only Excel files (.xls or .xlsx)")
191
- return
 
 
 
192
 
193
- try:
194
- # Send initial processing message
195
- processing_msg = await update.message.reply_text("πŸ“Š Processing your Excel file... This may take a few minutes.")
 
 
 
 
196
 
197
- # Get file from Telegram
198
- file = await context.bot.get_file(document.file_id)
199
- file_bytes = await file.download_as_bytearray()
 
 
 
 
 
200
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
201
  headers = {
202
  "Authorization": f"Bearer {HF_TOKEN}",
203
  "accept": "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet"
204
  }
205
 
206
- async with aiohttp.ClientSession() as session:
207
- # Create form data with the file
208
- form_data = aiohttp.FormData()
209
- form_data.add_field('file',
210
- file_bytes,
211
- filename=document.file_name,
212
- content_type='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet')
213
-
214
- # Send file to API
215
- async with session.post(self.excel_url, headers=headers, data=form_data) as response:
216
- if response.status == 200:
217
- # Get the Excel file content directly
218
- file_content = await response.read()
219
-
220
- # Send the Excel file back to Telegram
221
- await context.bot.send_document(
222
- chat_id=update.effective_chat.id,
223
- document=io.BytesIO(file_content),
224
- filename='rfp_responses.xlsx',
225
- caption="βœ… Here's your processed Excel file with answers!"
226
- )
227
- await processing_msg.delete()
228
-
229
- elif response.status == 401:
230
- logging.warning("Authorization expired. Re-authenticating...")
231
- # self.authenticate()
232
- await self.handle_excel(update, context)
233
- else:
234
- error_text = await response.text()
235
- await processing_msg.edit_text(f"❌ Error processing Excel file: {error_text}")
236
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
237
  except Exception as e:
238
- logging.error(f"Error handling Excel file: {e}")
239
- await update.message.reply_text(f"❌ Error processing Excel file: {str(e)}")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
240
 
241
  def setup_handlers(self):
242
  """Set up Telegram command and message handlers."""
243
  self.app.add_handler(CommandHandler("start", self.start_command))
 
244
  self.app.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, self.handle_message))
245
- self.app.add_handler(MessageHandler(filters.Document.FileExtension("xlsx") | filters.Document.FileExtension("xls"), self.handle_excel))
 
 
 
246
 
247
  async def run(self):
248
  """Start the bot and listen for messages."""
 
5
  from telegram.ext import Application, CommandHandler, MessageHandler, filters, CallbackContext
6
  import aiohttp
7
  import io
8
+ import asyncio
9
+ from concurrent.futures import ThreadPoolExecutor
10
+ import time
11
 
12
  # Configure logging
13
  logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
 
27
  AUTHENTICATED_USERS = set()
28
  AWAITING_PASSWORD = set()
29
 
30
+ logging.info("Bot starting... Version 1.0.1") # Add version number
31
 
32
  class TelegramBot:
33
  """A Telegram bot with password-based authentication."""
 
45
  self.ai_url = f"{self.base_url}/api/v1/questions/text"
46
  self.excel_url = f"{self.base_url}/api/v1/questions/excel"
47
 
48
+ # Executors (keep only one instance of each)
49
+ self.text_executor = ThreadPoolExecutor(max_workers=25, thread_name_prefix="text_worker")
50
+ self.excel_executor = ThreadPoolExecutor(max_workers=20, thread_name_prefix="excel_worker")
51
+ self.excel_semaphore = asyncio.Semaphore(10)
52
+
53
+ # Track active processes
54
+ self.active_requests = {}
55
+ self.active_excel_files = {}
56
+
57
  # Start Telegram Bot
58
  self.app = Application.builder().token(self.bot_token).build()
59
  self.setup_handlers()
 
62
  logging.info("Authenticating with API...")
63
  # self.authenticate()
64
 
65
+ # Create a ThreadPoolExecutor for handling concurrent requests
66
+ self.executor = ThreadPoolExecutor(max_workers=10)
67
+
68
+ # Increase Excel workers for more concurrent processing
69
+ self.excel_executor = ThreadPoolExecutor(max_workers=10, thread_name_prefix="excel_worker")
70
+ self.excel_semaphore = asyncio.Semaphore(10) # Allow 10 concurrent Excel processes
71
+ self.active_excel_files = {}
72
+
73
  # def authenticate(self):
74
  # """Authenticate with the API and retrieve an access token."""
75
  # payload = {"username": self.username, "password": self.password}
 
97
  "You can:\n"
98
  "1. Send me any question as text\n"
99
  "2. Send me an Excel file with questions (must have a 'question' column in 'rfp' sheet)\n\n"
100
+ "Note: Excel files must contain no more than 50 questions.\n\n"
101
+ "type '/status' to check the status of the request"
102
  )
103
  else:
104
  AWAITING_PASSWORD.add(user_id)
105
  await update.message.reply_text("πŸ”‘ Please enter the secret password to access the bot.")
106
 
107
  async def handle_message(self, update: Update, context: CallbackContext):
108
+ """Handles all incoming messages concurrently"""
109
  user_id = update.message.from_user.id
110
  user_message = update.message.text.strip()
111
+ message_id = update.message.message_id
112
 
113
  # If user is waiting to enter a password, validate it
114
  if user_id in AWAITING_PASSWORD:
 
117
 
118
  # If user is authenticated, process AI request
119
  if user_id in AUTHENTICATED_USERS:
120
+ # Create task for processing
121
+ asyncio.create_task(self.chat_with_ai(update, context))
122
  else:
123
+ await update.message.reply_text("❌ You are not authenticated. Please type /start to authenticate and then enter the password.")
124
 
125
  async def check_password(self, update: Update, context: CallbackContext):
126
  """Checks if the password is correct and authenticates the user."""
 
142
  await update.message.reply_text("❌ Wrong password. Try again.")
143
 
144
  async def chat_with_ai(self, update: Update, context: CallbackContext):
145
+ """Process text messages asynchronously"""
146
+ message_id = update.message.message_id
147
+ user_message = update.message.text
 
 
 
148
 
149
+ try:
150
+ # Send immediate acknowledgment
151
+ processing_msg = await update.message.reply_text(
152
+ f"πŸ€” Processing your request...\n"
153
+ f"Request ID: #{message_id}"
154
+ )
155
 
156
+ # Track this request
157
+ self.active_requests[message_id] = {
158
+ 'type': 'text',
159
+ 'status': 'processing',
160
+ 'start_time': time.time()
161
+ }
162
 
163
+ # Process in thread pool
164
+ response = await asyncio.get_event_loop().run_in_executor(
165
+ self.text_executor,
166
+ self._make_api_request,
167
+ user_message
168
+ )
169
 
170
+ # Update with response
171
+ await processing_msg.edit_text(
172
+ f"βœ… Response for #{message_id}:\n{response}"
173
+ )
174
 
175
+ except Exception as e:
176
+ logging.error(f"Error processing text request: {e}")
177
+ await processing_msg.edit_text(
178
+ f"❌ Error processing request #{message_id}: {str(e)}"
179
+ )
180
+ finally:
181
+ if message_id in self.active_requests:
182
+ self.active_requests[message_id]['status'] = 'completed'
183
+ self.active_requests[message_id]['end_time'] = time.time()
184
 
185
+ def _make_api_request(self, user_message):
186
+ """Make API request"""
187
  try:
188
+ headers = {
189
+ "Authorization": f"Bearer {HF_TOKEN}",
190
+ "accept": "application/json"
191
+ }
192
+
193
+ json_payload = {"question": user_message}
194
+ form_payload = {"question": user_message}
195
+
196
+ response = requests.post(
197
+ self.ai_url,
198
+ headers={**headers, "Content-Type": "application/json"},
199
+ json=json_payload
200
+ )
201
 
202
  if response.status_code == 422:
203
+ response = requests.post(
204
+ self.ai_url,
205
+ headers={**headers, "Content-Type": "application/x-www-form-urlencoded"},
206
+ data=form_payload
207
+ )
208
 
209
  if response.status_code == 200:
210
+ return response.json().get("answer", "I didn't understand that.")
 
 
 
 
 
211
  else:
212
+ return f"Error: {response.status_code}"
 
213
 
214
  except Exception as e:
215
+ return f"Connection error: {e}"
 
 
 
216
 
217
  async def handle_excel(self, update: Update, context: CallbackContext):
218
+ """Handle Excel files concurrently"""
219
+ message_id = update.message.message_id
220
+
221
+ # Create task for processing
222
+ asyncio.create_task(self._process_excel_file(update, context, message_id))
223
+
224
+ async def _process_excel_file(self, update, context, message_id):
225
+ """Process Excel file with progress updates"""
226
+ try:
227
+ document = update.message.document
228
+
229
+ # Send initial processing message
230
+ processing_msg = await update.message.reply_text(
231
+ f"πŸ“Š Starting Excel processing...\n"
232
+ f"File: {document.file_name}\n"
233
+ f"Request ID: #{message_id}"
234
+ )
235
 
236
+ # Step 1: Download file - ADD THIS PART
237
+ try:
238
+ file = await context.bot.get_file(document.file_id)
239
+ file_bytes = await file.download_as_bytearray() # Define file_bytes here
240
+ except Exception as e:
241
+ logging.error(f"Error downloading file: {e}")
242
+ await processing_msg.edit_text(
243
+ f"❌ Error downloading file: {document.file_name}\n"
244
+ f"Please try again or contact support."
245
+ )
246
+ return
247
 
248
+ # Step 2: Update status to processing
249
+ self.active_excel_files[message_id] = {
250
+ 'filename': document.file_name,
251
+ 'status': 'processing',
252
+ 'start_time': time.time()
253
+ }
254
+ await processing_msg.edit_text(
255
+ f"βš™οΈ Processing Excel file...\n"
256
+ f"File: {document.file_name}\n"
257
+ f"Request ID: #{message_id}"
258
+ )
259
 
260
+ # Step 3: Process in thread pool
261
+ result = await asyncio.get_event_loop().run_in_executor(
262
+ self.excel_executor,
263
+ self._process_excel_sync,
264
+ file_bytes, # Now file_bytes is defined
265
+ document.file_name
266
+ )
267
 
268
+ if result is None:
269
+ await processing_msg.edit_text(
270
+ f"❌ Failed to process file\n"
271
+ f"File: {document.file_name}\n"
272
+ f"Please check the file format and try again."
273
+ )
274
+ return
275
 
276
+ # Send processed file
277
+ await context.bot.send_document(
278
+ chat_id=update.effective_chat.id,
279
+ document=io.BytesIO(result),
280
+ filename=f'processed_{document.file_name}',
281
+ caption=f"βœ… Excel processing completed!\nRequest ID: #{message_id}"
282
+ )
283
+ await processing_msg.delete()
284
 
285
+ except Exception as e:
286
+ logging.error(f"Error processing file: {e}")
287
+ await processing_msg.edit_text(
288
+ f"❌ Error processing file\n"
289
+ f"File: {document.file_name}\n"
290
+ f"Error: {str(e)}"
291
+ )
292
+ finally:
293
+ if message_id in self.active_excel_files:
294
+ self.active_excel_files[message_id]['status'] = 'completed'
295
+ self.active_excel_files[message_id]['end_time'] = time.time()
296
+
297
+ def _process_excel_sync(self, file_bytes, filename):
298
+ """Synchronous function to process Excel file"""
299
+ try:
300
  headers = {
301
  "Authorization": f"Bearer {HF_TOKEN}",
302
  "accept": "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet"
303
  }
304
 
305
+ files = {
306
+ 'file': (
307
+ filename,
308
+ file_bytes,
309
+ 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet'
310
+ )
311
+ }
312
+
313
+ response = requests.post(
314
+ self.excel_url,
315
+ headers=headers,
316
+ files=files
317
+ )
318
+
319
+ if response.status_code == 200:
320
+ return response.content
321
+ else:
322
+ logging.error(f"Excel API Error: {response.status_code} - {response.text}")
323
+ return None
324
+
325
+ except Exception as e:
326
+ logging.error(f"Excel processing error: {e}")
327
+ return None
 
 
 
 
 
 
 
328
 
329
+ async def _update_progress(self, message, message_id, filename):
330
+ """Update progress message periodically"""
331
+ try:
332
+ while message_id in self.active_excel_files:
333
+ elapsed_time = time.time() - self.active_excel_files[message_id]['start_time']
334
+ hours = int(elapsed_time // 3600)
335
+ minutes = int((elapsed_time % 3600) // 60)
336
+
337
+ await message.edit_text(
338
+ f"βš™οΈ Processing Excel file...\n"
339
+ f"File: {filename}\n"
340
+ f"Request ID: #{message_id}\n"
341
+ f"Time elapsed: {hours}h {minutes}m\n"
342
+ f"Status: {self.active_excel_files[message_id]['status']}"
343
+ )
344
+
345
+ # Update every 5 minutes
346
+ await asyncio.sleep(300)
347
  except Exception as e:
348
+ logging.error(f"Error updating progress: {e}")
349
+
350
+ async def status_command(self, update: Update, context: CallbackContext):
351
+ """Show status of all active processes"""
352
+ status_message = "Current Status:\n\n"
353
+
354
+ # Text requests status
355
+ if self.active_requests:
356
+ status_message += "πŸ“ Text Requests:\n"
357
+ for msg_id, info in self.active_requests.items():
358
+ current_time = time.time()
359
+ processing_time = current_time - info['start_time']
360
+ status_message += (
361
+ f"Request #{msg_id}:\n"
362
+ f"β”œβ”€ Status: {info['status']}\n"
363
+ f"└─ Time: {processing_time:.1f}s\n\n"
364
+ )
365
+
366
+ # Excel files status
367
+ if self.active_excel_files:
368
+ status_message += "πŸ“Š Excel Files:\n"
369
+ for msg_id, info in self.active_excel_files.items():
370
+ current_time = time.time()
371
+ processing_time = current_time - info['start_time']
372
+ status_message += (
373
+ f"File #{msg_id}:\n"
374
+ f"β”œβ”€ Name: {info['filename']}\n"
375
+ f"β”œβ”€ Status: {info['status']}\n"
376
+ f"└─ Time: {processing_time:.1f}s\n\n"
377
+ )
378
+
379
+ if not self.active_requests and not self.active_excel_files:
380
+ status_message += "No active processes"
381
+
382
+ await update.message.reply_text(status_message)
383
 
384
  def setup_handlers(self):
385
  """Set up Telegram command and message handlers."""
386
  self.app.add_handler(CommandHandler("start", self.start_command))
387
+ self.app.add_handler(CommandHandler("status", self.status_command))
388
  self.app.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, self.handle_message))
389
+ self.app.add_handler(MessageHandler(
390
+ filters.Document.FileExtension("xlsx") | filters.Document.FileExtension("xls"),
391
+ self.handle_excel
392
+ ))
393
 
394
  async def run(self):
395
  """Start the bot and listen for messages."""
requirements.txt CHANGED
@@ -1,5 +1,5 @@
1
- python-telegram-bot
2
- requests
3
- aiohttp
4
- fastapi
5
- uvicorn[standard]
 
1
+ python-telegram-bot==20.7
2
+ requests==2.31.0
3
+ aiohttp==3.9.1
4
+ fastapi==0.105.0
5
+ uvicorn[standard]==0.24.0.post1