AstraOS commited on
Commit
b0d6e6a
Β·
verified Β·
1 Parent(s): f304f06

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +137 -61
app.py CHANGED
@@ -1,5 +1,11 @@
1
  """
2
  Advanced Stream Bot β€” v3.3.0
 
 
 
 
 
 
3
  - Live view: status+log bubble auto-edits every 2s (logg.py pattern)
4
  - Pause/Resume/Refresh/Close live view controls
5
  - Force Reboot button + /reboot command β€” last-resort recovery
@@ -11,9 +17,6 @@ Advanced Stream Bot β€” v3.3.0
11
  - Fixed: stale edits never pile up in outbound queue (cap=1 per message)
12
  - SSE /events/{chat_id} for real-time web dashboard
13
  - Scheduler-triggered streams queue outbound notifications
14
- - NEW: Raw FFmpeg/libav log lines piped into user live log (bitrate, fps, codec init, errors)
15
- - Fixed: stats now pushed every frame for first 300 frames, then every 100
16
- - Fixed: delayed live-view force-refresh 4s after stream starts so stats show non-zero
17
  """
18
 
19
  import logging
@@ -156,6 +159,9 @@ DEFAULT_USER_SETTINGS = {
156
  "logo_opacity": 0.85,
157
  "logo_margin_px": 10,
158
 
 
 
 
159
  # Conversation state
160
  "current_step": None,
161
  "current_step_index": 0,
@@ -394,7 +400,7 @@ def _build_live_view_text(chat_id: int, mode: str = "status") -> str:
394
  else:
395
  status = compose_status_message(chat_id, include_config=False, include_logs=False)
396
  logs = session.get('live_log_lines_user', [])
397
- log_tail = "\n".join(logs[-8:]) if logs else "No logs yet."
398
  pause_note = "\n\n<i>⏸ Auto-updates paused β€” tap ▢️ Resume to continue</i>" if is_paused else ""
399
 
400
  # Only show log tail while actively streaming
@@ -687,6 +693,7 @@ def get_settings_keyboard(session: dict = None):
687
  {"text": "πŸ”„ Auto-Reconnect", "callback_data": "toggle_reconnect"}],
688
  [{"text": "πŸ›‘ Stop on Error", "callback_data": "toggle_stop_on_error"},
689
  {"text": "⏰ Open Timeout", "callback_data": "set_open_timeout"}],
 
690
  [{"text": "🎨 Quality Preset", "callback_data": "pick_quality_preset"}],
691
  [{"text": ux_label, "callback_data": "toggle_ux_mode"}],
692
  [{"text": "βœ… Done", "callback_data": "settings_done"}],
@@ -824,6 +831,7 @@ def format_settings_display(session: dict) -> str:
824
 
825
  auto_rc = "βœ…" if session.get('reconnect_on_stream_error') else "❌"
826
  stop_err = "βœ…" if session.get('stop_on_error_in_playlist') else "❌"
 
827
 
828
  lines = [
829
  f"πŸ“‘ <b>Output URL:</b> <code>{esc(session.get('output_url', 'β€”'))}</code>",
@@ -838,6 +846,7 @@ def format_settings_display(session: dict) -> str:
838
  f"πŸ” <b>Loop:</b> <code>{loop_disp}</code>",
839
  f"πŸ”„ <b>Auto-Reconnect:</b> {auto_rc} (delay: <code>{session.get('reconnect_delay_seconds')}s</code>, max: <code>{session.get('max_reconnect_attempts')}</code>)",
840
  f"πŸ›‘ <b>Stop on Error:</b> {stop_err}",
 
841
  f"⏱ <b>Timeouts:</b> open <code>{session.get('open_timeout_seconds')}s</code> read <code>{session.get('read_timeout_seconds')}s</code>",
842
  f"πŸ–Ό <b>Logo:</b> {logo_status}",
843
  ]
@@ -1040,54 +1049,50 @@ def stream_engine_thread_target(chat_id: int):
1040
 
1041
  active_output_container = None
1042
 
