|
|
import os |
|
|
import logging |
|
|
import requests |
|
|
from telegram import Update |
|
|
from telegram.ext import Application, CommandHandler, MessageHandler, filters, CallbackContext |
|
|
import aiohttp |
|
|
import io |
|
|
import asyncio |
|
|
from concurrent.futures import ThreadPoolExecutor |
|
|
import time |
|
|
|
|
|
|
|
|
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") |
|
|
|
|
|
|
|
|
BOT_SECRET_PASSWORD = os.getenv("BOT_SECRET_PASSWORD") |
|
|
BOT_TOKEN = os.getenv("BOT_TOKEN") |
|
|
BASE_URL = os.getenv("BASE_URL") |
|
|
HF_TOKEN = os.getenv("HF_TOKEN") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
SECRET_PASSWORD = BOT_SECRET_PASSWORD |
|
|
|
|
|
|
|
|
AUTHENTICATED_USERS = set() |
|
|
AWAITING_PASSWORD = set() |
|
|
|
|
|
logging.info("Bot starting... Version 1.0.1") |
|
|
|
|
|
class TelegramBot: |
|
|
"""A Telegram bot with password-based authentication.""" |
|
|
|
|
|
def __init__(self, bot_token, base_url): |
|
|
"""Initialize the bot with Telegram API token, API credentials, and authentication.""" |
|
|
self.bot_token = bot_token |
|
|
self.base_url = base_url |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
self.login_url = f"{self.base_url}/api/v1/auth/login" |
|
|
self.ai_url = f"{self.base_url}/api/v1/questions/text" |
|
|
self.excel_url = f"{self.base_url}/api/v1/questions/excel" |
|
|
|
|
|
|
|
|
self.text_executor = ThreadPoolExecutor(max_workers=5, thread_name_prefix="text_worker") |
|
|
self.excel_executor = ThreadPoolExecutor(max_workers=5, thread_name_prefix="excel_worker") |
|
|
self.excel_semaphore = asyncio.Semaphore(10) |
|
|
|
|
|
|
|
|
self.active_requests = {} |
|
|
self.active_excel_files = {} |
|
|
|
|
|
|
|
|
self.app = Application.builder().token(self.bot_token).build() |
|
|
self.setup_handlers() |
|
|
|
|
|
|
|
|
logging.info("Authenticating with API...") |
|
|
|
|
|
|
|
|
|
|
|
self.executor = ThreadPoolExecutor(max_workers=10) |
|
|
|
|
|
|
|
|
self.excel_executor = ThreadPoolExecutor(max_workers=10, thread_name_prefix="excel_worker") |
|
|
self.excel_semaphore = asyncio.Semaphore(10) |
|
|
self.active_excel_files = {} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async def start_command(self, update: Update, context: CallbackContext): |
|
|
"""Handles the /start command and asks for a password if the user is not authenticated.""" |
|
|
user_id = update.message.from_user.id |
|
|
|
|
|
if user_id in AUTHENTICATED_USERS: |
|
|
await update.message.reply_text( |
|
|
"β
You are already authenticated!\n\n" |
|
|
"You can:\n" |
|
|
"1. Send me any question as text\n" |
|
|
"2. Send me an Excel file with questions (must have a 'question' column in 'rfp' sheet)\n\n" |
|
|
"Note: Excel files must contain no more than 50 questions.\n\n" |
|
|
"type '/status' to check the status of the request" |
|
|
) |
|
|
else: |
|
|
AWAITING_PASSWORD.add(user_id) |
|
|
await update.message.reply_text("π Please enter the secret password to access the bot.") |
|
|
|
|
|
async def handle_message(self, update: Update, context: CallbackContext): |
|
|
"""Handles all incoming messages concurrently""" |
|
|
user_id = update.message.from_user.id |
|
|
user_message = update.message.text.strip() |
|
|
message_id = update.message.message_id |
|
|
|
|
|
|
|
|
if user_id in AWAITING_PASSWORD: |
|
|
await self.check_password(update, context) |
|
|
return |
|
|
|
|
|
|
|
|
if user_id in AUTHENTICATED_USERS: |
|
|
|
|
|
asyncio.create_task(self.chat_with_ai(update, context)) |
|
|
else: |
|
|
await update.message.reply_text("β You are not authenticated. Please type /start to authenticate and then enter the password.") |
|
|
|
|
|
async def check_password(self, update: Update, context: CallbackContext): |
|
|
"""Checks if the password is correct and authenticates the user.""" |
|
|
user_id = update.message.from_user.id |
|
|
user_message = update.message.text.strip() |
|
|
|
|
|
if user_message == SECRET_PASSWORD: |
|
|
AUTHENTICATED_USERS.add(user_id) |
|
|
AWAITING_PASSWORD.discard(user_id) |
|
|
logging.info(f"User {user_id} authenticated successfully.") |
|
|
await update.message.reply_text( |
|
|
"β
Authentication successful!\n\n" |
|
|
"You can:\n" |
|
|
"1. Send me any question as text\n" |
|
|
"2. Send me an Excel file with questions (must have a 'question' column in 'rfp' sheet)\n\n" |
|
|
"Note: Excel files must contain no more than 50 questions." |
|
|
) |
|
|
else: |
|
|
await update.message.reply_text("β Wrong password. Try again.") |
|
|
|
|
|
async def chat_with_ai(self, update: Update, context: CallbackContext): |
|
|
"""Process text messages asynchronously""" |
|
|
message_id = update.message.message_id |
|
|
user_message = update.message.text |
|
|
|
|
|
try: |
|
|
|
|
|
processing_msg = await update.message.reply_text( |
|
|
f"π€ Processing your request...\n" |
|
|
f"Request ID: #{message_id}" |
|
|
) |
|
|
|
|
|
|
|
|
self.active_requests[message_id] = { |
|
|
'type': 'text', |
|
|
'status': 'processing', |
|
|
'start_time': time.time() |
|
|
} |
|
|
|
|
|
|
|
|
response = await asyncio.get_event_loop().run_in_executor( |
|
|
self.text_executor, |
|
|
self._make_api_request, |
|
|
user_message |
|
|
) |
|
|
|
|
|
|
|
|
await processing_msg.edit_text( |
|
|
f"β
Response for #{message_id}:\n{response}" |
|
|
) |
|
|
|
|
|
except Exception as e: |
|
|
logging.error(f"Error processing text request: {e}") |
|
|
await processing_msg.edit_text( |
|
|
f"β Error processing request #{message_id}: {str(e)}" |
|
|
) |
|
|
finally: |
|
|
if message_id in self.active_requests: |
|
|
self.active_requests[message_id]['status'] = 'completed' |
|
|
self.active_requests[message_id]['end_time'] = time.time() |
|
|
|
|
|
def _make_api_request(self, user_message): |
|
|
"""Make API request""" |
|
|
try: |
|
|
headers = { |
|
|
"Authorization": f"Bearer {HF_TOKEN}", |
|
|
"accept": "application/json" |
|
|
} |
|
|
|
|
|
json_payload = {"question": user_message} |
|
|
form_payload = {"question": user_message} |
|
|
|
|
|
response = requests.post( |
|
|
self.ai_url, |
|
|
headers={**headers, "Content-Type": "application/json"}, |
|
|
json=json_payload |
|
|
) |
|
|
|
|
|
if response.status_code == 422: |
|
|
response = requests.post( |
|
|
self.ai_url, |
|
|
headers={**headers, "Content-Type": "application/x-www-form-urlencoded"}, |
|
|
data=form_payload |
|
|
) |
|
|
|
|
|
if response.status_code == 200: |
|
|
return response.json().get("answer", "I didn't understand that.") |
|
|
else: |
|
|
return f"Error: {response.status_code}" |
|
|
|
|
|
except Exception as e: |
|
|
return f"Connection error: {e}" |
|
|
|
|
|
async def handle_excel(self, update: Update, context: CallbackContext): |
|
|
"""Handle Excel files concurrently""" |
|
|
message_id = update.message.message_id |
|
|
|
|
|
|
|
|
asyncio.create_task(self._process_excel_file(update, context, message_id)) |
|
|
|
|
|
async def _process_excel_file(self, update, context, message_id): |
|
|
"""Process Excel file with progress updates""" |
|
|
try: |
|
|
document = update.message.document |
|
|
|
|
|
|
|
|
processing_msg = await update.message.reply_text( |
|
|
f"π Starting Excel processing...\n" |
|
|
f"File: {document.file_name}\n" |
|
|
f"Request ID: #{message_id}" |
|
|
) |
|
|
|
|
|
|
|
|
try: |
|
|
file = await context.bot.get_file(document.file_id) |
|
|
file_bytes = await file.download_as_bytearray() |
|
|
except Exception as e: |
|
|
logging.error(f"Error downloading file: {e}") |
|
|
await processing_msg.edit_text( |
|
|
f"β Error downloading file: {document.file_name}\n" |
|
|
f"Please try again or contact support." |
|
|
) |
|
|
return |
|
|
|
|
|
|
|
|
self.active_excel_files[message_id] = { |
|
|
'filename': document.file_name, |
|
|
'status': 'processing', |
|
|
'start_time': time.time() |
|
|
} |
|
|
await processing_msg.edit_text( |
|
|
f"βοΈ Processing Excel file...\n" |
|
|
f"File: {document.file_name}\n" |
|
|
f"Request ID: #{message_id}" |
|
|
) |
|
|
|
|
|
|
|
|
result = await asyncio.get_event_loop().run_in_executor( |
|
|
self.excel_executor, |
|
|
self._process_excel_sync, |
|
|
file_bytes, |
|
|
document.file_name |
|
|
) |
|
|
|
|
|
if result is None: |
|
|
await processing_msg.edit_text( |
|
|
f"β Failed to process file\n" |
|
|
f"File: {document.file_name}\n" |
|
|
f"Please check the file format and try again." |
|
|
) |
|
|
return |
|
|
|
|
|
|
|
|
await context.bot.send_document( |
|
|
chat_id=update.effective_chat.id, |
|
|
document=io.BytesIO(result), |
|
|
filename=f'processed_{document.file_name}', |
|
|
caption=f"β
Excel processing completed!\nRequest ID: #{message_id}" |
|
|
) |
|
|
await processing_msg.delete() |
|
|
|
|
|
except Exception as e: |
|
|
logging.error(f"Error processing file: {e}") |
|
|
await processing_msg.edit_text( |
|
|
f"β Error processing file\n" |
|
|
f"File: {document.file_name}\n" |
|
|
f"Error: {str(e)}" |
|
|
) |
|
|
finally: |
|
|
if message_id in self.active_excel_files: |
|
|
self.active_excel_files[message_id]['status'] = 'completed' |
|
|
self.active_excel_files[message_id]['end_time'] = time.time() |
|
|
|
|
|
def _process_excel_sync(self, file_bytes, filename): |
|
|
"""Synchronous function to process Excel file""" |
|
|
try: |
|
|
headers = { |
|
|
"Authorization": f"Bearer {HF_TOKEN}", |
|
|
"accept": "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet" |
|
|
} |
|
|
|
|
|
files = { |
|
|
'file': ( |
|
|
filename, |
|
|
file_bytes, |
|
|
'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet' |
|
|
) |
|
|
} |
|
|
|
|
|
response = requests.post( |
|
|
self.excel_url, |
|
|
headers=headers, |
|
|
files=files |
|
|
) |
|
|
|
|
|
if response.status_code == 200: |
|
|
return response.content |
|
|
else: |
|
|
logging.error(f"Excel API Error: {response.status_code} - {response.text}") |
|
|
return None |
|
|
|
|
|
except Exception as e: |
|
|
logging.error(f"Excel processing error: {e}") |
|
|
return None |
|
|
|
|
|
async def _update_progress(self, message, message_id, filename): |
|
|
"""Update progress message periodically""" |
|
|
try: |
|
|
while message_id in self.active_excel_files: |
|
|
elapsed_time = time.time() - self.active_excel_files[message_id]['start_time'] |
|
|
hours = int(elapsed_time // 3600) |
|
|
minutes = int((elapsed_time % 3600) // 60) |
|
|
|
|
|
await message.edit_text( |
|
|
f"βοΈ Processing Excel file...\n" |
|
|
f"File: {filename}\n" |
|
|
f"Request ID: #{message_id}\n" |
|
|
f"Time elapsed: {hours}h {minutes}m\n" |
|
|
f"Status: {self.active_excel_files[message_id]['status']}" |
|
|
) |
|
|
|
|
|
|
|
|
await asyncio.sleep(300) |
|
|
except Exception as e: |
|
|
logging.error(f"Error updating progress: {e}") |
|
|
|
|
|
async def status_command(self, update: Update, context: CallbackContext): |
|
|
"""Show status of all active processes""" |
|
|
status_message = "Current Status:\n\n" |
|
|
|
|
|
|
|
|
if self.active_requests: |
|
|
status_message += "π Text Requests:\n" |
|
|
for msg_id, info in self.active_requests.items(): |
|
|
current_time = time.time() |
|
|
processing_time = current_time - info['start_time'] |
|
|
status_message += ( |
|
|
f"Request #{msg_id}:\n" |
|
|
f"ββ Status: {info['status']}\n" |
|
|
f"ββ Time: {processing_time:.1f}s\n\n" |
|
|
) |
|
|
|
|
|
|
|
|
if self.active_excel_files: |
|
|
status_message += "π Excel Files:\n" |
|
|
for msg_id, info in self.active_excel_files.items(): |
|
|
current_time = time.time() |
|
|
processing_time = current_time - info['start_time'] |
|
|
status_message += ( |
|
|
f"File #{msg_id}:\n" |
|
|
f"ββ Name: {info['filename']}\n" |
|
|
f"ββ Status: {info['status']}\n" |
|
|
f"ββ Time: {processing_time:.1f}s\n\n" |
|
|
) |
|
|
|
|
|
if not self.active_requests and not self.active_excel_files: |
|
|
status_message += "No active processes" |
|
|
|
|
|
await update.message.reply_text(status_message) |
|
|
|
|
|
def setup_handlers(self): |
|
|
"""Set up Telegram command and message handlers.""" |
|
|
self.app.add_handler(CommandHandler("start", self.start_command)) |
|
|
self.app.add_handler(CommandHandler("status", self.status_command)) |
|
|
self.app.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, self.handle_message)) |
|
|
self.app.add_handler(MessageHandler( |
|
|
filters.Document.FileExtension("xlsx") | filters.Document.FileExtension("xls"), |
|
|
self.handle_excel |
|
|
)) |
|
|
|
|
|
async def run(self): |
|
|
"""Start the bot and listen for messages.""" |
|
|
logging.info("Starting Telegram bot...") |
|
|
await self.app.initialize() |
|
|
await self.app.start() |
|
|
await self.app.updater.start_polling() |
|
|
|
|
|
async def bot_stop(self): |
|
|
"""Stop the bot.""" |
|
|
logging.info("Stopping Telegram bot...") |
|
|
if self.app.updater: |
|
|
await self.app.updater.stop() |
|
|
await self.app.stop() |
|
|
await self.app.shutdown() |
|
|
|
|
|
def init_bot(): |
|
|
return TelegramBot(bot_token=BOT_TOKEN,base_url=BASE_URL) |
|
|
|