Spaces:
Paused
Paused
| import telebot | |
| import subprocess | |
| import threading | |
| import time | |
| import re | |
| import html | |
| from collections import deque | |
| import os | |
| import re | |
| import signal | |
| import psutil | |
| from fastapi import FastAPI, status | |
| from fastapi.responses import RedirectResponse | |
| from fastapi.responses import HTMLResponse | |
| # Initialize bot with your token | |
| TOKEN = os.environ["BOT_TOKEN"] | |
| bot = telebot.TeleBot(TOKEN) | |
| # Store authorized chat IDs | |
| AUTHORIZED_USERS = list(map(int, re.findall(r"\d+", str(os.getenv("CHAT_IDS", ""))))) | |
| # Store active command processes | |
| active_processes = {} | |
| # Store logs history (for each chat_id) | |
| logs_history = {} | |
| MESSAGE_LIMIT = 4000 | |
| # Maximum number of log lines to keep in history | |
| MAX_LOG_HISTORY = 1000 | |
| # Maximum number of command history entries | |
| MAX_COMMAND_HISTORY = 20 | |
| # Store command history | |
| command_history = {} | |
| def is_authorized(chat_id): | |
| """Check if user is authorized""" | |
| return chat_id in AUTHORIZED_USERS | |
| def initialize_logs_history(chat_id): | |
| """Initialize logs history for a chat""" | |
| if chat_id not in logs_history: | |
| logs_history[chat_id] = deque(maxlen=MAX_LOG_HISTORY) | |
| def add_to_logs_history(chat_id, log_line): | |
| """Add log line to history""" | |
| initialize_logs_history(chat_id) | |
| logs_history[chat_id].append(log_line) | |
| def initialize_command_history(chat_id): | |
| """Initialize command history for a chat""" | |
| if chat_id not in command_history: | |
| command_history[chat_id] = deque(maxlen=MAX_COMMAND_HISTORY) | |
| def add_to_command_history(chat_id, command): | |
| """Add a command to the history""" | |
| initialize_command_history(chat_id) | |
| command_history[chat_id].append(command) | |
| def get_command_history(chat_id): | |
| """Get the command history for a chat""" | |
| if chat_id in command_history: | |
| return list(command_history[chat_id]) | |
| return [] | |
| def get_latest_logs(chat_id): | |
| """Get the latest logs for a chat""" | |
| if chat_id in logs_history: | |
| return "\n".join(logs_history[chat_id]) | |
| return "" | |
| def sanitize_log(text): | |
| """Sanitize text for HTML formatting""" | |
| return html.escape(text) | |
| def split_logs(logs, latest_only=False): | |
| """Split logs into chunks respecting MESSAGE_LIMIT. | |
| If `latest_only` is True, only return the last chunk. | |
| """ | |
| chunks = [] | |
| current_chunk = "" | |
| for line in logs.split('\n'): | |
| if len(current_chunk) + len(line) + 1 > MESSAGE_LIMIT: | |
| chunks.append(current_chunk) | |
| current_chunk = line + '\n' | |
| else: | |
| current_chunk += line + '\n' | |
| if current_chunk: | |
| chunks.append(current_chunk) | |
| # If latest_only is True, return only the last chunk | |
| if latest_only and chunks: | |
| return [chunks[-1]] | |
| return chunks | |
| def create_log_keyboard(is_live=True): | |
| """Create inline keyboard for log control""" | |
| keyboard = telebot.types.InlineKeyboardMarkup(row_width=2) | |
| if is_live: | |
| keyboard.add( | |
| telebot.types.InlineKeyboardButton("βΈοΈ Pause", callback_data="stop_logs"), | |
| telebot.types.InlineKeyboardButton("βΆοΈ Resume", callback_data="resume_logs"), | |
| telebot.types.InlineKeyboardButton("π Refresh", callback_data="refresh_logs"), | |
| telebot.types.InlineKeyboardButton("β Clear", callback_data="clear_logs") | |
| ) | |
| else: | |
| keyboard.add( | |
| telebot.types.InlineKeyboardButton("β Clear log history", callback_data="clear_logs") | |
| ) | |
| return keyboard | |
| def stream_command(chat_id, command): | |
| initialize_logs_history(chat_id) | |
| initialize_command_history(chat_id) | |
| add_to_command_history(chat_id, command) | |
| process = subprocess.Popen( | |
| command, | |
| stdout=subprocess.PIPE, | |
| stderr=subprocess.STDOUT, | |
| shell=True, | |
| text=True, | |
| bufsize=1, | |
| universal_newlines=True | |
| ) | |
| active_processes[chat_id] = { | |
| 'process': process, | |
| 'last_message_id': None, | |
| 'buffer': "", | |
| 'last_update': time.time(), | |
| 'show_logs': True, | |
| 'paused_at': None | |
| } | |
| UPDATE_INTERVAL = 2 | |
| try: | |
| while True: | |
| line = process.stdout.readline() | |
| if not line and process.poll() is not None: | |
| break | |
| if line: | |
| add_to_logs_history(chat_id, line.strip()) | |
| active_processes[chat_id]['buffer'] += line | |
| current_time = time.time() | |
| if (active_processes[chat_id]['show_logs'] and | |
| current_time - active_processes[chat_id]['last_update'] >= UPDATE_INTERVAL): | |
| send_log_update(chat_id) | |
| active_processes[chat_id]['last_update'] = current_time | |
| except Exception as e: | |
| bot.send_message(chat_id, f"β Error: {str(e)}", parse_mode='HTML') | |
| finally: | |
| if chat_id in active_processes: | |
| if active_processes[chat_id]['buffer'] and active_processes[chat_id]['show_logs']: | |
| send_log_update(chat_id) | |
| if process.poll() is not None: | |
| bot.send_message( | |
| chat_id, | |
| f"β Command completed with exit code: {process.returncode}", | |
| parse_mode='HTML' | |
| ) | |
| try: | |
| del active_processes[chat_id] | |
| except KeyError: | |
| pass | |
| def send_log_update(chat_id): | |
| """Send or update log message""" | |
| process_info = active_processes[chat_id] | |
| log_content = process_info['buffer'] | |
| if process_info['process'].poll() is None: | |
| chunks = split_logs(log_content) | |
| total_pages = len(chunks) | |
| # Determine the content of the current page (latest log chunk) | |
| current_chunk = chunks[-1] if chunks else "" | |
| # Add the page count to the log header | |
| status = "π Live Log (updating...)" if process_info['show_logs'] else "π Live Log (paused)" | |
| header = f"{status} (Page {total_pages}/{total_pages})\n" | |
| formatted_message = f"{header}\n<pre>{sanitize_log(current_chunk)}</pre>" | |
| if not process_info['show_logs']: | |
| formatted_message += "\n<pre>Logs are paused. Click Resume to continue showing logs.</pre>" | |
| # Append a timestamp to force a slight change in the content when paused | |
| if not process_info['show_logs']: | |
| formatted_message += f"\n\n<pre>Last paused at: {time.strftime('%H:%M:%S')}</pre>" | |
| try: | |
| if process_info['last_message_id']: | |
| # Edit the current message only if content differs | |
| bot.edit_message_text( | |
| formatted_message, | |
| chat_id=chat_id, | |
| message_id=process_info['last_message_id'], | |
| parse_mode='HTML', | |
| reply_markup=create_log_keyboard() | |
| ) | |
| else: | |
| # Send a new message if there isn't an existing one | |
| msg = bot.send_message( | |
| chat_id, | |
| formatted_message, | |
| parse_mode='HTML', | |
| reply_markup=create_log_keyboard() | |
| ) | |
| process_info['last_message_id'] = msg.message_id | |
| except telebot.apihelper.ApiException as e: | |
| # Handle specific "message is not modified" errors | |
| if "message is not modified" in str(e): | |
| # Skip the update since content is the same | |
| pass | |
| else: | |
| bot.send_message(chat_id, f"Error updating message: {str(e)}") | |
| else: | |
| chunks = split_logs(log_content) | |
| for i, chunk in enumerate(chunks, 1): | |
| formatted_chunk = ( | |
| f"π Final Log Part {i}/{len(chunks)}\n" | |
| f"<pre>{sanitize_log(chunk)}</pre>" | |
| ) | |
| bot.send_message(chat_id, formatted_chunk, parse_mode='HTML') | |
| if process_info['process'].poll() is not None: | |
| process_info['buffer'] = "" | |
| def handle_callback(call): | |
| chat_id = call.message.chat.id | |
| if chat_id not in active_processes and call.data != "clear_logs": | |
| bot.answer_callback_query(call.id, "No active process") | |
| return | |
| if call.data == "stop_logs": | |
| active_processes[chat_id]['show_logs'] = False | |
| bot.answer_callback_query(call.id, "Logs paused") | |
| send_log_update(chat_id) | |
| elif call.data == "resume_logs": | |
| active_processes[chat_id]['show_logs'] = True | |
| bot.answer_callback_query(call.id, "Logs resumed") | |
| send_log_update(chat_id) | |
| elif call.data == "refresh_logs": | |
| bot.answer_callback_query(call.id, "Logs refreshed") | |
| send_log_update(chat_id) | |
| elif call.data == "clear_logs": | |
| if chat_id in logs_history: | |
| logs_history[chat_id].clear() | |
| if chat_id in active_processes: | |
| active_processes[chat_id]['buffer'] = "" | |
| bot.answer_callback_query(call.id, "Logs cleared") | |
| def show_logs(message): | |
| chat_id = message.chat.id | |
| # Check if the user is authorized to use the bot | |
| if not is_authorized(chat_id): | |
| bot.reply_to(message, "β You are not authorized to use this bot.") | |
| return | |
| # Initialize the log history for the chat if it doesn't exist | |
| initialize_logs_history(chat_id) | |
| # Determine the logs content and whether it's live or historical | |
| logs_content = "" | |
| if chat_id in active_processes: | |
| logs_content = active_processes[chat_id]['buffer'] | |
| is_live = True | |
| else: | |
| logs_content = get_latest_logs(chat_id) | |
| is_live = False | |
| # Show only the current live logs if the command is active | |
| if is_live and logs_content: | |
| # Get only the latest chunk of logs | |
| chunks = split_logs(logs_content, latest_only=True) | |
| if chunks: | |
| status = "π Live Logs" | |
| formatted_chunk = f"{status} (Current Live Logs)\n<pre>{sanitize_log(chunks[0])}</pre>" | |
| # Send the formatted log chunk as a message | |
| msg = bot.send_message( | |
| chat_id, | |
| formatted_chunk, | |
| parse_mode='HTML', | |
| reply_markup=create_log_keyboard(is_live) | |
| ) | |
| # Store the message ID for future updates | |
| active_processes[chat_id]['last_message_id'] = msg.message_id | |
| else: | |
| # If there are logs to show and no active process | |
| if logs_content: | |
| # Split the logs into chunks to avoid hitting the message length limit | |
| chunks = split_logs(logs_content) | |
| # Send all parts of the logs | |
| for i, chunk in enumerate(chunks, 1): | |
| status = "π Historical Logs" | |
| formatted_chunk = f"{status} (Part {i}/{len(chunks)})\n<pre>{sanitize_log(chunk)}</pre>" | |
| # Send the formatted log chunk as a message | |
| bot.send_message( | |
| chat_id, | |
| formatted_chunk, | |
| parse_mode='HTML', | |
| reply_markup=create_log_keyboard(is_live) | |
| ) | |
| else: | |
| # If no logs are available, inform the user | |
| bot.reply_to(message, "No logs available.") | |
| def stop_command(message): | |
| chat_id = message.chat.id | |
| if not is_authorized(chat_id): | |
| bot.reply_to(message, "β You are not authorized to use this bot.") | |
| return | |
| if chat_id in active_processes: | |
| try: | |
| # 1. First, disable log updates | |
| active_processes[chat_id]['show_logs'] = False | |
| active_processes[chat_id]['buffer'] = "" | |
| # 2. Get the process | |
| process = active_processes[chat_id]['process'] | |
| # 3. Try to terminate the process using Windows-friendly approach | |
| try: | |
| # Get the process using psutil | |
| parent = psutil.Process(process.pid) | |
| # Get all children processes | |
| children = parent.children(recursive=True) | |
| # Terminate children first | |
| for child in children: | |
| child.terminate() | |
| # Terminate parent | |
| parent.terminate() | |
| # Wait for processes to terminate | |
| psutil.wait_procs(children + [parent], timeout=3) | |
| except: | |
| # Fallback: try direct termination | |
| try: | |
| process.terminate() | |
| except: | |
| pass | |
| # 4. Send success message | |
| bot.reply_to(message, "π Command stopped successfully.") | |
| except Exception as e: | |
| bot.reply_to(message, f"β Error stopping command: {str(e)}") | |
| finally: | |
| # 5. Clean up | |
| if chat_id in active_processes: | |
| try: | |
| # One final attempt to kill if still running | |
| process = active_processes[chat_id]['process'] | |
| if process.poll() is None: | |
| process.kill() | |
| except: | |
| pass | |
| # Remove from active processes | |
| del active_processes[chat_id] | |
| else: | |
| bot.reply_to(message, "No active command to stop.") | |
| def show_command_history(message): | |
| chat_id = message.chat.id | |
| if not is_authorized(chat_id): | |
| bot.reply_to(message, "β You are not authorized to use this bot.") | |
| return | |
| commands = get_command_history(chat_id) | |
| if commands: | |
| # formatted_commands = "\n".join(f"{i + 1}. {cmd}" for i, cmd in enumerate(commands)) | |
| formatted_commands = "\n".join([f"{i+1}. {cmd}" for i, cmd in enumerate(commands)]) | |
| # bot.reply_to(message, f"π Command History:\n{formatted_commands}") | |
| # bot.reply_to(message, f"π Last {len(commands)} commands:\n<pre>{formatted_commands}</pre>", parse_mode='HTML') | |
| bot.reply_to(message, f"π Command History (Last {len(commands)} commands):\n<pre>{formatted_commands}</pre>", parse_mode='HTML') | |
| else: | |
| bot.reply_to(message, "No command history available.") | |
| def send_welcome(message): | |
| bot.reply_to( | |
| message, | |
| "π Welcome! Send me any command to execute and monitor logs.\n" | |
| "Use /stop to stop the current command.\n" | |
| "Use /cmds to see the last 20 executed commands." | |
| ) | |
| def execute_command(message): | |
| chat_id = message.chat.id | |
| if not is_authorized(chat_id): | |
| bot.reply_to(message, "β You are not authorized to use this bot.") | |
| return | |
| if chat_id in active_processes: | |
| bot.reply_to( | |
| message, | |
| "β οΈ A command is already running. Use /stop to stop it first." | |
| ) | |
| return | |
| command = message.text | |
| bot.reply_to(message, f"βΆοΈ Executing command: {command}") | |
| thread = threading.Thread( | |
| target=stream_command, | |
| args=(chat_id, command) | |
| ) | |
| thread.start() | |
| # # Start the bot | |
| # try: | |
| # bot.polling(none_stop=True) | |
| # except Exception as e: | |
| # print(f"Bot polling error: {e}") | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββ FastAPI βββ | |
| # app = FastAPI() | |
| # @app.get("/") | |
| # def root(): # β‘ healthβcheck hits this β must return 200 quickly | |
| # # return {"status": "ok"} | |
| # return RedirectResponse( | |
| # url="https://t.me/python3463_bot", | |
| # status_code=status.HTTP_302_FOUND # 302 is fine too | |
| # ) | |
| from fastapi import FastAPI, HTTPException | |
| from fastapi.responses import HTMLResponse, FileResponse | |
| import requests, os, time, pathlib | |
| # ββββββββββββββββββββββββββββββ | |
| # Config | |
| # ββββββββββββββββββββββββββββββ | |
| # BOT_TOKEN = os.getenv("BOT_TOKEN") # required | |
| BOT_USERNAME = os.getenv("BOT_USERNAME", "live_logger_bot") | |
| AVATAR_TTL = int(os.getenv("AVATAR_TTL", "3600")) # seconds to keep avatar fresh | |
| DATA_DIR = pathlib.Path("/tmp") # change if you prefer | |
| AVATAR_PATH = DATA_DIR / "bot_avatar.jpg" | |
| AVATAR_META = {"ts": 0} # keeps timestamp of last download | |
| if not TOKEN: | |
| raise RuntimeError("TOKEN environment variable not set") | |
| app = FastAPI(title="Telegram Bot Preview") | |
| # ββββββββββββββββββββββββββββββ | |
| # Helper: refresh avatar if needed | |
| # ββββββββββββββββββββββββββββββ | |
| def refresh_avatar(force: bool = False) -> None: | |
| """ | |
| Download the bot's avatar if it's missing or stale. | |
| Saves it to AVATAR_PATH and updates AVATAR_META['ts']. | |
| """ | |
| now = time.time() | |
| if not force and AVATAR_PATH.exists() and now - AVATAR_META["ts"] < AVATAR_TTL: | |
| return # still fresh | |
| try: | |
| # 1. get bot id | |
| me = requests.get(f"https://api.telegram.org/bot{TOKEN}/getMe").json() | |
| user_id = me["result"]["id"] | |
| # 2. get latest profile photo | |
| photos = requests.get( | |
| f"https://api.telegram.org/bot{TOKEN}/getUserProfilePhotos", | |
| params={"user_id": user_id, "limit": 1}, | |
| timeout=5, | |
| ).json() | |
| if photos["result"]["total_count"] == 0: | |
| return # bot has no avatar | |
| file_id = photos["result"]["photos"][0][-1]["file_id"] # biggest size | |
| # 3. resolve file path | |
| file_obj = requests.get( | |
| f"https://api.telegram.org/bot{TOKEN}/getFile", | |
| params={"file_id": file_id}, | |
| timeout=5, | |
| ).json() | |
| file_path = file_obj["result"]["file_path"] | |
| # 4. download the actual image | |
| file_url = f"https://api.telegram.org/file/bot{TOKEN}/{file_path}" | |
| img_bytes = requests.get(file_url, timeout=10).content | |
| DATA_DIR.mkdir(parents=True, exist_ok=True) | |
| with open(AVATAR_PATH, "wb") as f: | |
| f.write(img_bytes) | |
| AVATAR_META["ts"] = now | |
| print("Avatar refreshed") | |
| except Exception as exc: | |
| # If something goes wrong, keep the old one | |
| print("Avatar refresh failed:", exc) | |
| # ββββββββββββββββββββββββββββββ | |
| # Helper: bot description (once) | |
| # ββββββββββββββββββββββββββββββ | |
| def get_bot_description() -> str: | |
| resp = requests.get( | |
| f"https://api.telegram.org/bot{TOKEN}/getMyShortDescription" | |
| ).json() | |
| return resp["result"].get("description", "") | |
| # ββββββββββββββββββββββββββββββ | |
| # Startup: fetch avatar once | |
| # ββββββββββββββββββββββββββββββ | |
| def on_startup(): | |
| refresh_avatar(force=True) | |
| # ββββββββββββββββββββββββββββββ | |
| # Route: preview card | |
| # ββββββββββββββββββββββββββββββ | |
| def preview(): | |
| refresh_avatar() # refresh if stale | |
| avatar_src = "/avatar.jpg" if AVATAR_PATH.exists() else "https://telegram.org/img/t_logo.png" | |
| description = get_bot_description() | |
| html = f""" | |
| <html> | |
| <head> | |
| <title>{BOT_USERNAME}</title> | |
| <style> | |
| body {{ | |
| font-family: sans-serif; display: flex; justify-content: center; | |
| padding: 40px; background: #f7f7f7; | |
| }} | |
| .card {{ | |
| max-width: 420px; background: #fff; padding: 24px; text-align: center; | |
| border-radius: 12px; box-shadow: 0 4px 12px rgba(0,0,0,.1); | |
| }} | |
| .avatar {{ | |
| width: 120px; height: 120px; border-radius: 50%; object-fit: cover; | |
| }} | |
| .btn {{ | |
| display: inline-block; margin-top: 16px; padding: 12px 24px; | |
| background: #2AABEE; color: #fff; border-radius: 8px; | |
| text-decoration: none; font-weight: bold; | |
| }} | |
| </style> | |
| </head> | |
| <body> | |
| <div class="card"> | |
| <img src="{avatar_src}" alt="avatar" class="avatar"> | |
| <h2>@{BOT_USERNAME}</h2> | |
| <p>{description}</p> | |
| <a class="btn" href="https://t.me/{BOT_USERNAME}" target="_blank">StartΒ Bot</a> | |
| </div> | |
| </body> | |
| </html> | |
| """ | |
| return HTMLResponse(html) | |
| # ββββββββββββββββββββββββββββββ | |
| # Route: serve avatar file | |
| # ββββββββββββββββββββββββββββββ | |
| def avatar(): | |
| refresh_avatar() # make sure it's fresh | |
| if not AVATAR_PATH.exists(): | |
| raise HTTPException(status_code=404, detail="Avatar not found") | |
| return FileResponse(AVATAR_PATH, media_type="image/jpeg") | |
| def startup(): | |
| # Launch the bot *after* Uvicorn has started | |
| threading.Thread(target=bot.infinity_polling, daemon=True).start() | |
| if __name__ == "__main__": | |
| import uvicorn | |
| uvicorn.run("app:app", host="0.0.0.0", port=7860) | |