import os import logging import requests from telegram import Update from telegram.ext import Application, CommandHandler, MessageHandler, filters, CallbackContext import aiohttp import io import asyncio from concurrent.futures import ThreadPoolExecutor import time # Configure logging logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") # Load environment variables from Hugging Face Secrets BOT_SECRET_PASSWORD = os.getenv("BOT_SECRET_PASSWORD") BOT_TOKEN = os.getenv("BOT_TOKEN") BASE_URL = os.getenv("BASE_URL") HF_TOKEN = os.getenv("HF_TOKEN") # API_USERNAME = os.getenv("API_USERNAME") # API_PASSWORD = os.getenv("API_PASSWORD") # Set the secret password for authentication SECRET_PASSWORD = BOT_SECRET_PASSWORD # Dictionary to store authenticated users AUTHENTICATED_USERS = set() AWAITING_PASSWORD = set() logging.info("Bot starting... Version 1.0.1") # Add version number class TelegramBot: """A Telegram bot with password-based authentication.""" def __init__(self, bot_token, base_url): """Initialize the bot with Telegram API token, API credentials, and authentication.""" self.bot_token = bot_token self.base_url = base_url # self.username = username # self.password = password # self.auth_token = None # API Endpoints self.login_url = f"{self.base_url}/api/v1/auth/login" self.ai_url = f"{self.base_url}/api/v1/questions/text" self.excel_url = f"{self.base_url}/api/v1/questions/excel" # Executors (keep only one instance of each) self.text_executor = ThreadPoolExecutor(max_workers=5, thread_name_prefix="text_worker") self.excel_executor = ThreadPoolExecutor(max_workers=5, thread_name_prefix="excel_worker") self.excel_semaphore = asyncio.Semaphore(10) # Track active processes self.active_requests = {} self.active_excel_files = {} # Start Telegram Bot self.app = Application.builder().token(self.bot_token).build() self.setup_handlers() # Authenticate with API logging.info("Authenticating with API...") # self.authenticate() # Create a ThreadPoolExecutor for handling concurrent requests self.executor = ThreadPoolExecutor(max_workers=10) # Increase Excel workers for more concurrent processing self.excel_executor = ThreadPoolExecutor(max_workers=10, thread_name_prefix="excel_worker") self.excel_semaphore = asyncio.Semaphore(10) # Allow 10 concurrent Excel processes self.active_excel_files = {} # def authenticate(self): # """Authenticate with the API and retrieve an access token.""" # payload = {"username": self.username, "password": self.password} # headers = {"Content-Type": "application/json", "accept": "application/json"} # # try: # response = requests.post(self.login_url, headers=headers, json=payload) # # if response.status_code == 200: # self.auth_token = response.json().get("access_token") # logging.info("Successfully authenticated with API") # else: # logging.error(f"Authentication failed: {response.status_code} - {response.text}") # # except Exception as e: # logging.error(f"Authentication Error: {e}") async def start_command(self, update: Update, context: CallbackContext): """Handles the /start command and asks for a password if the user is not authenticated.""" user_id = update.message.from_user.id if user_id in AUTHENTICATED_USERS: await update.message.reply_text( "✅ You are already authenticated!\n\n" "You can:\n" "1. Send me any question as text\n" "2. Send me an Excel file with questions (must have a 'question' column in 'rfp' sheet)\n\n" "Note: Excel files must contain no more than 50 questions.\n\n" "type '/status' to check the status of the request" ) else: AWAITING_PASSWORD.add(user_id) await update.message.reply_text("🔑 Please enter the secret password to access the bot.") async def handle_message(self, update: Update, context: CallbackContext): """Handles all incoming messages concurrently""" user_id = update.message.from_user.id user_message = update.message.text.strip() message_id = update.message.message_id # If user is waiting to enter a password, validate it if user_id in AWAITING_PASSWORD: await self.check_password(update, context) return # If user is authenticated, process AI request if user_id in AUTHENTICATED_USERS: # Create task for processing asyncio.create_task(self.chat_with_ai(update, context)) else: await update.message.reply_text("❌ You are not authenticated. Please type /start to authenticate and then enter the password.") async def check_password(self, update: Update, context: CallbackContext): """Checks if the password is correct and authenticates the user.""" user_id = update.message.from_user.id user_message = update.message.text.strip() if user_message == SECRET_PASSWORD: AUTHENTICATED_USERS.add(user_id) AWAITING_PASSWORD.discard(user_id) logging.info(f"User {user_id} authenticated successfully.") await update.message.reply_text( "✅ Authentication successful!\n\n" "You can:\n" "1. Send me any question as text\n" "2. Send me an Excel file with questions (must have a 'question' column in 'rfp' sheet)\n\n" "Note: Excel files must contain no more than 50 questions." ) else: await update.message.reply_text("❌ Wrong password. Try again.") async def chat_with_ai(self, update: Update, context: CallbackContext): """Process text messages asynchronously""" message_id = update.message.message_id user_message = update.message.text try: # Send immediate acknowledgment processing_msg = await update.message.reply_text( f"🤔 Processing your request...\n" f"Request ID: #{message_id}" ) # Track this request self.active_requests[message_id] = { 'type': 'text', 'status': 'processing', 'start_time': time.time() } # Process in thread pool response = await asyncio.get_event_loop().run_in_executor( self.text_executor, self._make_api_request, user_message ) # Update with response await processing_msg.edit_text( f"✅ Response for #{message_id}:\n{response}" ) except Exception as e: logging.error(f"Error processing text request: {e}") await processing_msg.edit_text( f"❌ Error processing request #{message_id}: {str(e)}" ) finally: if message_id in self.active_requests: self.active_requests[message_id]['status'] = 'completed' self.active_requests[message_id]['end_time'] = time.time() def _make_api_request(self, user_message): """Make API request""" try: headers = { "Authorization": f"Bearer {HF_TOKEN}", "accept": "application/json" } json_payload = {"question": user_message} form_payload = {"question": user_message} response = requests.post( self.ai_url, headers={**headers, "Content-Type": "application/json"}, json=json_payload ) if response.status_code == 422: response = requests.post( self.ai_url, headers={**headers, "Content-Type": "application/x-www-form-urlencoded"}, data=form_payload ) if response.status_code == 200: return response.json().get("answer", "I didn't understand that.") else: return f"Error: {response.status_code}" except Exception as e: return f"Connection error: {e}" async def handle_excel(self, update: Update, context: CallbackContext): """Handle Excel files concurrently""" message_id = update.message.message_id # Create task for processing asyncio.create_task(self._process_excel_file(update, context, message_id)) async def _process_excel_file(self, update, context, message_id): """Process Excel file with progress updates""" try: document = update.message.document # Send initial processing message processing_msg = await update.message.reply_text( f"📊 Starting Excel processing...\n" f"File: {document.file_name}\n" f"Request ID: #{message_id}" ) # Step 1: Download file - ADD THIS PART try: file = await context.bot.get_file(document.file_id) file_bytes = await file.download_as_bytearray() # Define file_bytes here except Exception as e: logging.error(f"Error downloading file: {e}") await processing_msg.edit_text( f"❌ Error downloading file: {document.file_name}\n" f"Please try again or contact support." ) return # Step 2: Update status to processing self.active_excel_files[message_id] = { 'filename': document.file_name, 'status': 'processing', 'start_time': time.time() } await processing_msg.edit_text( f"⚙️ Processing Excel file...\n" f"File: {document.file_name}\n" f"Request ID: #{message_id}" ) # Step 3: Process in thread pool result = await asyncio.get_event_loop().run_in_executor( self.excel_executor, self._process_excel_sync, file_bytes, # Now file_bytes is defined document.file_name ) if result is None: await processing_msg.edit_text( f"❌ Failed to process file\n" f"File: {document.file_name}\n" f"Please check the file format and try again." ) return # Send processed file await context.bot.send_document( chat_id=update.effective_chat.id, document=io.BytesIO(result), filename=f'processed_{document.file_name}', caption=f"✅ Excel processing completed!\nRequest ID: #{message_id}" ) await processing_msg.delete() except Exception as e: logging.error(f"Error processing file: {e}") await processing_msg.edit_text( f"❌ Error processing file\n" f"File: {document.file_name}\n" f"Error: {str(e)}" ) finally: if message_id in self.active_excel_files: self.active_excel_files[message_id]['status'] = 'completed' self.active_excel_files[message_id]['end_time'] = time.time() def _process_excel_sync(self, file_bytes, filename): """Synchronous function to process Excel file""" try: headers = { "Authorization": f"Bearer {HF_TOKEN}", "accept": "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet" } files = { 'file': ( filename, file_bytes, 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet' ) } response = requests.post( self.excel_url, headers=headers, files=files ) if response.status_code == 200: return response.content else: logging.error(f"Excel API Error: {response.status_code} - {response.text}") return None except Exception as e: logging.error(f"Excel processing error: {e}") return None async def _update_progress(self, message, message_id, filename): """Update progress message periodically""" try: while message_id in self.active_excel_files: elapsed_time = time.time() - self.active_excel_files[message_id]['start_time'] hours = int(elapsed_time // 3600) minutes = int((elapsed_time % 3600) // 60) await message.edit_text( f"⚙️ Processing Excel file...\n" f"File: {filename}\n" f"Request ID: #{message_id}\n" f"Time elapsed: {hours}h {minutes}m\n" f"Status: {self.active_excel_files[message_id]['status']}" ) # Update every 5 minutes await asyncio.sleep(300) except Exception as e: logging.error(f"Error updating progress: {e}") async def status_command(self, update: Update, context: CallbackContext): """Show status of all active processes""" status_message = "Current Status:\n\n" # Text requests status if self.active_requests: status_message += "📝 Text Requests:\n" for msg_id, info in self.active_requests.items(): current_time = time.time() processing_time = current_time - info['start_time'] status_message += ( f"Request #{msg_id}:\n" f"├─ Status: {info['status']}\n" f"└─ Time: {processing_time:.1f}s\n\n" ) # Excel files status if self.active_excel_files: status_message += "📊 Excel Files:\n" for msg_id, info in self.active_excel_files.items(): current_time = time.time() processing_time = current_time - info['start_time'] status_message += ( f"File #{msg_id}:\n" f"├─ Name: {info['filename']}\n" f"├─ Status: {info['status']}\n" f"└─ Time: {processing_time:.1f}s\n\n" ) if not self.active_requests and not self.active_excel_files: status_message += "No active processes" await update.message.reply_text(status_message) def setup_handlers(self): """Set up Telegram command and message handlers.""" self.app.add_handler(CommandHandler("start", self.start_command)) self.app.add_handler(CommandHandler("status", self.status_command)) self.app.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, self.handle_message)) self.app.add_handler(MessageHandler( filters.Document.FileExtension("xlsx") | filters.Document.FileExtension("xls"), self.handle_excel )) async def run(self): """Start the bot and listen for messages.""" logging.info("Starting Telegram bot...") await self.app.initialize() await self.app.start() await self.app.updater.start_polling() async def bot_stop(self): """Stop the bot.""" logging.info("Stopping Telegram bot...") if self.app.updater: await self.app.updater.stop() await self.app.stop() await self.app.shutdown() def init_bot(): return TelegramBot(bot_token=BOT_TOKEN,base_url=BASE_URL)