| import logging |
| import threading |
| import time |
| import datetime |
| import traceback |
| import fractions |
|
|
| from fastapi import FastAPI, Request |
| import av |
|
|
| app = FastAPI() |
|
|
| |
| |
| |
| |
| |
| user_inputs = {} |
| |
| |
| |
| conversation_fields = [] |
| current_step = None |
| advanced_mode = False |
|
|
| |
| default_settings = { |
| "quality_settings": "medium", |
| "video_codec": "libx264", |
| "audio_codec": "aac", |
| "output_url": "rtmp://a.rtmp.youtube.com/live2" |
| } |
|
|
| |
| streaming_state = "idle" |
| stream_chat_id = None |
| stream_start_time = None |
| frames_encoded = 0 |
| bytes_sent = 0 |
|
|
| |
| video_stream = None |
| audio_stream_in = None |
| output_stream = None |
|
|
| |
| stream_thread = None |
| live_log_thread = None |
|
|
| |
| live_log_lines = [] |
| live_log_display = "" |
| error_notification = "" |
|
|
| |
| |
| |
| logging.basicConfig( |
| level=logging.DEBUG, |
| format="%(asctime)s [%(levelname)s] %(message)s", |
| datefmt="%Y-%m-%d %H:%M:%S", |
| ) |
| logger = logging.getLogger() |
|
|
| def append_live_log(line: str): |
| global live_log_lines |
| live_log_lines.append(line) |
| if len(live_log_lines) > 50: |
| live_log_lines.pop(0) |
|
|
| class ListHandler(logging.Handler): |
| def emit(self, record): |
| log_entry = self.format(record) |
| append_live_log(log_entry) |
|
|
| list_handler = ListHandler() |
| list_handler.setFormatter(logging.Formatter("%(asctime)s [%(levelname)s] %(message)s", "%Y-%m-%d %H:%M:%S")) |
| logger.addHandler(list_handler) |
|
|
| |
| |
| |
| def compose_streaming_message(): |
| global live_log_display, error_notification, streaming_state, frames_encoded, bytes_sent |
| stats = f"State: {streaming_state} | Uptime: {get_uptime()} | Frames: {frames_encoded} | Bytes: {bytes_sent}\n" |
| msg = "" |
| if error_notification: |
| msg += "<b>ERROR:</b> " + error_notification + "\n\n" |
| msg += "🚀 <b>Streaming in progress!</b>\n" |
| msg += stats + "\n" |
| msg += "Live Logs:\n" + live_log_display + "\n\nUse the inline keyboard to control the stream." |
| return msg |
|
|
| |
| |
| |
| def create_html_message(text: str): |
| |
| return {"parse_mode": "HTML", "text": f"<pre>{text}</pre>"} |
|
|
| def get_inline_keyboard_for_stream(): |
| |
| keyboard = { |
| "inline_keyboard": [ |
| [ |
| {"text": "⏸ Pause", "callback_data": "pause"}, |
| {"text": "▶️ Resume", "callback_data": "resume"}, |
| {"text": "⏹ Abort", "callback_data": "abort"} |
| ], |
| [ |
| {"text": "📊 Status", "callback_data": "status"} |
| ] |
| ] |
| } |
| return keyboard |
|
|
| def get_inline_keyboard_for_start(): |
| |
| keyboard = { |
| "inline_keyboard": [ |
| [ |
| {"text": "🚀 Start Streaming", "callback_data": "start_stream"} |
| ] |
| ] |
| } |
| return keyboard |
|
|
| def help_text(): |
| return ( |
| "*Stream Bot Help*\n\n" |
| "*/start* - Begin setup for streaming (simple mode: only Input & Output URL)\n" |
| "*/setting* - Enter advanced settings (Input URL, Quality Settings, Video Codec, Audio Codec, Output URL)\n" |
| "*/help* - Display this help text\n" |
| "*/logs* - Show the log history (live log display)\n\n" |
| "After inputs are collected, press the inline *Start Streaming* button.\n\n" |
| "While streaming, you can use inline buttons or commands:\n" |
| "*/pause* - Pause the stream\n" |
| "*/resume* - Resume a paused stream\n" |
| "*/abort* - Abort the stream\n" |
| "*/status* - Get current stream statistics" |
| ) |
|
|
| def send_guide_message(chat_id, message): |
| |
| logging.info(f"Sending message to chat {chat_id}: {message}") |
| return { |
| "method": "sendMessage", |
| "chat_id": chat_id, |
| "text": message, |
| "parse_mode": "Markdown" |
| } |
|
|
| def reset_statistics(): |
| global stream_start_time, frames_encoded, bytes_sent |
| stream_start_time = datetime.datetime.now() |
| frames_encoded = 0 |
| bytes_sent = 0 |
|
|
| def get_uptime(): |
| if stream_start_time: |
| uptime = datetime.datetime.now() - stream_start_time |
| return str(uptime).split('.')[0] |
| return "0" |
|
|
| def validate_inputs(): |
| |
| missing = [field for field in conversation_fields if field not in user_inputs or not user_inputs[field]] |
| if missing: |
| return False, f"Missing fields: {', '.join(missing)}" |
| return True, "" |
|
|
| |
| |
| |
| def notify_error(chat_id, error_message): |
| global error_notification |
| error_notification = error_message |
| logging.error(f"Error for chat {chat_id}: {error_message}") |
|
|
| |
| |
| |
| def live_log_updater(): |
| global live_log_display, streaming_state |
| try: |
| while streaming_state in ["streaming", "paused"]: |
| |
| live_log_display = "<pre>" + "\n".join(live_log_lines[-15:]) + "</pre>" |
| time.sleep(1) |
| except Exception as e: |
| logging.error(f"Error in live log updater: {e}") |
|
|
| |
| |
| |
| def logs_history(chat_id): |
| global live_log_display, error_notification |
| log_text = live_log_display if live_log_display else "<pre>No logs available yet.</pre>" |
| if error_notification: |
| if log_text.startswith("<pre>"): |
| log_text = f"<pre>ERROR: {error_notification}\n\n" + log_text[5:] |
| else: |
| log_text = f"<pre>ERROR: {error_notification}\n\n{log_text}</pre>" |
| return { |
| "method": "sendMessage", |
| "chat_id": chat_id, |
| "text": log_text, |
| "parse_mode": "HTML" |
| } |
|
|
| |
| |
| |
| def handle_start(chat_id): |
| global current_step, user_inputs, conversation_fields, advanced_mode |
| |
| user_inputs = {} |
| if not advanced_mode: |
| conversation_fields = ["input_url", "output_url"] |
| else: |
| conversation_fields = ["input_url", "quality_settings", "video_codec", "audio_codec", "output_url"] |
| current_step = conversation_fields[0] |
| text = ("👋 *Welcome to the Stream Bot!*\n\n" |
| "Let's set up your stream.\n" |
| f"Please enter the *{current_step.replace('_', ' ')}*" |
| f"{' (no default)' if current_step not in default_settings else f' _(default: {default_settings[current_step]})_'}:") |
| logging.info(f"/start command from chat {chat_id} (advanced_mode={advanced_mode})") |
| return { |
| "method": "sendMessage", |
| "chat_id": chat_id, |
| "text": text, |
| "parse_mode": "Markdown" |
| } |
|
|
| def handle_setting(chat_id): |
| global advanced_mode, conversation_fields, current_step, user_inputs |
| advanced_mode = True |
| conversation_fields = ["input_url", "quality_settings", "video_codec", "audio_codec", "output_url"] |
| user_inputs = {} |
| current_step = conversation_fields[0] |
| text = ("⚙️ *Advanced Mode Activated!*\n\n" |
| "Please enter the *input url*:") |
| logging.info(f"/setting command from chat {chat_id} - advanced mode enabled") |
| return { |
| "method": "sendMessage", |
| "chat_id": chat_id, |
| "text": text, |
| "parse_mode": "Markdown" |
| } |
|
|
| def handle_help(chat_id): |
| logging.info(f"/help command from chat {chat_id}") |
| return { |
| "method": "sendMessage", |
| "chat_id": chat_id, |
| "text": help_text(), |
| "parse_mode": "Markdown" |
| } |
|
|
| def handle_conversation(chat_id, text): |
| global current_step, user_inputs, conversation_fields |
| if current_step: |
| if text.strip() == "" and current_step in default_settings: |
| user_inputs[current_step] = default_settings[current_step] |
| logging.info(f"Using default for {current_step}: {default_settings[current_step]}") |
| else: |
| user_inputs[current_step] = text.strip() |
| logging.info(f"Received {current_step}: {text.strip()}") |
|
|
| idx = conversation_fields.index(current_step) |
| if idx < len(conversation_fields) - 1: |
| current_step = conversation_fields[idx + 1] |
| prompt = f"Please enter the *{current_step.replace('_', ' ')}*" |
| if current_step in default_settings: |
| prompt += f" _(default: {default_settings[current_step]})_" |
| return send_guide_message(chat_id, prompt) |
| else: |
| current_step = None |
| valid, msg = validate_inputs() |
| if not valid: |
| return send_guide_message(chat_id, f"Validation error: {msg}") |
| if not advanced_mode: |
| user_inputs.setdefault("quality_settings", default_settings["quality_settings"]) |
| user_inputs.setdefault("video_codec", default_settings["video_codec"]) |
| user_inputs.setdefault("audio_codec", default_settings["audio_codec"]) |
| return { |
| "method": "sendMessage", |
| "chat_id": chat_id, |
| "text": "All inputs received. Press *🚀 Start Streaming* to begin.", |
| "reply_markup": get_inline_keyboard_for_start(), |
| "parse_mode": "Markdown" |
| } |
| else: |
| return send_guide_message(chat_id, "Unrecognized input. Type /help for available commands.") |
|
|
| |
| |
| |
| def stream_to_youtube(input_url, quality_settings, video_codec, audio_codec, output_url, chat_id): |
| global video_stream, audio_stream_in, output_stream, streaming_state, frames_encoded, bytes_sent |
| logging.info("Initiating streaming to YouTube") |
| try: |
| streaming_state = "streaming" |
| reset_statistics() |
|
|
| input_stream = av.open(input_url) |
| output_stream = av.open(output_url, mode='w', format='flv') |
|
|
| |
| video_stream = output_stream.add_stream(video_codec, rate=30) |
| video_stream.width = input_stream.streams.video[0].width |
| video_stream.height = input_stream.streams.video[0].height |
| video_stream.pix_fmt = input_stream.streams.video[0].format.name |
| video_stream.codec_context.options.update({'g': '30'}) |
|
|
| if quality_settings.lower() == "high": |
| video_stream.bit_rate = 3000000 |
| video_stream.bit_rate_tolerance = 1000000 |
| elif quality_settings.lower() == "medium": |
| video_stream.bit_rate = 1500000 |
| video_stream.bit_rate_tolerance = 500000 |
| elif quality_settings.lower() == "low": |
| video_stream.bit_rate = 800000 |
| video_stream.bit_rate_tolerance = 200000 |
|
|
| |
| audio_stream_in = input_stream.streams.audio[0] |
| out_audio_stream = output_stream.add_stream(audio_codec, rate=audio_stream_in.rate) |
| out_audio_stream.layout = "stereo" |
|
|
| |
| video_stream.codec_context.time_base = fractions.Fraction(1, 30) |
|
|
| logging.info("Streaming started successfully.") |
|
|
| |
| global live_log_thread |
| if live_log_thread is None or not live_log_thread.is_alive(): |
| live_log_thread = threading.Thread(target=live_log_updater) |
| live_log_thread.daemon = True |
| live_log_thread.start() |
| logging.info("Live log updater thread started.") |
|
|
| |
| while streaming_state in ["streaming", "paused"]: |
| for packet in input_stream.demux(): |
| if streaming_state == "stopped": |
| break |
| if packet.stream == input_stream.streams.video[0]: |
| for frame in packet.decode(): |
| if streaming_state == "paused": |
| time.sleep(0.5) |
| continue |
| for out_packet in video_stream.encode(frame): |
| output_stream.mux(out_packet) |
| frames_encoded += 1 |
| if hasattr(out_packet, "size"): |
| bytes_sent += out_packet.size |
| elif packet.stream == audio_stream_in: |
| for frame in packet.decode(): |
| if streaming_state == "paused": |
| time.sleep(0.5) |
| continue |
| for out_packet in out_audio_stream.encode(frame): |
| output_stream.mux(out_packet) |
| if hasattr(out_packet, "size"): |
| bytes_sent += out_packet.size |
|
|
| |
| for out_packet in video_stream.encode(): |
| output_stream.mux(out_packet) |
| for out_packet in out_audio_stream.encode(): |
| output_stream.mux(out_packet) |
|
|
| if streaming_state == "paused": |
| time.sleep(1) |
|
|
| |
| try: |
| video_stream.close() |
| out_audio_stream.close() |
| output_stream.close() |
| input_stream.close() |
| except Exception as cleanup_error: |
| logging.error(f"Error during cleanup: {cleanup_error}") |
|
|
| logging.info("Streaming complete, resources cleaned up.") |
| streaming_state = "idle" |
| except Exception as e: |
| error_message = f"An error occurred during streaming: {str(e)}\n\n{traceback.format_exc()}" |
| logging.error(error_message) |
| streaming_state = "idle" |
| notify_error(chat_id, error_message) |
|
|
| def start_streaming(chat_id): |
| global stream_thread, stream_chat_id |
| valid, msg = validate_inputs() |
| if not valid: |
| return send_guide_message(chat_id, f"Validation error: {msg}") |
|
|
| stream_chat_id = chat_id |
| try: |
| stream_thread = threading.Thread( |
| target=stream_to_youtube, |
| args=( |
| user_inputs["input_url"], |
| user_inputs["quality_settings"], |
| user_inputs["video_codec"], |
| user_inputs["audio_codec"], |
| user_inputs["output_url"], |
| chat_id, |
| ) |
| ) |
| stream_thread.daemon = True |
| stream_thread.start() |
| logging.info("Streaming thread started.") |
| |
| return { |
| "method": "sendMessage", |
| "chat_id": chat_id, |
| "text": "🚀 <b>Streaming initiated!</b>\n\n" + compose_streaming_message(), |
| "reply_markup": get_inline_keyboard_for_stream(), |
| "parse_mode": "HTML" |
| } |
| except Exception as e: |
| error_message = f"Failed to start streaming: {str(e)}" |
| logging.error(error_message) |
| notify_error(chat_id, error_message) |
| return send_guide_message(chat_id, error_message) |
|
|
| |
| |
| |
| def pause_stream(chat_id): |
| global streaming_state |
| if streaming_state == "streaming": |
| streaming_state = "paused" |
| logging.info("Streaming paused.") |
| return { |
| "method": "sendMessage", |
| "chat_id": chat_id, |
| "text": "⏸ <b>Streaming paused.</b>", |
| "parse_mode": "HTML" |
| } |
| return send_guide_message(chat_id, "Streaming is not active.") |
|
|
| def resume_stream(chat_id): |
| global streaming_state |
| if streaming_state == "paused": |
| streaming_state = "streaming" |
| logging.info("Streaming resumed.") |
| return { |
| "method": "sendMessage", |
| "chat_id": chat_id, |
| "text": "▶️ <b>Streaming resumed.</b>", |
| "parse_mode": "HTML" |
| } |
| return send_guide_message(chat_id, "Streaming is not paused.") |
|
|
| def abort_stream(chat_id): |
| global streaming_state |
| if streaming_state in ["streaming", "paused"]: |
| streaming_state = "stopped" |
| logging.info("Streaming aborted by user.") |
| return { |
| "method": "sendMessage", |
| "chat_id": chat_id, |
| "text": "⏹ <b>Streaming aborted.</b>", |
| "parse_mode": "HTML" |
| } |
| return send_guide_message(chat_id, "No active streaming to abort.") |
|
|
| def stream_status(chat_id): |
| stats = ( |
| f"*Stream Status:*\n\n" |
| f"• **State:** {streaming_state}\n" |
| f"• **Uptime:** {get_uptime()}\n" |
| f"• **Frames Encoded:** {frames_encoded}\n" |
| f"• **Bytes Sent:** {bytes_sent}\n" |
| ) |
| return { |
| "method": "sendMessage", |
| "chat_id": chat_id, |
| "text": stats, |
| "parse_mode": "Markdown" |
| } |
|
|
| |
| |
| |
| @app.post("/webhook") |
| async def telegram_webhook(request: Request): |
| update = await request.json() |
| logging.debug(f"Received update: {update}") |
|
|
| |
| if "message" in update: |
| chat_id = update["message"]["chat"]["id"] |
| text = update["message"].get("text", "").strip() |
|
|
| if text.startswith("/setting"): |
| return handle_setting(chat_id) |
| elif text.startswith("/start"): |
| return handle_start(chat_id) |
| elif text.startswith("/help"): |
| return handle_help(chat_id) |
| elif text.startswith("/logs"): |
| return logs_history(chat_id) |
| elif text.startswith("/pause"): |
| return pause_stream(chat_id) |
| elif text.startswith("/resume"): |
| return resume_stream(chat_id) |
| elif text.startswith("/abort"): |
| return abort_stream(chat_id) |
| elif text.startswith("/status"): |
| return stream_status(chat_id) |
| else: |
| return handle_conversation(chat_id, text) |
|
|
| |
| elif "callback_query" in update: |
| callback_data = update["callback_query"]["data"] |
| chat_id = update["callback_query"]["message"]["chat"]["id"] |
| message_id = update["callback_query"]["message"]["message_id"] |
|
|
| if callback_data == "pause": |
| response = pause_stream(chat_id) |
| elif callback_data == "resume": |
| response = resume_stream(chat_id) |
| elif callback_data == "abort": |
| response = abort_stream(chat_id) |
| elif callback_data == "status": |
| response = stream_status(chat_id) |
| elif callback_data == "start_stream": |
| response = start_streaming(chat_id) |
| else: |
| response = send_guide_message(chat_id, "❓ Unknown callback command.") |
|
|
| |
| if callback_data in ["pause", "resume", "abort", "status", "start_stream"]: |
| response["method"] = "editMessageText" |
| response["message_id"] = message_id |
| response["text"] = compose_streaming_message() |
| response["parse_mode"] = "HTML" |
| response["reply_markup"] = get_inline_keyboard_for_stream() |
| return response |
|
|
| return {"status": "ok"} |
|
|