1043
- # ── Raw FFmpeg/libav log capture ──────────────────────────────────────────
1044
- # PyAV exposes av.logging.set_log_level() and av.logging.log_callback so we
1045
- # can intercept every libav log line (mux stats, bitrate, fps, speed, codec
1046
- # messages, errors) and pipe them straight into the user's live log.
1047
- #
1048
- # Level map (libav levels):
1049
- # QUIET=-8 PANIC=0 FATAL=8 ERROR=16 WARNING=24 INFO=32
1050
- # VERBOSE=40 DEBUG=48 TRACE=56
1051
- #
1052
- # We use INFO (32) so we get: stream open/close, mux progress, codec init,
1053
- # and error messages β€” without drowning in debug noise.
1054
- _LIBAV_LEVEL_NAMES = {
1055
- -8: "QUIET", 0: "PANIC", 8: "FATAL", 16: "ERROR",
1056
- 24: "WARN", 32: "INFO", 40: "VERBOSE", 48: "DEBUG", 56: "TRACE",
1057
- }
1058
- _libav_log_active = threading.Event()
1059
- _libav_log_active.set()
1060
-
1061
- # Deduplicate repeated identical lines (libav repeats some lines many times)
1062
- _libav_last_line = {"v": ""}
1063
-
1064
- def _libav_log_callback(level, message, _user_ptr=None):
1065
- if not _libav_log_active.is_set():
1066
- return
1067
- msg = message.strip()
1068
- if not msg:
1069
- return
1070
- # Skip pure noise lines
1071
- if msg in ("Past duration 0.999992 too large",) or msg.startswith("deprecated pixel format"):
1072
- return
1073
- # Collapse duplicate consecutive lines
1074
- if msg == _libav_last_line["v"]:
1075
- return
1076
- _libav_last_line["v"] = msg
1077
-
1078
- lvl_name = _LIBAV_LEVEL_NAMES.get(level, f"L{level}")
1079
- # Only prefix with level tag for warnings/errors; keep info lines clean
1080
- if level <= 24: # WARNING and above
1081
- entry = f"[ffmpeg/{lvl_name}] {msg}"
1082
- else:
1083
- entry = f"[ffmpeg] {msg}"
1084
- append_user_live_log(chat_id, entry)
1085
 
 
 
1086
  try:
1087
- av.logging.set_log_level(av.logging.INFO)
1088
- av.logging.set_log_callback(_libav_log_callback)
1089
- except Exception as _e_log:
1090
- logger.warning(f"Could not set av log callback: {_e_log}")
1091
  # ─────────────────────────────────────────────────────────────────────────
1092
 
1093
  try:
@@ -1102,11 +1107,16 @@ def stream_engine_thread_target(chat_id: int):
1102
  session['stop_gracefully_flag'] = False
1103
  session['reconnect_attempt'] = 0
1104
  session['last_frame_time'] = None
 
1105
  session['pyav_objects'] = {
1106
  "input_container": None, "output_container": None,
1107
  "video_out_stream": None, "audio_out_stream": None,
1108
  "logo_image_pil": None,
1109
  }
 
 
 
 
1110
 
1111
  append_user_live_log(chat_id, f"Stream engine started β†’ {session['output_url']}")
1112
 
@@ -1323,15 +1333,22 @@ def stream_engine_thread_target(chat_id: int):
1323
  except Exception as e_logo:
1324
  pass
1325
 
 
 
 
 
 
 
 
 
 
 
 
 
1326
  for out_pkt in active_video_out_stream.encode(frame):
1327
  active_output_container.mux(out_pkt)
1328
  with lock:
1329
- session['frames_encoded'] += 1
1330
  session['bytes_sent'] += out_pkt.size
1331
- session['last_frame_time'] = datetime.datetime.now()
1332
- # Push live stats to SSE subscribers every 100 frames
1333
- if session['frames_encoded'] % 100 == 0:
1334
- threading.Thread(target=_push_state_sse, args=(chat_id,), daemon=True).start()
1335
 
1336
  elif active_audio_out_stream and packet.stream.type == 'audio' and in_a_streams and packet.stream.index == in_a_streams[0].index:
1337
  for frame in packet.decode():
@@ -1428,12 +1445,12 @@ def stream_engine_thread_target(chat_id: int):
1428
  notify_state_change(chat_id, "error", f"Fatal error: {esc(str(e_fatal))}")
1429
 
1430
  finally:
1431
- # Stop raw FFmpeg log capture first so no stray lines land after teardown
1432
- _libav_log_active.clear()
1433
  try:
1434
  av.logging.restore_default_callback()
1435
  except Exception:
1436
  pass
 
1437
 
1438
  append_user_live_log(chat_id, "Finalizing stream…")
1439
 
@@ -2588,17 +2605,28 @@ async def _handle_update_inner(update: dict):
2588
  f"Expected: {esc(desc)}\n\n"
2589
  f"Type the new value now, or /cancel to abort.")]
2590
 
