baha-99 commited on
Commit
ff7576e
Β·
1 Parent(s): ec49547

reverted bcs of broken code

Browse files
Files changed (1) hide show
  1. bot_telegram.py +95 -242
bot_telegram.py CHANGED
@@ -5,9 +5,6 @@ from telegram import Update
5
  from telegram.ext import Application, CommandHandler, MessageHandler, filters, CallbackContext
6
  import aiohttp
7
  import io
8
- import asyncio
9
- from concurrent.futures import ThreadPoolExecutor
10
- import time
11
 
12
  # Configure logging
13
  logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
@@ -27,7 +24,6 @@ SECRET_PASSWORD = BOT_SECRET_PASSWORD
27
  AUTHENTICATED_USERS = set()
28
  AWAITING_PASSWORD = set()
29
 
30
- logging.info("Bot starting... Version 1.0.1") # Add version number
31
 
32
  class TelegramBot:
33
  """A Telegram bot with password-based authentication."""
@@ -45,15 +41,6 @@ class TelegramBot:
45
  self.ai_url = f"{self.base_url}/api/v1/questions/text"
46
  self.excel_url = f"{self.base_url}/api/v1/questions/excel"
47
 
48
- # Executors (keep only one instance of each)
49
- self.text_executor = ThreadPoolExecutor(max_workers=25, thread_name_prefix="text_worker")
50
- self.excel_executor = ThreadPoolExecutor(max_workers=20, thread_name_prefix="excel_worker")
51
- self.excel_semaphore = asyncio.Semaphore(10)
52
-
53
- # Track active processes
54
- self.active_requests = {}
55
- self.active_excel_files = {}
56
-
57
  # Start Telegram Bot
58
  self.app = Application.builder().token(self.bot_token).build()
59
  self.setup_handlers()
@@ -62,14 +49,6 @@ class TelegramBot:
62
  logging.info("Authenticating with API...")
63
  # self.authenticate()
64
 
65
- # Create a ThreadPoolExecutor for handling concurrent requests
66
- self.executor = ThreadPoolExecutor(max_workers=10)
67
-
68
- # Increase Excel workers for more concurrent processing
69
- self.excel_executor = ThreadPoolExecutor(max_workers=10, thread_name_prefix="excel_worker")
70
- self.excel_semaphore = asyncio.Semaphore(10) # Allow 10 concurrent Excel processes
71
- self.active_excel_files = {}
72
-
73
  # def authenticate(self):
74
  # """Authenticate with the API and retrieve an access token."""
75
  # payload = {"username": self.username, "password": self.password}
@@ -97,18 +76,16 @@ class TelegramBot:
97
  "You can:\n"
98
  "1. Send me any question as text\n"
99
  "2. Send me an Excel file with questions (must have a 'question' column in 'rfp' sheet)\n\n"
100
- "Note: Excel files must contain no more than 50 questions.\n\n"
101
- "type '/status' to check the status of the request"
102
  )
103
  else:
104
  AWAITING_PASSWORD.add(user_id)
105
  await update.message.reply_text("πŸ”‘ Please enter the secret password to access the bot.")
106
 
107
  async def handle_message(self, update: Update, context: CallbackContext):
108
- """Handles all incoming messages concurrently"""
109
  user_id = update.message.from_user.id
110
  user_message = update.message.text.strip()
111
- message_id = update.message.message_id
112
 
113
  # If user is waiting to enter a password, validate it
114
  if user_id in AWAITING_PASSWORD:
@@ -117,10 +94,9 @@ class TelegramBot:
117
 
118
  # If user is authenticated, process AI request
119
  if user_id in AUTHENTICATED_USERS:
120
- # Create task for processing
121
- asyncio.create_task(self.chat_with_ai(update, context))
122
  else:
123
- await update.message.reply_text("❌ You are not authenticated. Please type /start to authenticate and then enter the password.")
124
 
125
  async def check_password(self, update: Update, context: CallbackContext):
126
  """Checks if the password is correct and authenticates the user."""
@@ -142,254 +118,131 @@ class TelegramBot:
142
  await update.message.reply_text("❌ Wrong password. Try again.")
143
 
