from __future__ import annotations from typing import Optional """Telegram MarkdownV2 utilities. Renders common Markdown into Telegram MarkdownV2 format. Used by the message handler and Telegram platform adapter. """ import re from markdown_it import MarkdownIt MDV2_SPECIAL_CHARS = set("\\_*[]()~`>#+-=|{}.!") MDV2_LINK_ESCAPE = set("\\)") _MD = MarkdownIt("commonmark", {"html": False, "breaks": False}) _MD.enable("strikethrough") _MD.enable("table") _TABLE_SEP_RE = re.compile(r"^\s*\|?\s*:?-{3,}:?\s*(\|\s*:?-{3,}:?\s*)+\|?\s*$") _FENCE_RE = re.compile(r"^\s*```") def _is_gfm_table_header_line(line: str) -> bool: """Check if line is a GFM table header (pipe-delimited, not separator).""" if "|" not in line: return False if _TABLE_SEP_RE.match(line): return False stripped = line.strip() parts = [p.strip() for p in stripped.strip("|").split("|")] parts = [p for p in parts if p != ""] return len(parts) >= 2 def _normalize_gfm_tables(text: str) -> str: """ Many LLMs emit tables immediately after a paragraph line (no blank line). Markdown-it will treat that as a softbreak within the paragraph, so the table extension won't trigger. Insert a blank line before detected tables. We only do this outside fenced code blocks. """ lines = text.splitlines() if len(lines) < 2: return text out_lines: list[str] = [] in_fence = False for idx, line in enumerate(lines): if _FENCE_RE.match(line): in_fence = not in_fence out_lines.append(line) continue if ( not in_fence and idx + 1 < len(lines) and _is_gfm_table_header_line(line) and _TABLE_SEP_RE.match(lines[idx + 1]) and out_lines and out_lines[-1].strip() != "" ): m = re.match(r"^(\s*)", line) indent = m.group(1) if m else "" out_lines.append(indent) out_lines.append(line) return "\n".join(out_lines) def escape_md_v2(text: str) -> str: """Escape text for Telegram MarkdownV2.""" return "".join(f"\\{ch}" if ch in MDV2_SPECIAL_CHARS else ch for ch in text) def escape_md_v2_code(text: str) -> str: """Escape text for Telegram MarkdownV2 code spans/blocks.""" return text.replace("\\", "\\\\").replace("`", "\\`") def escape_md_v2_link_url(text: str) -> str: """Escape URL for Telegram MarkdownV2 link destination.""" return "".join(f"\\{ch}" if ch in MDV2_LINK_ESCAPE else ch for ch in text) def mdv2_bold(text: str) -> str: """Format text as bold in MarkdownV2.""" return f"*{escape_md_v2(text)}*" def mdv2_code_inline(text: str) -> str: """Format text as inline code in MarkdownV2.""" return f"`{escape_md_v2_code(text)}`" def format_status(emoji: str, label: str, suffix: Optional[str] = None) -> str: """Format a status message with emoji and optional suffix.""" base = f"{emoji} {mdv2_bold(label)}" if suffix: return f"{base} {escape_md_v2(suffix)}" return base def render_markdown_to_mdv2(text: str) -> str: """Render common Markdown into Telegram MarkdownV2.""" if not text: return "" text = _normalize_gfm_tables(text) tokens = _MD.parse(text) def render_inline_table_plain(children) -> str: out: list[str] = [] for tok in children: if tok.type == "text" or tok.type == "code_inline": out.append(tok.content) elif tok.type in {"softbreak", "hardbreak"}: out.append(" ") elif tok.type == "image" and tok.content: out.append(tok.content) return "".join(out) def render_inline_plain(children) -> str: out: list[str] = [] for tok in children: if tok.type == "text" or tok.type == "code_inline": out.append(escape_md_v2(tok.content)) elif tok.type in {"softbreak", "hardbreak"}: out.append("\n") return "".join(out) def render_inline(children) -> str: out: list[str] = [] i = 0 while i < len(children): tok = children[i] t = tok.type if t == "text": out.append(escape_md_v2(tok.content)) elif t in {"softbreak", "hardbreak"}: out.append("\n") elif t == "em_open" or t == "em_close": out.append("_") elif t == "strong_open" or t == "strong_close": out.append("*") elif t == "s_open" or t == "s_close": out.append("~") elif t == "code_inline": out.append(f"`{escape_md_v2_code(tok.content)}`") elif t == "link_open": href = "" if tok.attrs: if isinstance(tok.attrs, dict): href = tok.attrs.get("href", "") else: for key, val in tok.attrs: if key == "href": href = val break inner_tokens = [] i += 1 while i < len(children) and children[i].type != "link_close": inner_tokens.append(children[i]) i += 1 link_text = "" for child in inner_tokens: if child.type == "text" or child.type == "code_inline": link_text += child.content out.append( f"[{escape_md_v2(link_text)}]({escape_md_v2_link_url(href)})" ) elif t == "image": href = "" alt = tok.content or "" if tok.attrs: if isinstance(tok.attrs, dict): href = tok.attrs.get("src", "") else: for key, val in tok.attrs: if key == "src": href = val break if alt: out.append(f"{escape_md_v2(alt)} ({escape_md_v2_link_url(href)})") else: out.append(escape_md_v2_link_url(href)) else: out.append(escape_md_v2(tok.content or "")) i += 1 return "".join(out) out: list[str] = [] list_stack: list[dict] = [] pending_prefix: Optional[str] = None blockquote_level = 0 in_heading = False def apply_blockquote(val: str) -> str: if blockquote_level <= 0: return val prefix = "> " * blockquote_level return prefix + val.replace("\n", "\n" + prefix) i = 0 while i < len(tokens): tok = tokens[i] t = tok.type if t == "paragraph_open": pass elif t == "paragraph_close": out.append("\n") elif t == "heading_open": in_heading = True elif t == "heading_close": in_heading = False out.append("\n") elif t == "bullet_list_open": list_stack.append({"type": "bullet", "index": 1}) elif t == "bullet_list_close": if list_stack: list_stack.pop() out.append("\n") elif t == "ordered_list_open": start = 1 if tok.attrs: if isinstance(tok.attrs, dict): val = tok.attrs.get("start") if val is not None: try: start = int(val) except (TypeError, ValueError): start = 1 else: for key, val in tok.attrs: if key == "start": try: start = int(val) except (TypeError, ValueError): start = 1 break list_stack.append({"type": "ordered", "index": start}) elif t == "ordered_list_close": if list_stack: list_stack.pop() out.append("\n") elif t == "list_item_open": if list_stack: top = list_stack[-1] if top["type"] == "bullet": pending_prefix = "\\- " else: pending_prefix = f"{top['index']}\\." top["index"] += 1 pending_prefix += " " elif t == "list_item_close": out.append("\n") elif t == "blockquote_open": blockquote_level += 1 elif t == "blockquote_close": blockquote_level = max(0, blockquote_level - 1) out.append("\n") elif t == "table_open": if pending_prefix: out.append(apply_blockquote(pending_prefix.rstrip())) out.append("\n") pending_prefix = None rows: list[list[str]] = [] row_is_header: list[bool] = [] j = i + 1 in_thead = False in_row = False current_row: list[str] = [] current_row_header = False in_cell = False cell_parts: list[str] = [] while j < len(tokens): tt = tokens[j].type if tt == "thead_open": in_thead = True elif tt == "thead_close": in_thead = False elif tt == "tr_open": in_row = True current_row = [] current_row_header = in_thead elif tt in {"th_open", "td_open"}: in_cell = True cell_parts = [] elif tt == "inline" and in_cell: cell_parts.append( render_inline_table_plain(tokens[j].children or []) ) elif tt in {"th_close", "td_close"} and in_cell: cell = " ".join(cell_parts).strip() current_row.append(cell) in_cell = False cell_parts = [] elif tt == "tr_close" and in_row: rows.append(current_row) row_is_header.append(bool(current_row_header)) in_row = False elif tt == "table_close": break j += 1 if rows: col_count = max((len(r) for r in rows), default=0) norm_rows: list[list[str]] = [] for r in rows: if len(r) < col_count: r = r + [""] * (col_count - len(r)) norm_rows.append(r) widths: list[int] = [] for c in range(col_count): w = max((len(r[c]) for r in norm_rows), default=0) widths.append(max(w, 3)) def fmt_row( r: list[str], _w: list[int] = widths, _c: int = col_count ) -> str: cells = [r[c].ljust(_w[c]) for c in range(_c)] return "| " + " | ".join(cells) + " |" def fmt_sep(_w: list[int] = widths, _c: int = col_count) -> str: cells = ["-" * _w[c] for c in range(_c)] return "| " + " | ".join(cells) + " |" last_header_idx = -1 for idx, is_h in enumerate(row_is_header): if is_h: last_header_idx = idx lines: list[str] = [] for idx, r in enumerate(norm_rows): lines.append(fmt_row(r)) if idx == last_header_idx: lines.append(fmt_sep()) table_text = "\n".join(lines).rstrip() out.append(f"```\n{escape_md_v2_code(table_text)}\n```") out.append("\n") i = j + 1 continue elif t in {"code_block", "fence"}: code = escape_md_v2_code(tok.content.rstrip("\n")) out.append(f"```\n{code}\n```") out.append("\n") elif t == "inline": rendered = render_inline(tok.children or []) if in_heading: rendered = f"*{render_inline_plain(tok.children or [])}*" if pending_prefix: rendered = pending_prefix + rendered pending_prefix = None rendered = apply_blockquote(rendered) out.append(rendered) else: if tok.content: out.append(escape_md_v2(tok.content)) i += 1 return "".join(out).rstrip() __all__ = [ "escape_md_v2", "escape_md_v2_code", "escape_md_v2_link_url", "format_status", "mdv2_bold", "mdv2_code_inline", "render_markdown_to_mdv2", ]