2591
- elif data in ("toggle_reconnect", "toggle_stop_on_error"):
2592
  field_map = {
2593
- "toggle_reconnect": "reconnect_on_stream_error",
2594
  "toggle_stop_on_error": "stop_on_error_in_playlist",
 
 
 
 
 
 
2595
  }
2596
  field = field_map[data]
2597
  with lock:
2598
- session[field] = not session.get(field, True)
2599
  new_val = session[field]
 
 
 
 
 
2600
  return [ack, edit(
2601
- f"βœ… <b>{field}</b> β†’ <code>{'on' if new_val else 'off'}</code>",
2602
  get_settings_keyboard(session))]
2603
 
2604
  # Playlist view
@@ -3109,6 +3137,54 @@ async def get_status_endpoint(chat_id: int):
3109
  }
3110
 
3111
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
3112
  # ──────────────────────────────────────────────
3113
  # MAIN
3114
  # ──────────────────────────────────────────────
 
1
  """
2
  Advanced Stream Bot β€” v3.3.0
3
+ - Fixed: frames_encoded now counts decoded input frames (encoder buffering no longer causes 0)
4
+ - Fixed: live_log_lines_user cleared on stream start so stale logs don't persist
5
+ - Fixed: last_sent reset on stream start so bubble refreshes immediately
6
+ - Fixed: raw FFmpeg logs via queue-drained thread (bitrate, fps, codec init, errors)
7
+ - Fixed: log tail display increased 8β†’15 lines
8
+ - New: GET /poll/{chat_id} endpoint β€” returns pending live-view edit for proactive refresh
9
  - Live view: status+log bubble auto-edits every 2s (logg.py pattern)
10
  - Pause/Resume/Refresh/Close live view controls
11
  - Force Reboot button + /reboot command β€” last-resort recovery
 
17
  - Fixed: stale edits never pile up in outbound queue (cap=1 per message)
18
  - SSE /events/{chat_id} for real-time web dashboard
19
  - Scheduler-triggered streams queue outbound notifications
 
 
 
20
  """
21
 
22
  import logging
 
159
  "logo_opacity": 0.85,
160
  "logo_margin_px": 10,
161
 
162
+ # Logging
163
+ "verbose_ffmpeg_log": False, # show raw FFmpeg/libav lines in live log
164
+
165
  # Conversation state
166
  "current_step": None,
167
  "current_step_index": 0,
 
400
  else:
401
  status = compose_status_message(chat_id, include_config=False, include_logs=False)
402
  logs = session.get('live_log_lines_user', [])
403
+ log_tail = "\n".join(logs[-15:]) if logs else "No logs yet."
404
  pause_note = "\n\n<i>⏸ Auto-updates paused β€” tap ▢️ Resume to continue</i>" if is_paused else ""
405
 
406
  # Only show log tail while actively streaming
 
693
  {"text": "πŸ”„ Auto-Reconnect", "callback_data": "toggle_reconnect"}],
694
  [{"text": "πŸ›‘ Stop on Error", "callback_data": "toggle_stop_on_error"},
695
  {"text": "⏰ Open Timeout", "callback_data": "set_open_timeout"}],
696
+ [{"text": "πŸ“‹ Verbose FFmpeg Log: " + ("βœ… On" if (session or {}).get("verbose_ffmpeg_log") else "❌ Off"), "callback_data": "toggle_verbose_log"}],
697
  [{"text": "🎨 Quality Preset", "callback_data": "pick_quality_preset"}],
698
  [{"text": ux_label, "callback_data": "toggle_ux_mode"}],
699
  [{"text": "βœ… Done", "callback_data": "settings_done"}],
 
831
 
832
  auto_rc = "βœ…" if session.get('reconnect_on_stream_error') else "❌"
833
  stop_err = "βœ…" if session.get('stop_on_error_in_playlist') else "❌"
834
+ verbose_log = "βœ… On" if session.get('verbose_ffmpeg_log') else "❌ Off"
835
 
836
  lines = [
837
  f"πŸ“‘ <b>Output URL:</b> <code>{esc(session.get('output_url', 'β€”'))}</code>",
 
846
  f"πŸ” <b>Loop:</b> <code>{loop_disp}</code>",
847
  f"πŸ”„ <b>Auto-Reconnect:</b> {auto_rc} (delay: <code>{session.get('reconnect_delay_seconds')}s</code>, max: <code>{session.get('max_reconnect_attempts')}</code>)",
848
  f"πŸ›‘ <b>Stop on Error:</b> {stop_err}",
849
+ f"πŸ“‹ <b>Verbose FFmpeg Log:</b> {verbose_log}",
850
  f"⏱ <b>Timeouts:</b> open <code>{session.get('open_timeout_seconds')}s</code> read <code>{session.get('read_timeout_seconds')}s</code>",
851
  f"πŸ–Ό <b>Logo:</b> {logo_status}",
852
  ]
 