144
  async def chat_with_ai(self, update: Update, context: CallbackContext):
145
- """Process text messages asynchronously"""
146
- message_id = update.message.message_id
147
- user_message = update.message.text
148
-
149
- try:
150
- # Send immediate acknowledgment
151
- processing_msg = await update.message.reply_text(
152
- f"πŸ€” Processing your request...\n"
153
- f"Request ID: #{message_id}"
154
- )
155
 
156
- # Track this request
157
- self.active_requests[message_id] = {
158
- 'type': 'text',
159
- 'status': 'processing',
160
- 'start_time': time.time()
161
- }
162
 
163
- # Process in thread pool
164
- response = await asyncio.get_event_loop().run_in_executor(
165
- self.text_executor,
166
- self._make_api_request,
167
- user_message
168
- )
169
 
170
- # Update with response
171
- await processing_msg.edit_text(
172
- f"βœ… Response for #{message_id}:\n{response}"
173
- )
174
 
175
- except Exception as e:
176
- logging.error(f"Error processing text request: {e}")
177
- await processing_msg.edit_text(
178
- f"❌ Error processing request #{message_id}: {str(e)}"
179
- )
180
- finally:
181
- if message_id in self.active_requests:
182
- self.active_requests[message_id]['status'] = 'completed'
183
- self.active_requests[message_id]['end_time'] = time.time()
184
 
185
- def _make_api_request(self, user_message):
186
- """Make API request"""
187
- try:
188
- headers = {
189
- "Authorization": f"Bearer {HF_TOKEN}",
190
- "accept": "application/json"
191
- }
192
 
193
- json_payload = {"question": user_message}
194
- form_payload = {"question": user_message}
195
 
196
- response = requests.post(
197
- self.ai_url,
198
- headers={**headers, "Content-Type": "application/json"},
199
- json=json_payload
200
- )
201
 
202
  if response.status_code == 422:
203
- response = requests.post(
204
- self.ai_url,
205
- headers={**headers, "Content-Type": "application/x-www-form-urlencoded"},
206
- data=form_payload
207
- )
208
 
209
  if response.status_code == 200:
210
- return response.json().get("answer", "I didn't understand that.")
 
 
 
 
 
211
  else:
212
- return f"Error: {response.status_code}"
 
213
 
214
  except Exception as e:
215
- return f"Connection error: {e}"
 
216
 
217
- async def handle_excel(self, update: Update, context: CallbackContext):
218
- """Handle Excel files concurrently"""
219
- message_id = update.message.message_id
220
-
221
- # Create task for processing
222
- asyncio.create_task(self._process_excel_file(update, context, message_id))
223
-
224
- async def _process_excel_file(self, update, context, message_id):
225
- """Process Excel file with progress updates"""
226
- try:
227
- document = update.message.document
228
-
229
- # Send initial processing message
230
- processing_msg = await update.message.reply_text(
231
- f"πŸ“Š Starting Excel processing...\n"
232
- f"File: {document.file_name}\n"
233
- f"Request ID: #{message_id}"
234
- )
235
 
236
- # Step 1: Download file - ADD THIS PART
237
- try:
238
- file = await context.bot.get_file(document.file_id)
239
- file_bytes = await file.download_as_bytearray() # Define file_bytes here
240
- except Exception as e:
241
- logging.error(f"Error downloading file: {e}")
242
- await processing_msg.edit_text(
243
- f"❌ Error downloading file: {document.file_name}\n"
244
- f"Please try again or contact support."
245
- )
246
- return
247
 
248
- # Step 2: Update status to processing
249
- self.active_excel_files[message_id] = {
250
- 'filename': document.file_name,
251
- 'status': 'processing',
252
- 'start_time': time.time()
253
- }
254
- await processing_msg.edit_text(
255
- f"βš™οΈ Processing Excel file...\n"
256
- f"File: {document.file_name}\n"
257
- f"Request ID: #{message_id}"
258
- )
259
 
260
- # Step 3: Process in thread pool
261
- result = await asyncio.get_event_loop().run_in_executor(
262
- self.excel_executor,
263
- self._process_excel_sync,
264
- file_bytes, # Now file_bytes is defined
265
- document.file_name
266
- )
267
 
