Spaces:
Sleeping
Sleeping
| import logging | |
| import threading | |
| import time | |
| import datetime | |
| import traceback | |
| import fractions | |
| import json | |
| import os | |
| import re | |
| from urllib.parse import urlparse | |
| from pathlib import Path # Still useful for parsing, just not disk ops | |
| import io # For handling bytes as files | |
| from fastapi import FastAPI, Request, HTTPException, UploadFile, File, Form | |
| import av | |
| from PIL import Image, ImageEnhance, UnidentifiedImageError # Pillow is still needed for image manipulation | |
| from apscheduler.schedulers.background import BackgroundScheduler | |
| # SQLAlchemyJobStore is removed as we can't write to disk. APScheduler will use MemoryJobStore. | |
| # --- Configuration & Global Constants --- | |
| APP_VERSION = "2.0.1-inmemory" # Version reflects in-memory change | |
| # --- FastAPI & Scheduler Initialization --- | |
| app = FastAPI(title="Advanced Stream Bot (In-Memory Edition)", version=APP_VERSION) | |
| # APScheduler for scheduled streams - will use default MemoryJobStore | |
| scheduler = BackgroundScheduler(timezone="UTC") | |
| # --- Global Data Stores (Managed per user where applicable) --- | |
| # user_sessions: chat_id -> session_data_dict | |
| # All data is now in-memory. | |
| user_sessions = {} | |
| # session_locks: chat_id -> threading.Lock() for thread-safe session updates | |
| session_locks = {} | |
| # Global list for general bot logs (rolling) | |
| live_log_lines_global = [] | |
| MAX_GLOBAL_LOG_LINES = 100 | |
| # --- Logging Setup --- | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format="%(asctime)s [%(levelname)s] %(threadName)s %(module)s:%(lineno)d %(message)s", | |
| datefmt="%Y-%m-%d %H:%M:%S", | |
| ) | |
| logger = logging.getLogger("stream_bot_inmemory") | |
| def append_global_live_log(line: str): | |
| global live_log_lines_global | |
| live_log_lines_global.append(line) | |
| if len(live_log_lines_global) > MAX_GLOBAL_LOG_LINES: | |
| live_log_lines_global.pop(0) | |
| class GlobalListHandler(logging.Handler): | |
| def emit(self, record): | |
| log_entry = self.format(record) | |
| append_global_live_log(log_entry) | |
| global_list_handler = GlobalListHandler() | |
| global_list_handler.setFormatter(logging.Formatter("[%(levelname)s] %(message)s")) | |
| logger.addHandler(global_list_handler) | |
| # --- Per-User Session Management (In-Memory) --- | |
| DEFAULT_USER_SETTINGS = { | |
| "input_url_playlist": [], | |
| "current_playlist_index": 0, | |
| "output_url": "rtmp://a.rtmp.youtube.com/live2/", | |
| "quality_settings": "medium", | |
| "video_codec": "libx264", | |
| "audio_codec": "aac", | |
| "resolution": "source", | |
| "fps": 30, | |
| "gop_size": 60, | |
| "video_bitrate": "1500k", | |
| "audio_bitrate": "128k", | |
| "loop_count": 0, | |
| "stop_on_error_in_playlist": True, | |
| "ffmpeg_preset": "medium", | |
| "logo_enabled": False, | |
| "logo_data_bytes": None, # Store logo as bytes | |
| "logo_mime_type": None, # e.g., 'image/png' | |
| "logo_original_filename": None, # For display | |
| "logo_position": "top_right", | |
| "logo_scale": 0.1, | |
| "logo_opacity": 0.8, | |
| "advanced_mode": False, | |
| "current_step": None, | |
| "conversation_fields_list": [], | |
| } | |
| DEFAULT_SESSION_RUNTIME_STATE = { | |
| "streaming_state": "idle", | |
| "stream_start_time": None, | |
| "frames_encoded": 0, | |
| "bytes_sent": 0, | |
| "stream_thread_ref": None, | |
| "pyav_objects": {"input_container": None, "output_container": None, "video_out_stream": None, "audio_out_stream": None, "logo_image_pil": None}, | |
| "live_log_lines_user": [], | |
| "error_notification_user": "", | |
| "stop_gracefully_flag": False, | |
| "current_loop_iteration": 0, | |
| } | |
| def get_user_session(chat_id: int) -> dict: | |
| if chat_id not in user_sessions: | |
| session_locks[chat_id] = threading.Lock() | |
| with session_locks[chat_id]: | |
| # Initialize new session with defaults as there's no file to load from | |
| user_sessions[chat_id] = {} | |
| # Deep copy default settings to avoid shared mutable objects | |
| for key, value in DEFAULT_USER_SETTINGS.items(): | |
| user_sessions[chat_id][key] = list(value) if isinstance(value, list) else value | |
| for key, value in DEFAULT_SESSION_RUNTIME_STATE.items(): | |
| user_sessions[chat_id][key] = list(value) if isinstance(value, list) else value | |
| logger.info(f"[Chat {chat_id}] New IN-MEMORY session created. Settings are NOT persistent.") | |
| return user_sessions[chat_id] | |
| # save_user_settings_to_file, load_user_settings_from_file, load_all_sessions_on_startup are REMOVED. | |
| # Settings are only saved in memory for the current runtime. | |
| # --- User-specific Live Logging --- | |
| def append_user_live_log(chat_id: int, line: str): | |
| session = get_user_session(chat_id) # Ensures session exists | |
| # No need for lock here if list append is atomic and reading is infrequent / tolerant | |
| # but for consistency with other session modifications: | |
| lock = session_locks.get(chat_id) | |
| if lock: # Should always exist after get_user_session | |
| with lock: | |
| session['live_log_lines_user'].append(f"{datetime.datetime.now().strftime('%H:%M:%S')} {line}") | |
| if len(session['live_log_lines_user']) > 50: | |
| session['live_log_lines_user'].pop(0) | |
| else: # Fallback if lock somehow not initialized | |
| logger.warning(f"[Chat {chat_id}] Lock not found for appending user log. This is unexpected.") | |
| session['live_log_lines_user'].append(f"{datetime.datetime.now().strftime('%H:%M:%S')} {line}") | |
| if len(session['live_log_lines_user']) > 50: | |
| session['live_log_lines_user'].pop(0) | |
| # --- Input Validation --- | |
| SUPPORTED_VIDEO_CODECS = ["libx264", "h264_nvenc", "h264_qsv", "libx265", "hevc_nvenc", "hevc_qsv", "copy"] | |
| SUPPORTED_AUDIO_CODECS = ["aac", "opus", "libmp3lame", "copy"] | |
| LOGO_POSITIONS = ["top_left", "top_right", "bottom_left", "bottom_right", "center"] | |
| QUALITY_SETTINGS_MAP = { | |
| "low": {"video_bitrate": "800k", "audio_bitrate": "96k", "ffmpeg_preset": "superfast", "resolution": "854x480"}, | |
| "medium": {"video_bitrate": "1500k", "audio_bitrate": "128k", "ffmpeg_preset": "medium", "resolution": "1280x720"}, | |
| "high": {"video_bitrate": "3000k", "audio_bitrate": "192k", "ffmpeg_preset": "fast", "resolution": "1920x1080"}, | |
| "source": {} | |
| } | |
| def validate_url(url: str) -> bool: | |
| try: | |
| result = urlparse(url) | |
| return all([result.scheme, result.netloc]) and result.scheme in ['http', 'https', 'rtmp', 'rtsp', 'udp', 'srt', 'file'] | |
| except: | |
| return False | |
| def validate_resolution(res: str) -> bool: | |
| if res.lower() == "source": | |
| return True | |
| return bool(re.match(r"^\d{3,4}x\d{3,4}$", res)) | |
| def parse_bitrate(bitrate_str: str) -> int: | |
| bitrate_str = str(bitrate_str).lower() # Ensure it's a string | |
| if bitrate_str.endswith('k'): | |
| return int(float(bitrate_str[:-1]) * 1000) | |
| elif bitrate_str.endswith('m'): | |
| return int(float(bitrate_str[:-1]) * 1000000) | |
| try: | |
| return int(bitrate_str) | |
| except ValueError: | |
| return 1500000 | |
| def format_settings_for_display(session: dict) -> str: | |
| logo_status = "Disabled" | |
| if session.get('logo_enabled') and session.get('logo_data_bytes'): | |
| logo_status = f"Enabled ({session.get('logo_original_filename', 'Unknown name')})" | |
| display_data = { | |
| "Output URL": session.get('output_url'), | |
| "Quality": session.get('quality_settings'), | |
| "Video Codec": session.get('video_codec'), | |
| "Audio Codec": session.get('audio_codec'), | |
| "Resolution": session.get('resolution'), | |
| "FPS": session.get('fps'), | |
| "Video Bitrate": session.get('video_bitrate'), | |
| "Audio Bitrate": session.get('audio_bitrate'), | |
| "FFmpeg Preset": session.get('ffmpeg_preset'), | |
| "Loop Count": "Infinite" if session.get('loop_count') == -1 else session.get('loop_count'), | |
| "Playlist Length": len(session.get('input_url_playlist', [])), | |
| "Logo": logo_status, | |
| } | |
| if session.get('logo_enabled') and session.get('logo_data_bytes'): | |
| display_data["Logo Position"] = session.get('logo_position') | |
| display_data["Logo Scale"] = session.get('logo_scale') | |
| display_data["Logo Opacity"] = session.get('logo_opacity') | |
| return "\n".join([f"• *{k}*: `{v}`" for k, v in display_data.items() if v is not None]) | |
| # --- UI Helpers (send_message, edit_message_text, answer_callback_query, get_main_keyboard, get_uptime, compose_status_message) --- | |
| # These functions remain largely the same as they don't interact with disk. | |
| # Minor change in compose_status_message for logo display from bytes. | |
| def send_message(chat_id: int, text: str, parse_mode="Markdown", reply_markup=None): | |
| logger.info(f"[Chat {chat_id}] Sending message: {text[:100]}...") | |
| response = { | |
| "method": "sendMessage", | |
| "chat_id": chat_id, | |
| "text": text, | |
| "parse_mode": parse_mode | |
| } | |
| if reply_markup: | |
| response["reply_markup"] = reply_markup | |
| return response | |
| def edit_message_text(chat_id: int, message_id: int, text: str, parse_mode="Markdown", reply_markup=None): | |
| logger.info(f"[Chat {chat_id}] Editing message {message_id}: {text[:100]}...") | |
| response = { | |
| "method": "editMessageText", | |
| "chat_id": chat_id, | |
| "message_id": message_id, | |
| "text": text, | |
| "parse_mode": parse_mode | |
| } | |
| if reply_markup: | |
| response["reply_markup"] = reply_markup | |
| return response | |
| def answer_callback_query(callback_query_id: str, text: str = None, show_alert: bool = False): | |
| response = { | |
| "method": "answerCallbackQuery", | |
| "callback_query_id": callback_query_id, | |
| } | |
| if text: | |
| response["text"] = text | |
| if show_alert: | |
| response["show_alert"] = True | |
| return response | |
| def get_main_keyboard(session: dict): | |
| streaming = session.get('streaming_state') in ["streaming", "paused", "starting"] | |
| buttons = [] | |
| if not streaming: | |
| buttons.append([{"text": "🚀 Start/Configure Stream", "callback_data": "cfg_start"}]) | |
| buttons.append([ | |
| {"text": "⚙️ Settings", "callback_data": "cfg_advanced"}, | |
| {"text": "📜 Playlist (" + str(len(session.get('input_url_playlist',[]))) + ")", "callback_data": "cfg_playlist"} | |
| ]) | |
| buttons.append([ | |
| {"text": "🖼️ Logo", "callback_data": "cfg_logo"}, | |
| {"text": "🕒 Schedule", "callback_data": "cfg_schedule"} | |
| ]) | |
| else: | |
| if session.get('streaming_state') == "streaming": | |
| buttons.append([{"text": "⏸ Pause", "callback_data": "stream_pause"}]) | |
| elif session.get('streaming_state') == "paused": | |
| buttons.append([{"text": "▶️ Resume", "callback_data": "stream_resume"}]) | |
| buttons.append([ | |
| {"text": "⏹ Abort Stream", "callback_data": "stream_abort"}, | |
| {"text": "📊 Status Update", "callback_data": "stream_status"} | |
| ]) | |
| if session.get('loop_count', 0) == -1: | |
| buttons.append([{"text": "⏳ Stop Loop Gracefully", "callback_data": "stream_stop_loop_graceful"}]) | |
| buttons.append([{"text": "📜 Global Logs", "callback_data": "show_global_logs"}, {"text": "❔ Help", "callback_data": "show_help"}]) | |
| return {"inline_keyboard": buttons} | |
| def get_uptime(start_time_obj): | |
| if start_time_obj: | |
| # Ensure start_time_obj is offset-aware if comparing with offset-aware now() | |
| if start_time_obj.tzinfo is None: | |
| start_time_obj = start_time_obj.replace(tzinfo=datetime.timezone.utc) # Assume UTC if naive | |
| uptime_delta = datetime.datetime.now(datetime.timezone.utc) - start_time_obj | |
| return str(uptime_delta).split('.')[0] | |
| return "0s" | |
| def compose_status_message(chat_id: int, include_config: bool = False) -> str: | |
| session = get_user_session(chat_id) | |
| state = session['streaming_state'] | |
| status_lines = [ | |
| f"🤖 *Stream Bot Status for Chat ID {chat_id} (In-Memory Mode)*", | |
| f"_Settings, logos, and schedules are NOT persistent across bot restarts._", | |
| f"State: `{state}`", | |
| ] | |
| if session.get('error_notification_user'): | |
| status_lines.append(f"⚠️ Error: `{session['error_notification_user']}`") | |
| if state in ["streaming", "paused", "starting", "stopping"]: | |
| current_input_url_display = "N/A" | |
| playlist = session.get('input_url_playlist', []) | |
| playlist_idx = session.get('current_playlist_index', 0) | |
| if playlist and 0 <= playlist_idx < len(playlist): | |
| current_input_url_display = playlist[playlist_idx] | |
| status_lines.extend([ | |
| f"Uptime: `{get_uptime(session['stream_start_time'])}`", | |
| f"Frames Encoded: `{session['frames_encoded']}`", | |
| f"Bytes Sent: `{session['bytes_sent'] / (1024*1024):.2f} MB`", | |
| f"Current Input: `{current_input_url_display}` (Item {playlist_idx+1}/{len(playlist)})", | |
| f"Loop Iteration: {session.get('current_loop_iteration',0)+1}/{'Infinite' if session.get('loop_count',0) == -1 else session.get('loop_count',0) or 1}" | |
| ]) | |
| if include_config or state not in ["streaming", "paused"]: | |
| status_lines.append("\n📝 *Current Configuration:*") | |
| status_lines.append(format_settings_for_display(session)) | |
| if state in ["streaming", "paused", "starting"]: | |
| status_lines.append("\n📋 *User Live Logs (last 10):*") | |
| user_logs_preview = "\n".join(session['live_log_lines_user'][-10:]) | |
| status_lines.append(f"<pre>{user_logs_preview if user_logs_preview else 'No user logs yet.'}</pre>") | |
| return "\n".join(status_lines) | |
| # --- Core Streaming Logic --- | |
| # stream_engine_thread_target needs to load logo from session['logo_data_bytes'] | |
| def stream_engine_thread_target(chat_id: int): | |
| session = get_user_session(chat_id) | |
| lock = session_locks[chat_id] | |
| active_input_container = None | |
| active_output_container = None | |
| active_video_out_stream = None | |
| active_audio_out_stream = None | |
| # logo_image_pil is now loaded from bytes each time, or stored in pyav_objects for the thread's duration | |
| # logo_width, logo_height = 0, 0 # These will be calculated | |
| # logo_pos_x, logo_pos_y = 0, 0 # These will be calculated | |
| def _cleanup_pyav_resources(): | |
| # This function is called within the stream thread | |
| # Access pyav_objects directly from session inside the lock | |
| append_user_live_log(chat_id, "Cleaning up PyAV resources...") | |
| with lock: # Ensure thread-safe access to session's pyav_objects | |
| pyav_objs = session.get('pyav_objects', {}) | |
| if pyav_objs.get('output_container'): | |
| try: pyav_objs['output_container'].close() | |
| except Exception as e_c: append_user_live_log(chat_id, f"Error closing output_container: {e_c}") | |
| pyav_objs['output_container'] = None | |
| pyav_objs['video_out_stream'] = None | |
| pyav_objs['audio_out_stream'] = None | |
| if pyav_objs.get('input_container'): | |
| try: pyav_objs['input_container'].close() | |
| except Exception as e_c: append_user_live_log(chat_id, f"Error closing input_container: {e_c}") | |
| pyav_objs['input_container'] = None | |
| pyav_objs['logo_image_pil'] = None # Clear processed PIL image | |
| # session['pyav_objects'] = pyav_objs # Already modified in place | |
| append_user_live_log(chat_id, "PyAV resources cleanup attempt complete.") | |
| try: | |
| with lock: | |
| session['streaming_state'] = "starting" | |
| session['error_notification_user'] = "" | |
| session['stream_start_time'] = datetime.datetime.now(datetime.timezone.utc) | |
| session['frames_encoded'] = 0 | |
| session['bytes_sent'] = 0 | |
| session['current_loop_iteration'] = 0 | |
| session['current_playlist_index'] = 0 | |
| session['stop_gracefully_flag'] = False | |
| # Reset pyav_objects for this run, ensures clean state | |
| session['pyav_objects'] = { | |
| "input_container": None, "output_container": None, | |
| "video_out_stream": None, "audio_out_stream": None, | |
| "logo_image_pil": None | |
| } | |
| append_user_live_log(chat_id, f"Stream engine started. Output: {session['output_url']}") | |
| try: | |
| active_output_container = av.open(session['output_url'], mode='w', format='flv', timeout=10) | |
| with lock: session['pyav_objects']['output_container'] = active_output_container | |
| append_user_live_log(chat_id, f"Output RTMP connection established: {session['output_url']}") | |
| except Exception as e: | |
| raise Exception(f"Failed to open output RTMP stream: {e}") | |
| # --- Logo Setup (if enabled and data exists) --- | |
| # This is done once per stream start, assuming logo doesn't change mid-stream | |
| # The processed PIL image is stored in session['pyav_objects']['logo_image_pil'] | |
| # For this thread's use only. | |
| if session.get('logo_enabled') and session.get('logo_data_bytes'): | |
| logo_bytes = session.get('logo_data_bytes') | |
| try: | |
| logo_image_pil_original = Image.open(io.BytesIO(logo_bytes)).convert("RGBA") | |
| if session.get('logo_opacity', 1.0) < 1.0: | |
| alpha = logo_image_pil_original.split()[-1] | |
| alpha = ImageEnhance.Brightness(alpha).enhance(session['logo_opacity']) | |
| logo_image_pil_original.putalpha(alpha) | |
| with lock: # Store the processed PIL image for this stream thread | |
| session['pyav_objects']['logo_image_pil'] = logo_image_pil_original | |
| append_user_live_log(chat_id, f"Logo loaded from in-memory bytes: {session.get('logo_original_filename', 'N/A')}") | |
| except UnidentifiedImageError: | |
| append_user_live_log(chat_id, f"Error: Could not load logo image from bytes. Skipping logo.") | |
| except Exception as e: | |
| append_user_live_log(chat_id, f"Error processing logo from bytes: {e}. Skipping logo.") | |
| else: | |
| append_user_live_log(chat_id, "Logo not enabled or no logo data. Skipping logo.") | |
| total_loops = session.get('loop_count', 0) | |
| playlist = session.get('input_url_playlist', []) | |
| if not playlist: | |
| raise Exception("Playlist is empty. Nothing to stream.") | |
| # --- Main Loop (Loops & Playlist) --- | |
| while True: | |
| with lock: | |
| if session['stop_gracefully_flag'] or session['streaming_state'] == "stopping": | |
| append_user_live_log(chat_id, "Graceful stop or abort signal received. Ending outer loop.") | |
| break | |
| current_loop_iter = session['current_loop_iteration'] | |
| if total_loops != -1 and current_loop_iter >= total_loops and total_loops != 0: | |
| append_user_live_log(chat_id, f"Completed {total_loops} loop(s).") | |
| break | |
| append_user_live_log(chat_id, f"Starting loop iteration {current_loop_iter + 1}") | |
| with lock: session['current_playlist_index'] = 0 | |
| while session['current_playlist_index'] < len(playlist): | |
| with lock: | |
| if session['stop_gracefully_flag'] or session['streaming_state'] == "stopping": | |
| append_user_live_log(chat_id, "Graceful stop or abort signal received. Ending playlist loop.") | |
| break | |
| current_idx = session['current_playlist_index'] | |
| current_input_url = playlist[current_idx] | |
| append_user_live_log(chat_id, f"Processing playlist item {current_idx + 1}/{len(playlist)}: {current_input_url}") | |
| # Local vars for this specific input processing cycle. | |
| # These do not directly use session['pyav_objects'] for their primary reference | |
| # but update it upon successful creation/assignment. | |
| _current_input_container = None | |
| # Output streams are more persistent across playlist items IF compatible. | |
| # For simplicity, we define them once based on the first item or re-check. | |
| try: | |
| _current_input_container = av.open(current_input_url, timeout=10) | |
| # Store in session for potential cleanup if error occurs before local var is cleared | |
| with lock: session['pyav_objects']['input_container'] = _current_input_container | |
| append_user_live_log(chat_id, f"Input opened: {current_input_url}") | |
| in_v_streams = _current_input_container.streams.video | |
| in_a_streams = _current_input_container.streams.audio | |
| if not in_v_streams: raise Exception("No video stream found in input.") | |
| in_v_s = in_v_streams[0] | |
| target_width, target_height = in_v_s.width, in_v_s.height | |
| if session['resolution'] != "source": | |
| try: | |
| target_width, target_height = map(int, session['resolution'].split('x')) | |
| except ValueError: | |
| append_user_live_log(chat_id, f"Invalid target resolution '{session['resolution']}'. Using source.") | |
| target_width, target_height = in_v_s.width, in_v_s.height | |
| # Retrieve output streams from session if they exist, or create them | |
| with lock: | |
| active_video_out_stream = session['pyav_objects'].get('video_out_stream') | |
| active_audio_out_stream = session['pyav_objects'].get('audio_out_stream') | |
| logo_pil_original_ref = session['pyav_objects'].get('logo_image_pil') # This is the pre-opacity PIL image | |
| # --- (Re)configure video output stream --- | |
| # Only configure if not already done or if significantly changed (here, simplified to "first time") | |
| is_first_input_or_reconfig_needed = active_video_out_stream is None | |
| # Local var for resized logo for current video dimensions | |
| current_logo_pil_resized = None | |
| logo_pos_x, logo_pos_y = 0,0 # Reset for each potential reconfig | |
| if is_first_input_or_reconfig_needed: | |
| active_video_out_stream = active_output_container.add_stream(session['video_codec'], rate=session['fps']) | |
| active_video_out_stream.width = target_width | |
| active_video_out_stream.height = target_height | |
| active_video_out_stream.pix_fmt = 'yuv420p' | |
| active_video_out_stream.codec_context.gop_size = session.get('gop_size', int(session['fps'] * 2)) | |
| active_video_out_stream.codec_context.bit_rate = parse_bitrate(session['video_bitrate']) | |
| active_video_out_stream.codec_context.thread_type = "AUTO" | |
| active_video_out_stream.codec_context.options = {'preset': session.get('ffmpeg_preset', 'medium')} | |
| if session['video_codec'] == 'copy': | |
| active_video_out_stream.codec_context.copy_parameters(in_v_s.codec_context) | |
| with lock: session['pyav_objects']['video_out_stream'] = active_video_out_stream | |
| append_user_live_log(chat_id, f"Video output stream configured: {target_width}x{target_height}@{session['fps']}fps, {session['video_bitrate']}") | |
| # Logo scaling and positioning based on this video stream's output height | |
| if logo_pil_original_ref: # The pre-opacity, original ratio PIL object | |
| logo_aspect_ratio = logo_pil_original_ref.width / logo_pil_original_ref.height | |
| logo_h = int(active_video_out_stream.height * session.get('logo_scale', 0.1)) | |
| logo_w = int(logo_h * logo_aspect_ratio) | |
| logo_w = min(logo_w, active_video_out_stream.width) # cap width | |
| logo_h = min(logo_h, active_video_out_stream.height) # cap height | |
| if logo_w > 0 and logo_h > 0: | |
| current_logo_pil_resized = logo_pil_original_ref.resize((logo_w, logo_h), Image.Resampling.LANCZOS) | |
| append_user_live_log(chat_id, f"Logo resized to {logo_w}x{logo_h} for current video item.") | |
| # Calculate position | |
| margin = 10 | |
| if session['logo_position'] == "top_right": | |
| logo_pos_x = active_video_out_stream.width - logo_w - margin | |
| logo_pos_y = margin | |
| elif session['logo_position'] == "top_left": | |
| logo_pos_x = margin; logo_pos_y = margin | |
| elif session['logo_position'] == "bottom_left": | |
| logo_pos_x = margin; logo_pos_y = active_video_out_stream.height - logo_h - margin | |
| elif session['logo_position'] == "bottom_right": | |
| logo_pos_x = active_video_out_stream.width - logo_w - margin | |
| logo_pos_y = active_video_out_stream.height - logo_h - margin | |
| elif session['logo_position'] == "center": | |
| logo_pos_x = (active_video_out_stream.width - logo_w) // 2 | |
| logo_pos_y = (active_video_out_stream.height - logo_h) // 2 | |
| else: # Default top_right | |
| logo_pos_x = active_video_out_stream.width - logo_w - margin; logo_pos_y = margin | |
| append_user_live_log(chat_id, f"Logo position set to ({logo_pos_x}, {logo_pos_y}) for current video item.") | |
| else: | |
| append_user_live_log(chat_id, "Logo resulted in 0 dimension after scaling for current video. Logo disabled for this item.") | |
| current_logo_pil_resized = None # Disable for this item | |
| # --- (Re)configure audio output stream --- | |
| if is_first_input_or_reconfig_needed and in_a_streams: | |
| in_a_s = in_a_streams[0] | |
| active_audio_out_stream = active_output_container.add_stream(session['audio_codec'], rate=in_a_s.rate) | |
| active_audio_out_stream.codec_context.bit_rate = parse_bitrate(session['audio_bitrate']) | |
| if session['audio_codec'] == 'copy': | |
| active_audio_out_stream.codec_context.copy_parameters(in_a_s.codec_context) | |
| with lock: session['pyav_objects']['audio_out_stream'] = active_audio_out_stream | |
| append_user_live_log(chat_id, f"Audio output stream configured: {in_a_s.rate}Hz, {session['audio_bitrate']}") | |
| elif is_first_input_or_reconfig_needed and not in_a_streams: | |
| append_user_live_log(chat_id, "No audio stream in input. Streaming video only.") | |
| active_audio_out_stream = None | |
| with lock: session['pyav_objects']['audio_out_stream'] = None | |
| with lock: | |
| if session['streaming_state'] != "paused": | |
| session['streaming_state'] = "streaming" | |
| # --- Packet Processing Loop (for current input URL) --- | |
| for packet in _current_input_container.demux(video=0, audio=0 if in_a_streams else -1): | |
| with lock: | |
| if session['streaming_state'] == "stopping": break | |
| while session['streaming_state'] == "paused": | |
| lock.release(); time.sleep(0.2); lock.acquire() | |
| if session['streaming_state'] == "stopping": break | |
| if session['streaming_state'] == "stopping": break | |
| if packet.dts is None: continue | |
| if packet.stream.type == 'video' and packet.stream.index == in_v_s.index: | |
| for frame in packet.decode(): | |
| if not frame.width or not frame.height: continue | |
| if frame.width != active_video_out_stream.width or frame.height != active_video_out_stream.height: | |
| frame = frame.reformat(active_video_out_stream.width, active_video_out_stream.height, format='yuv420p') | |
| if current_logo_pil_resized and active_video_out_stream: # Use the resized logo for *this* video item | |
| try: | |
| pil_frame = frame.to_image() | |
| # Ensure logo_w, logo_h from current_logo_pil_resized are used for bounds | |
| actual_x = min(logo_pos_x, pil_frame.width - current_logo_pil_resized.width) | |
| actual_y = min(logo_pos_y, pil_frame.height - current_logo_pil_resized.height) | |
| actual_x = max(0, actual_x) | |
| actual_y = max(0, actual_y) | |
| pil_frame.paste(current_logo_pil_resized, (actual_x, actual_y), current_logo_pil_resized) | |
| # frame = av.VideoFrame.from_image(pil_frame) # This creates a new frame | |
| # Preserve original frame's PTS if possible, or let encoder handle it | |
| new_pts = frame.pts # Keep original PTS | |
| frame = av.VideoFrame.from_image(pil_frame) | |
| frame.pts = new_pts | |
| except Exception as e_logo: | |
| append_user_live_log(chat_id, f"Error applying logo: {e_logo}.") | |
| for out_packet in active_video_out_stream.encode(frame): | |
| active_output_container.mux(out_packet) | |
| with lock: | |
| session['frames_encoded'] += 1 | |
| session['bytes_sent'] += out_packet.size | |
| elif active_audio_out_stream and packet.stream.type == 'audio' and packet.stream.index == in_a_streams[0].index : | |
| for frame in packet.decode(): | |
| if frame.pts is None: continue | |
| for out_packet in active_audio_out_stream.encode(frame): | |
| active_output_container.mux(out_packet) | |
| with lock: | |
| session['bytes_sent'] += out_packet.size | |
| else: | |
| append_user_live_log(chat_id, f"Finished processing input: {current_input_url}") | |
| except Exception as e_input: | |
| tb_str = traceback.format_exc() | |
| append_user_live_log(chat_id, f"Error processing input {current_input_url}: {e_input}\n{tb_str}") | |
| with lock: session['error_notification_user'] = f"Error with {current_input_url}: {e_input}" | |
| if session.get('stop_on_error_in_playlist', True): | |
| append_user_live_log(chat_id, "Stopping stream due to error in playlist and stop_on_error_in_playlist=True.") | |
| with lock: session['streaming_state'] = "stopping" | |
| break | |
| else: | |
| append_user_live_log(chat_id, "Skipping to next item in playlist (if any).") | |
| finally: | |
| if _current_input_container: # Use the local var for this input item | |
| try: _current_input_container.close() | |
| except Exception as e_c: append_user_live_log(chat_id, f"Minor error closing input container: {e_c}") | |
| # Clear from session too, as this specific input_container is done | |
| with lock: | |
| if session['pyav_objects']['input_container'] == _current_input_container: | |
| session['pyav_objects']['input_container'] = None | |
| with lock: | |
| if session['streaming_state'] == "stopping": break | |
| session['current_playlist_index'] += 1 | |
| with lock: | |
| if session['streaming_state'] == "stopping": break | |
| session['current_loop_iteration'] += 1 | |
| append_user_live_log(chat_id, "All playlist items and loops processed or stream stopped.") | |
| except Exception as e_stream: | |
| tb_str = traceback.format_exc() | |
| append_user_live_log(chat_id, f"Fatal stream engine error: {e_stream}\n{tb_str}") | |
| with lock: | |
| session['error_notification_user'] = f"Stream failed: {e_stream}" | |
| session['streaming_state'] = "error" | |
| finally: | |
| append_user_live_log(chat_id, "Stream engine finalizing...") | |
| # Retrieve output streams from session for flushing | |
| # This assumes active_output_container is still the one from the start of the stream. | |
| _final_output_container = None | |
| _final_video_out_stream = None | |
| _final_audio_out_stream = None | |
| with lock: | |
| _final_output_container = session['pyav_objects'].get('output_container') | |
| _final_video_out_stream = session['pyav_objects'].get('video_out_stream') | |
| _final_audio_out_stream = session['pyav_objects'].get('audio_out_stream') | |
| if _final_output_container: # Check if output container was successfully opened | |
| append_user_live_log(chat_id, "Flushing encoders...") | |
| if _final_video_out_stream: | |
| for out_packet in _final_video_out_stream.encode(): | |
| try: _final_output_container.mux(out_packet) | |
| except Exception as fe: append_user_live_log(chat_id, f"Err flushing V: {fe}") | |
| if _final_audio_out_stream: | |
| for out_packet in _final_audio_out_stream.encode(): | |
| try: _final_output_container.mux(out_packet) | |
| except Exception as fe: append_user_live_log(chat_id, f"Err flushing A: {fe}") | |
| append_user_live_log(chat_id, "Encoders flushed.") | |
| _cleanup_pyav_resources() | |
| with lock: | |
| final_state = "completed" if not session['error_notification_user'] and session['streaming_state'] != "error" else "error" | |
| if session['streaming_state'] == "stopping": | |
| final_state = "stopped" | |
| session['streaming_state'] = final_state | |
| session['stream_thread_ref'] = None | |
| append_user_live_log(chat_id, f"Stream engine finished. Final state: {final_state}.") | |
| logger.info(f"[Chat {chat_id}] Stream ended with state: {final_state}. Error: {session['error_notification_user']}") | |
| # --- Telegram Command and Callback Handlers --- | |
| # handle_telegram_update, CONVERSATION_STEPS*, start_settings_conversation, handle_conversation_step | |
| # handle_playlist_command, handle_playlist_config_interaction remain similar, | |
| # but they won't call save_user_settings_to_file. Changes are implicitly in-memory. | |
| # handle_logo_upload needs to read file bytes and store them. | |
| # handle_logo_config_interaction needs to reflect in-memory storage. | |
| # schedule_stream_job, handle_schedule_command, handle_schedule_config_interaction are fine as APScheduler will use MemoryJobStore. | |
| # Stream Control Command Handlers (start, pause, resume, abort) are mostly fine, no disk writes. | |
| # get_help_text needs to be updated about non-persistence. | |
| async def handle_telegram_update(update: dict): | |
| if "message" in update: | |
| message = update["message"] | |
| chat_id = message["chat"]["id"] | |
| user_name = message["from"].get("username", f"User_{chat_id}") | |
| session = get_user_session(chat_id) | |
| lock = session_locks[chat_id] | |
| text = message.get("text", "") | |
| command = text.split(' ')[0].lower() if text.startswith('/') else None | |
| if message.get("photo") or message.get("document"): | |
| if session.get("current_step") == "awaiting_logo_upload": | |
| # Need to pass the actual UploadFile if using FastAPI's file handling for webhook | |
| # For direct JSON, we'd get file_id and need a bot instance to download. | |
| # Assuming direct JSON from Telegram with file_id: | |
| return await handle_logo_data_from_telegram(chat_id, message) | |
| else: | |
| return send_message(chat_id, "Received a file, but I wasn't expecting one. Use /set_logo first if you want to upload a logo.") | |
| if command: | |
| logger.info(f"[Chat {chat_id} ({user_name})] Received command: {text}") | |
| if command == "/start" or command == "/menu": | |
| with lock: session['current_step'] = None | |
| return send_message(chat_id, compose_status_message(chat_id, include_config=True), reply_markup=get_main_keyboard(session), parse_mode="HTML") | |
| elif command == "/help": | |
| return send_message(chat_id, get_help_text(), parse_mode="Markdown") | |
| elif command == "/settings": | |
| return start_settings_conversation(chat_id, advanced=True) | |
| elif command == "/stream": | |
| return await start_stream_command_handler(chat_id) | |
| elif command == "/playlist": | |
| return await handle_playlist_command(chat_id, text) | |
| elif command == "/set_logo": | |
| with lock: session["current_step"] = "awaiting_logo_upload" | |
| return send_message(chat_id, "Please upload your logo image (PNG or JPG). Max 1MB recommended due to in-memory storage.") | |
| elif command == "/schedule": | |
| return await handle_schedule_command(chat_id, text) | |
| elif command == "/status": | |
| return send_message(chat_id, compose_status_message(chat_id, include_config=True), reply_markup=get_main_keyboard(session), parse_mode="HTML") | |
| elif command == "/abort": | |
| return await stream_abort_handler(chat_id) | |
| else: | |
| return await handle_conversation_step(chat_id, text) | |
| else: | |
| return await handle_conversation_step(chat_id, text) | |
| elif "callback_query" in update: | |
| cb_query = update["callback_query"] | |
| chat_id = cb_query["message"]["chat"]["id"] | |
| message_id = cb_query["message"]["message_id"] | |
| data = cb_query["data"] | |
| user_name = cb_query["from"].get("username", f"User_{chat_id}") | |
| session = get_user_session(chat_id) | |
| lock = session_locks[chat_id] | |
| logger.info(f"[Chat {chat_id} ({user_name})] Received callback: {data}") | |
| ack = answer_callback_query(cb_query["id"]) | |
| if data == "cfg_start": | |
| if not session.get('input_url_playlist') or not session.get('output_url'): | |
| # This is tricky as start_settings_conversation returns a message dict, not just text. | |
| settings_response = start_settings_conversation(chat_id, advanced=session.get('advanced_mode', False)) | |
| response_text = "Playlist or Output URL is not set. Let's configure.\n" + settings_response['text'] | |
| # If settings_response has its own reply_markup, it might conflict. | |
| # For simplicity, let's assume start_settings_conversation is primarily text. | |
| return [ack, send_message(chat_id, response_text, reply_markup=settings_response.get('reply_markup') or get_main_keyboard(session))] | |
| else: | |
| return [ack, await start_stream_command_handler(chat_id, message_id_to_edit=message_id)] | |
| elif data == "cfg_advanced": | |
| settings_response = start_settings_conversation(chat_id, advanced=True) | |
| return [ack, edit_message_text(chat_id, message_id, settings_response['text'] + "\n\n" + compose_status_message(chat_id, include_config=True), reply_markup=settings_response.get('reply_markup') or get_main_keyboard(session), parse_mode="HTML")] | |
| elif data == "cfg_playlist": | |
| return [ack, await handle_playlist_config_interaction(chat_id, message_id)] | |
| elif data == "cfg_logo": | |
| return [ack, await handle_logo_config_interaction(chat_id, message_id)] | |
| elif data == "cfg_schedule": | |
| return [ack, await handle_schedule_config_interaction(chat_id, message_id)] | |
| elif data == "stream_pause": | |
| return [ack, await stream_pause_handler(chat_id, message_id)] | |
| elif data == "stream_resume": | |
| return [ack, await stream_resume_handler(chat_id, message_id)] | |
| elif data == "stream_abort": | |
| return [ack, await stream_abort_handler(chat_id, message_id_to_edit=message_id)] | |
| elif data == "stream_status": | |
| return [ack, edit_message_text(chat_id, message_id, compose_status_message(chat_id, include_config=False), reply_markup=get_main_keyboard(session), parse_mode="HTML")] | |
| elif data == "stream_stop_loop_graceful": | |
| with lock: session['stop_gracefully_flag'] = True | |
| append_user_live_log(chat_id, "Graceful loop stop requested.") | |
| return [ack, edit_message_text(chat_id, message_id, "Graceful loop stop initiated. Current iteration will complete.\n" + compose_status_message(chat_id), reply_markup=get_main_keyboard(session), parse_mode="HTML")] | |
| elif data == "show_global_logs": | |
| global_logs_text = "📜 *Global Bot Logs (last 20):*\n<pre>" + "\n".join(live_log_lines_global[-20:]) + "</pre>" | |
| return [ack, edit_message_text(chat_id, message_id, global_logs_text + "\n\n" + compose_status_message(chat_id), reply_markup=get_main_keyboard(session), parse_mode="HTML")] | |
| elif data == "show_help": | |
| return [ack, edit_message_text(chat_id, message_id, get_help_text() + "\n\n" + compose_status_message(chat_id), reply_markup=get_main_keyboard(session))] | |
| elif data.startswith("set_quality_"): | |
| quality = data.split("_")[-1] | |
| with lock: | |
| session["quality_settings"] = quality | |
| if quality in QUALITY_SETTINGS_MAP and quality != "custom": | |
| for key, val in QUALITY_SETTINGS_MAP[quality].items(): | |
| session[key] = val | |
| session["current_step"] = None | |
| # save_user_settings_to_file(chat_id) # REMOVED | |
| return [ack, edit_message_text(chat_id, message_id, f"Quality set to `{quality}`.\n" + compose_status_message(chat_id, True), reply_markup=get_main_keyboard(session), parse_mode="HTML")] | |
| return [ack, send_message(chat_id, "Unknown action.")] | |
| return {"status": "ok", "message": "Update type not handled"} | |
| CONVERSATION_STEPS_SIMPLE = [ | |
| ("input_url_playlist", "Enter the first Input URL for your playlist (e.g., http://example.com/stream.m3u8). You can add more later.", validate_url), | |
| ("output_url", "Enter your RTMP Output URL (e.g., rtmp://a.rtmp.youtube.com/live2/YOUR_STREAM_KEY).", validate_url), | |
| ] | |
| CONVERSATION_STEPS_ADVANCED = [ | |
| ("input_url_playlist", "Enter the first Input URL for your playlist. You can add more later.", validate_url), | |
| ("output_url", "Enter your RTMP Output URL.", validate_url), | |
| ("quality_settings_prompt", "Choose Quality Preset: (low, medium, high, custom). Custom allows setting individual params next.", None), | |
| ("video_codec", f"Video Codec (e.g., libx264). Supported: {', '.join(SUPPORTED_VIDEO_CODECS)}.", lambda x: x in SUPPORTED_VIDEO_CODECS), | |
| ("audio_codec", f"Audio Codec (e.g., aac). Supported: {', '.join(SUPPORTED_AUDIO_CODECS)}.", lambda x: x in SUPPORTED_AUDIO_CODECS), | |
| ("resolution", "Resolution (e.g., 1280x720 or 'source').", validate_resolution), | |
| ("fps", "FPS (e.g., 30).", lambda x: str(x).isdigit() and 1 <= int(x) <= 120), | |
| ("video_bitrate", "Video Bitrate (e.g., 1500k, 3M).", lambda x: True), | |
| ("audio_bitrate", "Audio Bitrate (e.g., 128k).", lambda x: True), | |
| ("ffmpeg_preset", "FFmpeg Preset (for libx264: ultrafast, medium, slow etc.).", lambda x: True), | |
| ("loop_count", "Loop Count (0 for no loop, -1 for infinite, N for N times).", lambda x: str(x).lstrip('-').isdigit()), | |
| ] | |
| def start_settings_conversation(chat_id: int, advanced: bool): | |
| session = get_user_session(chat_id) | |
| lock = session_locks[chat_id] | |
| with lock: | |
| session['advanced_mode'] = advanced | |
| session['conversation_fields_list'] = CONVERSATION_STEPS_ADVANCED if advanced else CONVERSATION_STEPS_SIMPLE | |
| session['current_step_index'] = 0 | |
| session['current_step'] = session['conversation_fields_list'][0][0] | |
| first_field_key, first_prompt, _ = session['conversation_fields_list'][0] | |
| # Prepare initial message (send_message or part of edit_message_text) | |
| initial_message_content = {} | |
| if first_field_key == "quality_settings_prompt": | |
| initial_message_content = { | |
| "text": first_prompt, | |
| "reply_markup": { | |
| "inline_keyboard": [ | |
| [{"text": q.title(), "callback_data": f"set_quality_{q}"} for q in ["low", "medium", "high", "custom"]] | |
| ] | |
| } | |
| } | |
| else: | |
| current_value_msg = f"(Current: `{session.get(first_field_key, 'Not set')}`)" if session.get(first_field_key) else "" | |
| initial_message_content = {"text": f"{first_prompt}\n{current_value_msg}\n\nType /cancel to abort setup."} | |
| # This function is now called from callbacks too, so it needs to return a dict that can be used by send_message or edit_message_text | |
| return initial_message_content | |
| async def handle_conversation_step(chat_id: int, user_text: str): | |
| session = get_user_session(chat_id) | |
| lock = session_locks[chat_id] | |
| current_step_key_local = None | |
| validator_local = None | |
| current_step_index_local = -1 | |
| with lock: | |
| if not session.get('current_step') or not session.get('conversation_fields_list'): | |
| return send_message(chat_id, "No setup in progress. Use /start or a menu button.", reply_markup=get_main_keyboard(session)) | |
| if user_text.lower() == "/cancel": | |
| session['current_step'] = None | |
| session['current_step_index'] = 0 # Reset index | |
| return send_message(chat_id, "Setup cancelled.", reply_markup=get_main_keyboard(session)) | |
| # Ensure current_step_index is valid | |
| if not (0 <= session.get('current_step_index', -1) < len(session['conversation_fields_list'])): | |
| session['current_step'] = None # Invalid state, reset | |
| session['current_step_index'] = 0 | |
| logger.warning(f"[Chat {chat_id}] Invalid current_step_index, resetting conversation.") | |
| return send_message(chat_id, "Conversation state error, setup reset. Please start again.", reply_markup=get_main_keyboard(session)) | |
| current_step_index_local = session['current_step_index'] | |
| current_step_key_local, _, validator_local = session['conversation_fields_list'][current_step_index_local] | |
| user_input = user_text.strip() | |
| if validator_local and not validator_local(user_input): | |
| # Re-fetch prompt as it might have been clobbered if lock wasn't held long enough (should be fine here) | |
| prompt, _, _ = session['conversation_fields_list'][current_step_index_local] # Use local index | |
| return send_message(chat_id, f"Invalid input for {current_step_key_local.replace('_',' ').title()}. Please try again.\n{prompt}") | |
| with lock: | |
| # Re-verify current_step_key just in case, though primary logic relies on index | |
| if current_step_key_local != session['conversation_fields_list'][session['current_step_index']][0]: | |
| logger.error(f"[Chat {chat_id}] Conversation step mismatch! Aborting step.") | |
| return send_message(chat_id, "A state error occurred. Please try /cancel and restart setup.", reply_markup=get_main_keyboard(session)) | |
| if current_step_key_local == "input_url_playlist": | |
| session["input_url_playlist"] = [user_input] if user_input else [] | |
| elif current_step_key_local == "quality_settings_prompt": | |
| quality_input = user_input.lower() | |
| if quality_input in QUALITY_SETTINGS_MAP.keys() or quality_input == "custom": | |
| session["quality_settings"] = quality_input | |
| if quality_input in QUALITY_SETTINGS_MAP and quality_input != "custom": | |
| for key, val in QUALITY_SETTINGS_MAP[quality_input].items(): session[key] = val | |
| append_user_live_log(chat_id, f"Quality set to {quality_input} via text.") | |
| else: | |
| # This return is problematic inside a lock. Need to refactor or just set error and continue. | |
| # For now, let it pass, but ideally, validation handles this before lock. | |
| # Re-prompting from here is complex. | |
| logger.warning(f"[Chat {chat_id}] Invalid quality input '{quality_input}' in conversation step. Not applied.") | |
| elif current_step_key_local == "loop_count" or current_step_key_local == "fps": | |
| session[current_step_key_local] = int(user_input) | |
| else: | |
| session[current_step_key_local] = user_input | |
| append_user_live_log(chat_id, f"Set {current_step_key_local} = {user_input}") | |
| session['current_step_index'] += 1 | |
| if session['current_step_index'] < len(session['conversation_fields_list']): | |
| next_step_key, next_prompt, _ = session['conversation_fields_list'][session['current_step_index']] | |
| session['current_step'] = next_step_key | |
| if next_step_key == "quality_settings_prompt": | |
| return send_message(chat_id, next_prompt, reply_markup={ | |
| "inline_keyboard": [ | |
| [{"text": q.title(), "callback_data": f"set_quality_{q}"} for q in ["low", "medium", "high", "custom"]] | |
| ]}) | |
| current_value_msg = f"(Current: `{session.get(next_step_key, 'Not set')}`)" if session.get(next_step_key) else "" | |
| return send_message(chat_id, f"{next_prompt}\n{current_value_msg}\n\nType /cancel to abort setup.") | |
| else: | |
| session['current_step'] = None | |
| session['current_step_index'] = 0 | |
| # save_user_settings_to_file(chat_id) # REMOVED | |
| logger.info(f"[Chat {chat_id}] In-memory settings configured.") | |
| return send_message(chat_id, "All settings configured (in memory for this session)!\n" + compose_status_message(chat_id, True), reply_markup=get_main_keyboard(session), parse_mode="HTML") | |
| async def handle_playlist_command(chat_id: int, text: str): | |
| session = get_user_session(chat_id) | |
| lock = session_locks[chat_id] | |
| parts = text.split(maxsplit=2) | |
| action = parts[1].lower() if len(parts) > 1 else "show" | |
| msg = "" | |
| with lock: | |
| playlist = session.get('input_url_playlist', []) | |
| if action == "add" and len(parts) > 2: | |
| url_to_add = parts[2] | |
| if validate_url(url_to_add): | |
| playlist.append(url_to_add) | |
| # session['input_url_playlist'] = playlist # Modified in place | |
| msg = f"Added to playlist. Total items: {len(playlist)}." | |
| else: | |
| msg = "Invalid URL format." | |
| elif action == "remove" and len(parts) > 2: | |
| idx_str = parts[2] | |
| removed_item = None | |
| if idx_str.lower() == "last" and playlist: removed_item = playlist.pop() | |
| elif idx_str.isdigit(): | |
| idx_to_remove = int(idx_str) -1 | |
| if 0 <= idx_to_remove < len(playlist): removed_item = playlist.pop(idx_to_remove) | |
| if removed_item: msg = f"Removed '{removed_item[:50]}...' from playlist. Total items: {len(playlist)}." | |
| else: msg = "Invalid index or playlist empty." | |
| elif action == "clear": | |
| session['input_url_playlist'] = [] | |
| msg = "Playlist cleared." | |
| elif action == "show": | |
| if playlist: | |
| plist_str = "\n".join([f"{i+1}. `{url}`" for i, url in enumerate(playlist)]) | |
| msg = f"*Current Playlist ({len(playlist)} items):*\n{plist_str}" | |
| else: msg = "Playlist is empty." | |
| else: msg = "Usage: /playlist [add <url>|remove <index|last>|clear|show]" | |
| # save_user_settings_to_file(chat_id) # REMOVED (for playlist changes) | |
| return send_message(chat_id, msg, reply_markup=get_main_keyboard(session)) | |
| async def handle_playlist_config_interaction(chat_id: int, message_id: int): | |
| session = get_user_session(chat_id) | |
| playlist = session.get('input_url_playlist', []) | |
| msg = "" | |
| if playlist: | |
| plist_str = "\n".join([f"{i+1}. `{url}`" for i, url in enumerate(playlist)]) | |
| msg = f"*Current Playlist ({len(playlist)} items):*\n{plist_str}\n\nUse `/playlist add <url>`, `/playlist remove <index>`, `/playlist clear`." | |
| else: | |
| msg = "Playlist is empty. Use `/playlist add <url>` to add your first stream input." | |
| return edit_message_text(chat_id, message_id, msg + "\n\n" + compose_status_message(chat_id), reply_markup=get_main_keyboard(session)) | |
| async def handle_logo_data_from_telegram(chat_id: int, message: dict): | |
| """ | |
| This function is a placeholder. | |
| To get file bytes, the bot needs to: | |
| 1. Get `file_id` from `message` (photo or document). | |
| 2. Call Telegram API's `getFile` method with `file_id` to get a `file_path`. | |
| 3. Download the file content from `https://api.telegram.org/file/bot<YOUR_BOT_TOKEN>/<file_path>`. | |
| This webhook setup cannot make these outgoing API calls. | |
| If you are using a library like `python-telegram-bot` which handles the webhook and provides | |
| the downloaded file bytes directly (e.g. via `message.effective_attachment.download_as_bytearray()`), | |
| then you would use that. | |
| For this example, we'll simulate that the bytes are somehow obtained. | |
| """ | |
| session = get_user_session(chat_id) | |
| lock = session_locks[chat_id] | |
| # --- SIMULATION --- | |
| # In a real bot that can make API calls or using a full bot framework: | |
| # actual_logo_bytes = await download_file_from_telegram(file_id) | |
| # actual_mime_type = message["document"].get("mime_type") or "image/png" (if photo) | |
| # actual_filename = message["document"].get("file_name") or f"photo_logo_{chat_id}.png" | |
| # For now, we can't implement the download here. User will be told it's conceptual. | |
| # Simulate receiving some dummy data | |
| dummy_logo_bytes = b"dummy_logo_data_placeholder_not_a_real_image" | |
| dummy_mime_type = "image/png" | |
| dummy_filename = f"concept_logo_{chat_id}.png" | |
| # --- END SIMULATION --- | |
| # Check file size (conceptual, as we don't have real bytes here) | |
| MAX_LOGO_SIZE_BYTES = 1 * 1024 * 1024 # 1 MB | |
| # if len(actual_logo_bytes) > MAX_LOGO_SIZE_BYTES: | |
| # return send_message(chat_id, f"Logo is too large (max {MAX_LOGO_SIZE_BYTES/1024/1024:.1f} MB). Please upload a smaller one.") | |
| with lock: | |
| session['logo_data_bytes'] = dummy_logo_bytes # actual_logo_bytes | |
| session['logo_mime_type'] = dummy_mime_type # actual_mime_type | |
| session['logo_original_filename'] = dummy_filename # actual_filename | |
| session['logo_enabled'] = True | |
| session['current_step'] = None | |
| # save_user_settings_to_file(chat_id) # REMOVED | |
| append_user_live_log(chat_id, f"CONCEPTUAL: Logo data for '{dummy_filename}' stored in memory.") | |
| return send_message(chat_id, f"Conceptual: Logo '{dummy_filename}' data stored in memory and enabled! (Actual file download/processing requires full bot capabilities beyond this webhook example.)\nLogo settings (position, etc.) are in Advanced Settings.", reply_markup=get_main_keyboard(session)) | |
| async def handle_logo_config_interaction(chat_id: int, message_id: int): | |
| session = get_user_session(chat_id) | |
| msg_parts = ["*Logo Configuration (In-Memory):*"] | |
| if session.get('logo_data_bytes') and session.get('logo_enabled'): | |
| msg_parts.append(f"Status: Enabled (`{session.get('logo_original_filename', 'N/A')}`)") | |
| msg_parts.append(f"Position: `{session.get('logo_position')}` (Change in Advanced Settings)") | |
| msg_parts.append(f"Scale: `{session.get('logo_scale')}` (Change in Advanced Settings)") | |
| msg_parts.append(f"Opacity: `{session.get('logo_opacity')}` (Change in Advanced Settings)") | |
| msg_parts.append("\nTo change logo, send /set_logo then upload a new image.") | |
| msg_parts.append("To disable: `/configure_logo enabled false` (conceptual command).") | |
| elif session.get('logo_data_bytes') and not session.get('logo_enabled'): | |
| msg_parts.append(f"Status: Disabled (`{session.get('logo_original_filename', 'N/A')}`)") | |
| msg_parts.append("To enable: `/configure_logo enabled true` (conceptual command).") | |
| else: | |
| msg_parts.append("No logo uploaded/stored. Send /set_logo then upload an image.") | |
| return edit_message_text(chat_id, message_id, "\n".join(msg_parts) + "\n\n" + compose_status_message(chat_id), reply_markup=get_main_keyboard(session)) | |
| def schedule_stream_job(chat_id: int, trigger_time: datetime.datetime, job_name: str = None): | |
| session = get_user_session(chat_id) | |
| job_id = f"stream_{chat_id}_{int(trigger_time.timestamp())}" | |
| if not job_name: job_name = f"Scheduled Stream for {chat_id} at {trigger_time.strftime('%Y-%m-%d %H:%M')}" | |
| scheduler.add_job( | |
| start_stream_for_scheduler, | |
| 'date', | |
| run_date=trigger_time, | |
| args=[chat_id], | |
| id=job_id, | |
| name=job_name, | |
| replace_existing=True | |
| ) | |
| append_user_live_log(chat_id, f"Stream scheduled (in-memory): {job_name} (ID: {job_id}) at {trigger_time.strftime('%Y-%m-%d %H:%M:%S %Z')}") | |
| logger.info(f"[Chat {chat_id}] Job {job_id} scheduled (in-memory) for {trigger_time}") | |
| def start_stream_for_scheduler(chat_id: int): | |
| logger.info(f"[Scheduler][Chat {chat_id}] Triggering scheduled stream (in-memory).") | |
| session = get_user_session(chat_id) | |
| # Ensure the asyncio event loop is available if start_stream_command_handler is async | |
| # This can be tricky when APScheduler runs in a separate thread. | |
| # A common pattern is to use asyncio.run_coroutine_threadsafe if the scheduler | |
| # has access to the main event loop, or just run the core logic synchronously if possible. | |
| # For simplicity, let's assume start_stream_command_handler can be adapted or called carefully. | |
| # Simplified: this call will need an event loop if `start_stream_command_handler` is async. | |
| # If `start_stream_command_handler` is sync, it's fine. | |
| # Since it *is* async, this needs proper async handling from a sync thread. | |
| loop = asyncio.get_event_loop() # Get loop from the thread APScheduler is running in | |
| if loop.is_running(): | |
| asyncio.create_task(start_stream_command_handler(chat_id, is_scheduled=True)) | |
| else: # Fallback for environments where loop might not be running in APScheduler's thread | |
| asyncio.run(start_stream_command_handler(chat_id, is_scheduled=True)) | |
| logger.info(f"[Scheduler][Chat {chat_id}] Scheduled stream start attempt initiated.") | |
| async def handle_schedule_command(chat_id: int, text: str): | |
| parts = text.split(maxsplit=3) | |
| if len(parts) < 3: | |
| return send_message(chat_id, "Usage: `/schedule YYYY-MM-DD HH:MM:SS [Optional Job Name]`\nExample: `/schedule 2024-12-31 23:55:00 NYE`\nTimezone is UTC. Schedules are in-memory and lost on restart.") | |
| try: | |
| date_str, time_str = parts[1], parts[2] | |
| job_name = parts[3] if len(parts) > 3 else f"Stream for {chat_id}" | |
| trigger_time_naive = datetime.datetime.strptime(f"{date_str} {time_str}", "%Y-%m-%d %H:%M:%S") | |
| trigger_time_utc = trigger_time_naive.replace(tzinfo=datetime.timezone.utc) | |
| if trigger_time_utc <= datetime.datetime.now(datetime.timezone.utc): | |
| return send_message(chat_id, "Scheduled time must be in the future.") | |
| schedule_stream_job(chat_id, trigger_time_utc, job_name) | |
| return send_message(chat_id, f"Stream scheduled (in-memory) as '{job_name}' for {trigger_time_utc.strftime('%Y-%m-%d %H:%M:%S %Z')}. Current settings will be used.", reply_markup=get_main_keyboard(session)) | |
| except ValueError as e: | |
| return send_message(chat_id, f"Invalid date/time format: {e}. Use YYYY-MM-DD HH:MM:SS.") | |
| async def handle_schedule_config_interaction(chat_id: int, message_id: int): | |
| jobs = scheduler.get_jobs() | |
| user_jobs = [job for job in jobs if job.id.startswith(f"stream_{chat_id}_")] | |
| msg_parts = ["*Scheduled Streams (In-Memory - Lost on Restart):*"] | |
| if user_jobs: | |
| for job in user_jobs: | |
| run_time_str = job.next_run_time.strftime('%Y-%m-%d %H:%M:%S %Z') if job.next_run_time else "N/A" | |
| msg_parts.append(f"• `{job.name}` at `{run_time_str}` (ID: `{job.id}`)") | |
| msg_parts.append(f" To cancel: `/cancel_schedule {job.id}` (conceptual command).") | |
| else: | |
| msg_parts.append("No streams currently scheduled.") | |
| msg_parts.append("\nUse `/schedule YYYY-MM-DD HH:MM:SS [Name]` to schedule a new stream.") | |
| return edit_message_text(chat_id, message_id, "\n".join(msg_parts) + "\n\n" + compose_status_message(chat_id), reply_markup=get_main_keyboard(session)) | |
| async def start_stream_command_handler(chat_id: int, message_id_to_edit: int = None, is_scheduled: bool = False): | |
| session = get_user_session(chat_id) | |
| lock = session_locks[chat_id] | |
| response = None | |
| with lock: | |
| if session['streaming_state'] in ["streaming", "paused", "starting"]: | |
| msg = f"A stream is already active ({session['streaming_state']})." | |
| if message_id_to_edit: response = edit_message_text(chat_id, message_id_to_edit, msg + "\n" + compose_status_message(chat_id), reply_markup=get_main_keyboard(session), parse_mode="HTML") | |
| else: response = send_message(chat_id, msg, reply_markup=get_main_keyboard(session)) | |
| return response | |
| if not session.get('input_url_playlist') or not session.get('output_url'): | |
| msg = "Input playlist or Output URL is not set. Please configure first using /settings or menu." | |
| if message_id_to_edit: response = edit_message_text(chat_id, message_id_to_edit, msg + "\n" + compose_status_message(chat_id), reply_markup=get_main_keyboard(session), parse_mode="HTML") | |
| else: response = send_message(chat_id, msg, reply_markup=get_main_keyboard(session)) | |
| return response | |
| session['streaming_state'] = "starting" | |
| session['error_notification_user'] = "" | |
| # save_user_settings_to_file(chat_id) # REMOVED | |
| thread = threading.Thread(target=stream_engine_thread_target, args=(chat_id,), name=f"StreamThread-{chat_id}", daemon=True) | |
| session['stream_thread_ref'] = thread | |
| thread.start() | |
| append_user_live_log(chat_id, "Stream thread initiated.") | |
| await asyncio.sleep(0.2) | |
| status_msg = compose_status_message(chat_id) | |
| if is_scheduled and not message_id_to_edit: | |
| logger.info(f"[Chat {chat_id}] Scheduled stream started. Status: {status_msg[:100]}") | |
| # Conceptual proactive message: | |
| # await send_proactive_message(chat_id, "Your scheduled stream is now starting!\n" + status_msg) | |
| return {"status": "scheduled_stream_started_silently_inmemory"} | |
| elif message_id_to_edit: | |
| response = edit_message_text(chat_id, message_id_to_edit, "Stream starting...\n" + status_msg, reply_markup=get_main_keyboard(session), parse_mode="HTML") | |
| else: | |
| response = send_message(chat_id, "Stream starting...\n" + status_msg, reply_markup=get_main_keyboard(session), parse_mode="HTML") | |
| return response | |
| async def stream_pause_handler(chat_id: int, message_id: int): | |
| session = get_user_session(chat_id) | |
| lock = session_locks[chat_id] | |
| with lock: | |
| if session['streaming_state'] == "streaming": | |
| session['streaming_state'] = "paused" | |
| append_user_live_log(chat_id, "Stream paused by user.") | |
| msg = "Stream paused." | |
| else: | |
| msg = f"Stream is not streaming (state: {session['streaming_state']}). Cannot pause." | |
| return edit_message_text(chat_id, message_id, msg + "\n" + compose_status_message(chat_id), reply_markup=get_main_keyboard(session), parse_mode="HTML") | |
| async def stream_resume_handler(chat_id: int, message_id: int): | |
| session = get_user_session(chat_id) | |
| lock = session_locks[chat_id] | |
| with lock: | |
| if session['streaming_state'] == "paused": | |
| session['streaming_state'] = "streaming" | |
| append_user_live_log(chat_id, "Stream resumed by user.") | |
| msg = "Stream resumed." | |
| else: | |
| msg = f"Stream is not paused (state: {session['streaming_state']}). Cannot resume." | |
| return edit_message_text(chat_id, message_id, msg + "\n" + compose_status_message(chat_id), reply_markup=get_main_keyboard(session), parse_mode="HTML") | |
| async def stream_abort_handler(chat_id: int, message_id_to_edit: int = None): | |
| session = get_user_session(chat_id) | |
| lock = session_locks[chat_id] | |
| thread_to_join = None | |
| msg = "" | |
| with lock: | |
| if session['streaming_state'] in ["streaming", "paused", "starting"]: | |
| session['streaming_state'] = "stopping" | |
| thread_to_join = session.get('stream_thread_ref') | |
| append_user_live_log(chat_id, "Stream abort sequence initiated by user.") | |
| msg = "Stream aborting..." | |
| else: | |
| msg = f"No active stream to abort (state: {session['streaming_state']})." | |
| if message_id_to_edit: return edit_message_text(chat_id, message_id_to_edit, msg + "\n" + compose_status_message(chat_id), reply_markup=get_main_keyboard(session), parse_mode="HTML") | |
| return send_message(chat_id, msg, reply_markup=get_main_keyboard(session)) | |
| if thread_to_join and thread_to_join.is_alive(): | |
| logger.info(f"[Chat {chat_id}] Waiting for stream thread {thread_to_join.name} to join...") | |
| thread_to_join.join(timeout=7.0) | |
| if thread_to_join.is_alive(): | |
| append_user_live_log(chat_id, "Warning: Stream thread did not terminate cleanly after abort signal.") | |
| logger.warning(f"[Chat {chat_id}] Stream thread {thread_to_join.name} still alive after join timeout.") | |
| else: | |
| append_user_live_log(chat_id, "Stream thread terminated.") | |
| with lock: | |
| if session['streaming_state'] == "stopping": | |
| session['streaming_state'] = "stopped" | |
| session['stream_thread_ref'] = None | |
| final_status_msg = "Stream aborted.\n" + compose_status_message(chat_id) | |
| if message_id_to_edit: | |
| return edit_message_text(chat_id, message_id_to_edit, final_status_msg, reply_markup=get_main_keyboard(session), parse_mode="HTML") | |
| else: | |
| return send_message(chat_id, final_status_msg, reply_markup=get_main_keyboard(session), parse_mode="HTML") | |
| def get_help_text(): | |
| return ( | |
| f"*Advanced Stream Bot v{APP_VERSION} (In-Memory Edition) Help*\n\n" | |
| "⚠️ *IMPORTANT: All settings, uploaded logos, and schedules are stored in memory " | |
| "and will be LOST if the bot restarts due to environment restrictions.*\n\n" | |
| "Use the interactive menu buttons or the following commands:\n" | |
| "*/start* or */menu* - Show main menu and status.\n" | |
| "*/settings* - Start advanced configuration.\n" | |
| "*/stream* - Start stream with current settings.\n" | |
| "*/playlist add <url>* - Add URL to playlist.\n" | |
| "*/playlist remove <index|last>* - Remove URL from playlist.\n" | |
| "*/playlist clear* - Clear entire playlist.\n" | |
| "*/playlist show* - Display current playlist.\n" | |
| "*/set_logo* - Prompts to upload a logo image (stored in memory).\n" | |
| " (Logo settings like position, scale are in Advanced Settings via menu)\n" | |
| "*/schedule YYYY-MM-DD HH:MM:SS [Name]* - Schedule a stream (UTC time, in-memory).\n" | |
| " Example: `/schedule 2024-07-04 18:30:00 My Stream`\n" | |
| "*/status* - Show detailed stream status and config.\n" | |
| "*/abort* - Force stop the current stream.\n" | |
| "\n*While Streaming (via menu buttons):*\n" | |
| "• Pause/Resume Stream.\n" | |
| "• Abort Stream.\n" | |
| "• Status Update (refreshes message).\n" | |
| "• Stop Loop Gracefully (for infinite loops, current iteration completes).\n" | |
| "\n*Configuration Conversation:*\n" | |
| "When in a setup process (e.g., after /settings), reply to prompts.\n" | |
| "Type `/cancel` to exit setup anytime.\n" | |
| ) | |
| # --- FastAPI Webhook Endpoint --- | |
| import asyncio | |
| async def startup_event(): | |
| # load_all_sessions_on_startup() # REMOVED | |
| if not scheduler.running: | |
| scheduler.start() | |
| logger.info("APScheduler started (using MemoryJobStore).") | |
| logger.info(f"FastAPI application startup complete. Version: {APP_VERSION}. Data is IN-MEMORY.") | |
| logger.warning("IMPORTANT: All user settings, logos, and schedules will be lost on bot restart.") | |
| async def shutdown_event(): | |
| if scheduler.running: | |
| scheduler.shutdown() | |
| logger.info("APScheduler shutdown.") | |
| # No settings to save to file | |
| logger.info("FastAPI application shutdown. In-memory data will be lost.") | |
| async def telegram_webhook_endpoint(request: Request): | |
| try: | |
| update = await request.json() | |
| # logger.debug(f"Webhook received: {update}") # Can be too verbose | |
| response_data = await handle_telegram_update(update) | |
| if isinstance(response_data, list): | |
| main_response = None | |
| for r_item in response_data: | |
| if r_item and r_item.get("method") in ["sendMessage", "editMessageText", "sendPhoto", "sendDocument"]: # Added more types | |
| main_response = r_item | |
| break | |
| if not main_response: | |
| main_response = response_data[0] if response_data else {"status": "ok", "message": "Empty response list from handler"} | |
| return main_response | |
| return response_data | |
| except json.JSONDecodeError: | |
| logger.error("Invalid JSON received in webhook.") | |
| raise HTTPException(status_code=400, detail="Invalid JSON payload") | |
| except Exception as e: | |
| logger.error(f"Error processing webhook: {e}", exc_info=True) | |
| return {"status": "ok", "message": "Internal server error processing update."} # Ack to Telegram | |
| async def root(): | |
| return {"message": f"Advanced Stream Bot v{APP_VERSION} (In-Memory Edition) is running. Webhook is at /webhook."} | |
| # --- Main Execution --- | |
| # if __name__ == "__main__": | |
| # import uvicorn | |
| # logger.info(f"Starting Advanced Stream Bot v{APP_VERSION} (In-Memory Edition)...") | |
| # logger.warning("This version stores all data in memory. NO DATA WILL PERSIST ACROSS RESTARTS.") | |
| # uvicorn.run("__main__:app", host="127.0.0.1", port=8000, reload=True) |