1049
 
1050
  active_output_container = None
1051
 
1052
+ # ── Raw FFmpeg/libav log capture via queue ────────────────────────────────
1053
+ # PyAV's av.logging.set_log_callback installs a C-level callback. We route
1054
+ # every libav message into a thread-safe queue and drain it in a background
1055
+ # thread so we never call Python from a C signal context.
1056
+ _fflog_queue: queue.Queue = queue.Queue(maxsize=500)
1057
+ _fflog_stop = threading.Event()
1058
+
1059
+ def _fflog_drain():
1060
+ _LEVELS = {0:"PANIC",8:"FATAL",16:"ERROR",24:"WARN",32:"INFO",40:"VERBOSE"}
1061
+ _NOISE = ("Past duration", "deprecated pixel format", "DTS", "PTS", "non monotonous")
1062
+ _last = ""
1063
+ while not _fflog_stop.is_set() or not _fflog_queue.empty():
1064
+ try:
1065
+ level, msg = _fflog_queue.get(timeout=0.3)
1066
+ except queue.Empty:
1067
+ continue
1068
+ msg = msg.strip()
1069
+ if not msg or msg == _last:
1070
+ continue
1071
+ # Verbose mode off β†’ only pass warnings/errors (level <= 24)
1072
+ verbose_on = session.get('verbose_ffmpeg_log', False)
1073
+ if not verbose_on and level > 24:
1074
+ continue
1075
+ # Skip high-volume noise regardless of verbose setting
1076
+ if any(s in msg for s in _NOISE):
1077
+ continue
1078
+ _last = msg
1079
+ lvl = _LEVELS.get(level, f"L{level}")
1080
+ prefix = f"[ffmpeg/{lvl}]" if level <= 24 else "[ffmpeg]"
1081
+ append_user_live_log(chat_id, f"{prefix} {msg}")
1082
+
1083
+ def _fflog_callback(level, message):
1084
+ try:
1085
+ _fflog_queue.put_nowait((level, message))
1086
+ except queue.Full:
1087
+ pass
 
 
 
 
 
 
1088
 
1089
+ _fflog_thread = threading.Thread(target=_fflog_drain, name=f"FFlogDrain-{chat_id}", daemon=True)
1090
+ _fflog_thread.start()
1091
  try:
1092
+ av.logging.set_log_level(av.logging.VERBOSE)
1093
+ av.logging.set_log_callback(_fflog_callback)
1094
+ except Exception as _e_avlog:
1095
+ append_user_live_log(chat_id, f"[warn] av log callback unavailable: {_e_avlog}")
1096
  # ─────────────────────────────────────────────────────────────────────────
1097
 
1098
  try:
 
1107
  session['stop_gracefully_flag'] = False
1108
  session['reconnect_attempt'] = 0
1109
  session['last_frame_time'] = None
1110
+ session['live_log_lines_user'] = [] # clear stale logs from previous run
1111
  session['pyav_objects'] = {
1112
  "input_container": None, "output_container": None,
1113
  "video_out_stream": None, "audio_out_stream": None,
1114
  "logo_image_pil": None,
1115
  }
1116
+ # Also reset last_sent so the live bubble always refreshes immediately
1117
+ with _live_view_lock:
1118
+ if chat_id in _live_views:
1119
+ _live_views[chat_id]["last_sent"] = ""
1120
 
1121
  append_user_live_log(chat_id, f"Stream engine started β†’ {session['output_url']}")
1122
 
 
1333
  except Exception as e_logo:
1334
  pass
1335
 
1336
+ # Count decoded input frames immediately.
1337
+ # The encoder buffers many frames before emitting
1338
+ # the first output packet, so counting only encoded
1339
+ # output kept stats stuck at 0 for a long time.
1340
+ with lock:
1341
+ session['frames_encoded'] += 1
1342
+ session['last_frame_time'] = datetime.datetime.now()
1343
+ _fc = session['frames_encoded']
1344
+ # Push live stats every 30 decoded frames (~1s at 30fps)
1345
+ if _fc % 30 == 0:
1346
+ threading.Thread(target=_push_state_sse, args=(chat_id,), daemon=True).start()
1347
+
1348
  for out_pkt in active_video_out_stream.encode(frame):
1349
  active_output_container.mux(out_pkt)
1350
  with lock:
 