268
- if result is None:
269
- await processing_msg.edit_text(
270
- f"❌ Failed to process file\n"
271
- f"File: {document.file_name}\n"
272
- f"Please check the file format and try again."
273
- )
274
- return
275
 
276
- # Send processed file
277
- await context.bot.send_document(
278
- chat_id=update.effective_chat.id,
279
- document=io.BytesIO(result),
280
- filename=f'processed_{document.file_name}',
281
- caption=f"βœ… Excel processing completed!\nRequest ID: #{message_id}"
282
- )
283
- await processing_msg.delete()
284
 
285
- except Exception as e:
286
- logging.error(f"Error processing file: {e}")
287
- await processing_msg.edit_text(
288
- f"❌ Error processing file\n"
289
- f"File: {document.file_name}\n"
290
- f"Error: {str(e)}"
291
- )
292
- finally:
293
- if message_id in self.active_excel_files:
294
- self.active_excel_files[message_id]['status'] = 'completed'
295
- self.active_excel_files[message_id]['end_time'] = time.time()
296
 
297
- def _process_excel_sync(self, file_bytes, filename):
298
- """Synchronous function to process Excel file"""
299
- try:
300
  headers = {
301
  "Authorization": f"Bearer {HF_TOKEN}",
302
  "accept": "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet"
303
  }
304
 
305
- files = {
306
- 'file': (
307
- filename,
308
- file_bytes,
309
- 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet'
310
- )
311
- }
312
-
313
- response = requests.post(
314
- self.excel_url,
315
- headers=headers,
316
- files=files
317
- )
318
-
319
- if response.status_code == 200:
320
- return response.content
321
- else:
322
- logging.error(f"Excel API Error: {response.status_code} - {response.text}")
323
- return None
324
-
325
- except Exception as e:
326
- logging.error(f"Excel processing error: {e}")
327
- return None
 
 
 
 
 
 
 
328
 
329
- async def _update_progress(self, message, message_id, filename):
330
- """Update progress message periodically"""
331
- try:
332
- while message_id in self.active_excel_files:
333
- elapsed_time = time.time() - self.active_excel_files[message_id]['start_time']
334
- hours = int(elapsed_time // 3600)
335
- minutes = int((elapsed_time % 3600) // 60)
336
-
337
- await message.edit_text(
338
- f"βš™οΈ Processing Excel file...\n"
339
- f"File: {filename}\n"
340
- f"Request ID: #{message_id}\n"
341
- f"Time elapsed: {hours}h {minutes}m\n"
342
- f"Status: {self.active_excel_files[message_id]['status']}"
343
- )
344
-
345
- # Update every 5 minutes
346
- await asyncio.sleep(300)
347
  except Exception as e:
348
- logging.error(f"Error updating progress: {e}")
349
-
350
- async def status_command(self, update: Update, context: CallbackContext):
351
- """Show status of all active processes"""
352
- status_message = "Current Status:\n\n"
353
-
354
- # Text requests status
355
- if self.active_requests:
356
- status_message += "πŸ“ Text Requests:\n"
357
- for msg_id, info in self.active_requests.items():
358
- current_time = time.time()
359
- processing_time = current_time - info['start_time']
360
- status_message += (
361
- f"Request #{msg_id}:\n"
362
- f"β”œβ”€ Status: {info['status']}\n"
363
- f"└─ Time: {processing_time:.1f}s\n\n"
364
- )
365
-
366
- # Excel files status
367
- if self.active_excel_files:
368
- status_message += "πŸ“Š Excel Files:\n"
369
- for msg_id, info in self.active_excel_files.items():
370
- current_time = time.time()
371
- processing_time = current_time - info['start_time']
372
- status_message += (
373
- f"File #{msg_id}:\n"
374
- f"β”œβ”€ Name: {info['filename']}\n"
375
- f"β”œβ”€ Status: {info['status']}\n"
376
- f"└─ Time: {processing_time:.1f}s\n\n"
377
- )
378
-
379
- if not self.active_requests and not self.active_excel_files:
380
- status_message += "No active processes"
381
-
382
- await update.message.reply_text(status_message)
383
 
384
  def setup_handlers(self):
385
  """Set up Telegram command and message handlers."""
386
  self.app.add_handler(CommandHandler("start", self.start_command))
387
- self.app.add_handler(CommandHandler("status", self.status_command))
388
  self.app.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, self.handle_message))
