Spaces:
Paused
Paused
| import telebot | |
| import subprocess | |
| import threading | |
| import time | |
| import re | |
| import html | |
| from collections import deque | |
| import os | |
| import re | |
| import signal | |
| import psutil | |
| from fastapi import FastAPI | |
| # Initialize bot with your token | |
| TOKEN = os.environ["BOT_TOKEN"] | |
| bot = telebot.TeleBot(TOKEN) | |
| # Store authorized chat IDs | |
| AUTHORIZED_USERS = list(map(int, re.findall(r"\d+", str(os.getenv("CHAT_IDS", ""))))) | |
| # Store active command processes | |
| active_processes = {} | |
| # Store logs history (for each chat_id) | |
| logs_history = {} | |
| MESSAGE_LIMIT = 4000 | |
| # Maximum number of log lines to keep in history | |
| MAX_LOG_HISTORY = 1000 | |
| # Maximum number of command history entries | |
| MAX_COMMAND_HISTORY = 20 | |
| # Store command history | |
| command_history = {} | |
| def is_authorized(chat_id): | |
| """Check if user is authorized""" | |
| return chat_id in AUTHORIZED_USERS | |
| def initialize_logs_history(chat_id): | |
| """Initialize logs history for a chat""" | |
| if chat_id not in logs_history: | |
| logs_history[chat_id] = deque(maxlen=MAX_LOG_HISTORY) | |
| def add_to_logs_history(chat_id, log_line): | |
| """Add log line to history""" | |
| initialize_logs_history(chat_id) | |
| logs_history[chat_id].append(log_line) | |
| def initialize_command_history(chat_id): | |
| """Initialize command history for a chat""" | |
| if chat_id not in command_history: | |
| command_history[chat_id] = deque(maxlen=MAX_COMMAND_HISTORY) | |
| def add_to_command_history(chat_id, command): | |
| """Add a command to the history""" | |
| initialize_command_history(chat_id) | |
| command_history[chat_id].append(command) | |
| def get_command_history(chat_id): | |
| """Get the command history for a chat""" | |
| if chat_id in command_history: | |
| return list(command_history[chat_id]) | |
| return [] | |
| def get_latest_logs(chat_id): | |
| """Get the latest logs for a chat""" | |
| if chat_id in logs_history: | |
| return "\n".join(logs_history[chat_id]) | |
| return "" | |
| def sanitize_log(text): | |
| """Sanitize text for HTML formatting""" | |
| return html.escape(text) | |
| def split_logs(logs, latest_only=False): | |
| """Split logs into chunks respecting MESSAGE_LIMIT. | |
| If `latest_only` is True, only return the last chunk. | |
| """ | |
| chunks = [] | |
| current_chunk = "" | |
| for line in logs.split('\n'): | |
| if len(current_chunk) + len(line) + 1 > MESSAGE_LIMIT: | |
| chunks.append(current_chunk) | |
| current_chunk = line + '\n' | |
| else: | |
| current_chunk += line + '\n' | |
| if current_chunk: | |
| chunks.append(current_chunk) | |
| # If latest_only is True, return only the last chunk | |
| if latest_only and chunks: | |
| return [chunks[-1]] | |
| return chunks | |
| def create_log_keyboard(is_live=True): | |
| """Create inline keyboard for log control""" | |
| keyboard = telebot.types.InlineKeyboardMarkup(row_width=2) | |
| if is_live: | |
| keyboard.add( | |
| telebot.types.InlineKeyboardButton("βΈοΈ Pause", callback_data="stop_logs"), | |
| telebot.types.InlineKeyboardButton("βΆοΈ Resume", callback_data="resume_logs"), | |
| telebot.types.InlineKeyboardButton("π Refresh", callback_data="refresh_logs"), | |
| telebot.types.InlineKeyboardButton("β Clear", callback_data="clear_logs") | |
| ) | |
| else: | |
| keyboard.add( | |
| telebot.types.InlineKeyboardButton("β Clear log history", callback_data="clear_logs") | |
| ) | |
| return keyboard | |
| def stream_command(chat_id, command): | |
| initialize_logs_history(chat_id) | |
| initialize_command_history(chat_id) | |
| add_to_command_history(chat_id, command) | |
| process = subprocess.Popen( | |
| command, | |
| stdout=subprocess.PIPE, | |
| stderr=subprocess.STDOUT, | |
| shell=True, | |
| text=True, | |
| bufsize=1, | |
| universal_newlines=True | |
| ) | |
| active_processes[chat_id] = { | |
| 'process': process, | |
| 'last_message_id': None, | |
| 'buffer': "", | |
| 'last_update': time.time(), | |
| 'show_logs': True, | |
| 'paused_at': None | |
| } | |
| UPDATE_INTERVAL = 2 | |
| try: | |
| while True: | |
| line = process.stdout.readline() | |
| if not line and process.poll() is not None: | |
| break | |
| if line: | |
| add_to_logs_history(chat_id, line.strip()) | |
| active_processes[chat_id]['buffer'] += line | |
| current_time = time.time() | |
| if (active_processes[chat_id]['show_logs'] and | |
| current_time - active_processes[chat_id]['last_update'] >= UPDATE_INTERVAL): | |
| send_log_update(chat_id) | |
| active_processes[chat_id]['last_update'] = current_time | |
| except Exception as e: | |
| bot.send_message(chat_id, f"β Error: {str(e)}", parse_mode='HTML') | |
| finally: | |
| if chat_id in active_processes: | |
| if active_processes[chat_id]['buffer'] and active_processes[chat_id]['show_logs']: | |
| send_log_update(chat_id) | |
| if process.poll() is not None: | |
| bot.send_message( | |
| chat_id, | |
| f"β Command completed with exit code: {process.returncode}", | |
| parse_mode='HTML' | |
| ) | |
| try: | |
| del active_processes[chat_id] | |
| except KeyError: | |
| pass | |
| def send_log_update(chat_id): | |
| """Send or update log message""" | |
| process_info = active_processes[chat_id] | |
| log_content = process_info['buffer'] | |
| if process_info['process'].poll() is None: | |
| chunks = split_logs(log_content) | |
| total_pages = len(chunks) | |
| # Determine the content of the current page (latest log chunk) | |
| current_chunk = chunks[-1] if chunks else "" | |
| # Add the page count to the log header | |
| status = "π Live Log (updating...)" if process_info['show_logs'] else "π Live Log (paused)" | |
| header = f"{status} (Page {total_pages}/{total_pages})\n" | |
| formatted_message = f"{header}\n<pre>{sanitize_log(current_chunk)}</pre>" | |
| if not process_info['show_logs']: | |
| formatted_message += "\n<pre>Logs are paused. Click Resume to continue showing logs.</pre>" | |
| # Append a timestamp to force a slight change in the content when paused | |
| if not process_info['show_logs']: | |
| formatted_message += f"\n\n<pre>Last paused at: {time.strftime('%H:%M:%S')}</pre>" | |
| try: | |
| if process_info['last_message_id']: | |
| # Edit the current message only if content differs | |
| bot.edit_message_text( | |
| formatted_message, | |
| chat_id=chat_id, | |
| message_id=process_info['last_message_id'], | |
| parse_mode='HTML', | |
| reply_markup=create_log_keyboard() | |
| ) | |
| else: | |
| # Send a new message if there isn't an existing one | |
| msg = bot.send_message( | |
| chat_id, | |
| formatted_message, | |
| parse_mode='HTML', | |
| reply_markup=create_log_keyboard() | |
| ) | |
| process_info['last_message_id'] = msg.message_id | |
| except telebot.apihelper.ApiException as e: | |
| # Handle specific "message is not modified" errors | |
| if "message is not modified" in str(e): | |
| # Skip the update since content is the same | |
| pass | |
| else: | |
| bot.send_message(chat_id, f"Error updating message: {str(e)}") | |
| else: | |
| chunks = split_logs(log_content) | |
| for i, chunk in enumerate(chunks, 1): | |
| formatted_chunk = ( | |
| f"π Final Log Part {i}/{len(chunks)}\n" | |
| f"<pre>{sanitize_log(chunk)}</pre>" | |
| ) | |
| bot.send_message(chat_id, formatted_chunk, parse_mode='HTML') | |
| if process_info['process'].poll() is not None: | |
| process_info['buffer'] = "" | |
| def handle_callback(call): | |
| chat_id = call.message.chat.id | |
| if chat_id not in active_processes and call.data != "clear_logs": | |
| bot.answer_callback_query(call.id, "No active process") | |
| return | |
| if call.data == "stop_logs": | |
| active_processes[chat_id]['show_logs'] = False | |
| bot.answer_callback_query(call.id, "Logs paused") | |
| send_log_update(chat_id) | |
| elif call.data == "resume_logs": | |
| active_processes[chat_id]['show_logs'] = True | |
| bot.answer_callback_query(call.id, "Logs resumed") | |
| send_log_update(chat_id) | |
| elif call.data == "refresh_logs": | |
| bot.answer_callback_query(call.id, "Logs refreshed") | |
| send_log_update(chat_id) | |
| elif call.data == "clear_logs": | |
| if chat_id in logs_history: | |
| logs_history[chat_id].clear() | |
| if chat_id in active_processes: | |
| active_processes[chat_id]['buffer'] = "" | |
| bot.answer_callback_query(call.id, "Logs cleared") | |
| def show_logs(message): | |
| chat_id = message.chat.id | |
| # Check if the user is authorized to use the bot | |
| if not is_authorized(chat_id): | |
| bot.reply_to(message, "β You are not authorized to use this bot.") | |
| return | |
| # Initialize the log history for the chat if it doesn't exist | |
| initialize_logs_history(chat_id) | |
| # Determine the logs content and whether it's live or historical | |
| logs_content = "" | |
| if chat_id in active_processes: | |
| logs_content = active_processes[chat_id]['buffer'] | |
| is_live = True | |
| else: | |
| logs_content = get_latest_logs(chat_id) | |
| is_live = False | |
| # Show only the current live logs if the command is active | |
| if is_live and logs_content: | |
| # Get only the latest chunk of logs | |
| chunks = split_logs(logs_content, latest_only=True) | |
| if chunks: | |
| status = "π Live Logs" | |
| formatted_chunk = f"{status} (Current Live Logs)\n<pre>{sanitize_log(chunks[0])}</pre>" | |
| # Send the formatted log chunk as a message | |
| msg = bot.send_message( | |
| chat_id, | |
| formatted_chunk, | |
| parse_mode='HTML', | |
| reply_markup=create_log_keyboard(is_live) | |
| ) | |
| # Store the message ID for future updates | |
| active_processes[chat_id]['last_message_id'] = msg.message_id | |
| else: | |
| # If there are logs to show and no active process | |
| if logs_content: | |
| # Split the logs into chunks to avoid hitting the message length limit | |
| chunks = split_logs(logs_content) | |
| # Send all parts of the logs | |
| for i, chunk in enumerate(chunks, 1): | |
| status = "π Historical Logs" | |
| formatted_chunk = f"{status} (Part {i}/{len(chunks)})\n<pre>{sanitize_log(chunk)}</pre>" | |
| # Send the formatted log chunk as a message | |
| bot.send_message( | |
| chat_id, | |
| formatted_chunk, | |
| parse_mode='HTML', | |
| reply_markup=create_log_keyboard(is_live) | |
| ) | |
| else: | |
| # If no logs are available, inform the user | |
| bot.reply_to(message, "No logs available.") | |
| def stop_command(message): | |
| chat_id = message.chat.id | |
| if not is_authorized(chat_id): | |
| bot.reply_to(message, "β You are not authorized to use this bot.") | |
| return | |
| if chat_id in active_processes: | |
| try: | |
| # 1. First, disable log updates | |
| active_processes[chat_id]['show_logs'] = False | |
| active_processes[chat_id]['buffer'] = "" | |
| # 2. Get the process | |
| process = active_processes[chat_id]['process'] | |
| # 3. Try to terminate the process using Windows-friendly approach | |
| try: | |
| # Get the process using psutil | |
| parent = psutil.Process(process.pid) | |
| # Get all children processes | |
| children = parent.children(recursive=True) | |
| # Terminate children first | |
| for child in children: | |
| child.terminate() | |
| # Terminate parent | |
| parent.terminate() | |
| # Wait for processes to terminate | |
| psutil.wait_procs(children + [parent], timeout=3) | |
| except: | |
| # Fallback: try direct termination | |
| try: | |
| process.terminate() | |
| except: | |
| pass | |
| # 4. Send success message | |
| bot.reply_to(message, "π Command stopped successfully.") | |
| except Exception as e: | |
| bot.reply_to(message, f"β Error stopping command: {str(e)}") | |
| finally: | |
| # 5. Clean up | |
| if chat_id in active_processes: | |
| try: | |
| # One final attempt to kill if still running | |
| process = active_processes[chat_id]['process'] | |
| if process.poll() is None: | |
| process.kill() | |
| except: | |
| pass | |
| # Remove from active processes | |
| del active_processes[chat_id] | |
| else: | |
| bot.reply_to(message, "No active command to stop.") | |
| def show_command_history(message): | |
| chat_id = message.chat.id | |
| if not is_authorized(chat_id): | |
| bot.reply_to(message, "β You are not authorized to use this bot.") | |
| return | |
| commands = get_command_history(chat_id) | |
| if commands: | |
| # formatted_commands = "\n".join(f"{i + 1}. {cmd}" for i, cmd in enumerate(commands)) | |
| formatted_commands = "\n".join([f"{i+1}. {cmd}" for i, cmd in enumerate(commands)]) | |
| # bot.reply_to(message, f"π Command History:\n{formatted_commands}") | |
| # bot.reply_to(message, f"π Last {len(commands)} commands:\n<pre>{formatted_commands}</pre>", parse_mode='HTML') | |
| bot.reply_to(message, f"π Command History (Last {len(commands)} commands):\n<pre>{formatted_commands}</pre>", parse_mode='HTML') | |
| else: | |
| bot.reply_to(message, "No command history available.") | |
| def send_welcome(message): | |
| bot.reply_to( | |
| message, | |
| "π Welcome! Send me any command to execute and monitor logs.\n" | |
| "Use /stop to stop the current command.\n" | |
| "Use /cmds to see the last 20 executed commands." | |
| ) | |
| def execute_command(message): | |
| chat_id = message.chat.id | |
| if not is_authorized(chat_id): | |
| bot.reply_to(message, "β You are not authorized to use this bot.") | |
| return | |
| if chat_id in active_processes: | |
| bot.reply_to( | |
| message, | |
| "β οΈ A command is already running. Use /stop to stop it first." | |
| ) | |
| return | |
| command = message.text | |
| bot.reply_to(message, f"βΆοΈ Executing command: {command}") | |
| thread = threading.Thread( | |
| target=stream_command, | |
| args=(chat_id, command) | |
| ) | |
| thread.start() | |
| # # Start the bot | |
| # try: | |
| # bot.polling(none_stop=True) | |
| # except Exception as e: | |
| # print(f"Bot polling error: {e}") | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββ FastAPI βββ | |
| # from telegram_preview import router as telegram_preview_router | |
| import telegram_preview | |
| app = FastAPI() | |
| telegram_preview.include_in_app(app) | |
| # Simple healthβcheck endpoint | |
| def health(): | |
| return {"status": "ok"} | |
| # OPTIONAL: if you really want a startup hook, keep it complete: | |
| # @app.on_event("startup") | |
| # async def startup(): | |
| # print("Application started") | |
| def startup(): | |
| # Launch the bot *after* Uvicorn has started | |
| threading.Thread(target=bot.infinity_polling, daemon=True).start() | |
| if __name__ == "__main__": | |
| import uvicorn | |
| uvicorn.run("app:app", host="0.0.0.0", port=7860) | |