import logging import threading import time import datetime import traceback import fractions import json import os import re from urllib.parse import urlparse from pathlib import Path # Still useful for parsing, just not disk ops import io # For handling bytes as files from fastapi import FastAPI, Request, HTTPException, UploadFile, File, Form import av from PIL import Image, ImageEnhance, UnidentifiedImageError # Pillow is still needed for image manipulation from apscheduler.schedulers.background import BackgroundScheduler # SQLAlchemyJobStore is removed as we can't write to disk. APScheduler will use MemoryJobStore. # --- Configuration & Global Constants --- APP_VERSION = "2.0.1-inmemory" # Version reflects in-memory change # --- FastAPI & Scheduler Initialization --- app = FastAPI(title="Advanced Stream Bot (In-Memory Edition)", version=APP_VERSION) # APScheduler for scheduled streams - will use default MemoryJobStore scheduler = BackgroundScheduler(timezone="UTC") # --- Global Data Stores (Managed per user where applicable) --- # user_sessions: chat_id -> session_data_dict # All data is now in-memory. user_sessions = {} # session_locks: chat_id -> threading.Lock() for thread-safe session updates session_locks = {} # Global list for general bot logs (rolling) live_log_lines_global = [] MAX_GLOBAL_LOG_LINES = 100 # --- Logging Setup --- logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(threadName)s %(module)s:%(lineno)d %(message)s", datefmt="%Y-%m-%d %H:%M:%S", ) logger = logging.getLogger("stream_bot_inmemory") def append_global_live_log(line: str): global live_log_lines_global live_log_lines_global.append(line) if len(live_log_lines_global) > MAX_GLOBAL_LOG_LINES: live_log_lines_global.pop(0) class GlobalListHandler(logging.Handler): def emit(self, record): log_entry = self.format(record) append_global_live_log(log_entry) global_list_handler = GlobalListHandler() global_list_handler.setFormatter(logging.Formatter("[%(levelname)s] %(message)s")) logger.addHandler(global_list_handler) # --- Per-User Session Management (In-Memory) --- DEFAULT_USER_SETTINGS = { "input_url_playlist": [], "current_playlist_index": 0, "output_url": "rtmp://a.rtmp.youtube.com/live2/", "quality_settings": "medium", "video_codec": "libx264", "audio_codec": "aac", "resolution": "source", "fps": 30, "gop_size": 60, "video_bitrate": "1500k", "audio_bitrate": "128k", "loop_count": 0, "stop_on_error_in_playlist": True, "ffmpeg_preset": "medium", "logo_enabled": False, "logo_data_bytes": None, # Store logo as bytes "logo_mime_type": None, # e.g., 'image/png' "logo_original_filename": None, # For display "logo_position": "top_right", "logo_scale": 0.1, "logo_opacity": 0.8, "advanced_mode": False, "current_step": None, "conversation_fields_list": [], } DEFAULT_SESSION_RUNTIME_STATE = { "streaming_state": "idle", "stream_start_time": None, "frames_encoded": 0, "bytes_sent": 0, "stream_thread_ref": None, "pyav_objects": {"input_container": None, "output_container": None, "video_out_stream": None, "audio_out_stream": None, "logo_image_pil": None}, "live_log_lines_user": [], "error_notification_user": "", "stop_gracefully_flag": False, "current_loop_iteration": 0, } def get_user_session(chat_id: int) -> dict: if chat_id not in user_sessions: session_locks[chat_id] = threading.Lock() with session_locks[chat_id]: # Initialize new session with defaults as there's no file to load from user_sessions[chat_id] = {} # Deep copy default settings to avoid shared mutable objects for key, value in DEFAULT_USER_SETTINGS.items(): user_sessions[chat_id][key] = list(value) if isinstance(value, list) else value for key, value in DEFAULT_SESSION_RUNTIME_STATE.items(): user_sessions[chat_id][key] = list(value) if isinstance(value, list) else value logger.info(f"[Chat {chat_id}] New IN-MEMORY session created. Settings are NOT persistent.") return user_sessions[chat_id] # save_user_settings_to_file, load_user_settings_from_file, load_all_sessions_on_startup are REMOVED. # Settings are only saved in memory for the current runtime. # --- User-specific Live Logging --- def append_user_live_log(chat_id: int, line: str): session = get_user_session(chat_id) # Ensures session exists # No need for lock here if list append is atomic and reading is infrequent / tolerant # but for consistency with other session modifications: lock = session_locks.get(chat_id) if lock: # Should always exist after get_user_session with lock: session['live_log_lines_user'].append(f"{datetime.datetime.now().strftime('%H:%M:%S')} {line}") if len(session['live_log_lines_user']) > 50: session['live_log_lines_user'].pop(0) else: # Fallback if lock somehow not initialized logger.warning(f"[Chat {chat_id}] Lock not found for appending user log. This is unexpected.") session['live_log_lines_user'].append(f"{datetime.datetime.now().strftime('%H:%M:%S')} {line}") if len(session['live_log_lines_user']) > 50: session['live_log_lines_user'].pop(0) # --- Input Validation --- SUPPORTED_VIDEO_CODECS = ["libx264", "h264_nvenc", "h264_qsv", "libx265", "hevc_nvenc", "hevc_qsv", "copy"] SUPPORTED_AUDIO_CODECS = ["aac", "opus", "libmp3lame", "copy"] LOGO_POSITIONS = ["top_left", "top_right", "bottom_left", "bottom_right", "center"] QUALITY_SETTINGS_MAP = { "low": {"video_bitrate": "800k", "audio_bitrate": "96k", "ffmpeg_preset": "superfast", "resolution": "854x480"}, "medium": {"video_bitrate": "1500k", "audio_bitrate": "128k", "ffmpeg_preset": "medium", "resolution": "1280x720"}, "high": {"video_bitrate": "3000k", "audio_bitrate": "192k", "ffmpeg_preset": "fast", "resolution": "1920x1080"}, "source": {} } def validate_url(url: str) -> bool: try: result = urlparse(url) return all([result.scheme, result.netloc]) and result.scheme in ['http', 'https', 'rtmp', 'rtsp', 'udp', 'srt', 'file'] except: return False def validate_resolution(res: str) -> bool: if res.lower() == "source": return True return bool(re.match(r"^\d{3,4}x\d{3,4}$", res)) def parse_bitrate(bitrate_str: str) -> int: bitrate_str = str(bitrate_str).lower() # Ensure it's a string if bitrate_str.endswith('k'): return int(float(bitrate_str[:-1]) * 1000) elif bitrate_str.endswith('m'): return int(float(bitrate_str[:-1]) * 1000000) try: return int(bitrate_str) except ValueError: return 1500000 def format_settings_for_display(session: dict) -> str: logo_status = "Disabled" if session.get('logo_enabled') and session.get('logo_data_bytes'): logo_status = f"Enabled ({session.get('logo_original_filename', 'Unknown name')})" display_data = { "Output URL": session.get('output_url'), "Quality": session.get('quality_settings'), "Video Codec": session.get('video_codec'), "Audio Codec": session.get('audio_codec'), "Resolution": session.get('resolution'), "FPS": session.get('fps'), "Video Bitrate": session.get('video_bitrate'), "Audio Bitrate": session.get('audio_bitrate'), "FFmpeg Preset": session.get('ffmpeg_preset'), "Loop Count": "Infinite" if session.get('loop_count') == -1 else session.get('loop_count'), "Playlist Length": len(session.get('input_url_playlist', [])), "Logo": logo_status, } if session.get('logo_enabled') and session.get('logo_data_bytes'): display_data["Logo Position"] = session.get('logo_position') display_data["Logo Scale"] = session.get('logo_scale') display_data["Logo Opacity"] = session.get('logo_opacity') return "\n".join([f"⢠*{k}*: `{v}`" for k, v in display_data.items() if v is not None]) # --- UI Helpers (send_message, edit_message_text, answer_callback_query, get_main_keyboard, get_uptime, compose_status_message) --- # These functions remain largely the same as they don't interact with disk. # Minor change in compose_status_message for logo display from bytes. def send_message(chat_id: int, text: str, parse_mode="Markdown", reply_markup=None): logger.info(f"[Chat {chat_id}] Sending message: {text[:100]}...") response = { "method": "sendMessage", "chat_id": chat_id, "text": text, "parse_mode": parse_mode } if reply_markup: response["reply_markup"] = reply_markup return response def edit_message_text(chat_id: int, message_id: int, text: str, parse_mode="Markdown", reply_markup=None): logger.info(f"[Chat {chat_id}] Editing message {message_id}: {text[:100]}...") response = { "method": "editMessageText", "chat_id": chat_id, "message_id": message_id, "text": text, "parse_mode": parse_mode } if reply_markup: response["reply_markup"] = reply_markup return response def answer_callback_query(callback_query_id: str, text: str = None, show_alert: bool = False): response = { "method": "answerCallbackQuery", "callback_query_id": callback_query_id, } if text: response["text"] = text if show_alert: response["show_alert"] = True return response def get_main_keyboard(session: dict): streaming = session.get('streaming_state') in ["streaming", "paused", "starting"] buttons = [] if not streaming: buttons.append([{"text": "š Start/Configure Stream", "callback_data": "cfg_start"}]) buttons.append([ {"text": "āļø Settings", "callback_data": "cfg_advanced"}, {"text": "š Playlist (" + str(len(session.get('input_url_playlist',[]))) + ")", "callback_data": "cfg_playlist"} ]) buttons.append([ {"text": "š¼ļø Logo", "callback_data": "cfg_logo"}, {"text": "š Schedule", "callback_data": "cfg_schedule"} ]) else: if session.get('streaming_state') == "streaming": buttons.append([{"text": "āø Pause", "callback_data": "stream_pause"}]) elif session.get('streaming_state') == "paused": buttons.append([{"text": "ā¶ļø Resume", "callback_data": "stream_resume"}]) buttons.append([ {"text": "ā¹ Abort Stream", "callback_data": "stream_abort"}, {"text": "š Status Update", "callback_data": "stream_status"} ]) if session.get('loop_count', 0) == -1: buttons.append([{"text": "ā³ Stop Loop Gracefully", "callback_data": "stream_stop_loop_graceful"}]) buttons.append([{"text": "š Global Logs", "callback_data": "show_global_logs"}, {"text": "ā Help", "callback_data": "show_help"}]) return {"inline_keyboard": buttons} def get_uptime(start_time_obj): if start_time_obj: # Ensure start_time_obj is offset-aware if comparing with offset-aware now() if start_time_obj.tzinfo is None: start_time_obj = start_time_obj.replace(tzinfo=datetime.timezone.utc) # Assume UTC if naive uptime_delta = datetime.datetime.now(datetime.timezone.utc) - start_time_obj return str(uptime_delta).split('.')[0] return "0s" def compose_status_message(chat_id: int, include_config: bool = False) -> str: session = get_user_session(chat_id) state = session['streaming_state'] status_lines = [ f"š¤ *Stream Bot Status for Chat ID {chat_id} (In-Memory Mode)*", f"_Settings, logos, and schedules are NOT persistent across bot restarts._", f"State: `{state}`", ] if session.get('error_notification_user'): status_lines.append(f"ā ļø Error: `{session['error_notification_user']}`") if state in ["streaming", "paused", "starting", "stopping"]: current_input_url_display = "N/A" playlist = session.get('input_url_playlist', []) playlist_idx = session.get('current_playlist_index', 0) if playlist and 0 <= playlist_idx < len(playlist): current_input_url_display = playlist[playlist_idx] status_lines.extend([ f"Uptime: `{get_uptime(session['stream_start_time'])}`", f"Frames Encoded: `{session['frames_encoded']}`", f"Bytes Sent: `{session['bytes_sent'] / (1024*1024):.2f} MB`", f"Current Input: `{current_input_url_display}` (Item {playlist_idx+1}/{len(playlist)})", f"Loop Iteration: {session.get('current_loop_iteration',0)+1}/{'Infinite' if session.get('loop_count',0) == -1 else session.get('loop_count',0) or 1}" ]) if include_config or state not in ["streaming", "paused"]: status_lines.append("\nš *Current Configuration:*") status_lines.append(format_settings_for_display(session)) if state in ["streaming", "paused", "starting"]: status_lines.append("\nš *User Live Logs (last 10):*") user_logs_preview = "\n".join(session['live_log_lines_user'][-10:]) status_lines.append(f"
{user_logs_preview if user_logs_preview else 'No user logs yet.'}")
return "\n".join(status_lines)
# --- Core Streaming Logic ---
# stream_engine_thread_target needs to load logo from session['logo_data_bytes']
def stream_engine_thread_target(chat_id: int):
session = get_user_session(chat_id)
lock = session_locks[chat_id]
active_input_container = None
active_output_container = None
active_video_out_stream = None
active_audio_out_stream = None
# logo_image_pil is now loaded from bytes each time, or stored in pyav_objects for the thread's duration
# logo_width, logo_height = 0, 0 # These will be calculated
# logo_pos_x, logo_pos_y = 0, 0 # These will be calculated
def _cleanup_pyav_resources():
# This function is called within the stream thread
# Access pyav_objects directly from session inside the lock
append_user_live_log(chat_id, "Cleaning up PyAV resources...")
with lock: # Ensure thread-safe access to session's pyav_objects
pyav_objs = session.get('pyav_objects', {})
if pyav_objs.get('output_container'):
try: pyav_objs['output_container'].close()
except Exception as e_c: append_user_live_log(chat_id, f"Error closing output_container: {e_c}")
pyav_objs['output_container'] = None
pyav_objs['video_out_stream'] = None
pyav_objs['audio_out_stream'] = None
if pyav_objs.get('input_container'):
try: pyav_objs['input_container'].close()
except Exception as e_c: append_user_live_log(chat_id, f"Error closing input_container: {e_c}")
pyav_objs['input_container'] = None
pyav_objs['logo_image_pil'] = None # Clear processed PIL image
# session['pyav_objects'] = pyav_objs # Already modified in place
append_user_live_log(chat_id, "PyAV resources cleanup attempt complete.")
try:
with lock:
session['streaming_state'] = "starting"
session['error_notification_user'] = ""
session['stream_start_time'] = datetime.datetime.now(datetime.timezone.utc)
session['frames_encoded'] = 0
session['bytes_sent'] = 0
session['current_loop_iteration'] = 0
session['current_playlist_index'] = 0
session['stop_gracefully_flag'] = False
# Reset pyav_objects for this run, ensures clean state
session['pyav_objects'] = {
"input_container": None, "output_container": None,
"video_out_stream": None, "audio_out_stream": None,
"logo_image_pil": None
}
append_user_live_log(chat_id, f"Stream engine started. Output: {session['output_url']}")
try:
active_output_container = av.open(session['output_url'], mode='w', format='flv', timeout=10)
with lock: session['pyav_objects']['output_container'] = active_output_container
append_user_live_log(chat_id, f"Output RTMP connection established: {session['output_url']}")
except Exception as e:
raise Exception(f"Failed to open output RTMP stream: {e}")
# --- Logo Setup (if enabled and data exists) ---
# This is done once per stream start, assuming logo doesn't change mid-stream
# The processed PIL image is stored in session['pyav_objects']['logo_image_pil']
# For this thread's use only.
if session.get('logo_enabled') and session.get('logo_data_bytes'):
logo_bytes = session.get('logo_data_bytes')
try:
logo_image_pil_original = Image.open(io.BytesIO(logo_bytes)).convert("RGBA")
if session.get('logo_opacity', 1.0) < 1.0:
alpha = logo_image_pil_original.split()[-1]
alpha = ImageEnhance.Brightness(alpha).enhance(session['logo_opacity'])
logo_image_pil_original.putalpha(alpha)
with lock: # Store the processed PIL image for this stream thread
session['pyav_objects']['logo_image_pil'] = logo_image_pil_original
append_user_live_log(chat_id, f"Logo loaded from in-memory bytes: {session.get('logo_original_filename', 'N/A')}")
except UnidentifiedImageError:
append_user_live_log(chat_id, f"Error: Could not load logo image from bytes. Skipping logo.")
except Exception as e:
append_user_live_log(chat_id, f"Error processing logo from bytes: {e}. Skipping logo.")
else:
append_user_live_log(chat_id, "Logo not enabled or no logo data. Skipping logo.")
total_loops = session.get('loop_count', 0)
playlist = session.get('input_url_playlist', [])
if not playlist:
raise Exception("Playlist is empty. Nothing to stream.")
# --- Main Loop (Loops & Playlist) ---
while True:
with lock:
if session['stop_gracefully_flag'] or session['streaming_state'] == "stopping":
append_user_live_log(chat_id, "Graceful stop or abort signal received. Ending outer loop.")
break
current_loop_iter = session['current_loop_iteration']
if total_loops != -1 and current_loop_iter >= total_loops and total_loops != 0:
append_user_live_log(chat_id, f"Completed {total_loops} loop(s).")
break
append_user_live_log(chat_id, f"Starting loop iteration {current_loop_iter + 1}")
with lock: session['current_playlist_index'] = 0
while session['current_playlist_index'] < len(playlist):
with lock:
if session['stop_gracefully_flag'] or session['streaming_state'] == "stopping":
append_user_live_log(chat_id, "Graceful stop or abort signal received. Ending playlist loop.")
break
current_idx = session['current_playlist_index']
current_input_url = playlist[current_idx]
append_user_live_log(chat_id, f"Processing playlist item {current_idx + 1}/{len(playlist)}: {current_input_url}")
# Local vars for this specific input processing cycle.
# These do not directly use session['pyav_objects'] for their primary reference
# but update it upon successful creation/assignment.
_current_input_container = None
# Output streams are more persistent across playlist items IF compatible.
# For simplicity, we define them once based on the first item or re-check.
try:
_current_input_container = av.open(current_input_url, timeout=10)
# Store in session for potential cleanup if error occurs before local var is cleared
with lock: session['pyav_objects']['input_container'] = _current_input_container
append_user_live_log(chat_id, f"Input opened: {current_input_url}")
in_v_streams = _current_input_container.streams.video
in_a_streams = _current_input_container.streams.audio
if not in_v_streams: raise Exception("No video stream found in input.")
in_v_s = in_v_streams[0]
target_width, target_height = in_v_s.width, in_v_s.height
if session['resolution'] != "source":
try:
target_width, target_height = map(int, session['resolution'].split('x'))
except ValueError:
append_user_live_log(chat_id, f"Invalid target resolution '{session['resolution']}'. Using source.")
target_width, target_height = in_v_s.width, in_v_s.height
# Retrieve output streams from session if they exist, or create them
with lock:
active_video_out_stream = session['pyav_objects'].get('video_out_stream')
active_audio_out_stream = session['pyav_objects'].get('audio_out_stream')
logo_pil_original_ref = session['pyav_objects'].get('logo_image_pil') # This is the pre-opacity PIL image
# --- (Re)configure video output stream ---
# Only configure if not already done or if significantly changed (here, simplified to "first time")
is_first_input_or_reconfig_needed = active_video_out_stream is None
# Local var for resized logo for current video dimensions
current_logo_pil_resized = None
logo_pos_x, logo_pos_y = 0,0 # Reset for each potential reconfig
if is_first_input_or_reconfig_needed:
active_video_out_stream = active_output_container.add_stream(session['video_codec'], rate=session['fps'])
active_video_out_stream.width = target_width
active_video_out_stream.height = target_height
active_video_out_stream.pix_fmt = 'yuv420p'
active_video_out_stream.codec_context.gop_size = session.get('gop_size', int(session['fps'] * 2))
active_video_out_stream.codec_context.bit_rate = parse_bitrate(session['video_bitrate'])
active_video_out_stream.codec_context.thread_type = "AUTO"
active_video_out_stream.codec_context.options = {'preset': session.get('ffmpeg_preset', 'medium')}
if session['video_codec'] == 'copy':
active_video_out_stream.codec_context.copy_parameters(in_v_s.codec_context)
with lock: session['pyav_objects']['video_out_stream'] = active_video_out_stream
append_user_live_log(chat_id, f"Video output stream configured: {target_width}x{target_height}@{session['fps']}fps, {session['video_bitrate']}")
# Logo scaling and positioning based on this video stream's output height
if logo_pil_original_ref: # The pre-opacity, original ratio PIL object
logo_aspect_ratio = logo_pil_original_ref.width / logo_pil_original_ref.height
logo_h = int(active_video_out_stream.height * session.get('logo_scale', 0.1))
logo_w = int(logo_h * logo_aspect_ratio)
logo_w = min(logo_w, active_video_out_stream.width) # cap width
logo_h = min(logo_h, active_video_out_stream.height) # cap height
if logo_w > 0 and logo_h > 0:
current_logo_pil_resized = logo_pil_original_ref.resize((logo_w, logo_h), Image.Resampling.LANCZOS)
append_user_live_log(chat_id, f"Logo resized to {logo_w}x{logo_h} for current video item.")
# Calculate position
margin = 10
if session['logo_position'] == "top_right":
logo_pos_x = active_video_out_stream.width - logo_w - margin
logo_pos_y = margin
elif session['logo_position'] == "top_left":
logo_pos_x = margin; logo_pos_y = margin
elif session['logo_position'] == "bottom_left":
logo_pos_x = margin; logo_pos_y = active_video_out_stream.height - logo_h - margin
elif session['logo_position'] == "bottom_right":
logo_pos_x = active_video_out_stream.width - logo_w - margin
logo_pos_y = active_video_out_stream.height - logo_h - margin
elif session['logo_position'] == "center":
logo_pos_x = (active_video_out_stream.width - logo_w) // 2
logo_pos_y = (active_video_out_stream.height - logo_h) // 2
else: # Default top_right
logo_pos_x = active_video_out_stream.width - logo_w - margin; logo_pos_y = margin
append_user_live_log(chat_id, f"Logo position set to ({logo_pos_x}, {logo_pos_y}) for current video item.")
else:
append_user_live_log(chat_id, "Logo resulted in 0 dimension after scaling for current video. Logo disabled for this item.")
current_logo_pil_resized = None # Disable for this item
# --- (Re)configure audio output stream ---
if is_first_input_or_reconfig_needed and in_a_streams:
in_a_s = in_a_streams[0]
active_audio_out_stream = active_output_container.add_stream(session['audio_codec'], rate=in_a_s.rate)
active_audio_out_stream.codec_context.bit_rate = parse_bitrate(session['audio_bitrate'])
if session['audio_codec'] == 'copy':
active_audio_out_stream.codec_context.copy_parameters(in_a_s.codec_context)
with lock: session['pyav_objects']['audio_out_stream'] = active_audio_out_stream
append_user_live_log(chat_id, f"Audio output stream configured: {in_a_s.rate}Hz, {session['audio_bitrate']}")
elif is_first_input_or_reconfig_needed and not in_a_streams:
append_user_live_log(chat_id, "No audio stream in input. Streaming video only.")
active_audio_out_stream = None
with lock: session['pyav_objects']['audio_out_stream'] = None
with lock:
if session['streaming_state'] != "paused":
session['streaming_state'] = "streaming"
# --- Packet Processing Loop (for current input URL) ---
for packet in _current_input_container.demux(video=0, audio=0 if in_a_streams else -1):
with lock:
if session['streaming_state'] == "stopping": break
while session['streaming_state'] == "paused":
lock.release(); time.sleep(0.2); lock.acquire()
if session['streaming_state'] == "stopping": break
if session['streaming_state'] == "stopping": break
if packet.dts is None: continue
if packet.stream.type == 'video' and packet.stream.index == in_v_s.index:
for frame in packet.decode():
if not frame.width or not frame.height: continue
if frame.width != active_video_out_stream.width or frame.height != active_video_out_stream.height:
frame = frame.reformat(active_video_out_stream.width, active_video_out_stream.height, format='yuv420p')
if current_logo_pil_resized and active_video_out_stream: # Use the resized logo for *this* video item
try:
pil_frame = frame.to_image()
# Ensure logo_w, logo_h from current_logo_pil_resized are used for bounds
actual_x = min(logo_pos_x, pil_frame.width - current_logo_pil_resized.width)
actual_y = min(logo_pos_y, pil_frame.height - current_logo_pil_resized.height)
actual_x = max(0, actual_x)
actual_y = max(0, actual_y)
pil_frame.paste(current_logo_pil_resized, (actual_x, actual_y), current_logo_pil_resized)
# frame = av.VideoFrame.from_image(pil_frame) # This creates a new frame
# Preserve original frame's PTS if possible, or let encoder handle it
new_pts = frame.pts # Keep original PTS
frame = av.VideoFrame.from_image(pil_frame)
frame.pts = new_pts
except Exception as e_logo:
append_user_live_log(chat_id, f"Error applying logo: {e_logo}.")
for out_packet in active_video_out_stream.encode(frame):
active_output_container.mux(out_packet)
with lock:
session['frames_encoded'] += 1
session['bytes_sent'] += out_packet.size
elif active_audio_out_stream and packet.stream.type == 'audio' and packet.stream.index == in_a_streams[0].index :
for frame in packet.decode():
if frame.pts is None: continue
for out_packet in active_audio_out_stream.encode(frame):
active_output_container.mux(out_packet)
with lock:
session['bytes_sent'] += out_packet.size
else:
append_user_live_log(chat_id, f"Finished processing input: {current_input_url}")
except Exception as e_input:
tb_str = traceback.format_exc()
append_user_live_log(chat_id, f"Error processing input {current_input_url}: {e_input}\n{tb_str}")
with lock: session['error_notification_user'] = f"Error with {current_input_url}: {e_input}"
if session.get('stop_on_error_in_playlist', True):
append_user_live_log(chat_id, "Stopping stream due to error in playlist and stop_on_error_in_playlist=True.")
with lock: session['streaming_state'] = "stopping"
break
else:
append_user_live_log(chat_id, "Skipping to next item in playlist (if any).")
finally:
if _current_input_container: # Use the local var for this input item
try: _current_input_container.close()
except Exception as e_c: append_user_live_log(chat_id, f"Minor error closing input container: {e_c}")
# Clear from session too, as this specific input_container is done
with lock:
if session['pyav_objects']['input_container'] == _current_input_container:
session['pyav_objects']['input_container'] = None
with lock:
if session['streaming_state'] == "stopping": break
session['current_playlist_index'] += 1
with lock:
if session['streaming_state'] == "stopping": break
session['current_loop_iteration'] += 1
append_user_live_log(chat_id, "All playlist items and loops processed or stream stopped.")
except Exception as e_stream:
tb_str = traceback.format_exc()
append_user_live_log(chat_id, f"Fatal stream engine error: {e_stream}\n{tb_str}")
with lock:
session['error_notification_user'] = f"Stream failed: {e_stream}"
session['streaming_state'] = "error"
finally:
append_user_live_log(chat_id, "Stream engine finalizing...")
# Retrieve output streams from session for flushing
# This assumes active_output_container is still the one from the start of the stream.
_final_output_container = None
_final_video_out_stream = None
_final_audio_out_stream = None
with lock:
_final_output_container = session['pyav_objects'].get('output_container')
_final_video_out_stream = session['pyav_objects'].get('video_out_stream')
_final_audio_out_stream = session['pyav_objects'].get('audio_out_stream')
if _final_output_container: # Check if output container was successfully opened
append_user_live_log(chat_id, "Flushing encoders...")
if _final_video_out_stream:
for out_packet in _final_video_out_stream.encode():
try: _final_output_container.mux(out_packet)
except Exception as fe: append_user_live_log(chat_id, f"Err flushing V: {fe}")
if _final_audio_out_stream:
for out_packet in _final_audio_out_stream.encode():
try: _final_output_container.mux(out_packet)
except Exception as fe: append_user_live_log(chat_id, f"Err flushing A: {fe}")
append_user_live_log(chat_id, "Encoders flushed.")
_cleanup_pyav_resources()
with lock:
final_state = "completed" if not session['error_notification_user'] and session['streaming_state'] != "error" else "error"
if session['streaming_state'] == "stopping":
final_state = "stopped"
session['streaming_state'] = final_state
session['stream_thread_ref'] = None
append_user_live_log(chat_id, f"Stream engine finished. Final state: {final_state}.")
logger.info(f"[Chat {chat_id}] Stream ended with state: {final_state}. Error: {session['error_notification_user']}")
# --- Telegram Command and Callback Handlers ---
# handle_telegram_update, CONVERSATION_STEPS*, start_settings_conversation, handle_conversation_step
# handle_playlist_command, handle_playlist_config_interaction remain similar,
# but they won't call save_user_settings_to_file. Changes are implicitly in-memory.
# handle_logo_upload needs to read file bytes and store them.
# handle_logo_config_interaction needs to reflect in-memory storage.
# schedule_stream_job, handle_schedule_command, handle_schedule_config_interaction are fine as APScheduler will use MemoryJobStore.
# Stream Control Command Handlers (start, pause, resume, abort) are mostly fine, no disk writes.
# get_help_text needs to be updated about non-persistence.
async def handle_telegram_update(update: dict):
if "message" in update:
message = update["message"]
chat_id = message["chat"]["id"]
user_name = message["from"].get("username", f"User_{chat_id}")
session = get_user_session(chat_id)
lock = session_locks[chat_id]
text = message.get("text", "")
command = text.split(' ')[0].lower() if text.startswith('/') else None
if message.get("photo") or message.get("document"):
if session.get("current_step") == "awaiting_logo_upload":
# Need to pass the actual UploadFile if using FastAPI's file handling for webhook
# For direct JSON, we'd get file_id and need a bot instance to download.
# Assuming direct JSON from Telegram with file_id:
return await handle_logo_data_from_telegram(chat_id, message)
else:
return send_message(chat_id, "Received a file, but I wasn't expecting one. Use /set_logo first if you want to upload a logo.")
if command:
logger.info(f"[Chat {chat_id} ({user_name})] Received command: {text}")
if command == "/start" or command == "/menu":
with lock: session['current_step'] = None
return send_message(chat_id, compose_status_message(chat_id, include_config=True), reply_markup=get_main_keyboard(session), parse_mode="HTML")
elif command == "/help":
return send_message(chat_id, get_help_text(), parse_mode="Markdown")
elif command == "/settings":
return start_settings_conversation(chat_id, advanced=True)
elif command == "/stream":
return await start_stream_command_handler(chat_id)
elif command == "/playlist":
return await handle_playlist_command(chat_id, text)
elif command == "/set_logo":
with lock: session["current_step"] = "awaiting_logo_upload"
return send_message(chat_id, "Please upload your logo image (PNG or JPG). Max 1MB recommended due to in-memory storage.")
elif command == "/schedule":
return await handle_schedule_command(chat_id, text)
elif command == "/status":
return send_message(chat_id, compose_status_message(chat_id, include_config=True), reply_markup=get_main_keyboard(session), parse_mode="HTML")
elif command == "/abort":
return await stream_abort_handler(chat_id)
else:
return await handle_conversation_step(chat_id, text)
else:
return await handle_conversation_step(chat_id, text)
elif "callback_query" in update:
cb_query = update["callback_query"]
chat_id = cb_query["message"]["chat"]["id"]
message_id = cb_query["message"]["message_id"]
data = cb_query["data"]
user_name = cb_query["from"].get("username", f"User_{chat_id}")
session = get_user_session(chat_id)
lock = session_locks[chat_id]
logger.info(f"[Chat {chat_id} ({user_name})] Received callback: {data}")
ack = answer_callback_query(cb_query["id"])
if data == "cfg_start":
if not session.get('input_url_playlist') or not session.get('output_url'):
# This is tricky as start_settings_conversation returns a message dict, not just text.
settings_response = start_settings_conversation(chat_id, advanced=session.get('advanced_mode', False))
response_text = "Playlist or Output URL is not set. Let's configure.\n" + settings_response['text']
# If settings_response has its own reply_markup, it might conflict.
# For simplicity, let's assume start_settings_conversation is primarily text.
return [ack, send_message(chat_id, response_text, reply_markup=settings_response.get('reply_markup') or get_main_keyboard(session))]
else:
return [ack, await start_stream_command_handler(chat_id, message_id_to_edit=message_id)]
elif data == "cfg_advanced":
settings_response = start_settings_conversation(chat_id, advanced=True)
return [ack, edit_message_text(chat_id, message_id, settings_response['text'] + "\n\n" + compose_status_message(chat_id, include_config=True), reply_markup=settings_response.get('reply_markup') or get_main_keyboard(session), parse_mode="HTML")]
elif data == "cfg_playlist":
return [ack, await handle_playlist_config_interaction(chat_id, message_id)]
elif data == "cfg_logo":
return [ack, await handle_logo_config_interaction(chat_id, message_id)]
elif data == "cfg_schedule":
return [ack, await handle_schedule_config_interaction(chat_id, message_id)]
elif data == "stream_pause":
return [ack, await stream_pause_handler(chat_id, message_id)]
elif data == "stream_resume":
return [ack, await stream_resume_handler(chat_id, message_id)]
elif data == "stream_abort":
return [ack, await stream_abort_handler(chat_id, message_id_to_edit=message_id)]
elif data == "stream_status":
return [ack, edit_message_text(chat_id, message_id, compose_status_message(chat_id, include_config=False), reply_markup=get_main_keyboard(session), parse_mode="HTML")]
elif data == "stream_stop_loop_graceful":
with lock: session['stop_gracefully_flag'] = True
append_user_live_log(chat_id, "Graceful loop stop requested.")
return [ack, edit_message_text(chat_id, message_id, "Graceful loop stop initiated. Current iteration will complete.\n" + compose_status_message(chat_id), reply_markup=get_main_keyboard(session), parse_mode="HTML")]
elif data == "show_global_logs":
global_logs_text = "š *Global Bot Logs (last 20):*\n" + "\n".join(live_log_lines_global[-20:]) + "" return [ack, edit_message_text(chat_id, message_id, global_logs_text + "\n\n" + compose_status_message(chat_id), reply_markup=get_main_keyboard(session), parse_mode="HTML")] elif data == "show_help": return [ack, edit_message_text(chat_id, message_id, get_help_text() + "\n\n" + compose_status_message(chat_id), reply_markup=get_main_keyboard(session))] elif data.startswith("set_quality_"): quality = data.split("_")[-1] with lock: session["quality_settings"] = quality if quality in QUALITY_SETTINGS_MAP and quality != "custom": for key, val in QUALITY_SETTINGS_MAP[quality].items(): session[key] = val session["current_step"] = None # save_user_settings_to_file(chat_id) # REMOVED return [ack, edit_message_text(chat_id, message_id, f"Quality set to `{quality}`.\n" + compose_status_message(chat_id, True), reply_markup=get_main_keyboard(session), parse_mode="HTML")] return [ack, send_message(chat_id, "Unknown action.")] return {"status": "ok", "message": "Update type not handled"} CONVERSATION_STEPS_SIMPLE = [ ("input_url_playlist", "Enter the first Input URL for your playlist (e.g., http://example.com/stream.m3u8). You can add more later.", validate_url), ("output_url", "Enter your RTMP Output URL (e.g., rtmp://a.rtmp.youtube.com/live2/YOUR_STREAM_KEY).", validate_url), ] CONVERSATION_STEPS_ADVANCED = [ ("input_url_playlist", "Enter the first Input URL for your playlist. You can add more later.", validate_url), ("output_url", "Enter your RTMP Output URL.", validate_url), ("quality_settings_prompt", "Choose Quality Preset: (low, medium, high, custom). Custom allows setting individual params next.", None), ("video_codec", f"Video Codec (e.g., libx264). Supported: {', '.join(SUPPORTED_VIDEO_CODECS)}.", lambda x: x in SUPPORTED_VIDEO_CODECS), ("audio_codec", f"Audio Codec (e.g., aac). Supported: {', '.join(SUPPORTED_AUDIO_CODECS)}.", lambda x: x in SUPPORTED_AUDIO_CODECS), ("resolution", "Resolution (e.g., 1280x720 or 'source').", validate_resolution), ("fps", "FPS (e.g., 30).", lambda x: str(x).isdigit() and 1 <= int(x) <= 120), ("video_bitrate", "Video Bitrate (e.g., 1500k, 3M).", lambda x: True), ("audio_bitrate", "Audio Bitrate (e.g., 128k).", lambda x: True), ("ffmpeg_preset", "FFmpeg Preset (for libx264: ultrafast, medium, slow etc.).", lambda x: True), ("loop_count", "Loop Count (0 for no loop, -1 for infinite, N for N times).", lambda x: str(x).lstrip('-').isdigit()), ] def start_settings_conversation(chat_id: int, advanced: bool): session = get_user_session(chat_id) lock = session_locks[chat_id] with lock: session['advanced_mode'] = advanced session['conversation_fields_list'] = CONVERSATION_STEPS_ADVANCED if advanced else CONVERSATION_STEPS_SIMPLE session['current_step_index'] = 0 session['current_step'] = session['conversation_fields_list'][0][0] first_field_key, first_prompt, _ = session['conversation_fields_list'][0] # Prepare initial message (send_message or part of edit_message_text) initial_message_content = {} if first_field_key == "quality_settings_prompt": initial_message_content = { "text": first_prompt, "reply_markup": { "inline_keyboard": [ [{"text": q.title(), "callback_data": f"set_quality_{q}"} for q in ["low", "medium", "high", "custom"]] ] } } else: current_value_msg = f"(Current: `{session.get(first_field_key, 'Not set')}`)" if session.get(first_field_key) else "" initial_message_content = {"text": f"{first_prompt}\n{current_value_msg}\n\nType /cancel to abort setup."} # This function is now called from callbacks too, so it needs to return a dict that can be used by send_message or edit_message_text return initial_message_content async def handle_conversation_step(chat_id: int, user_text: str): session = get_user_session(chat_id) lock = session_locks[chat_id] current_step_key_local = None validator_local = None current_step_index_local = -1 with lock: if not session.get('current_step') or not session.get('conversation_fields_list'): return send_message(chat_id, "No setup in progress. Use /start or a menu button.", reply_markup=get_main_keyboard(session)) if user_text.lower() == "/cancel": session['current_step'] = None session['current_step_index'] = 0 # Reset index return send_message(chat_id, "Setup cancelled.", reply_markup=get_main_keyboard(session)) # Ensure current_step_index is valid if not (0 <= session.get('current_step_index', -1) < len(session['conversation_fields_list'])): session['current_step'] = None # Invalid state, reset session['current_step_index'] = 0 logger.warning(f"[Chat {chat_id}] Invalid current_step_index, resetting conversation.") return send_message(chat_id, "Conversation state error, setup reset. Please start again.", reply_markup=get_main_keyboard(session)) current_step_index_local = session['current_step_index'] current_step_key_local, _, validator_local = session['conversation_fields_list'][current_step_index_local] user_input = user_text.strip() if validator_local and not validator_local(user_input): # Re-fetch prompt as it might have been clobbered if lock wasn't held long enough (should be fine here) prompt, _, _ = session['conversation_fields_list'][current_step_index_local] # Use local index return send_message(chat_id, f"Invalid input for {current_step_key_local.replace('_',' ').title()}. Please try again.\n{prompt}") with lock: # Re-verify current_step_key just in case, though primary logic relies on index if current_step_key_local != session['conversation_fields_list'][session['current_step_index']][0]: logger.error(f"[Chat {chat_id}] Conversation step mismatch! Aborting step.") return send_message(chat_id, "A state error occurred. Please try /cancel and restart setup.", reply_markup=get_main_keyboard(session)) if current_step_key_local == "input_url_playlist": session["input_url_playlist"] = [user_input] if user_input else [] elif current_step_key_local == "quality_settings_prompt": quality_input = user_input.lower() if quality_input in QUALITY_SETTINGS_MAP.keys() or quality_input == "custom": session["quality_settings"] = quality_input if quality_input in QUALITY_SETTINGS_MAP and quality_input != "custom": for key, val in QUALITY_SETTINGS_MAP[quality_input].items(): session[key] = val append_user_live_log(chat_id, f"Quality set to {quality_input} via text.") else: # This return is problematic inside a lock. Need to refactor or just set error and continue. # For now, let it pass, but ideally, validation handles this before lock. # Re-prompting from here is complex. logger.warning(f"[Chat {chat_id}] Invalid quality input '{quality_input}' in conversation step. Not applied.") elif current_step_key_local == "loop_count" or current_step_key_local == "fps": session[current_step_key_local] = int(user_input) else: session[current_step_key_local] = user_input append_user_live_log(chat_id, f"Set {current_step_key_local} = {user_input}") session['current_step_index'] += 1 if session['current_step_index'] < len(session['conversation_fields_list']): next_step_key, next_prompt, _ = session['conversation_fields_list'][session['current_step_index']] session['current_step'] = next_step_key if next_step_key == "quality_settings_prompt": return send_message(chat_id, next_prompt, reply_markup={ "inline_keyboard": [ [{"text": q.title(), "callback_data": f"set_quality_{q}"} for q in ["low", "medium", "high", "custom"]] ]}) current_value_msg = f"(Current: `{session.get(next_step_key, 'Not set')}`)" if session.get(next_step_key) else "" return send_message(chat_id, f"{next_prompt}\n{current_value_msg}\n\nType /cancel to abort setup.") else: session['current_step'] = None session['current_step_index'] = 0 # save_user_settings_to_file(chat_id) # REMOVED logger.info(f"[Chat {chat_id}] In-memory settings configured.") return send_message(chat_id, "All settings configured (in memory for this session)!\n" + compose_status_message(chat_id, True), reply_markup=get_main_keyboard(session), parse_mode="HTML") async def handle_playlist_command(chat_id: int, text: str): session = get_user_session(chat_id) lock = session_locks[chat_id] parts = text.split(maxsplit=2) action = parts[1].lower() if len(parts) > 1 else "show" msg = "" with lock: playlist = session.get('input_url_playlist', []) if action == "add" and len(parts) > 2: url_to_add = parts[2] if validate_url(url_to_add): playlist.append(url_to_add) # session['input_url_playlist'] = playlist # Modified in place msg = f"Added to playlist. Total items: {len(playlist)}." else: msg = "Invalid URL format." elif action == "remove" and len(parts) > 2: idx_str = parts[2] removed_item = None if idx_str.lower() == "last" and playlist: removed_item = playlist.pop() elif idx_str.isdigit(): idx_to_remove = int(idx_str) -1 if 0 <= idx_to_remove < len(playlist): removed_item = playlist.pop(idx_to_remove) if removed_item: msg = f"Removed '{removed_item[:50]}...' from playlist. Total items: {len(playlist)}." else: msg = "Invalid index or playlist empty." elif action == "clear": session['input_url_playlist'] = [] msg = "Playlist cleared." elif action == "show": if playlist: plist_str = "\n".join([f"{i+1}. `{url}`" for i, url in enumerate(playlist)]) msg = f"*Current Playlist ({len(playlist)} items):*\n{plist_str}" else: msg = "Playlist is empty." else: msg = "Usage: /playlist [add