389
- self.app.add_handler(MessageHandler(
390
- filters.Document.FileExtension("xlsx") | filters.Document.FileExtension("xls"),
391
- self.handle_excel
392
- ))
393
 
394
  async def run(self):
395
  """Start the bot and listen for messages."""
 
5
  from telegram.ext import Application, CommandHandler, MessageHandler, filters, CallbackContext
6
  import aiohttp
7
  import io
 
 
 
8
 
9
  # Configure logging
10
  logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
 
24
  AUTHENTICATED_USERS = set()
25
  AWAITING_PASSWORD = set()
26
 
 
27
 
28
  class TelegramBot:
29
  """A Telegram bot with password-based authentication."""
 
41
  self.ai_url = f"{self.base_url}/api/v1/questions/text"
42
  self.excel_url = f"{self.base_url}/api/v1/questions/excel"
43
 
 
 
 
 
 
 
 
 
 
44
  # Start Telegram Bot
45
  self.app = Application.builder().token(self.bot_token).build()
46
  self.setup_handlers()
 
49
  logging.info("Authenticating with API...")
50
  # self.authenticate()
51
 
 
 
 
 
 
 
 
 
52
  # def authenticate(self):
53
  # """Authenticate with the API and retrieve an access token."""
54
  # payload = {"username": self.username, "password": self.password}
 
76
  "You can:\n"
77
  "1. Send me any question as text\n"
78
  "2. Send me an Excel file with questions (must have a 'question' column in 'rfp' sheet)\n\n"
79
+ "Note: Excel files must contain no more than 50 questions."
 
80
  )
81
  else:
82
  AWAITING_PASSWORD.add(user_id)
83
  await update.message.reply_text("πŸ”‘ Please enter the secret password to access the bot.")
84
 
85
  async def handle_message(self, update: Update, context: CallbackContext):
86
+ """Handles all incoming messages."""
87
  user_id = update.message.from_user.id
88
  user_message = update.message.text.strip()
 
89
 
90
  # If user is waiting to enter a password, validate it
91
  if user_id in AWAITING_PASSWORD:
 
94
 
95
  # If user is authenticated, process AI request
96
  if user_id in AUTHENTICATED_USERS:
97
+ await self.chat_with_ai(update, context)
 
98
  else:
99
+ await update.message.reply_text("❌ You are not authenticated. Please enter the password first.")
100
 
101
  async def check_password(self, update: Update, context: CallbackContext):
102
  """Checks if the password is correct and authenticates the user."""
 
118
  await update.message.reply_text("❌ Wrong password. Try again.")
119
 
120
  async def chat_with_ai(self, update: Update, context: CallbackContext):
121
+ """Handles messages and sends them to the AI API."""
122
+ user_id = update.message.from_user.id
 
 
 
 
 
 
 
 
123
 
124
+ if user_id not in AUTHENTICATED_USERS:
125
+ await update.message.reply_text("❌ You are not authenticated. Please enter the password first.")
126
+ return
 
 
 
127
 
128
+ # if not self.auth_token:
129
+ # self.authenticate()
130
+ #
131
+ # if not self.auth_token:
132
+ # await update.message.reply_text("Authentication failed. Please try again later.")
133
+ # return
134
 
135
+ user_message = update.message.text
 
 
 
136
 
137
+ hf_authorization = "Bearer " + HF_TOKEN
138
+ headers = {
139
+ "Authorization": hf_authorization,
140
+ "accept": "application/json"
141
+ }
 
 
 
 
142
 
 
 
 
 
 
 
 
143
 
144
+ json_payload = {"question": user_message}
145
+ form_payload = {"question": user_message}
146
 
147
+ try:
148
+ logging.info(f"Sending payload as JSON: {json_payload}")
149
+ response = requests.post(self.ai_url, headers={**headers, "Content-Type": "application/json"}, json=json_payload)
 
 
150
 
151
  if response.status_code == 422:
152
+ logging.warning("JSON format rejected. Retrying with form-data...")
153
+ response = requests.post(self.ai_url, headers={**headers, "Content-Type": "application/x-www-form-urlencoded"},
154
+ data=form_payload)
 
 
155
 
156
  if response.status_code == 200:
157
+ bot_reply = response.json().get("answer", "I didn't understand that.")
158
+ elif response.status_code == 401:
159
+ logging.warning("Authorization expired. Re-authenticating...")
160
+ # self.authenticate()
161
+ await self.chat_with_ai(update, context)
162
+ return
163
  else:
164
+ logging.error(f"Error: {response.status_code}")
165
+ bot_reply = f"Error: {response.status_code}"
166
 
167
  except Exception as e:
168
+ logging.error(f"Connection error: {e}")
169
+ bot_reply = f"Connection error: {e}"
170
 
171
+ await update.message.reply_text(bot_reply)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
172
 
173
+ async def handle_excel(self, update: Update, context: CallbackContext):
174
+ """Handles Excel file uploads."""
175
+ user_id = update.message.from_user.id
 
 
 
 
 
 
 
 
176
 
177
+ if user_id not in AUTHENTICATED_USERS:
178
+ await update.message.reply_text("❌ You are not authenticated. Please enter the password first.")
179
+ return
 
 
 
 
 
 
 
 
180
 
181
+ # if not self.auth_token:
182
+ # self.authenticate()
183
+ #
184
+ # if not self.auth_token:
185
+ # await update.message.reply_text("Authentication failed. Please try again later.")
186
+ # return
 
187
 
188
+ document = update.message.document
189
+ if not document.file_name.endswith(('.xls', '.xlsx')):
190
+ await update.message.reply_text("Please send only Excel files (.xls or .xlsx)")
191
+ return
 
 
 
192
 
193
+ try:
194
+ # Send initial processing message
195
+ processing_msg = await update.message.reply_text("πŸ“Š Processing your Excel file... This may take a few minutes.")
 
 
 
 
 
196
 
197
+ # Get file from Telegram
198
+ file = await context.bot.get_file(document.file_id)
199
+ file_bytes = await file.download_as_bytearray()
 
 
 
 
 
 
 
 
200
 
 
 
 
201
  headers = {
202
  "Authorization": f"Bearer {HF_TOKEN}",
203
  "accept": "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet"
204
  }
205
 
206
+ async with aiohttp.ClientSession() as session:
207
+ # Create form data with the file
208
+ form_data = aiohttp.FormData()
209
+ form_data.add_field('file',
210
+ file_bytes,
211
+ filename=document.file_name,
212
+ content_type='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet')
213
+
214
+ # Send file to API
215
+ async with session.post(self.excel_url, headers=headers, data=form_data) as response:
216
+ if response.status == 200:
217
+ # Get the Excel file content directly
218
+ file_content = await response.read()
219
+
220
+ # Send the Excel file back to Telegram
221
+ await context.bot.send_document(
222
+ chat_id=update.effective_chat.id,
223
+ document=io.BytesIO(file_content),
224
+ filename='rfp_responses.xlsx',
225
+ caption="βœ… Here's your processed Excel file with answers!"
226
+ )
227
+ await processing_msg.delete()
228
+
229
+ elif response.status == 401:
230
+ logging.warning("Authorization expired. Re-authenticating...")
231
+ # self.authenticate()
232
+ await self.handle_excel(update, context)
233
+ else:
234
+ error_text = await response.text()
235
+ await processing_msg.edit_text(f"❌ Error processing Excel file: {error_text}")
236
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
237
  except Exception as e:
238
+ logging.error(f"Error handling Excel file: {e}")
239
+ await update.message.reply_text(f"❌ Error processing Excel file: {str(e)}")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
240
 
241
  def setup_handlers(self):
242
  """Set up Telegram command and message handlers."""
243
  self.app.add_handler(CommandHandler("start", self.start_command))
 
244
  self.app.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, self.handle_message))
245
+ self.app.add_handler(MessageHandler(filters.Document.FileExtension("xlsx") | filters.Document.FileExtension("xls"), self.handle_excel))
 
 
 
246
 
247
  async def run(self):
248
  """Start the bot and listen for messages."""