1351
  session['bytes_sent'] += out_pkt.size
 
 
 
 
1352
 
1353
  elif active_audio_out_stream and packet.stream.type == 'audio' and in_a_streams and packet.stream.index == in_a_streams[0].index:
1354
  for frame in packet.decode():
 
1445
  notify_state_change(chat_id, "error", f"Fatal error: {esc(str(e_fatal))}")
1446
 
1447
  finally:
1448
+ # Shut down raw ffmpeg log capture
 
1449
  try:
1450
  av.logging.restore_default_callback()
1451
  except Exception:
1452
  pass
1453
+ _fflog_stop.set()
1454
 
1455
  append_user_live_log(chat_id, "Finalizing stream…")
1456
 
 
2605
  f"Expected: {esc(desc)}\n\n"
2606
  f"Type the new value now, or /cancel to abort.")]
2607
 
2608
+ elif data in ("toggle_reconnect", "toggle_stop_on_error", "toggle_verbose_log"):
2609
  field_map = {
2610
+ "toggle_reconnect": "reconnect_on_stream_error",
2611
  "toggle_stop_on_error": "stop_on_error_in_playlist",
2612
+ "toggle_verbose_log": "verbose_ffmpeg_log",
2613
+ }
2614
+ default_map = {
2615
+ "reconnect_on_stream_error": True,
2616
+ "stop_on_error_in_playlist": True,
2617
+ "verbose_ffmpeg_log": False,
2618
  }
2619
  field = field_map[data]
2620
  with lock:
2621
+ session[field] = not session.get(field, default_map[field])
2622
  new_val = session[field]
2623
+ labels = {
2624
+ "reconnect_on_stream_error": "Auto-Reconnect",
2625
+ "stop_on_error_in_playlist": "Stop on Error",
2626
+ "verbose_ffmpeg_log": "Verbose FFmpeg Log",
2627
+ }
2628
  return [ack, edit(
2629
+ f"βœ… <b>{labels[field]}</b> β†’ <code>{'on' if new_val else 'off'}</code>",
2630
  get_settings_keyboard(session))]
2631
 
2632
  # Playlist view
 
3137
  }
3138
 
3139
 
3140
+ @app.get("/poll/{chat_id}")
3141
+ async def poll_pending_edit(chat_id: int):
3142
+ """
3143
+ Lightweight polling endpoint β€” returns the pending live-view edit (if any)
3144
+ as a Telegram Bot API method dict, then clears it.
3145
+
3146
+ The Telegram Mini App / web dashboard can call this every 2s and POST the
3147
+ returned payload directly to api.telegram.org/bot<TOKEN>/editMessageText to
3148
+ achieve real-time live-view updates without waiting for an incoming webhook.
3149
+
3150
+ If no edit is pending, returns {"pending": false}.
3151
+ Also always includes a fresh status snapshot so callers can update their UI.
3152
+ """
3153
+ if chat_id not in user_sessions:
3154
+ raise HTTPException(status_code=404, detail="No session for this chat_id")
3155
+
3156
+ # Force-build a fresh live-view edit right now
3157
+ _do_live_view_edit(chat_id, force=True)
3158
+
3159
+ with _outbound_queue_lock:
3160
+ msgs = _outbound_message_queue.pop(chat_id, [])
3161
+
3162
+ # Pick the most recent editMessageText (last one wins β€” it's the freshest)
3163
+ edit_msg = None
3164
+ for m in reversed(msgs):
3165
+ if m.get("method") == "editMessageText":
3166
+ edit_msg = m
3167
+ break
3168
+ # Re-queue any non-edit messages (sendMessage notifications etc.)
3169
+ for m in msgs:
3170
+ if m.get("method") != "editMessageText":
3171
+ enqueue_outbound_message(chat_id, m)
3172
+
3173
+ session = get_user_session(chat_id)
3174
+ return {
3175
+ "pending": edit_msg is not None,
3176
+ "edit": edit_msg,
3177
+ "status": {
3178
+ "state": session.get("streaming_state", "idle"),
3179
+ "frames_encoded": session.get("frames_encoded", 0),
3180
+ "bytes_sent": session.get("bytes_sent", 0),
3181
+ "uptime_str": get_uptime(session.get("stream_start_time")),
3182
+ "logs": session.get("live_log_lines_user", [])[-15:],
3183
+ },
3184
+ "ts": datetime.datetime.now(datetime.timezone.utc).isoformat(),
3185
+ }
3186
+
3187
+
3188
  # ──────────────────────────────────────────────
3189
  # MAIN
3190
  # ──────────────────────────────────────────────