diff --git "a/app.py" "b/app.py" new file mode 100644--- /dev/null +++ "b/app.py" @@ -0,0 +1,3066 @@ +""" +Advanced Stream Bot — v3.2.0 +- Live view: status+log bubble auto-edits every 2s (logg.py pattern) +- Pause/Resume/Refresh/Close live view controls +- Force Reboot button + /reboot command — last-resort recovery +- Fixed: error field no longer shows raw log lines +- Fixed: error cleared on clean stop/abort +- Fixed: stats (uptime/frames/bytes) update correctly in live view +- Fixed: abort works instantly during reconnect sleep +- Fixed: watchdog does not mark error after intentional stop +- Fixed: stale edits never pile up in outbound queue (cap=1 per message) +- SSE /events/{chat_id} for real-time web dashboard +- Scheduler-triggered streams queue outbound notifications +""" + +import logging +import threading +import time +import datetime +import traceback +import fractions +import json +import os +import re +import asyncio +from urllib.parse import urlparse +from pathlib import Path +import io + +from fastapi import FastAPI, Request, HTTPException +from fastapi.responses import StreamingResponse +import queue +import av +from PIL import Image, ImageEnhance, UnidentifiedImageError +from apscheduler.schedulers.background import BackgroundScheduler + +# ────────────────────────────────────────────── +# VERSION & APP +# ────────────────────────────────────────────── +APP_VERSION = "3.2.0" + +app = FastAPI(title="Advanced Stream Bot", version=APP_VERSION) +scheduler = BackgroundScheduler(timezone="UTC") +# Per-job metadata store — APScheduler Job objects are frozen; +# we cannot attach arbitrary attrs to them. Store meta here instead. +_job_store: dict = {} # job_id -> {"output_url": ..., "input_url": ...} + + +# ────────────────────────────────────────────── +# LOGGING +# ────────────────────────────────────────────── +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s [%(levelname)s] %(threadName)s %(module)s:%(lineno)d %(message)s", + datefmt="%Y-%m-%d %H:%M:%S", +) +logger = logging.getLogger("stream_bot") + +live_log_lines_global = [] +MAX_GLOBAL_LOG_LINES = 200 + + +def append_global_live_log(line: str): + live_log_lines_global.append(line) + if len(live_log_lines_global) > MAX_GLOBAL_LOG_LINES: + live_log_lines_global.pop(0) + + +class GlobalListHandler(logging.Handler): + def emit(self, record): + append_global_live_log(self.format(record)) + + +_gh = GlobalListHandler() +_gh.setFormatter(logging.Formatter("[%(levelname)s] %(message)s")) +logger.addHandler(_gh) + +# ────────────────────────────────────────────── +# VALIDATION CONSTANTS +# ────────────────────────────────────────────── +SUPPORTED_VIDEO_CODECS = ["libx264", "h264_nvenc", "h264_qsv", "libx265", "hevc_nvenc", "hevc_qsv", "copy"] +SUPPORTED_AUDIO_CODECS = ["aac", "opus", "libmp3lame", "copy"] +LOGO_POSITIONS = ["top_left", "top_right", "bottom_left", "bottom_right", "center"] +FFMPEG_PRESETS = ["ultrafast", "superfast", "veryfast", "faster", "fast", "medium", "slow", "slower", "veryslow"] +OUTPUT_FORMATS = ["flv", "mpegts", "mp4", "hls"] + +# Common resolution presets for the picker +RESOLUTION_PRESETS = [ + ("360p", "640x360"), + ("480p", "854x480"), + ("720p", "1280x720"), + ("1080p", "1920x1080"), + ("1440p", "2560x1440"), + ("4K", "3840x2160"), + ("Source","source"), +] + +QUALITY_PRESETS = { + "low": {"video_bitrate": "800k", "audio_bitrate": "96k", "ffmpeg_preset": "superfast", "resolution": "854x480", "fps": 30}, + "medium": {"video_bitrate": "1500k", "audio_bitrate": "128k", "ffmpeg_preset": "medium", "resolution": "1280x720", "fps": 30}, + "high": {"video_bitrate": "3000k", "audio_bitrate": "192k", "ffmpeg_preset": "fast", "resolution": "1920x1080","fps": 30}, + "ultra": {"video_bitrate": "6000k", "audio_bitrate": "256k", "ffmpeg_preset": "slow", "resolution": "1920x1080","fps": 60}, + "source": {}, +} + +# ────────────────────────────────────────────── +# DEFAULT SETTINGS — every parameter exposed +# ────────────────────────────────────────────── +DEFAULT_USER_SETTINGS = { + # Playlist / IO + "input_url_playlist": [], + "current_playlist_index": 0, + "output_url": "rtmp://a.rtmp.youtube.com/live2/YOUR_KEY", + "output_format": "flv", + + # Quality shortcut + "quality_preset": "medium", + + # Video + "video_codec": "libx264", + "resolution": "1280x720", + "fps": 30, + "gop_size": 60, + "video_bitrate": "1500k", + "ffmpeg_preset": "medium", + "video_pix_fmt": "yuv420p", + "video_thread_count": 0, # 0 = auto + + # Audio + "audio_codec": "aac", + "audio_bitrate": "128k", + "audio_sample_rate": 44100, # 0 = source + "audio_channels": 2, # 0 = source + + # Loop / playlist + "loop_count": 0, # 0=once, -1=infinite, N=N times + "stop_on_error_in_playlist": True, + "reconnect_on_stream_error": True, + "reconnect_delay_seconds": 5, + "max_reconnect_attempts": 3, + + # Connection / network + "open_timeout_seconds": 15, + "read_timeout_seconds": 30, + + # Logo + "logo_enabled": False, + "logo_data_bytes": None, + "logo_mime_type": None, + "logo_original_filename": None, + "logo_position": "top_right", + "logo_scale": 0.10, + "logo_opacity": 0.85, + "logo_margin_px": 10, + + # Conversation state + "current_step": None, + "current_step_index": 0, + "conversation_fields_list": [], + "settings_editing_field": None, + + # UX mode: "send" = always new message (default), "edit" = edit-in-place + "ux_mode": "send", +} + +DEFAULT_SESSION_RUNTIME_STATE = { + "streaming_state": "idle", # idle|starting|streaming|paused|stopping|stopped|completed|error|reconnecting + "stream_start_time": None, + "frames_encoded": 0, + "bytes_sent": 0, + "stream_thread_ref": None, + "watchdog_thread_ref": None, + "pyav_objects": { + "input_container": None, + "output_container": None, + "video_out_stream": None, + "audio_out_stream": None, + "logo_image_pil": None, + }, + "live_log_lines_user": [], + "error_notification_user": "", + "stop_gracefully_flag": False, + "current_loop_iteration": 0, + "reconnect_attempt": 0, + "last_frame_time": None, # for watchdog + "last_notified_state": None, # for state-change notifications + "status_message_id": None, # message to auto-update in real-time + "status_chat_id": None, + "active_output_url": None, # the URL actually being streamed to right now +} + +# ────────────────────────────────────────────── +# SESSION MANAGEMENT +# ────────────────────────────────────────────── +user_sessions: dict = {} +session_locks: dict = {} +# Store the main event loop for cross-thread calls +_main_event_loop: asyncio.AbstractEventLoop = None + +# ────────────────────────────────────────────── +# REAL-TIME EVENT QUEUE SYSTEM +# Each chat_id has a list of asyncio.Queue objects (one per SSE subscriber). +# When any state/log change happens, we push an event to all subscribers. +# ────────────────────────────────────────────── +_sse_subscribers: dict = {} # chat_id -> list[asyncio.Queue] +_sse_lock = threading.Lock() + +# Outbound message queue: background threads can enqueue messages here; +# they will be returned on the next /webhook response for that chat. +_outbound_message_queue: dict = {} # chat_id -> list[dict] +_outbound_queue_lock = threading.Lock() + + +def _get_sse_subscribers(chat_id: int) -> list: + with _sse_lock: + return list(_sse_subscribers.get(chat_id, [])) + + +def _register_sse_subscriber(chat_id: int, q: asyncio.Queue): + with _sse_lock: + _sse_subscribers.setdefault(chat_id, []).append(q) + + +def _unregister_sse_subscriber(chat_id: int, q: asyncio.Queue): + with _sse_lock: + lst = _sse_subscribers.get(chat_id, []) + try: + lst.remove(q) + except ValueError: + pass + + +def push_sse_event(chat_id: int, event: dict): + """Push a JSON event to all SSE subscribers for this chat_id (thread-safe).""" + subscribers = _get_sse_subscribers(chat_id) + if not subscribers: + return + loop = _main_event_loop + if not loop or not loop.is_running(): + return + payload = json.dumps(event) + for q in subscribers: + try: + loop.call_soon_threadsafe(q.put_nowait, payload) + except Exception: + pass + + +def enqueue_outbound_message(chat_id: int, msg: dict): + """Queue a message dict to be returned on the next webhook response for chat_id.""" + with _outbound_queue_lock: + _outbound_message_queue.setdefault(chat_id, []).append(msg) + + +def pop_outbound_messages(chat_id: int) -> list: + """Pop and return all queued outbound messages for this chat_id.""" + with _outbound_queue_lock: + msgs = _outbound_message_queue.pop(chat_id, []) + return msgs + + +def get_user_session(chat_id: int) -> dict: + if chat_id not in user_sessions: + session_locks[chat_id] = threading.Lock() + with session_locks[chat_id]: + session = {} + for k, v in DEFAULT_USER_SETTINGS.items(): + session[k] = list(v) if isinstance(v, list) else (dict(v) if isinstance(v, dict) else v) + for k, v in DEFAULT_SESSION_RUNTIME_STATE.items(): + session[k] = dict(v) if isinstance(v, dict) else (list(v) if isinstance(v, list) else v) + user_sessions[chat_id] = session + logger.info(f"[Chat {chat_id}] New session created (in-memory).") + return user_sessions[chat_id] + + +def reset_session_settings(chat_id: int): + """Restore all settings to defaults, keep runtime state.""" + session = get_user_session(chat_id) + lock = session_locks[chat_id] + with lock: + for k, v in DEFAULT_USER_SETTINGS.items(): + session[k] = list(v) if isinstance(v, list) else (dict(v) if isinstance(v, dict) else v) + append_user_live_log(chat_id, "Settings restored to defaults.") + + +def append_user_live_log(chat_id: int, line: str): + session = get_user_session(chat_id) + lock = session_locks.get(chat_id) + entry = f"{datetime.datetime.now().strftime('%H:%M:%S')} {line}" + if lock: + with lock: + session['live_log_lines_user'].append(entry) + if len(session['live_log_lines_user']) > 100: + session['live_log_lines_user'].pop(0) + else: + session['live_log_lines_user'].append(entry) + # Push real-time log event to SSE subscribers + push_sse_event(chat_id, {"type": "log", "line": entry}) + + +# ────────────────────────────────────────────── +# TELEGRAM API HELPERS +# (Webhook-only mode: no outbound Telegram API calls) +# ────────────────────────────────────────────── +def push_message(chat_id: int, text: str, reply_markup=None, parse_mode="HTML"): + """ + In webhook-only mode outbound calls are not possible. + Log the notification so it appears in /logs instead. + """ + import html + plain = html.unescape(re.sub(r'<[^>]+>', '', text)) + logger.info(f"[Chat {chat_id}] [push_message suppressed] {plain[:200]}") + append_user_live_log(chat_id, f"[notification] {plain[:200]}") + + +# ────────────────────────────────────────────── +# TELEGRAM RESPONSE BUILDERS +# ────────────────────────────────────────────── +def send_message(chat_id: int, text: str, parse_mode="HTML", reply_markup=None): + """Build a sendMessage response dict (for webhook response).""" + resp = {"method": "sendMessage", "chat_id": chat_id, "text": text, "parse_mode": parse_mode} + if reply_markup: + resp["reply_markup"] = reply_markup + return resp + + +def edit_message_text(chat_id: int, message_id: int, text: str, parse_mode="HTML", reply_markup=None): + resp = {"method": "editMessageText", "chat_id": chat_id, "message_id": message_id, + "text": text, "parse_mode": parse_mode} + if reply_markup: + resp["reply_markup"] = reply_markup + return resp + + +def answer_callback_query(cq_id: str, text: str = None, show_alert: bool = False): + resp = {"method": "answerCallbackQuery", "callback_query_id": cq_id} + if text: + resp["text"] = text + if show_alert: + resp["show_alert"] = True + return resp + + +# ────────────────────────────────────────────── +# LIVE VIEW SYSTEM (inspired by logg.py) +# ────────────────────────────────────────────── +# Pattern taken from logg.py: +# • Each chat has a "live_view" entry: message_id + last_sent_text + show flag +# • A background thread every UPDATE_INTERVAL seconds builds the fresh combined +# status+log text and queues an editMessageText for that message_id. +# • On next webhook response the edit is piggybacked — the bubble updates in place. +# • Pause/Resume toggle show_live_view, suppressing/resuming edits (same as logg.py). +# • Refresh forces an immediate rebuild+enqueue regardless of interval. +# • Auto-registers when user taps 📊 Status or 📋 Logs. +# • Auto-unregisters when stream reaches a terminal state. +# • SSE events are pushed alongside every edit for web dashboard clients. +# +# UPDATE_INTERVAL: how often the background thread re-edits the live bubble (seconds) +LIVE_VIEW_UPDATE_INTERVAL = 2 + +_live_views: dict = {} # chat_id -> {message_id, last_sent, show, paused_at, mode} +_live_view_lock = threading.Lock() + + +def lock_for(chat_id: int): + return session_locks.get(chat_id, threading.Lock()) + + +def _build_live_view_text(chat_id: int, mode: str = "status") -> str: + """ + Build the combined live-view message text. + mode="status" → status block + last 8 log lines + mode="logs" → last 30 log lines only (like logg.py /logs) + """ + session = get_user_session(chat_id) + state = session.get('streaming_state', 'idle') + + with _live_view_lock: + info = _live_views.get(chat_id, {}) + is_paused = not info.get("show", True) + + if mode == "logs": + logs = session.get('live_log_lines_user', []) + log_tail = "\n".join(logs[-30:]) if logs else "No logs yet." + pause_note = "\n\n⏸ Updates paused" if is_paused else "" + return ( + f"📋 Live Logs {'(paused)' if is_paused else '(updating…)'}\n" + f"
{esc(log_tail)}
" + f"{pause_note}" + ) + else: + status = compose_status_message(chat_id, include_config=False, include_logs=False) + logs = session.get('live_log_lines_user', []) + log_tail = "\n".join(logs[-8:]) if logs else "No logs yet." + pause_note = "\n\n⏸ Auto-updates paused — tap ▶️ Resume to continue" if is_paused else "" + + # Only show log tail while actively streaming + if state in ("streaming", "starting", "paused", "reconnecting"): + return ( + f"{status}\n\n" + f"📋 Recent Logs:\n
{esc(log_tail)}
" + f"{pause_note}" + ) + else: + return status + pause_note + + +def _get_live_view_keyboard(chat_id: int) -> dict: + """ + Keyboard for the live-view bubble: Pause/Resume/Refresh + stream controls. + Mirrors logg.py's create_log_keyboard() pattern. + """ + session = get_user_session(chat_id) + state = session.get('streaming_state', 'idle') + streaming = state in ("streaming", "paused", "starting", "reconnecting") + + with _live_view_lock: + info = _live_views.get(chat_id, {}) + is_paused_view = not info.get("show", True) + + rows = [] + + # Row 1: Live-view controls (always present when registered) + view_row = [] + if is_paused_view: + view_row.append({"text": "▶️ Resume Updates", "callback_data": "lv_resume"}) + else: + view_row.append({"text": "⏸ Pause Updates", "callback_data": "lv_pause"}) + view_row.append({"text": "🔄 Refresh", "callback_data": "lv_refresh"}) + view_row.append({"text": "❌ Close", "callback_data": "lv_close"}) + rows.append(view_row) + + # Row 2: Stream controls (context-aware, same as get_main_keyboard) + if streaming: + ctrl = [] + if state == "streaming": + ctrl.append({"text": "⏸ Pause Stream", "callback_data": "stream_pause"}) + elif state == "paused": + ctrl.append({"text": "▶️ Resume Stream", "callback_data": "stream_resume"}) + ctrl.append({"text": "⏹ Stop Stream", "callback_data": "stream_abort"}) + rows.append(ctrl) + if session.get('loop_count', 0) == -1: + rows.append([{"text": "⏳ Finish After This Loop", "callback_data": "stream_stop_graceful"}]) + else: + playlist_count = len(session.get('input_url_playlist', [])) + if playlist_count > 0 and session.get('output_url'): + rows.append([{"text": "🚀 Start Stream", "callback_data": "stream_start"}]) + + return {"inline_keyboard": rows} + + +def register_live_view(chat_id: int, message_id: int, mode: str = "status"): + """ + Register a message as the live-view bubble for this chat. + Replaces any existing registration (one live bubble per chat). + """ + with _live_view_lock: + _live_views[chat_id] = { + "message_id": message_id, + "last_sent": "", + "show": True, # True = update edits are enabled + "paused_at": None, + "mode": mode, # "status" or "logs" + } + with lock_for(chat_id): + get_user_session(chat_id)['status_message_id'] = message_id + logger.info(f"[Chat {chat_id}] Live view registered → msg {message_id} mode={mode}") + + +def unregister_live_view(chat_id: int): + with _live_view_lock: + _live_views.pop(chat_id, None) + logger.info(f"[Chat {chat_id}] Live view unregistered.") + + +def set_live_view_show(chat_id: int, show: bool): + """Toggle live-view updates on/off (Pause/Resume).""" + with _live_view_lock: + if chat_id in _live_views: + _live_views[chat_id]["show"] = show + _live_views[chat_id]["paused_at"] = None if show else time.strftime('%H:%M:%S') + + +def _do_live_view_edit(chat_id: int, force: bool = False): + """ + Build fresh text and enqueue an editMessageText for the live-view bubble. + Skips if paused (unless force=True) or if text hasn't changed. + IMPORTANT: Replaces any existing queued edit for this message (cap=1) + so stale edits never pile up in the outbound queue. + Also pushes SSE event (for web dashboard clients). + """ + with _live_view_lock: + info = _live_views.get(chat_id) + if not info: + return + show = info["show"] + msg_id = info["message_id"] + last_sent = info["last_sent"] + mode = info.get("mode", "status") + + if not show and not force: + return # Paused — skip edit (same as logg.py) + + new_text = _build_live_view_text(chat_id, mode) + kb = _get_live_view_keyboard(chat_id) + + # Only enqueue if content actually changed (avoids "message not modified" errors) + if new_text == last_sent and not force: + # Still push SSE even if Telegram text unchanged + _push_state_sse(chat_id) + return + + with _live_view_lock: + if chat_id in _live_views: + _live_views[chat_id]["last_sent"] = new_text + + edit_payload = edit_message_text(chat_id, msg_id, new_text, reply_markup=kb) + + # Replace (not append) any existing live-view edit in the queue so we never + # pile up stale edits that Telegram would reject or deliver out of order. + with _outbound_queue_lock: + existing = _outbound_message_queue.get(chat_id, []) + # Remove any previous edit targeting the same message_id + filtered = [m for m in existing + if not (m.get("method") == "editMessageText" + and m.get("message_id") == msg_id)] + filtered.append(edit_payload) + _outbound_message_queue[chat_id] = filtered + + # Also push SSE snapshot for web clients + _push_state_sse(chat_id) + + +def _live_view_updater_loop(): + """ + Background thread — fires every LIVE_VIEW_UPDATE_INTERVAL seconds. + For each registered live-view, calls _do_live_view_edit(). + On terminal state: does one forced final update then unregisters. + Also pushes SSE for active sessions without a live-view registered. + Inspired directly by logg.py's stream_command() update-interval pattern. + """ + while True: + time.sleep(LIVE_VIEW_UPDATE_INTERVAL) + try: + with _live_view_lock: + entries = list(_live_views.items()) + + for chat_id, info in entries: + try: + session = get_user_session(chat_id) + state = session.get('streaming_state', 'idle') + + _do_live_view_edit(chat_id) + + # Auto-unregister on terminal state after one forced final update + if state in ('idle', 'stopped', 'completed', 'error'): + _do_live_view_edit(chat_id, force=True) + unregister_live_view(chat_id) + # Push final SSE + _push_state_sse(chat_id) + + except Exception as e_inner: + logger.warning(f"Live view updater error for {chat_id}: {e_inner}") + + # Push SSE for active sessions without a live-view (scheduler streams, etc.) + try: + with _live_view_lock: + registered = set(_live_views.keys()) + for chat_id, session in list(user_sessions.items()): + state = session.get('streaming_state', 'idle') + if state in ('streaming', 'starting', 'reconnecting', 'paused') \ + and chat_id not in registered: + _push_state_sse(chat_id) + except Exception: + pass + + except Exception as e: + logger.error(f"Live view updater loop error: {e}") + + +_live_view_updater_thread = threading.Thread( + target=_live_view_updater_loop, name="LiveViewUpdater", daemon=True) +_live_view_updater_thread.start() + + +# Compatibility shim — old code that called register_status_message still works +def register_status_message(chat_id: int, message_id: int): + register_live_view(chat_id, message_id, mode="status") + + +def unregister_status_message(chat_id: int): + unregister_live_view(chat_id) + + +def pop_pending_status_edit(chat_id: int): + """ + Legacy shim kept for webhook handler compatibility. + Now a no-op — edits are enqueued directly by _do_live_view_edit(). + """ + return None + + +# ────────────────────────────────────────────── +# CONTEXT-AWARE KEYBOARDS +# ────────────────────────────────────────────── +def get_main_keyboard(session: dict): + """Returns inline keyboard appropriate for current session state.""" + state = session.get('streaming_state', 'idle') + streaming = state in ("streaming", "paused", "starting", "reconnecting") + playlist_count = len(session.get('input_url_playlist', [])) + + if streaming: + # Stream control buttons + btns = [] + row1 = [] + if state == "streaming": + row1.append({"text": "⏸ Pause", "callback_data": "stream_pause"}) + elif state == "paused": + row1.append({"text": "▶️ Resume", "callback_data": "stream_resume"}) + row1.append({"text": "⏹ Stop", "callback_data": "stream_abort"}) + btns.append(row1) + + btns.append([ + {"text": "📊 Status", "callback_data": "stream_status"}, + {"text": "📋 Logs", "callback_data": "show_user_logs"}, + ]) + if session.get('loop_count', 0) == -1: + btns.append([{"text": "⏳ Finish After This Loop", "callback_data": "stream_stop_graceful"}]) + btns.append([{"text": "❔ Help", "callback_data": "show_help"}]) + btns.append([{"text": "⚡ Force Reboot", "callback_data": "force_reboot"}]) + else: + # Idle mode — full navigation + btns = [] + # Row 1: Start stream + if playlist_count > 0 and session.get('output_url'): + btns.append([{"text": "🚀 Start Stream", "callback_data": "stream_start"}]) + else: + btns.append([{"text": "⚙️ Quick Setup (required)", "callback_data": "quick_setup"}]) + + # Row 2: Config + btns.append([ + {"text": f"📜 Playlist ({playlist_count})", "callback_data": "view_playlist"}, + {"text": "⚙️ Settings", "callback_data": "open_settings"}, + ]) + + # Row 3: Logo + Schedule + logo_icon = "🖼✅" if session.get('logo_enabled') else "🖼" + btns.append([ + {"text": logo_icon + " Logo", "callback_data": "cfg_logo"}, + {"text": "🕒 Schedule", "callback_data": "cfg_schedule"}, + ]) + + # Row 4: Utils + btns.append([ + {"text": "🔄 Reset to Defaults", "callback_data": "confirm_reset"}, + {"text": "❔ Help", "callback_data": "show_help"}, + ]) + # Row 5: Emergency + btns.append([ + {"text": "⚡ Force Reboot", "callback_data": "force_reboot"}, + ]) + + return {"inline_keyboard": btns} + + +def get_settings_keyboard(session: dict = None): + """Inline keyboard for settings navigation.""" + ux = (session or {}).get("ux_mode", "send") + ux_label = "💬 UX: New Message ✅" if ux == "send" else "✏️ UX: Edit In-Place ✅" + return {"inline_keyboard": [ + [{"text": "📡 Output URL", "callback_data": "set_output_url"}, + {"text": "🎬 Input Playlist", "callback_data": "view_playlist"}], + [{"text": "🎥 Video Codec", "callback_data": "set_video_codec"}, + {"text": "🔊 Audio Codec", "callback_data": "set_audio_codec"}], + [{"text": "📐 Resolution", "callback_data": "pick_resolution"}, + {"text": "🎞 FPS", "callback_data": "set_fps"}], + [{"text": "📶 Video Bitrate", "callback_data": "set_video_bitrate"}, + {"text": "🔉 Audio Bitrate", "callback_data": "set_audio_bitrate"}], + [{"text": "⚡ Preset", "callback_data": "set_ffmpeg_preset"}, + {"text": "🔁 Loop Count", "callback_data": "set_loop_count"}], + [{"text": "🔀 GOP Size", "callback_data": "set_gop_size"}, + {"text": "📦 Output Format", "callback_data": "set_output_format"}], + [{"text": "⏱ Reconnect Delay", "callback_data": "set_reconnect_delay"}, + {"text": "🔄 Auto-Reconnect", "callback_data": "toggle_reconnect"}], + [{"text": "🛑 Stop on Error", "callback_data": "toggle_stop_on_error"}, + {"text": "⏰ Open Timeout", "callback_data": "set_open_timeout"}], + [{"text": "🎨 Quality Preset", "callback_data": "pick_quality_preset"}], + [{"text": ux_label, "callback_data": "toggle_ux_mode"}], + [{"text": "✅ Done", "callback_data": "settings_done"}], + ]} + + +def get_quality_keyboard(): + rows = [[{"text": q.title(), "callback_data": f"apply_quality_{q}"} for q in list(QUALITY_PRESETS.keys())[:3]]] + rows.append([{"text": q.title(), "callback_data": f"apply_quality_{q}"} for q in list(QUALITY_PRESETS.keys())[3:]]) + rows.append([{"text": "↩ Back", "callback_data": "open_settings"}]) + return {"inline_keyboard": rows} + + +def get_codec_keyboard(codec_type: str): + codecs = SUPPORTED_VIDEO_CODECS if codec_type == "video" else SUPPORTED_AUDIO_CODECS + cb_prefix = "set_vcodec_" if codec_type == "video" else "set_acodec_" + rows = [] + row = [] + for c in codecs: + row.append({"text": c, "callback_data": cb_prefix + c}) + if len(row) == 2: + rows.append(row) + row = [] + if row: + rows.append(row) + rows.append([{"text": "↩ Back to Settings", "callback_data": "open_settings"}]) + return {"inline_keyboard": rows} + + +def get_preset_keyboard(): + rows = [] + row = [] + for p in FFMPEG_PRESETS: + row.append({"text": p, "callback_data": "set_preset_" + p}) + if len(row) == 3: + rows.append(row) + row = [] + if row: + rows.append(row) + rows.append([{"text": "↩ Back", "callback_data": "open_settings"}]) + return {"inline_keyboard": rows} + + +def get_resolution_keyboard(): + """Inline picker for common resolutions.""" + rows = [] + row = [] + for label, val in RESOLUTION_PRESETS: + row.append({"text": label, "callback_data": "set_res_" + val}) + if len(row) == 3: + rows.append(row) + row = [] + if row: + rows.append(row) + rows.append([{"text": "✏️ Custom…", "callback_data": "set_res_custom"}]) + rows.append([{"text": "↩ Back", "callback_data": "open_settings"}]) + return {"inline_keyboard": rows} + + +def get_logo_pos_keyboard(): + positions = ["top_left", "top_right", "bottom_left", "bottom_right", "center"] + rows = [[{"text": p.replace("_", " ").title(), "callback_data": f"set_logo_pos_{p}"}] for p in positions] + rows.append([{"text": "↩ Back", "callback_data": "cfg_logo"}]) + return {"inline_keyboard": rows} + + +def get_reset_confirm_keyboard(): + return {"inline_keyboard": [ + [{"text": "✅ Yes, Reset Everything", "callback_data": "do_reset"}, + {"text": "❌ Cancel", "callback_data": "show_help"}], + ]} + + +# ────────────────────────────────────────────── +# VALIDATION HELPERS +# ────────────────────────────────────────────── +def validate_url(url: str) -> bool: + try: + r = urlparse(url) + return all([r.scheme, r.netloc]) and r.scheme in ['http', 'https', 'rtmp', 'rtsp', 'udp', 'srt', 'file'] + except Exception: + return False + + +def validate_resolution(res: str) -> bool: + if res.lower() == "source": + return True + return bool(re.match(r"^\d{3,4}x\d{3,4}$", res)) + + +def parse_bitrate(bitrate_str: str) -> int: + s = str(bitrate_str).lower() + try: + if s.endswith('m'): + return int(float(s[:-1]) * 1_000_000) + if s.endswith('k'): + return int(float(s[:-1]) * 1_000) + return int(s) + except ValueError: + return 1_500_000 + + +def esc(text: str) -> str: + """HTML-escape a string.""" + return str(text).replace("&", "&").replace("<", "<").replace(">", ">") + + +# ────────────────────────────────────────────── +# STATUS & DISPLAY +# ────────────────────────────────────────────── +STATE_EMOJI = { + "idle": "💤", "starting": "🔄", "streaming": "🔴", + "paused": "⏸", "stopping": "🛑", "stopped": "⏹", + "completed": "✅", "error": "❌", "reconnecting": "🔁", +} + + +def get_uptime(start_time_obj) -> str: + if not start_time_obj: + return "—" + if start_time_obj.tzinfo is None: + start_time_obj = start_time_obj.replace(tzinfo=datetime.timezone.utc) + delta = datetime.datetime.now(datetime.timezone.utc) - start_time_obj + return str(delta).split('.')[0] + + +def format_settings_display(session: dict) -> str: + logo_status = "Disabled" + if session.get('logo_enabled') and session.get('logo_data_bytes'): + fname = session.get('logo_original_filename', 'Unknown') + logo_status = f"✅ {esc(fname)}" + + loop = session.get('loop_count', 0) + loop_disp = "∞ Infinite" if loop == -1 else ("Once" if loop == 0 else f"{loop}×") + + auto_rc = "✅" if session.get('reconnect_on_stream_error') else "❌" + stop_err = "✅" if session.get('stop_on_error_in_playlist') else "❌" + + lines = [ + f"📡 Output URL: {esc(session.get('output_url', '—'))}", + f"📜 Playlist: {len(session.get('input_url_playlist', []))} item(s)", + "", + f"🎨 Quality Preset: {esc(session.get('quality_preset', '—'))}", + f"🎥 Video: {esc(session.get('video_codec'))} | {esc(session.get('resolution'))} | {session.get('fps')}fps | {esc(session.get('video_bitrate'))}", + f"🔊 Audio: {esc(session.get('audio_codec'))} | {esc(session.get('audio_bitrate'))}", + f"⚡ Preset: {esc(session.get('ffmpeg_preset'))} 🔀 GOP: {session.get('gop_size')}", + f"📦 Format: {esc(session.get('output_format', 'flv'))}", + "", + f"🔁 Loop: {loop_disp}", + f"🔄 Auto-Reconnect: {auto_rc} (delay: {session.get('reconnect_delay_seconds')}s, max: {session.get('max_reconnect_attempts')})", + f"🛑 Stop on Error: {stop_err}", + f"⏱ Timeouts: open {session.get('open_timeout_seconds')}s read {session.get('read_timeout_seconds')}s", + f"🖼 Logo: {logo_status}", + ] + if session.get('logo_enabled') and session.get('logo_data_bytes'): + lines.append( + f" Position: {session.get('logo_position')} " + f"Scale: {session.get('logo_scale')} " + f"Opacity: {session.get('logo_opacity')}" + ) + return "\n".join(lines) + + +def compose_status_message(chat_id: int, include_config: bool = False, include_logs: bool = True) -> str: + session = get_user_session(chat_id) + state = session.get('streaming_state', 'idle') + icon = STATE_EMOJI.get(state, "❓") + + lines = [ + f"🤖 Stream Bot v{APP_VERSION}", + f"{icon} State: {state}", + ] + + if session.get('error_notification_user'): + lines.append(f"⚠️ Last Error: {esc(session['error_notification_user'])}") + + if state in ("streaming", "paused", "starting", "stopping", "reconnecting"): + playlist = session.get('input_url_playlist', []) + idx = session.get('current_playlist_index', 0) + cur_url = playlist[idx] if playlist and 0 <= idx < len(playlist) else "N/A" + loop = session.get('loop_count', 0) + loop_total = "∞" if loop == -1 else str(max(loop, 1)) + reconnect_att = session.get('reconnect_attempt', 0) + + lines += [ + "", + f"⏱ Uptime: {get_uptime(session.get('stream_start_time'))}", + f"🎞 Frames: {session.get('frames_encoded', 0):,}", + f"📤 Sent: {session.get('bytes_sent', 0) / (1024*1024):.2f} MB", + f"🎬 Input: {esc(cur_url)} ({idx+1}/{len(playlist)})", + f"🔁 Loop: {session.get('current_loop_iteration', 0)+1}/{loop_total}", + ] + if reconnect_att > 0: + lines.append(f"🔄 Reconnect attempts: {reconnect_att}") + + if include_config or state in ("idle", "stopped", "completed", "error"): + lines += ["", "⚙️ Configuration:", format_settings_display(session)] + + if include_logs and state in ("streaming", "paused", "starting", "reconnecting"): + user_logs = session.get('live_log_lines_user', []) + last_logs = "\n".join(user_logs[-6:]) if user_logs else "No logs yet." + lines += ["", "📋 Recent Logs:", f"
{esc(last_logs)}
"] + + return "\n".join(lines) + + +# ────────────────────────────────────────────── +# STREAM STATE CHANGE NOTIFICATION +# ────────────────────────────────────────────── +def notify_state_change(chat_id: int, new_state: str, extra_msg: str = ""): + """Push a message to user when stream state changes.""" + session = get_user_session(chat_id) + icon = STATE_EMOJI.get(new_state, "❓") + text = f"{icon} Stream state changed: {new_state}" + if extra_msg: + text += f"\n{extra_msg}" + text += "\n\n" + compose_status_message(chat_id) + kb = get_main_keyboard(session) + # Webhook-only: queue outbound message to be returned on next webhook response + push_message(chat_id, text, reply_markup=kb) + enqueue_outbound_message(chat_id, send_message(chat_id, text, reply_markup=kb)) + # Push SSE state event with full status snapshot + _push_state_sse(chat_id) + + +def _push_state_sse(chat_id: int): + """Build and push a full real-time state snapshot to SSE subscribers.""" + try: + session = get_user_session(chat_id) + state = session.get('streaming_state', 'idle') + uptime = 0 + if session.get('stream_start_time'): + try: + uptime = int((datetime.datetime.now(datetime.timezone.utc) - session['stream_start_time']).total_seconds()) + except Exception: + pass + # Build keyboard state for client-side button rendering + kb = get_main_keyboard(session) + push_sse_event(chat_id, { + "type": "state", + "state": state, + "icon": STATE_EMOJI.get(state, "❓"), + "frames_encoded": session.get('frames_encoded', 0), + "bytes_sent": session.get('bytes_sent', 0), + "uptime_seconds": uptime, + "uptime_str": get_uptime(session.get('stream_start_time')), + "reconnect_attempt": session.get('reconnect_attempt', 0), + "error": session.get('error_notification_user', ''), + "active_output_url": session.get('active_output_url', ''), + "playlist_index": session.get('current_playlist_index', 0), + "playlist_count": len(session.get('input_url_playlist', [])), + "loop_iteration": session.get('current_loop_iteration', 0), + "status_text": compose_status_message(chat_id, include_config=False), + "keyboard": kb, + "ts": datetime.datetime.now(datetime.timezone.utc).isoformat(), + }) + except Exception as e: + logger.warning(f"_push_state_sse error for {chat_id}: {e}") + + +def update_streaming_state(chat_id: int, new_state: str, lock, session: dict, extra_msg: str = ""): + """Thread-safe state update + push notification if state changed.""" + old_state = session.get('streaming_state') + session['streaming_state'] = new_state + should_notify = (old_state != new_state) + + if should_notify: + # Notify outside the lock + threading.Thread( + target=notify_state_change, + args=(chat_id, new_state, extra_msg), + daemon=True + ).start() + else: + # Still push SSE stats even if state didn't change + threading.Thread(target=_push_state_sse, args=(chat_id,), daemon=True).start() + + +# ────────────────────────────────────────────── +# WATCHDOG THREAD +# ────────────────────────────────────────────── +def watchdog_thread_target(chat_id: int): + """Monitors the stream thread for freezes and notifies user.""" + session = get_user_session(chat_id) + lock = session_locks[chat_id] + STALL_THRESHOLD = 30 # seconds without a new frame = stall + + logger.info(f"[Chat {chat_id}] Watchdog started.") + while True: + time.sleep(5) + with lock: + state = session.get('streaming_state') + # Exit watchdog if stream is in any terminal or stopping state + if state not in ("streaming", "paused", "reconnecting", "starting"): + break + + last_frame = session.get('last_frame_time') + thread_ref = session.get('stream_thread_ref') + + # Check if stream thread died unexpectedly + if thread_ref and not thread_ref.is_alive(): + with lock: + cur_state = session.get('streaming_state') + # Only mark error if not already in a clean terminal/stopping state + if cur_state in ("streaming", "paused", "reconnecting", "starting"): + logger.error(f"[Chat {chat_id}] Watchdog: stream thread died unexpectedly!") + append_user_live_log(chat_id, "⚠️ Stream thread died unexpectedly. Marking as error.") + with lock: + session['streaming_state'] = "error" + session['error_notification_user'] = "Stream thread terminated unexpectedly." + notify_state_change(chat_id, "error", "The stream thread crashed. Use /stream to restart.") + break + + # Check for frame stall (only while actively streaming) + if state == "streaming" and last_frame: + stall_seconds = (datetime.datetime.now() - last_frame).total_seconds() + if stall_seconds > STALL_THRESHOLD: + append_user_live_log(chat_id, f"⚠️ Watchdog: No frames for {stall_seconds:.0f}s — possible freeze!") + enqueue_outbound_message(chat_id, send_message( + chat_id, + f"⚠️ Stream may be frozen!\nNo frames for {stall_seconds:.0f}s.\n" + f"Use /abort to stop or wait for auto-reconnect.", + reply_markup=get_main_keyboard(session) + )) + + logger.info(f"[Chat {chat_id}] Watchdog exiting.") + + +# ────────────────────────────────────────────── +# CORE STREAM ENGINE +# ────────────────────────────────────────────── +def stream_engine_thread_target(chat_id: int): + session = get_user_session(chat_id) + lock = session_locks[chat_id] + + def _cleanup_pyav(): + append_user_live_log(chat_id, "Cleaning up PyAV resources…") + with lock: + objs = session.get('pyav_objects', {}) + for key in ('output_container', 'input_container'): + container = objs.get(key) + if container: + try: + container.close() + except Exception as e: + append_user_live_log(chat_id, f"Error closing {key}: {e}") + objs[key] = None + objs['video_out_stream'] = None + objs['audio_out_stream'] = None + objs['logo_image_pil'] = None + + active_output_container = None + + try: + with lock: + session['streaming_state'] = "starting" + session['error_notification_user'] = "" + session['stream_start_time'] = datetime.datetime.now(datetime.timezone.utc) + session['frames_encoded'] = 0 + session['bytes_sent'] = 0 + session['current_loop_iteration'] = 0 + session['current_playlist_index'] = 0 + session['stop_gracefully_flag'] = False + session['reconnect_attempt'] = 0 + session['last_frame_time'] = None + session['pyav_objects'] = { + "input_container": None, "output_container": None, + "video_out_stream": None, "audio_out_stream": None, + "logo_image_pil": None, + } + + append_user_live_log(chat_id, f"Stream engine started → {session['output_url']}") + + # Notify state change + notify_state_change(chat_id, "starting") + + # --- Open output container --- + # Always read fresh from session so any /set output_url change takes effect. + with lock: + output_url = session['output_url'] + output_format = session.get('output_format', 'flv') + open_timeout = session.get('open_timeout_seconds', 15) + # Also store the active_output_url in session for status display + with lock: + session['active_output_url'] = output_url + append_user_live_log(chat_id, f"Opening output: {output_url} [{output_format}]") + try: + active_output_container = av.open( + output_url, mode='w', format=output_format, + timeout=open_timeout + ) + with lock: + session['pyav_objects']['output_container'] = active_output_container + append_user_live_log(chat_id, f"Output container opened ({output_format}).") + except Exception as e: + raise Exception(f"Failed to open output stream: {e}") + + # --- Load logo --- + logo_pil_original = None + if session.get('logo_enabled') and session.get('logo_data_bytes'): + try: + logo_pil_original = Image.open(io.BytesIO(session['logo_data_bytes'])).convert("RGBA") + opacity = session.get('logo_opacity', 1.0) + if opacity < 1.0: + alpha = logo_pil_original.split()[-1] + alpha = ImageEnhance.Brightness(alpha).enhance(opacity) + logo_pil_original.putalpha(alpha) + with lock: + session['pyav_objects']['logo_image_pil'] = logo_pil_original + append_user_live_log(chat_id, f"Logo loaded: {session.get('logo_original_filename', 'N/A')}") + except Exception as e: + append_user_live_log(chat_id, f"Logo load error: {e}. Skipping logo.") + logo_pil_original = None + + # --- Main loop --- + total_loops = session.get('loop_count', 0) + playlist = session.get('input_url_playlist', []) + + if not playlist: + raise Exception("Playlist is empty. Add URLs with /playlist add ") + + active_video_out_stream = None + active_audio_out_stream = None + output_configured = False + current_logo_resized = None + logo_pos_x, logo_pos_y = 0, 0 + + while True: + with lock: + if session.get('stop_gracefully_flag') or session['streaming_state'] == "stopping": + break + current_loop_iter = session['current_loop_iteration'] + # Re-read total_loops and playlist each iteration so live changes are picked up + total_loops = session.get('loop_count', 0) + playlist = list(session.get('input_url_playlist', [])) + + if total_loops != -1 and current_loop_iter >= max(total_loops, 1): + append_user_live_log(chat_id, f"Completed all {max(total_loops, 1)} loop(s).") + break + + append_user_live_log(chat_id, f"Loop iteration {current_loop_iter + 1}") + with lock: + session['current_playlist_index'] = 0 + + while True: + with lock: + cur_idx = session['current_playlist_index'] + should_stop = session.get('stop_gracefully_flag') or session['streaming_state'] == "stopping" + playlist_done = cur_idx >= len(playlist) + cur_playlist = list(session['input_url_playlist']) + + if should_stop or playlist_done: + break + + current_input_url = cur_playlist[cur_idx] + append_user_live_log(chat_id, f"Input [{cur_idx+1}/{len(cur_playlist)}]: {current_input_url}") + + _input_container = None + reconnect_on_error = session.get('reconnect_on_stream_error', True) + reconnect_delay = session.get('reconnect_delay_seconds', 5) + max_reconnects = session.get('max_reconnect_attempts', 3) + read_timeout = session.get('read_timeout_seconds', 30) + + try: + _input_container = av.open(current_input_url, timeout=open_timeout) + with lock: + session['pyav_objects']['input_container'] = _input_container + session['reconnect_attempt'] = 0 + + in_v_streams = _input_container.streams.video + in_a_streams = _input_container.streams.audio + if not in_v_streams: + raise Exception("No video stream found in input.") + + in_v_s = in_v_streams[0] + target_w, target_h = in_v_s.width, in_v_s.height + if session.get('resolution', 'source') != 'source': + try: + target_w, target_h = map(int, session['resolution'].split('x')) + except Exception: + pass + + # Configure output streams (once) + if not output_configured: + fps = session.get('fps', 30) + active_video_out_stream = active_output_container.add_stream( + session.get('video_codec', 'libx264'), rate=fps + ) + active_video_out_stream.width = target_w + active_video_out_stream.height = target_h + active_video_out_stream.pix_fmt = session.get('video_pix_fmt', 'yuv420p') + active_video_out_stream.codec_context.gop_size = session.get('gop_size', fps * 2) + active_video_out_stream.codec_context.bit_rate = parse_bitrate(session.get('video_bitrate', '1500k')) + tc = session.get('video_thread_count', 0) + if tc > 0: + active_video_out_stream.codec_context.thread_count = tc + active_video_out_stream.codec_context.thread_type = "AUTO" + preset = session.get('ffmpeg_preset', 'medium') + if session.get('video_codec', 'libx264') not in ('copy',): + active_video_out_stream.codec_context.options = {'preset': preset} + + with lock: + session['pyav_objects']['video_out_stream'] = active_video_out_stream + + if in_a_streams: + in_a_s = in_a_streams[0] + a_rate = session.get('audio_sample_rate') or in_a_s.rate + active_audio_out_stream = active_output_container.add_stream( + session.get('audio_codec', 'aac'), rate=a_rate + ) + active_audio_out_stream.codec_context.bit_rate = parse_bitrate(session.get('audio_bitrate', '128k')) + with lock: + session['pyav_objects']['audio_out_stream'] = active_audio_out_stream + + # Logo scaling + if logo_pil_original: + logo_scale = session.get('logo_scale', 0.10) + margin = session.get('logo_margin_px', 10) + logo_h = int(target_h * logo_scale) + logo_w = int(logo_h * (logo_pil_original.width / logo_pil_original.height)) + logo_w = min(logo_w, target_w) + logo_h = min(logo_h, target_h) + if logo_w > 0 and logo_h > 0: + current_logo_resized = logo_pil_original.resize((logo_w, logo_h), Image.Resampling.LANCZOS) + pos = session.get('logo_position', 'top_right') + if pos == 'top_right': + logo_pos_x, logo_pos_y = target_w - logo_w - margin, margin + elif pos == 'top_left': + logo_pos_x, logo_pos_y = margin, margin + elif pos == 'bottom_left': + logo_pos_x, logo_pos_y = margin, target_h - logo_h - margin + elif pos == 'bottom_right': + logo_pos_x, logo_pos_y = target_w - logo_w - margin, target_h - logo_h - margin + elif pos == 'center': + logo_pos_x, logo_pos_y = (target_w - logo_w) // 2, (target_h - logo_h) // 2 + + output_configured = True + append_user_live_log(chat_id, f"Output streams configured: {target_w}x{target_h}@{fps}fps") + + with lock: + if session['streaming_state'] != "paused": + old_st = session['streaming_state'] + session['streaming_state'] = "streaming" + if old_st != "streaming": + threading.Thread(target=notify_state_change, args=(chat_id, "streaming"), daemon=True).start() + # Schedule a delayed live-view refresh so the status bubble + # reflects real stats (frames/bytes/uptime) instead of zeros. + def _delayed_stats_refresh(cid=chat_id): + time.sleep(4) + _push_state_sse(cid) + _do_live_view_edit(cid, force=True) + threading.Thread(target=_delayed_stats_refresh, daemon=True).start() + + # --- Packet loop --- + last_read_time = time.time() + for packet in _input_container.demux(video=0, audio=0 if in_a_streams else -1): + with lock: + if session['streaming_state'] == "stopping": + break + while session['streaming_state'] == "paused": + lock.release() + time.sleep(0.2) + lock.acquire() + if session['streaming_state'] == "stopping": + break + + # Read timeout check + if time.time() - last_read_time > read_timeout: + raise Exception(f"Read timeout after {read_timeout}s of no packets.") + last_read_time = time.time() + + if packet.dts is None: + continue + + if packet.stream.type == 'video' and packet.stream.index == in_v_s.index: + for frame in packet.decode(): + if not frame.width or not frame.height: + continue + if frame.width != target_w or frame.height != target_h: + frame = frame.reformat(target_w, target_h, format='yuv420p') + + if current_logo_resized: + try: + pil_f = frame.to_image() + ax = max(0, min(logo_pos_x, pil_f.width - current_logo_resized.width)) + ay = max(0, min(logo_pos_y, pil_f.height - current_logo_resized.height)) + pil_f.paste(current_logo_resized, (ax, ay), current_logo_resized) + new_pts = frame.pts + frame = av.VideoFrame.from_image(pil_f) + frame.pts = new_pts + except Exception as e_logo: + pass + + for out_pkt in active_video_out_stream.encode(frame): + active_output_container.mux(out_pkt) + with lock: + session['frames_encoded'] += 1 + session['bytes_sent'] += out_pkt.size + session['last_frame_time'] = datetime.datetime.now() + # Push live stats to SSE subscribers: every frame for first 300, then every 100 + _fc = session['frames_encoded'] + if _fc < 300 or _fc % 100 == 0: + threading.Thread(target=_push_state_sse, args=(chat_id,), daemon=True).start() + + elif active_audio_out_stream and packet.stream.type == 'audio' and in_a_streams and packet.stream.index == in_a_streams[0].index: + for frame in packet.decode(): + if frame.pts is None: + continue + for out_pkt in active_audio_out_stream.encode(frame): + active_output_container.mux(out_pkt) + with lock: + session['bytes_sent'] += out_pkt.size + + append_user_live_log(chat_id, f"Finished input: {current_input_url}") + + except Exception as e_input: + tb = traceback.format_exc() + err_msg = str(e_input) + # Strip any log-format prefix that might have leaked in + if "] " in err_msg and "[" in err_msg[:40]: + err_msg = err_msg.split("] ", 1)[-1] if "] " in err_msg else err_msg + append_user_live_log(chat_id, f"Error on input {current_input_url}: {err_msg}") + with lock: + session['error_notification_user'] = err_msg[:300] + + with lock: + should_stop_now = session['streaming_state'] == "stopping" + reconnect_on = session.get('reconnect_on_stream_error', True) + cur_attempt = session.get('reconnect_attempt', 0) + max_att = session.get('max_reconnect_attempts', 3) + stop_on_err = session.get('stop_on_error_in_playlist', True) + + if should_stop_now: + break + + if reconnect_on and cur_attempt < max_att: + with lock: + session['reconnect_attempt'] = cur_attempt + 1 + old_st = session['streaming_state'] + if session['streaming_state'] not in ('stopping',): + session['streaming_state'] = "reconnecting" + append_user_live_log(chat_id, f"Reconnecting in {reconnect_delay}s (attempt {cur_attempt+1}/{max_att})…") + if old_st != "reconnecting": + notify_state_change(chat_id, "reconnecting", f"Attempt {cur_attempt+1}/{max_att}. Error: {esc(err_msg)}") + # Close bad input + if _input_container: + try: _input_container.close() + except Exception: pass + # Sleep in small increments so abort flag is checked promptly + for _ in range(int(reconnect_delay * 5)): + with lock: + if session['streaming_state'] == 'stopping': + break + time.sleep(0.2) + with lock: + if session['streaming_state'] == 'stopping': + break + # Retry same URL (re-enter inner loop iteration) + continue + elif stop_on_err: + append_user_live_log(chat_id, "Stopping due to error (stop_on_error=True).") + with lock: + session['streaming_state'] = "stopping" + break + else: + append_user_live_log(chat_id, "Skipping to next playlist item.") + + finally: + if _input_container: + try: + _input_container.close() + except Exception: + pass + with lock: + if session['pyav_objects'].get('input_container') is _input_container: + session['pyav_objects']['input_container'] = None + + with lock: + if session['streaming_state'] == "stopping": + break + session['current_playlist_index'] += 1 + + with lock: + if session['streaming_state'] == "stopping": + break + session['current_loop_iteration'] += 1 + + append_user_live_log(chat_id, "All items/loops processed or stopped.") + + except Exception as e_fatal: + tb = traceback.format_exc() + append_user_live_log(chat_id, f"Fatal error: {e_fatal}") + logger.error(f"[Chat {chat_id}] Fatal stream error: {e_fatal}\n{tb}") + with lock: + session['error_notification_user'] = str(e_fatal) + session['streaming_state'] = "error" + notify_state_change(chat_id, "error", f"Fatal error: {esc(str(e_fatal))}") + + finally: + append_user_live_log(chat_id, "Finalizing stream…") + + # Flush encoders + _final_output = None + _final_vid = None + _final_aud = None + with lock: + objs = session.get('pyav_objects', {}) + _final_output = objs.get('output_container') + _final_vid = objs.get('video_out_stream') + _final_aud = objs.get('audio_out_stream') + + if _final_output: + append_user_live_log(chat_id, "Flushing encoders…") + for stream in [_final_vid, _final_aud]: + if stream: + try: + for pkt in stream.encode(): + try: + _final_output.mux(pkt) + except Exception: + pass + except Exception as fe: + append_user_live_log(chat_id, f"Flush error: {fe}") + + _cleanup_pyav() + + with lock: + cur = session['streaming_state'] + if cur == "stopping": + final_state = "stopped" + session['error_notification_user'] = "" # clear error on clean stop + elif cur == "error": + final_state = "error" + elif session.get('error_notification_user'): + final_state = "error" + else: + final_state = "completed" + session['error_notification_user'] = "" + session['streaming_state'] = final_state + session['stream_thread_ref'] = None + session['watchdog_thread_ref'] = None + session['stop_gracefully_flag'] = False + + append_user_live_log(chat_id, f"Stream ended. Final state: {final_state}.") + notify_state_change(chat_id, final_state, + "Stream completed successfully! ✅" if final_state == "completed" else + "Stream was stopped." if final_state == "stopped" else + f"Error: {esc(session.get('error_notification_user', ''))}") + + +# ────────────────────────────────────────────── +# STREAM CONTROL HANDLERS +# ────────────────────────────────────────────── +async def start_stream_handler(chat_id: int, message_id_to_edit: int = None): + session = get_user_session(chat_id) + lock = session_locks[chat_id] + + def _reply(text, kb): + if message_id_to_edit: + return edit_message_text(chat_id, message_id_to_edit, text, reply_markup=kb) + return send_message(chat_id, text, reply_markup=kb) + + with lock: + state = session['streaming_state'] + if state in ("streaming", "paused", "starting", "reconnecting"): + return _reply( + f"⚠️ Stream already active (state: {state}).\n" + f"Use /abort to stop it first.", + get_main_keyboard(session)) + if not session.get('input_url_playlist'): + return _reply( + "📜 Playlist is empty.\nAdd at least one URL:\n" + "/playlist add <url>", + get_main_keyboard(session)) + if not session.get('output_url') or session['output_url'] == DEFAULT_USER_SETTINGS['output_url']: + return _reply( + "📡 Output URL not configured.\n" + "Set it with: /set output_url rtmp://your-url/key", + get_main_keyboard(session)) + session['streaming_state'] = "starting" + session['error_notification_user'] = "" + + t_stream = threading.Thread(target=stream_engine_thread_target, args=(chat_id,), + name=f"Stream-{chat_id}", daemon=True) + t_watch = threading.Thread(target=watchdog_thread_target, args=(chat_id,), + name=f"Watchdog-{chat_id}", daemon=True) + with lock: + session['stream_thread_ref'] = t_stream + session['watchdog_thread_ref'] = t_watch + + t_stream.start() + await asyncio.sleep(0.3) + t_watch.start() + + return _reply( + "🔄 Stream starting…\n\n" + "I'll notify you when it's live.\n\n" + "Commands while streaming:\n" + " /pause — pause the stream\n" + " /resume — resume\n" + " /abort — stop the stream\n" + " /status — current status", + get_main_keyboard(session)) + + +async def pause_stream_handler(chat_id: int, message_id: int = None): + session = get_user_session(chat_id) + lock = session_locks[chat_id] + with lock: + if session['streaming_state'] == "streaming": + session['streaming_state'] = "paused" + ok = True + else: + ok = False + st = session['streaming_state'] + + def _reply(text, kb): + if message_id: + return edit_message_text(chat_id, message_id, text, reply_markup=kb) + return send_message(chat_id, text, reply_markup=kb) + + if ok: + append_user_live_log(chat_id, "Paused by user.") + threading.Thread(target=notify_state_change, args=(chat_id, "paused"), daemon=True).start() + return _reply("⏸ Stream paused.\nUse /resume to continue or /abort to stop.", + get_main_keyboard(session)) + else: + return _reply(f"ℹ️ Cannot pause — state is {st}.", get_main_keyboard(session)) + + +async def resume_stream_handler(chat_id: int, message_id: int = None): + session = get_user_session(chat_id) + lock = session_locks[chat_id] + with lock: + if session['streaming_state'] == "paused": + session['streaming_state'] = "streaming" + ok = True + else: + ok = False + st = session['streaming_state'] + + def _reply(text, kb): + if message_id: + return edit_message_text(chat_id, message_id, text, reply_markup=kb) + return send_message(chat_id, text, reply_markup=kb) + + if ok: + append_user_live_log(chat_id, "Resumed by user.") + threading.Thread(target=notify_state_change, args=(chat_id, "streaming"), daemon=True).start() + return _reply("▶️ Stream resumed.", get_main_keyboard(session)) + else: + return _reply(f"ℹ️ Cannot resume — state is {st}.", get_main_keyboard(session)) + + +async def force_reboot_handler(chat_id: int, message_id: int = None): + """ + Force Reboot — last-resort recovery tool. + Kills any running stream thread, wipes ALL runtime state back to defaults + while preserving user settings (URLs, codec, etc.), clears all queues, + unregisters the live view, and returns the bot to a clean idle state. + """ + session = get_user_session(chat_id) + lock = session_locks[chat_id] + + append_user_live_log(chat_id, "⚡ Force Reboot initiated by user.") + + # 1. Signal any running thread to stop + with lock: + session['streaming_state'] = "stopping" + session['stop_gracefully_flag'] = True + thread_ref = session.get('stream_thread_ref') + + # 2. Wait briefly for thread to die + if thread_ref and thread_ref.is_alive(): + thread_ref.join(timeout=5.0) + + # 3. Hard-close any open PyAV containers + try: + with lock: + objs = session.get('pyav_objects', {}) + for key in ('output_container', 'input_container'): + c = objs.get(key) + if c: + try: c.close() + except Exception: pass + objs[key] = None + objs['video_out_stream'] = None + objs['audio_out_stream'] = None + objs['logo_image_pil'] = None + except Exception as e: + logger.warning(f"[Reboot] PyAV close error: {e}") + + # 4. Reset ALL runtime state — keep user settings intact + with lock: + for k, v in DEFAULT_SESSION_RUNTIME_STATE.items(): + session[k] = dict(v) if isinstance(v, dict) else (list(v) if isinstance(v, list) else v) + + # 5. Clear queues and live view + unregister_live_view(chat_id) + with _outbound_queue_lock: + _outbound_message_queue.pop(chat_id, None) + + _push_state_sse(chat_id) + append_user_live_log(chat_id, "Force Reboot complete — bot is in clean idle state.") + + reply_text = ( + "⚡ Force Reboot Complete!\n\n" + "All stream threads killed, state wiped to clean idle.\n" + "Your settings (URLs, codec, etc.) are preserved.\n\n" + + compose_status_message(chat_id, include_config=False) + ) + + def _reply(text, kb): + if message_id: + return edit_message_text(chat_id, message_id, text, reply_markup=kb) + return send_message(chat_id, text, reply_markup=kb) + + return _reply(reply_text, get_main_keyboard(session)) + + +async def abort_stream_handler(chat_id: int, message_id: int = None): + session = get_user_session(chat_id) + lock = session_locks[chat_id] + thread_ref = None + + with lock: + state = session['streaming_state'] + if state in ("streaming", "paused", "starting", "reconnecting", "stopping"): + session['streaming_state'] = "stopping" + session['stop_gracefully_flag'] = True + thread_ref = session.get('stream_thread_ref') + aborted = True + else: + aborted = False + st = state + + def _reply(text, kb): + if message_id: + return edit_message_text(chat_id, message_id, text, reply_markup=kb) + return send_message(chat_id, text, reply_markup=kb) + + if not aborted: + return _reply(f"ℹ️ No active stream (state: {st}).", get_main_keyboard(session)) + + append_user_live_log(chat_id, "Abort requested by user.") + if thread_ref and thread_ref.is_alive(): + thread_ref.join(timeout=10.0) + if thread_ref.is_alive(): + append_user_live_log(chat_id, "Warning: thread did not stop cleanly in 10s — forcing.") + + with lock: + # Force stopped state regardless of what thread set it to + if session['streaming_state'] in ("stopping", "reconnecting", "starting", "streaming", "paused"): + session['streaming_state'] = "stopped" + session['stream_thread_ref'] = None + session['watchdog_thread_ref'] = None + session['stop_gracefully_flag'] = False + session['error_notification_user'] = "" # clear error on clean stop + + _push_state_sse(chat_id) + return _reply("⏹ Stream stopped.\n\n" + compose_status_message(chat_id), + get_main_keyboard(session)) + + +# ────────────────────────────────────────────── +# PLAYLIST HANDLER +# ────────────────────────────────────────────── +async def handle_playlist_command(chat_id: int, text: str): + session = get_user_session(chat_id) + lock = session_locks[chat_id] + parts = text.split(maxsplit=2) + action = parts[1].lower() if len(parts) > 1 else "show" + + with lock: + playlist = session.setdefault('input_url_playlist', []) + + if action == "add": + if len(parts) < 3: + msg = "⚠️ Usage: /playlist add <url>" + else: + url = parts[2].strip() + if validate_url(url): + playlist.append(url) + msg = f"✅ Added to playlist.\n{esc(url)}\nTotal: {len(playlist)}" + else: + msg = f"❌ Invalid URL.\nMust start with http/https/rtmp/rtsp/udp/srt/file.\n{esc(url)}" + + elif action == "remove": + if len(parts) < 3: + msg = "⚠️ Usage: /playlist remove <index|last>" + else: + idx_str = parts[2].strip() + removed = None + if idx_str.lower() == "last" and playlist: + removed = playlist.pop() + elif idx_str.isdigit(): + i = int(idx_str) - 1 + if 0 <= i < len(playlist): + removed = playlist.pop(i) + if removed: + msg = f"🗑 Removed: {esc(removed[:80])}\nRemaining: {len(playlist)}" + else: + msg = "❌ Invalid index or empty playlist." + + elif action == "clear": + count = len(playlist) + session['input_url_playlist'] = [] + msg = f"🗑 Playlist cleared — {count} item(s) removed." + + elif action == "show": + if playlist: + items = "\n".join(f" {i+1}. {esc(url)}" for i, url in enumerate(playlist)) + msg = f"📜 Playlist ({len(playlist)} items):\n{items}" + else: + msg = ("📜 Playlist is empty.\n" + "Add a URL:\n/playlist add <url>") + + else: + msg = ("📜 Playlist Commands:\n" + " /playlist add <url>\n" + " /playlist remove <index|last>\n" + " /playlist clear\n" + " /playlist show") + + return send_message(chat_id, msg, reply_markup=get_main_keyboard(session)) + + +# ────────────────────────────────────────────── +# GENERIC SET COMMAND /set +# ────────────────────────────────────────────── +SETTABLE_FIELDS = { + "output_url": (validate_url, "RTMP/HTTP output URL"), + "video_codec": (lambda x: x in SUPPORTED_VIDEO_CODECS, f"Video codec. Options: {', '.join(SUPPORTED_VIDEO_CODECS)}"), + "audio_codec": (lambda x: x in SUPPORTED_AUDIO_CODECS, f"Audio codec. Options: {', '.join(SUPPORTED_AUDIO_CODECS)}"), + "resolution": (validate_resolution, "e.g. 1280x720 or source"), + "fps": (lambda x: x.isdigit() and 1 <= int(x) <= 120, "FPS (1–120)"), + "video_bitrate": (lambda x: True, "e.g. 1500k or 3M"), + "audio_bitrate": (lambda x: True, "e.g. 128k"), + "ffmpeg_preset": (lambda x: x in FFMPEG_PRESETS, f"Options: {', '.join(FFMPEG_PRESETS)}"), + "gop_size": (lambda x: x.isdigit(), "Integer, e.g. 60"), + "loop_count": (lambda x: x.lstrip('-').isdigit(), "0=once, -1=infinite, N=N times"), + "output_format": (lambda x: x in OUTPUT_FORMATS, f"Options: {', '.join(OUTPUT_FORMATS)}"), + "reconnect_delay_seconds": (lambda x: x.isdigit(), "Seconds between reconnect attempts"), + "max_reconnect_attempts": (lambda x: x.isdigit(), "Max number of reconnect attempts"), + "open_timeout_seconds": (lambda x: x.isdigit(), "Connection open timeout in seconds"), + "read_timeout_seconds": (lambda x: x.isdigit(), "Packet read timeout in seconds"), + "video_pix_fmt": (lambda x: True, "Pixel format, e.g. yuv420p"), + "video_thread_count": (lambda x: x.isdigit(), "Thread count (0=auto)"), + "audio_sample_rate": (lambda x: x.isdigit(), "Sample rate (0=source), e.g. 44100"), + "audio_channels": (lambda x: x.isdigit(), "Channels (0=source), e.g. 2"), + "logo_position": (lambda x: x in LOGO_POSITIONS, f"Options: {', '.join(LOGO_POSITIONS)}"), + "logo_scale": (lambda x: True, "0.0–1.0, e.g. 0.1"), + "logo_opacity": (lambda x: True, "0.0–1.0, e.g. 0.8"), + "logo_margin_px": (lambda x: x.isdigit(), "Margin in pixels"), + "logo_enabled": (lambda x: x.lower() in ("on","off","true","false","1","0"), "on/off"), + "stop_on_error_in_playlist":(lambda x: x.lower() in ("on","off","true","false","1","0"), "on/off"), + "reconnect_on_stream_error":(lambda x: x.lower() in ("on","off","true","false","1","0"), "on/off"), + "quality_preset": (lambda x: x in QUALITY_PRESETS, f"Options: {', '.join(QUALITY_PRESETS.keys())}"), +} + +INT_FIELDS = {"fps","gop_size","loop_count","reconnect_delay_seconds","max_reconnect_attempts", + "open_timeout_seconds","read_timeout_seconds","video_thread_count","audio_sample_rate","audio_channels","logo_margin_px"} +FLOAT_FIELDS = {"logo_scale","logo_opacity"} +BOOL_FIELDS = {"logo_enabled","stop_on_error_in_playlist","reconnect_on_stream_error"} + + +def parse_field_value(field: str, raw: str): + if field in BOOL_FIELDS: + return raw.lower() in ("on", "true", "1") + if field in INT_FIELDS: + return int(raw) + if field in FLOAT_FIELDS: + return float(raw) + return raw + + +async def handle_set_command(chat_id: int, text: str): + session = get_user_session(chat_id) + lock = session_locks[chat_id] + parts = text.split(maxsplit=2) + + if len(parts) == 1: + # /set with no args — show all settable fields + lines = ["⚙️ All configurable fields:\n", + "Usage: /set <field> <value>\n"] + for field, (_, desc) in SETTABLE_FIELDS.items(): + cur = session.get(field) + lines.append(f" {field} = {esc(str(cur))}\n ↳ {esc(desc)}") + return send_message(chat_id, "\n".join(lines), reply_markup=get_main_keyboard(session)) + + field = parts[1].lower() + if field not in SETTABLE_FIELDS: + close = [f for f in SETTABLE_FIELDS if field in f] + hint = f"\nDid you mean: {', '.join(close)}?" if close else "" + return send_message(chat_id, + f"❌ Unknown field {esc(field)}.{hint}\n" + f"Use /set to see all fields.", + reply_markup=get_main_keyboard(session)) + + if len(parts) < 3: + validator, desc = SETTABLE_FIELDS[field] + cur = session.get(field) + return send_message(chat_id, + f"📝 Field: {field}\n" + f"Current value: {esc(str(cur))}\n" + f"Expected: {esc(desc)}\n\n" + f"Usage: /set {field} <value>", + reply_markup=get_main_keyboard(session)) + + raw_value = parts[2].strip() + validator, desc = SETTABLE_FIELDS[field] + + if not validator(raw_value): + return send_message(chat_id, + f"❌ Invalid value for {field}.\n" + f"Expected: {esc(desc)}\n" + f"Got: {esc(raw_value)}", + reply_markup=get_main_keyboard(session)) + + with lock: + parsed = parse_field_value(field, raw_value) + session[field] = parsed + + # Apply quality preset side effects + if field == "quality_preset" and raw_value in QUALITY_PRESETS: + for k, v in QUALITY_PRESETS[raw_value].items(): + session[k] = v + + append_user_live_log(chat_id, f"Set {field} = {parsed}") + return send_message(chat_id, + f"✅ {field} set to {esc(str(parsed))}", + reply_markup=get_main_keyboard(session)) + + +# ────────────────────────────────────────────── +# LOGO HANDLER +# ────────────────────────────────────────────── +async def handle_logo_upload(chat_id: int, message: dict): + session = get_user_session(chat_id) + lock = session_locks[chat_id] + + file_id = None + filename = None + mime_type = "image/png" + + if message.get("document"): + doc = message["document"] + file_id = doc.get("file_id") + filename = doc.get("file_name", f"logo_{chat_id}.png") + mime_type = doc.get("mime_type", "image/png") + elif message.get("photo"): + # Take the largest photo + photos = message["photo"] + file_id = photos[-1]["file_id"] + filename = f"photo_{chat_id}.jpg" + mime_type = "image/jpeg" + + if not file_id: + return send_message(chat_id, "❌ Could not get file from message.", reply_markup=get_main_keyboard(session)) + + # Webhook-only mode: Telegram sends only file_id, not the actual bytes. + # Downloading requires an outbound getFile API call which is blocked on HuggingFace Spaces. + with lock: + session['current_step'] = None + return send_message(chat_id, + "⚠️ Logo upload is unavailable in webhook-only mode.\n\n" + "This deployment blocks outbound requests, so the bot cannot download " + "files from Telegram's servers.\n\n" + "To use a logo, host the image publicly and set it via a URL workaround, " + "or run the bot in an environment that allows outbound connections.", + reply_markup=get_main_keyboard(session)) + + +# ────────────────────────────────────────────── +# SCHEDULE HANDLER +# ────────────────────────────────────────────── +# Scheduling flow (conversation steps): +# sched_when → user picks "timer" or "datetime" +# sched_timer → user types "in X minutes/hours" +# sched_datetime → user types YYYY-MM-DD HH:MM:SS +# sched_name → user types a friendly job name +# sched_rtmp → user types the RTMP output URL for this job +# sched_input → user types the input URL for this job +# Then job is registered with its own output+input URLs (independent of session). + +def _list_schedules(chat_id: int) -> str: + """Build schedule list text.""" + jobs = scheduler.get_jobs() + user_jobs = [j for j in jobs if j.id.startswith(f"stream_{chat_id}_")] + lines = ["🕒 Scheduled Streams (in-memory, lost on restart)\n"] + if user_jobs: + for j in user_jobs: + rt = j.next_run_time.strftime("%Y-%m-%d %H:%M:%S UTC") if j.next_run_time else "N/A" + meta = _job_store.get(j.id, {}) + rtmp = meta.get("output_url", "?") + inp = meta.get("input_url", "?") + lines.append( + f" • {esc(j.name)}\n" + f" ⏰ {rt}\n" + f" 📡 {esc(rtmp[:60])}\n" + f" 🎬 {esc(inp[:60])}\n" + f" Cancel: /schedule cancel {esc(j.id)}" + ) + else: + lines.append(" No scheduled streams.") + lines += [ + "", + "➕ Add a schedule: use 🕒 Schedule button or /schedule new", + "❌ Cancel: /schedule cancel <job_id>", + ] + return "\n".join(lines) + + +def get_schedule_when_keyboard(): + return {"inline_keyboard": [ + [{"text": "⏱ Timer (in X minutes)", "callback_data": "sched_pick_timer"}, + {"text": "📅 Date & Time (UTC)", "callback_data": "sched_pick_datetime"}], + [{"text": "❌ Cancel", "callback_data": "sched_cancel_setup"}], + ]} + + +def get_schedule_menu_keyboard(session: dict = None): + """Keyboard shown on the schedule list view.""" + return {"inline_keyboard": [ + [{"text": "➕ New Schedule", "callback_data": "sched_new"}], + [{"text": "🔙 Back", "callback_data": "settings_done"}], + ]} + + +async def handle_schedule_command(chat_id: int, text: str): + session = get_user_session(chat_id) + lock = session_locks[chat_id] + parts = text.split(maxsplit=2) + sub = parts[1].lower() if len(parts) > 1 else "" + + # List / entry point + if not sub or sub == "list": + return send_message(chat_id, _list_schedules(chat_id), + reply_markup=get_main_keyboard(session)) + + # Start new schedule conversation + if sub == "new": + with lock: + session["current_step"] = "sched_when" + session["_sched_draft"] = {} + return send_message(chat_id, + "🕒 New Scheduled Stream\n\nWhen should it start?", + reply_markup=get_schedule_when_keyboard()) + + # Cancel a job + if sub == "cancel": + job_id = parts[2].strip() if len(parts) > 2 else None + if not job_id: + return send_message(chat_id, + "Usage: /schedule cancel <job_id>", + reply_markup=get_main_keyboard(session)) + try: + scheduler.remove_job(job_id) + _job_store.pop(job_id, None) + return send_message(chat_id, + f"✅ Cancelled: {esc(job_id)}", + reply_markup=get_main_keyboard(session)) + except Exception: + return send_message(chat_id, + f"❌ Job not found: {esc(job_id)}", + reply_markup=get_main_keyboard(session)) + + return send_message(chat_id, + "⚠️ Unknown sub-command.\nTry /schedule, /schedule new, or /schedule cancel <id>", + reply_markup=get_main_keyboard(session)) + + +async def handle_schedule_conversation(chat_id: int, text: str): + """Handle text input during schedule setup conversation steps.""" + session = get_user_session(chat_id) + lock = session_locks[chat_id] + step = session.get("current_step", "") + draft = session.setdefault("_sched_draft", {}) + + if step == "sched_timer": + # Parse "30", "30m", "2h", "1h30m", "90 minutes" etc. + raw = text.strip().lower().replace(" ", "") + minutes = 0 + import re as _re + hm = _re.match(r"^(?:(\d+)h)?(?:(\d+)m?)?$", raw) + plain = _re.match(r"^(\d+)$", raw) + if hm and (hm.group(1) or hm.group(2)): + minutes = int(hm.group(1) or 0) * 60 + int(hm.group(2) or 0) + elif plain: + minutes = int(plain.group(1)) + if minutes <= 0: + return send_message(chat_id, + "❌ Could not parse duration. Try 30, 2h, 1h30m\nOr /cancel.") + trigger_time = datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta(minutes=minutes) + draft["trigger_time"] = trigger_time + with lock: + session["current_step"] = "sched_name" + return send_message(chat_id, + f"✅ Timer set: {minutes} minute(s) from now\n" + f" → {trigger_time.strftime('%Y-%m-%d %H:%M:%S UTC')}\n\n" + f"Give this schedule a name (or type skip):") + + elif step == "sched_datetime": + raw = text.strip() + try: + # Accept both "YYYY-MM-DD HH:MM:SS" and "YYYY-MM-DD HH:MM" + fmt = "%Y-%m-%d %H:%M:%S" if len(raw) > 16 else "%Y-%m-%d %H:%M" + trigger_time = datetime.datetime.strptime(raw, fmt).replace(tzinfo=datetime.timezone.utc) + except ValueError: + return send_message(chat_id, + "❌ Invalid format. Use YYYY-MM-DD HH:MM:SS (UTC)\nOr /cancel.") + if trigger_time <= datetime.datetime.now(datetime.timezone.utc): + return send_message(chat_id, + "❌ That time is in the past. Enter a future time (UTC):\nOr /cancel.") + draft["trigger_time"] = trigger_time + with lock: + session["current_step"] = "sched_name" + return send_message(chat_id, + f"✅ Time set: {trigger_time.strftime('%Y-%m-%d %H:%M:%S UTC')}\n\n" + f"Give this schedule a name (or type skip):") + + elif step == "sched_name": + name = text.strip() + draft["name"] = f"Scheduled Stream {chat_id}" if name.lower() == "skip" or not name else name + with lock: + session["current_step"] = "sched_rtmp" + return send_message(chat_id, + f"📡 Step: RTMP Output URL\n\n" + f"Enter the RTMP URL to stream to:\n" + f"rtmp://a.rtmp.youtube.com/live2/YOUR_KEY\n\n" + f"Or type use to use your current output URL ({esc(session.get('output_url', 'not set'))})\n" + f"Or /cancel.") + + elif step == "sched_rtmp": + raw = text.strip() + if raw.lower() == "use": + url = session.get("output_url", "") + else: + url = raw + if not validate_url(url): + return send_message(chat_id, + "❌ Invalid URL. Must start with rtmp/http/https/rtsp/...\nTry again or /cancel.") + draft["output_url"] = url + with lock: + session["current_step"] = "sched_input" + return send_message(chat_id, + f"🎬 Step: Input URL\n\n" + f"Enter the stream/video input URL:\n" + f"https://example.com/video.mp4\n\n" + f"Or type use to use your current playlist first item ({esc((session.get('input_url_playlist') or ['not set'])[0])})\n" + f"Or /cancel.") + + elif step == "sched_input": + raw = text.strip() + if raw.lower() == "use": + pl = session.get("input_url_playlist", []) + url = pl[0] if pl else "" + else: + url = raw + if not validate_url(url): + return send_message(chat_id, + "❌ Invalid URL. Must start with http/https/rtmp/rtsp/...\nTry again or /cancel.") + draft["input_url"] = url + + # All info collected — register the job + trigger_time = draft["trigger_time"] + job_name = draft["name"] + output_url = draft["output_url"] + input_url = draft["input_url"] + + job_id = f"stream_{chat_id}_{int(trigger_time.timestamp())}" + try: + scheduler.add_job( + _scheduled_stream_runner, + "date", + run_date=trigger_time, + args=[chat_id, output_url, input_url], + id=job_id, + name=job_name, + replace_existing=True, + ) + # Store metadata in our own dict (APScheduler Jobs are frozen) + _job_store[job_id] = {"output_url": output_url, "input_url": input_url} + except Exception as e: + return send_message(chat_id, f"❌ Failed to register job: {esc(str(e))}", + reply_markup=get_main_keyboard(session)) + + mins_away = int((trigger_time - datetime.datetime.now(datetime.timezone.utc)).total_seconds() / 60) + with lock: + session["current_step"] = None + session["_sched_draft"] = {} + + return send_message(chat_id, + f"✅ Stream Scheduled!\n\n" + f" 📛 Name: {esc(job_name)}\n" + f" ⏰ Time: {trigger_time.strftime('%Y-%m-%d %H:%M:%S UTC')} (~{mins_away}m from now)\n" + f" 📡 Output: {esc(output_url[:60])}\n" + f" 🎬 Input: {esc(input_url[:60])}\n" + f" 🔑 ID: {job_id}\n\n" + f"View all: /schedule\nCancel: /schedule cancel {job_id}", + reply_markup=get_main_keyboard(session)) + + # Fallback — unknown step + with lock: + session["current_step"] = None + return send_message(chat_id, + "⚠️ Schedule setup cancelled (unknown step).", + reply_markup=get_main_keyboard(session)) + + +def _scheduled_stream_runner(chat_id: int, output_url: str = None, input_url: str = None): + """Run by APScheduler at the scheduled time. Overrides session URLs if provided.""" + logger.info(f"[Scheduler] Firing scheduled stream for chat {chat_id}") + session = get_user_session(chat_id) + lock = session_locks[chat_id] + + # Override session URLs with the per-job ones + if output_url: + with lock: + session["output_url"] = output_url + session["active_output_url"] = output_url + if input_url: + with lock: + session["input_url_playlist"] = [input_url] + + # Notify user via queued outbound message (returned on next webhook) + msg_text = ( + f"⏰ Scheduled stream is starting!\n\n" + f"📡 Output: {output_url or session.get('output_url', '—')}\n" + f"🎬 Input: {input_url or (session.get('input_url_playlist') or ['—'])[0]}\n\n" + f"Use /abort to stop or /status for live status." + ) + enqueue_outbound_message(chat_id, send_message(chat_id, msg_text, reply_markup=get_main_keyboard(session))) + # Push immediate SSE notification + push_sse_event(chat_id, { + "type": "notification", + "subtype": "scheduled_start", + "text": msg_text, + "ts": datetime.datetime.now(datetime.timezone.utc).isoformat(), + }) + + loop = _main_event_loop + if loop and loop.is_running(): + asyncio.run_coroutine_threadsafe(start_stream_handler(chat_id), loop) + else: + asyncio.run(start_stream_handler(chat_id)) + + +# ────────────────────────────────────────────── +# HELP TEXT +# ────────────────────────────────────────────── +def get_help_text() -> str: + return ( + f"🤖 Advanced Stream Bot v{APP_VERSION}\n" + "In-memory: settings lost on restart\n\n" + "━━ Core Commands ━━\n" + " /start — Home screen & status\n" + " /stream — Start streaming\n" + " /pause — Pause stream\n" + " /resume — Resume stream\n" + " /abort — Stop stream\n" + " /status — Detailed status\n" + " /reboot — ⚡ Force Reboot (last resort if bot stops responding)\n\n" + "━━ Configuration ━━\n" + " /set — List all settings\n" + " /set <field> <value> — Set a value\n" + " /settings — Interactive settings menu\n" + " /reset — Restore all defaults\n\n" + "━━ Playlist ━━\n" + " /playlist show\n" + " /playlist add <url>\n" + " /playlist remove <index|last>\n" + " /playlist clear\n\n" + "━━ Logo ━━\n" + " /set_logo — Upload logo (PNG/JPG)\n" + " /set logo_enabled on|off\n" + " /set logo_position top_right|...\n" + " /set logo_scale 0.1\n" + " /set logo_opacity 0.8\n\n" + "━━ Schedule ━━\n" + " /schedule — List & manage schedules\n" + " /schedule new — Create a new schedule (conversation)\n" + " /schedule cancel <id> — Cancel a scheduled stream\n\n" + "📅 Scheduling supports:\n" + " ⏱ Timer: in X minutes/hours\n" + " 📅 Date & time: YYYY-MM-DD HH:MM:SS (UTC)\n" + " Each schedule has its own RTMP output + input URL\n\n" + "━━ Diagnostics ━━\n" + " /logs — Recent user logs\n" + " /globallogs — System logs\n\n" + "━━ Emergency ━━\n" + " /reboot — Kills all threads, resets state to idle.\n" + " Use if the bot stops responding or gets stuck.\n" + " Your URL/settings are preserved.\n\n" + "⚠️ Data is in-memory only — lost on restart." + ) + + +# ────────────────────────────────────────────── +# MAIN UPDATE HANDLER +# ────────────────────────────────────────────── +async def handle_telegram_update(update: dict): + try: + return await _handle_update_inner(update) + except Exception as e: + logger.error(f"Unhandled update error: {e}", exc_info=True) + return {"status": "ok"} + + +async def _handle_update_inner(update: dict): + # ── Message ── + if "message" in update: + msg = update["message"] + chat_id = msg["chat"]["id"] + session = get_user_session(chat_id) + lock = session_locks[chat_id] + text = msg.get("text", "") + + # File upload + if msg.get("photo") or msg.get("document"): + if session.get("current_step") == "awaiting_logo": + return await handle_logo_upload(chat_id, msg) + else: + return send_message(chat_id, + "📎 Got a file — but I wasn't expecting one.\n" + "Use /set_logo first if you want to upload a logo.", + reply_markup=get_main_keyboard(session)) + + if not text: + return {"status": "ok"} + + command = text.split()[0].lower() if text.startswith('/') else None + + # ── Cancel during conversation ── + if text.strip().lower() == "/cancel": + with lock: + session['current_step'] = None + session['current_step_index'] = 0 + session['conversation_fields_list'] = [] + session['settings_editing_field'] = None + return send_message(chat_id, "✅ Cancelled.", reply_markup=get_main_keyboard(session)) + + # ── In a conversation step ── + if session.get('current_step') and not command: + return await handle_conversation_input(chat_id, text) + + # ── Commands ── + if command: + logger.info(f"[Chat {chat_id}] Command: {text[:80]}") + + if command in ("/start", "/menu", "/home"): + with lock: + session['current_step'] = None + return send_message(chat_id, compose_status_message(chat_id, include_config=True), + reply_markup=get_main_keyboard(session)) + + elif command == "/help": + return send_message(chat_id, get_help_text(), reply_markup=get_main_keyboard(session)) + + elif command == "/stream": + return await start_stream_handler(chat_id) + + elif command == "/pause": + return await pause_stream_handler(chat_id) + + elif command == "/resume": + return await resume_stream_handler(chat_id) + + elif command == "/abort": + return await abort_stream_handler(chat_id) + + elif command in ("/reboot", "/forcereboot"): + return await force_reboot_handler(chat_id) + + elif command == "/status": + return send_message(chat_id, compose_status_message(chat_id, include_config=True), + reply_markup=get_main_keyboard(session)) + + elif command == "/settings": + return send_message(chat_id, + "⚙️ Settings\n\n" + + format_settings_display(session) + "\n\n" + "To change a setting:\n" + "/set <field> <value>\n\n" + "Or use the buttons below to navigate interactively.", + reply_markup=get_settings_keyboard(session)) + + elif command == "/set": + return await handle_set_command(chat_id, text) + + elif command == "/reset": + return send_message(chat_id, + "⚠️ Reset all settings to defaults?\nThis cannot be undone.", + reply_markup=get_reset_confirm_keyboard()) + + elif command == "/playlist": + return await handle_playlist_command(chat_id, text) + + elif command == "/set_logo": + with lock: + session['current_step'] = "awaiting_logo" + return send_message(chat_id, + "🖼 Upload Logo\n\n" + "Send a PNG or JPG image now.\n" + "Max 5 MB. /cancel to abort.", + reply_markup=get_main_keyboard(session)) + + elif command == "/schedule": + return await handle_schedule_command(chat_id, text) + + elif command == "/logs": + # Send a snapshot. User can tap 📋 Logs button for the live-updating bubble. + logs = session.get('live_log_lines_user', []) + log_tail = "\n".join(logs[-30:]) if logs else "No logs yet." + return send_message(chat_id, + f"📋 Stream Logs (last 30):\n
{esc(log_tail)}
\n\n" + f"💡 Tap the 📋 Logs button below for a live-updating view.", + reply_markup=get_main_keyboard(session)) + + elif command == "/globallogs": + last = "\n".join(live_log_lines_global[-20:]) + return send_message(chat_id, + "📜 Global Bot Logs (last 20):\n
" + esc(last) + "
", + reply_markup=get_main_keyboard(session)) + + else: + # Unknown command — if in conversation, pass through + if session.get('current_step'): + return await handle_conversation_input(chat_id, text) + return send_message(chat_id, + f"❓ Unknown command: {esc(text.split()[0])}\nUse /help for available commands.", + reply_markup=get_main_keyboard(session)) + + else: + # Plain text — conversation input or hint + if session.get('current_step'): + return await handle_conversation_input(chat_id, text) + return send_message(chat_id, + "💬 Type a command or tap a button below.\n/help — see all commands.", + reply_markup=get_main_keyboard(session)) + + # ── Callback Query ── + elif "callback_query" in update: + cq = update["callback_query"] + chat_id = cq["message"]["chat"]["id"] + message_id = cq["message"]["message_id"] + data = cq["data"] + session = get_user_session(chat_id) + lock = session_locks[chat_id] + + logger.info(f"[Chat {chat_id}] Callback: {data}") + ack = answer_callback_query(cq["id"]) + + # ── UX mode helpers ────────────────────────────────────────────────── + # ux_mode "send" → always sendMessage (new bubble, default) + # ux_mode "edit" → editMessageText in-place (replaces the button msg) + ux = session.get("ux_mode", "send") + + def reply(text, kb=None): + """Send or edit depending on ux_mode.""" + kb = kb or get_main_keyboard(session) + if ux == "edit": + return edit_message_text(chat_id, message_id, text, reply_markup=kb) + return send_message(chat_id, text, reply_markup=kb) + + def edit(text, kb=None): + """Always edit in-place (used for sub-menus that must stay in same msg).""" + return edit_message_text(chat_id, message_id, text, + reply_markup=kb or get_main_keyboard(session)) + + # ───────────────────────────────────────────────────────────────────── + # Stream controls + if data == "stream_start": + mid = message_id if ux == "edit" else None + return [ack, await start_stream_handler(chat_id, message_id_to_edit=mid)] + elif data == "stream_pause": + mid = message_id if ux == "edit" else None + return [ack, await pause_stream_handler(chat_id, message_id=mid)] + elif data == "stream_resume": + mid = message_id if ux == "edit" else None + return [ack, await resume_stream_handler(chat_id, message_id=mid)] + elif data == "stream_abort": + mid = message_id if ux == "edit" else None + return [ack, await abort_stream_handler(chat_id, message_id=mid)] + + elif data == "force_reboot": + return [answer_callback_query(cq["id"], "⚡ Force rebooting…"), + await force_reboot_handler(chat_id, message_id=message_id)] + + elif data == "stream_status": + # Register this message as the live-view bubble (status+log mode) + # Background thread will edit it every 2s — exactly like logg.py + register_live_view(chat_id, message_id, mode="status") + live_text = _build_live_view_text(chat_id, "status") + live_kb = _get_live_view_keyboard(chat_id) + return [ack, edit(live_text, live_kb)] + + # ── Live-view controls (Pause / Resume / Refresh / Close) ──────────── + # Directly mirrors logg.py's handle_callback() for stop_logs/resume_logs/refresh_logs + elif data == "lv_pause": + set_live_view_show(chat_id, False) + # Force one immediate edit to show the "paused" state + live_text = _build_live_view_text(chat_id, _live_views.get(chat_id, {}).get("mode", "status")) + live_kb = _get_live_view_keyboard(chat_id) + with _live_view_lock: + if chat_id in _live_views: + _live_views[chat_id]["last_sent"] = live_text + return [answer_callback_query(cq["id"], "⏸ Updates paused"), + edit(live_text, live_kb)] + + elif data == "lv_resume": + set_live_view_show(chat_id, True) + _do_live_view_edit(chat_id, force=True) + live_text = _build_live_view_text(chat_id, _live_views.get(chat_id, {}).get("mode", "status")) + live_kb = _get_live_view_keyboard(chat_id) + return [answer_callback_query(cq["id"], "▶️ Updates resumed"), + edit(live_text, live_kb)] + + elif data == "lv_refresh": + # Force immediate rebuild — same as logg.py's refresh_logs + _do_live_view_edit(chat_id, force=True) + live_text = _build_live_view_text(chat_id, _live_views.get(chat_id, {}).get("mode", "status")) + live_kb = _get_live_view_keyboard(chat_id) + with _live_view_lock: + if chat_id in _live_views: + _live_views[chat_id]["last_sent"] = live_text + return [answer_callback_query(cq["id"], "🔄 Refreshed"), + edit(live_text, live_kb)] + + elif data == "lv_close": + unregister_live_view(chat_id) + return [answer_callback_query(cq["id"], "Live view closed"), + edit(compose_status_message(chat_id, include_config=False), + get_main_keyboard(session))] + + elif data == "stream_stop_graceful": + with lock: + session['stop_gracefully_flag'] = True + append_user_live_log(chat_id, "Graceful stop requested.") + return [ack, edit("⏳ Will stop after current loop finishes.\n\n" + + compose_status_message(chat_id))] + + # Settings navigation + elif data == "open_settings": + return [ack, edit( + "⚙️ Settings\n\n" + format_settings_display(session) + "\n\n" + "Tap a parameter to change it, or use /set <field> <value>", + get_settings_keyboard(session))] + + elif data == "toggle_ux_mode": + with lock: + current_ux = session.get("ux_mode", "send") + session["ux_mode"] = "edit" if current_ux == "send" else "send" + new_ux = session["ux_mode"] + label = "New Message (sendMessage)" if new_ux == "send" else "Edit In-Place (editMessageText)" + return [ack, edit( + f"✅ UX Mode switched to: {label}\n\n" + f"{'💬 Each button press sends a new message.' if new_ux == 'send' else '✏️ Button presses edit the existing message in-place.'}\n\n" + + format_settings_display(session), + get_settings_keyboard(session))] + + elif data == "settings_done": + return [ack, reply(compose_status_message(chat_id, True))] + + elif data == "pick_quality_preset": + return [ack, edit("🎨 Choose Quality Preset:", get_quality_keyboard())] + + elif data.startswith("apply_quality_"): + q = data.replace("apply_quality_", "") + if q in QUALITY_PRESETS: + with lock: + session['quality_preset'] = q + for k, v in QUALITY_PRESETS[q].items(): + session[k] = v + return [ack, edit( + f"✅ Quality preset {q} applied.\n\n" + format_settings_display(session), + get_settings_keyboard(session))] + + elif data == "set_video_codec": + return [ack, edit("🎥 Choose Video Codec:", get_codec_keyboard("video"))] + + elif data.startswith("set_vcodec_"): + codec = data.replace("set_vcodec_", "") + with lock: session['video_codec'] = codec + return [ack, edit(f"✅ Video codec: {codec}", get_settings_keyboard(session))] + + elif data == "set_audio_codec": + return [ack, edit("🔊 Choose Audio Codec:", get_codec_keyboard("audio"))] + + elif data.startswith("set_acodec_"): + codec = data.replace("set_acodec_", "") + with lock: session['audio_codec'] = codec + return [ack, edit(f"✅ Audio codec: {codec}", get_settings_keyboard(session))] + + elif data == "set_ffmpeg_preset": + return [ack, edit("⚡ Choose FFmpeg Preset:", get_preset_keyboard())] + + elif data.startswith("set_preset_"): + preset = data.replace("set_preset_", "") + with lock: session['ffmpeg_preset'] = preset + return [ack, edit(f"✅ Preset: {preset}", get_settings_keyboard(session))] + + # Resolution picker + elif data == "pick_resolution": + return [ack, edit("📐 Choose Resolution:", get_resolution_keyboard())] + + elif data.startswith("set_res_"): + val = data.replace("set_res_", "") + if val == "custom": + with lock: + session["current_step"] = "editing_field" + session["settings_editing_field"] = "resolution" + return [ack, reply( + "📐 Custom Resolution\n" + "Current: " + esc(str(session.get("resolution"))) + "\n" + "Format: WIDTHxHEIGHT e.g. 1280x720 or source\n\n" + "Type the value now, or /cancel.")] + else: + with lock: + session["resolution"] = val + label = next((l for l, v in RESOLUTION_PRESETS if v == val), val) + return [ack, edit( + f"✅ Resolution set to {label} ({val})", + get_settings_keyboard(session))] + + # Inline-triggered field edits (ask user to type) + elif data in ("set_output_url", "set_fps", "set_video_bitrate", + "set_audio_bitrate", "set_loop_count", "set_gop_size", + "set_output_format", "set_reconnect_delay", "set_open_timeout"): + field_map = { + "set_output_url": "output_url", + "set_fps": "fps", + "set_video_bitrate": "video_bitrate", + "set_audio_bitrate": "audio_bitrate", + "set_loop_count": "loop_count", + "set_gop_size": "gop_size", + "set_output_format": "output_format", + "set_reconnect_delay": "reconnect_delay_seconds", + "set_open_timeout": "open_timeout_seconds", + } + field = field_map[data] + _, desc = SETTABLE_FIELDS[field] + cur = session.get(field) + with lock: + session['current_step'] = "editing_field" + session['settings_editing_field'] = field + return [ack, reply( + f"📝 Set {field}\n" + f"Current: {esc(str(cur))}\n" + f"Expected: {esc(desc)}\n\n" + f"Type the new value now, or /cancel to abort.")] + + elif data in ("toggle_reconnect", "toggle_stop_on_error"): + field_map = { + "toggle_reconnect": "reconnect_on_stream_error", + "toggle_stop_on_error": "stop_on_error_in_playlist", + } + field = field_map[data] + with lock: + session[field] = not session.get(field, True) + new_val = session[field] + return [ack, edit( + f"✅ {field}{'on' if new_val else 'off'}", + get_settings_keyboard(session))] + + # Playlist view + elif data == "view_playlist": + pl = session.get('input_url_playlist', []) + if pl: + items = "\n".join(f" {i+1}. {esc(url)}" for i, url in enumerate(pl)) + msg = (f"📜 Playlist ({len(pl)} items):\n{items}\n\n" + "Manage:\n" + " /playlist add <url>\n" + " /playlist remove <index|last>\n" + " /playlist clear") + else: + msg = ("📜 Playlist is empty.\n" + "/playlist add <url>") + return [ack, reply(msg)] + + # Logo + elif data == "cfg_logo": + fname = session.get('logo_original_filename', 'N/A') + enabled = session.get('logo_enabled', False) + has_logo = bool(session.get('logo_data_bytes')) + logo_btns = [] + if has_logo: + logo_btns.append([{"text": ("✅ Enabled — Disable" if enabled else "❌ Disabled — Enable"), + "callback_data": "toggle_logo"}]) + logo_btns.append([{"text": "📍 Change Position", "callback_data": "change_logo_pos"}]) + logo_btns.append([{"text": "🔄 Replace Logo", "callback_data": "upload_new_logo"}]) + else: + logo_btns.append([{"text": "📤 Upload Logo", "callback_data": "upload_new_logo"}]) + logo_btns.append([{"text": "↩ Back", "callback_data": "open_settings"}]) + + lines = ["🖼 Logo Configuration\n"] + if has_logo: + lines.append(f"File: {esc(fname)}") + lines.append(f"Status: {'✅ Enabled' if enabled else '❌ Disabled'}") + lines.append(f"Position: {session.get('logo_position')}") + lines.append(f"Scale: {session.get('logo_scale')}") + lines.append(f"Opacity: {session.get('logo_opacity')}\n") + lines.append("Change scale/opacity: /set logo_scale 0.15") + else: + lines.append("No logo uploaded.") + return [ack, edit("\n".join(lines), {"inline_keyboard": logo_btns})] + + elif data == "toggle_logo": + with lock: + session['logo_enabled'] = not session.get('logo_enabled', False) + v = session['logo_enabled'] + return [ack, edit(f"🖼 Logo {'enabled ✅' if v else 'disabled ❌'}.")] + + elif data == "change_logo_pos": + return [ack, edit("📍 Choose logo position:", get_logo_pos_keyboard())] + + elif data.startswith("set_logo_pos_"): + pos = data.replace("set_logo_pos_", "") + with lock: session['logo_position'] = pos + return [ack, edit(f"✅ Logo position: {pos}")] + + elif data == "upload_new_logo": + with lock: session['current_step'] = "awaiting_logo" + return [ack, reply("🖼 Send a PNG or JPG image now. /cancel to abort.")] + + # Schedule + elif data == "cfg_schedule": + return [ack, reply(_list_schedules(chat_id), + get_schedule_menu_keyboard(session))] + + elif data == "sched_new": + with lock: + session["current_step"] = "sched_when" + session["_sched_draft"] = {} + return [ack, reply( + "🕒 New Scheduled Stream\n\nWhen should it start?", + get_schedule_when_keyboard())] + + elif data == "sched_pick_timer": + with lock: + session["current_step"] = "sched_timer" + return [ack, reply( + "\u23f1 Timer Setup\n\n" + "How long from now? Type a duration:\n" + " 30 \u2192 30 minutes\n" + " 2h \u2192 2 hours\n" + " 1h30m \u2192 1h 30min\n\n" + "Or /cancel.")] + + elif data == "sched_pick_datetime": + with lock: + session["current_step"] = "sched_datetime" + return [ack, reply( + "\U0001f4c5 Date & Time (UTC)\n\n" + "Enter the start time:\n" + "YYYY-MM-DD HH:MM:SS\n" + "Example: 2025-12-31 23:55:00\n\n" + "Or /cancel.")] + + elif data == "sched_cancel_setup": + with lock: + session["current_step"] = None + session["_sched_draft"] = {} + return [ack, reply("❌ Schedule setup cancelled.", + get_main_keyboard(session))] + + # Quick setup + elif data == "quick_setup": + with lock: + session['current_step'] = "quick_output_url" + return [ack, reply( + "⚙️ Quick Setup\n\n" + "Step 1/2: Enter your RTMP Output URL:\n" + "rtmp://a.rtmp.youtube.com/live2/YOUR_STREAM_KEY\n\n" + "Or /cancel to abort.")] + + # Reset + elif data == "confirm_reset": + return [ack, edit("⚠️ Reset all settings to defaults?", + get_reset_confirm_keyboard())] + + elif data == "do_reset": + reset_session_settings(chat_id) + return [ack, reply("🔄 Settings restored to defaults.\n\n" + + compose_status_message(chat_id, True))] + + # Logs — open as live-view bubble (logs mode), same concept as logg.py /logs + elif data == "show_user_logs": + register_live_view(chat_id, message_id, mode="logs") + live_text = _build_live_view_text(chat_id, "logs") + live_kb = _get_live_view_keyboard(chat_id) + with _live_view_lock: + if chat_id in _live_views: + _live_views[chat_id]["last_sent"] = live_text + return [ack, edit(live_text, live_kb)] + + elif data == "show_help": + return [ack, reply(get_help_text())] + + return [ack, answer_callback_query(cq["id"], "Unknown action", show_alert=True)] + + return {"status": "ok"} + + +# ────────────────────────────────────────────── +# CONVERSATION HANDLER (inline field edits / quick setup) +# ────────────────────────────────────────────── +async def handle_conversation_input(chat_id: int, text: str): + session = get_user_session(chat_id) + lock = session_locks[chat_id] + step = session.get('current_step') + + if step == "editing_field": + field = session.get('settings_editing_field') + if not field or field not in SETTABLE_FIELDS: + with lock: + session['current_step'] = None + return send_message(chat_id, "⚠️ State error — cancelled.", reply_markup=get_main_keyboard(session)) + + validator, desc = SETTABLE_FIELDS[field] + raw = text.strip() + if not validator(raw): + return send_message(chat_id, + f"❌ Invalid value for {field}.\nExpected: {esc(desc)}\nTry again or /cancel.") + + with lock: + session[field] = parse_field_value(field, raw) + if field == "quality_preset" and raw in QUALITY_PRESETS: + for k, v in QUALITY_PRESETS[raw].items(): + session[k] = v + session['current_step'] = None + session['settings_editing_field'] = None + + append_user_live_log(chat_id, f"Set {field} = {raw}") + return send_message(chat_id, + f"✅ {field} set to {esc(raw)}\n\n" + format_settings_display(session), + reply_markup=get_settings_keyboard(session)) + + elif step == "quick_output_url": + url = text.strip() + if not validate_url(url): + return send_message(chat_id, + "❌ Invalid URL. Must start with rtmp/http/https/rtsp/...\nTry again or /cancel.") + with lock: + session['output_url'] = url + session['current_step'] = "quick_input_url" + return send_message(chat_id, + f"✅ Output URL set.\n\n" + f"Step 2/2: Enter your first Input URL (stream/video URL):\n" + f"Or /cancel to abort.") + + elif step == "quick_input_url": + url = text.strip() + if not validate_url(url): + return send_message(chat_id, + "❌ Invalid URL. Must start with http/https/rtmp/rtsp/...\nTry again or /cancel.") + with lock: + session['input_url_playlist'] = [url] + session['current_step'] = None + return send_message(chat_id, + f"✅ Quick Setup Complete!\n\n" + f"Input URL added to playlist.\n\n" + f"You're ready to stream! Use /stream to start.\n\n" + + compose_status_message(chat_id, True), + reply_markup=get_main_keyboard(session)) + + elif step == "awaiting_logo": + return send_message(chat_id, + "🖼 Please send an image file (PNG/JPG), not text. /cancel to abort.") + + elif step in ("sched_when", "sched_timer", "sched_datetime", + "sched_name", "sched_rtmp", "sched_input"): + return await handle_schedule_conversation(chat_id, text) + + else: + with lock: + session['current_step'] = None + return send_message(chat_id, + "💬 No active setup. Use a command or button below.", + reply_markup=get_main_keyboard(session)) + + +# ────────────────────────────────────────────── +# FASTAPI ENDPOINTS +# ────────────────────────────────────────────── +@app.on_event("startup") +async def startup_event(): + global _main_event_loop + _main_event_loop = asyncio.get_event_loop() + if not scheduler.running: + scheduler.start() + logger.info("APScheduler started (MemoryJobStore).") + logger.info(f"Stream Bot v{APP_VERSION} started.") + logger.warning("All data is in-memory — lost on restart.") + + +@app.on_event("shutdown") +async def shutdown_event(): + if scheduler.running: + scheduler.shutdown() + logger.info("Stream Bot shutdown.") + + +@app.post("/webhook") +async def telegram_webhook_endpoint(request: Request): + try: + update = await request.json() + + # Identify chat_id early to drain any pending real-time status edit + chat_id = None + try: + if "message" in update: + chat_id = update["message"]["chat"]["id"] + elif "callback_query" in update: + chat_id = update["callback_query"]["message"]["chat"]["id"] + except Exception: + pass + + response_data = await handle_telegram_update(update) + + # Collect the primary response + primary = None + if isinstance(response_data, list): + items = [i for i in response_data if i and isinstance(i, dict) and i.get("method")] + for priority in ("sendMessage", "editMessageText"): + for item in items: + if item.get("method") == priority: + primary = item + break + if primary: + break + if not primary: + for item in items: + if item.get("method") == "answerCallbackQuery": + primary = item + break + elif isinstance(response_data, dict): + primary = response_data + + # Drain any background-queued outbound messages (live-view edits, scheduler + # notifications, state-change alerts) and return the first one as the response. + # Telegram only accepts one response per webhook call, so extras are re-queued. + if chat_id: + queued_msgs = pop_outbound_messages(chat_id) + if queued_msgs: + if primary is None or primary.get("method") == "answerCallbackQuery": + primary = queued_msgs[0] + # Re-queue remaining for future webhook calls + for m in queued_msgs[1:]: + enqueue_outbound_message(chat_id, m) + else: + # Already have a real primary — re-queue all queued for next call + for m in queued_msgs: + enqueue_outbound_message(chat_id, m) + + return primary or {"status": "ok"} + + except json.JSONDecodeError: + raise HTTPException(status_code=400, detail="Invalid JSON") + except Exception as e: + logger.error(f"Webhook error: {e}", exc_info=True) + return {"status": "ok"} + + +@app.get("/events/{chat_id}") +async def sse_events_endpoint(chat_id: int, request: Request): + """ + Server-Sent Events endpoint for real-time stream status. + Connect from any HTTP client: + GET /events/{chat_id} + Accept: text/event-stream + Events are JSON objects with a "type" field: + type=state — full status snapshot (state, frames, bytes, uptime, keyboard, etc.) + type=log — single log line + type=notification — background event (e.g. scheduled start) + """ + if chat_id not in user_sessions: + # Auto-create session so clients can connect before first message + get_user_session(chat_id) + + q: asyncio.Queue = asyncio.Queue(maxsize=200) + _register_sse_subscriber(chat_id, q) + + async def event_generator(): + try: + # Send immediate snapshot on connect + _push_state_sse(chat_id) + # Send recent logs as a batch + session = get_user_session(chat_id) + recent_logs = session.get('live_log_lines_user', [])[-20:] + if recent_logs: + payload = json.dumps({"type": "log_batch", "lines": recent_logs}) + yield f"data: {payload}\n\n" + + while True: + # Check client disconnected + if await request.is_disconnected(): + break + try: + # Wait up to 3s for an event, then send a heartbeat + payload = await asyncio.wait_for(q.get(), timeout=3.0) + yield f"data: {payload}\n\n" + except asyncio.TimeoutError: + # Heartbeat to keep connection alive + yield f": heartbeat\n\n" + except asyncio.CancelledError: + pass + finally: + _unregister_sse_subscriber(chat_id, q) + + return StreamingResponse( + event_generator(), + media_type="text/event-stream", + headers={ + "Cache-Control": "no-cache", + "Connection": "keep-alive", + "X-Accel-Buffering": "no", + } + ) + + +@app.get("/stream-log/{chat_id}") +async def sse_log_endpoint(chat_id: int, request: Request): + """ + Dedicated SSE endpoint for live log streaming only. + Lighter than /events — only delivers log lines. + """ + if chat_id not in user_sessions: + get_user_session(chat_id) + + q: asyncio.Queue = asyncio.Queue(maxsize=200) + _register_sse_subscriber(chat_id, q) + + async def log_generator(): + try: + # Send recent log history first + session = get_user_session(chat_id) + recent_logs = session.get('live_log_lines_user', [])[-50:] + for line in recent_logs: + payload = json.dumps({"type": "log", "line": line}) + yield f"data: {payload}\n\n" + + while True: + if await request.is_disconnected(): + break + try: + payload = await asyncio.wait_for(q.get(), timeout=5.0) + # Only forward log events + try: + ev = json.loads(payload) + if ev.get("type") in ("log", "log_batch"): + yield f"data: {payload}\n\n" + except Exception: + pass + except asyncio.TimeoutError: + yield f": heartbeat\n\n" + except asyncio.CancelledError: + pass + finally: + _unregister_sse_subscriber(chat_id, q) + + return StreamingResponse( + log_generator(), + media_type="text/event-stream", + headers={ + "Cache-Control": "no-cache", + "Connection": "keep-alive", + "X-Accel-Buffering": "no", + } + ) + + +@app.post("/notify/{chat_id}") +async def internal_notify_endpoint(chat_id: int, request: Request): + """ + Internal endpoint: POST a JSON event to push to SSE subscribers. + Body: {"type": "...", ...} (any JSON) + Also queues a sendMessage to be returned on the next webhook response. + """ + try: + body = await request.json() + except Exception: + body = {} + body.setdefault("ts", datetime.datetime.now(datetime.timezone.utc).isoformat()) + push_sse_event(chat_id, body) + # If there's a "text" field, queue it as an outbound message too + if "text" in body and chat_id in user_sessions: + session = get_user_session(chat_id) + enqueue_outbound_message(chat_id, send_message(chat_id, body["text"], + reply_markup=get_main_keyboard(session))) + return {"queued": True} + + +@app.get("/sessions") +async def list_sessions(): + """List all active sessions and their current state.""" + result = {} + for chat_id, session in user_sessions.items(): + result[str(chat_id)] = { + "state": session.get("streaming_state", "idle"), + "frames_encoded": session.get("frames_encoded", 0), + "bytes_sent": session.get("bytes_sent", 0), + "sse_subscribers": len(_sse_subscribers.get(chat_id, [])), + "queued_messages": len(_outbound_message_queue.get(chat_id, [])), + } + return result + + +@app.get("/") +async def root(): + return { + "bot": f"Advanced Stream Bot v{APP_VERSION}", + "status": "running", + "endpoints": { + "webhook": "POST /webhook", + "status": "GET /status/{chat_id}", + "sessions": "GET /sessions", + "sse_events": "GET /events/{chat_id} (text/event-stream)", + "sse_logs": "GET /stream-log/{chat_id} (text/event-stream)", + "notify": "POST /notify/{chat_id}", + "health": "GET /health", + }, + "sessions": len(user_sessions), + "sse_connections": sum(len(v) for v in _sse_subscribers.values()), + "scheduler_jobs": len(scheduler.get_jobs()), + } + + +@app.get("/health") +async def health(): + return { + "status": "ok", + "version": APP_VERSION, + "sessions": len(user_sessions), + "scheduler_jobs": len(scheduler.get_jobs()), + "sse_connections": sum(len(v) for v in _sse_subscribers.values()), + "queued_messages": sum(len(v) for v in _outbound_message_queue.values()), + } + + +@app.get("/status/{chat_id}") +async def get_status_endpoint(chat_id: int): + """HTTP polling — returns current stream state as JSON for external dashboards.""" + if chat_id not in user_sessions: + raise HTTPException(status_code=404, detail="No session for this chat_id") + session = get_user_session(chat_id) + uptime = 0 + if session.get("stream_start_time"): + try: + uptime = int((datetime.datetime.now(datetime.timezone.utc) - session["stream_start_time"]).total_seconds()) + except Exception: + pass + return { + "chat_id": chat_id, + "state": session.get("streaming_state", "idle"), + "state_icon": STATE_EMOJI.get(session.get("streaming_state", "idle"), "❓"), + "frames_encoded": session.get("frames_encoded", 0), + "bytes_sent": session.get("bytes_sent", 0), + "uptime_seconds": uptime, + "uptime_str": get_uptime(session.get("stream_start_time")), + "reconnect_attempt": session.get("reconnect_attempt", 0), + "error": session.get("error_notification_user", ""), + "active_output_url": session.get("active_output_url", ""), + "playlist_index": session.get("current_playlist_index", 0), + "playlist_count": len(session.get("input_url_playlist", [])), + "sse_subscribers": len(_sse_subscribers.get(chat_id, [])), + "queued_messages": len(_outbound_message_queue.get(chat_id, [])), + "keyboard": get_main_keyboard(session), + "status_text": compose_status_message(chat_id, include_config=False), + "ts": datetime.datetime.now(datetime.timezone.utc).isoformat(), + } + + +# ────────────────────────────────────────────── +# MAIN +# ────────────────────────────────────────────── +if __name__ == "__main__": + import uvicorn + logger.info(f"Starting Advanced Stream Bot v{APP_VERSION}...") + uvicorn.run("stream_bot:app", host="0.0.0.0", port=8000, reload=False) \ No newline at end of file