Medium-MCP / src /paragraph_parser.py
Nikhil Pravin Pise
fix: Upgrade Medium images to high resolution (1400px) across entire app
60742a2
"""
Medium Paragraph Parser
Converts Medium paragraph data to Markdown format.
Handles all 13 paragraph types and 5 markup types.
Ported from Freedium's medium-parser/core.py and markups.py
"""
import difflib
import logging
import re
from typing import Dict, List, Optional, Tuple
# Import centralized image URL utilities
from src.utils import get_medium_image_url, MEDIUM_IMAGE_DEFAULT_WIDTH
try:
import tld
HAS_TLD = True
except ImportError:
HAS_TLD = False
logger = logging.getLogger("ParagraphParser")
def get_percentage_match(string1: str, string2: str) -> float:
"""Calculate similarity percentage between two strings."""
if string1 is None or string2 is None:
return 0.0
if not string1 or not string2: # Empty strings
return 0.0
return difflib.SequenceMatcher(None, string1, string2).ratio() * 100
def get_fld_fallback(url: str) -> str:
"""Get first-level domain from URL."""
if HAS_TLD:
try:
return tld.get_fld(url)
except Exception:
pass
# Fallback
from urllib.parse import urlparse
parsed = urlparse(url)
return parsed.netloc
class MarkupProcessor:
"""
Processes text with inline markups (bold, italic, code, links).
Supports:
- STRONG: **text**
- EM: *text*
- CODE: `text`
- A (LINK): [text](url)
- A (USER): [text](https://medium.com/u/{userId})
"""
def __init__(self, text: str, is_code: bool = False):
"""
Initialize processor with raw text.
Args:
text: The raw paragraph text
is_code: Whether this is inside a code block
"""
self.text = text
self.is_code = is_code
self.ranges: List[Tuple[int, int, str, str]] = []
def add_markup(self, start: int, end: int, prefix: str, suffix: str):
"""Add a markup range."""
self.ranges.append((start, end, prefix, suffix))
def process_markups(self, markups: List[Dict]) -> str:
"""
Process all markups and return formatted text.
Args:
markups: List of markup dictionaries from Medium API
Returns:
Text with markdown formatting applied
"""
if not markups or self.is_code:
return self._escape_markdown(self.text) if not self.is_code else self.text
# Parse markups into ranges
for markup in markups:
markup_type = markup.get("type")
start = markup.get("start", 0)
end = markup.get("end", 0)
if markup_type == "STRONG":
self.add_markup(start, end, "**", "**")
elif markup_type == "EM":
self.add_markup(start, end, "*", "*")
elif markup_type == "CODE":
self.add_markup(start, end, "`", "`")
elif markup_type == "A":
anchor_type = markup.get("anchorType", "LINK")
if anchor_type == "USER":
user_id = markup.get("userId", "")
url = f"https://medium.com/u/{user_id}"
else:
url = markup.get("href", "")
self.add_markup(start, end, "[", f"]({url})")
# Sort ranges by start position (reverse to apply from end to start)
self.ranges.sort(key=lambda x: x[0], reverse=True)
# Apply markups
result = list(self.text)
for start, end, prefix, suffix in self.ranges:
if end <= len(result) and start >= 0:
result.insert(end, suffix)
result.insert(start, prefix)
return "".join(result)
def _escape_markdown(self, text: str) -> str:
"""Escape special markdown characters."""
# Don't escape too aggressively - just handle common issues
return text
def get_image_url(image_id: str, width: int = MEDIUM_IMAGE_DEFAULT_WIDTH) -> str:
"""Build Medium image URL from image ID. Uses high-res by default."""
return get_medium_image_url(image_id, width)
def parse_paragraphs_to_markdown(
paragraphs: List[Dict],
title: str = "",
subtitle: str = "",
tags: List[str] = None,
preview_image_id: str = None,
highlights: List[Dict] = None,
) -> str:
"""
Convert Medium paragraph objects to clean Markdown.
Handles:
- Title/subtitle deduplication (>80% match skipped)
- Consecutive list items (ULI/OLI) grouped
- Code block language detection
- Image URL construction
- IFRAME embedding (as links)
- Highlight annotations
Args:
paragraphs: List of paragraph dictionaries from Medium API
title: Article title (for deduplication)
subtitle: Article subtitle (for deduplication)
tags: List of tag names (for deduplication)
preview_image_id: Preview image ID (for deduplication)
highlights: List of highlight annotations
Returns:
Markdown formatted content
"""
if tags is None:
tags = []
if highlights is None:
highlights = []
out_lines: List[str] = []
current_pos = 0
detected_title = title
detected_subtitle = subtitle
while current_pos < len(paragraphs):
paragraph = paragraphs[current_pos]
para_type = paragraph.get("type", "")
para_text = paragraph.get("text", "") or ""
para_markups = paragraph.get("markups", [])
para_name = paragraph.get("name", "")
logger.debug(f"Processing paragraph {current_pos}: type={para_type}")
# Title/subtitle deduplication (first 4 paragraphs only)
if current_pos < 4:
# Skip title duplicates
if para_type in ["H3", "H4", "H2"]:
if get_percentage_match(para_text, title) > 80:
if title.endswith("…"):
detected_title = para_text
current_pos += 1
continue
# Skip tag headers
if para_type == "H4" and para_text in tags:
current_pos += 1
continue
# Skip subtitle duplicates
if para_type in ["H4", "P"]:
if get_percentage_match(para_text, subtitle) > 80:
if not subtitle.endswith("…"):
detected_subtitle = para_text
current_pos += 1
continue
elif subtitle and subtitle.endswith("…") and len(para_text) > 100:
detected_subtitle = ""
# Skip preview image
if para_type == "IMG":
metadata = paragraph.get("metadata") or {}
if metadata.get("id") == preview_image_id:
current_pos += 1
continue
# Process text with markups
processor = MarkupProcessor(para_text, is_code=(para_type == "PRE"))
formatted_text = processor.process_markups(para_markups)
# Apply highlights if any
for highlight in highlights:
for h_para in highlight.get("paragraphs", []):
if h_para.get("name") == para_name:
# Mark highlighted text
start = highlight.get("startOffset", 0)
end = highlight.get("endOffset", len(para_text))
# Note: Markdown doesn't have native highlight, use bold
logger.debug(f"Highlight found: {start}-{end}")
# === Paragraph Type Handlers ===
if para_type == "H2":
out_lines.append(f"## {formatted_text}")
out_lines.append("")
elif para_type == "H3":
out_lines.append(f"### {formatted_text}")
out_lines.append("")
elif para_type == "H4":
out_lines.append(f"#### {formatted_text}")
out_lines.append("")
elif para_type == "P":
# Check for drop cap (informational only, Markdown doesn't support)
has_drop_cap = paragraph.get("hasDropCap", False)
if has_drop_cap:
logger.debug("Paragraph has drop cap styling")
out_lines.append(formatted_text)
out_lines.append("")
elif para_type == "IMG":
metadata = paragraph.get("metadata") or {}
image_id = metadata.get("id", "")
alt_text = metadata.get("alt", "")
layout = paragraph.get("layout", "")
if layout == "OUTSET_ROW":
# Gallery: collect consecutive OUTSET_ROW images
images = []
tmp_pos = current_pos
while tmp_pos < len(paragraphs):
p = paragraphs[tmp_pos]
p_layout = p.get("layout", "")
if p.get("type") == "IMG" and (p_layout == "OUTSET_ROW" or p_layout == "OUTSET_ROW_CONTINUE"):
p_meta = p.get("metadata") or {}
p_id = p_meta.get("id", "")
p_alt = p_meta.get("alt", "")
if p_id:
images.append((p_id, p_alt))
tmp_pos += 1
else:
break
for img_id, img_alt in images:
img_url = get_image_url(img_id)
out_lines.append(f"![{img_alt}]({img_url})")
out_lines.append("")
current_pos = tmp_pos - 1
elif layout == "FULL_WIDTH":
logger.warning("FULL_WIDTH image layout not fully supported")
if image_id:
img_url = get_image_url(image_id, width=1400)
out_lines.append(f"![{alt_text}]({img_url})")
out_lines.append("")
else:
# Standard image
if image_id:
img_url = get_image_url(image_id)
out_lines.append(f"![{alt_text}]({img_url})")
# Caption
if formatted_text:
out_lines.append(f"*{formatted_text}*")
out_lines.append("")
elif para_type == "ULI":
# Unordered list: collect consecutive items
list_items = []
tmp_pos = current_pos
while tmp_pos < len(paragraphs):
p = paragraphs[tmp_pos]
if p.get("type") == "ULI":
p_text = p.get("text", "") or ""
p_markups = p.get("markups", [])
proc = MarkupProcessor(p_text)
list_items.append(proc.process_markups(p_markups))
tmp_pos += 1
else:
break
for item in list_items:
out_lines.append(f"- {item}")
out_lines.append("")
current_pos = tmp_pos - 1
elif para_type == "OLI":
# Ordered list: collect consecutive items
list_items = []
tmp_pos = current_pos
while tmp_pos < len(paragraphs):
p = paragraphs[tmp_pos]
if p.get("type") == "OLI":
p_text = p.get("text", "") or ""
p_markups = p.get("markups", [])
proc = MarkupProcessor(p_text)
list_items.append(proc.process_markups(p_markups))
tmp_pos += 1
else:
break
for i, item in enumerate(list_items, 1):
out_lines.append(f"{i}. {item}")
out_lines.append("")
current_pos = tmp_pos - 1
elif para_type == "PRE":
# Code block: collect consecutive blocks
code_lines = []
language = ""
tmp_pos = current_pos
while tmp_pos < len(paragraphs):
p = paragraphs[tmp_pos]
if p.get("type") == "PRE":
p_text = p.get("text", "") or ""
code_lines.append(p_text)
# Get language from first block
if not language:
code_meta = p.get("codeBlockMetadata") or {}
language = code_meta.get("lang", "")
tmp_pos += 1
else:
break
out_lines.append(f"```{language}")
out_lines.extend(code_lines)
out_lines.append("```")
out_lines.append("")
current_pos = tmp_pos - 1
elif para_type == "BQ":
# Block quote
out_lines.append(f"> {formatted_text}")
out_lines.append("")
elif para_type == "PQ":
# Pull quote (styled blockquote)
out_lines.append(f"> *{formatted_text}*")
out_lines.append("")
elif para_type == "MIXTAPE_EMBED":
# Link preview card
mixtape = paragraph.get("mixtapeMetadata") or {}
url = mixtape.get("href", "")
if not url:
logger.warning("MIXTAPE_EMBED missing href, skipping")
current_pos += 1
continue
# Try to extract title and description from markups
raw_text = para_text
markups = para_markups
embed_title = ""
embed_description = ""
if len(markups) >= 3:
title_range = markups[1]
desc_range = markups[2]
embed_title = raw_text[title_range.get("start", 0):title_range.get("end", 0)]
embed_description = raw_text[desc_range.get("start", 0):desc_range.get("end", 0)]
elif raw_text:
embed_title = raw_text
embed_site = get_fld_fallback(url)
# Format as link card
if embed_title:
out_lines.append(f"[**{embed_title}**]({url})")
else:
out_lines.append(f"[{url}]({url})")
if embed_description:
out_lines.append(f"> {embed_description}")
if embed_site:
out_lines.append(f"*— {embed_site}*")
out_lines.append("")
elif para_type == "IFRAME":
# Embedded content
iframe_data = paragraph.get("iframe") or {}
media_resource = iframe_data.get("mediaResource") or {}
iframe_src = media_resource.get("iframeSrc", "")
iframe_id = media_resource.get("id", "")
iframe_title = media_resource.get("title", "Embedded content")
if iframe_src:
out_lines.append(f"[📺 {iframe_title}]({iframe_src})")
elif iframe_id:
# Fallback - reference to iframe ID
out_lines.append(f"[📺 Embedded content (ID: {iframe_id})]")
else:
logger.warning("IFRAME missing source, skipping")
out_lines.append("")
else:
# Unknown paragraph type
logger.warning(f"Unknown paragraph type: {para_type}")
if formatted_text:
out_lines.append(formatted_text)
out_lines.append("")
current_pos += 1
# Clean up excessive blank lines
result = "\n".join(out_lines)
result = re.sub(r'\n{3,}', '\n\n', result)
return result.strip()
def extract_article_metadata(post_data: Dict) -> Dict:
"""
Extract article metadata from GraphQL response.
Args:
post_data: The raw GraphQL response data
Returns:
Dict with title, subtitle, author, publication, tags, etc.
"""
post = post_data.get("data", {}).get("post", {})
if not post:
return {}
# Author info
creator = post.get("creator", {})
author = {
"name": creator.get("name", ""),
"username": creator.get("username", ""),
"id": creator.get("id", ""),
"bio": creator.get("bio", ""),
"imageId": creator.get("imageId", ""),
}
# Publication info
collection = post.get("collection") or {}
publication = collection.get("name", "")
# Tags
tags_raw = post.get("tags", [])
tags = [tag.get("displayTitle", "") for tag in tags_raw]
# Preview content
preview = post.get("previewContent") or {}
subtitle = preview.get("subtitle", "")
# Preview image
preview_image = post.get("previewImage") or {}
preview_image_id = preview_image.get("id", "")
# Highlights
highlights = post.get("highlights", [])
return {
"title": post.get("title", ""),
"subtitle": subtitle,
"author": author,
"publication": publication,
"tags": tags,
"previewImageId": preview_image_id,
"highlights": highlights,
"readingTime": post.get("readingTime", 0),
"clapCount": post.get("clapCount", 0),
"mediumUrl": post.get("mediumUrl", ""),
"canonicalUrl": post.get("canonicalUrl", ""),
"firstPublishedAt": post.get("firstPublishedAt"),
"updatedAt": post.get("updatedAt"),
"isLocked": post.get("isLocked", False),
"detectedLanguage": post.get("detectedLanguage", "en"),
}
def extract_paragraphs(post_data: Dict) -> List[Dict]:
"""
Extract paragraph list from GraphQL response.
Args:
post_data: The raw GraphQL response data
Returns:
List of paragraph dictionaries
"""
post = post_data.get("data", {}).get("post", {})
content = post.get("content") or {}
body_model = content.get("bodyModel") or {}
return body_model.get("paragraphs", [])
def parse_graphql_response_to_markdown(post_data: Dict) -> Tuple[str, Dict]:
"""
Parse full GraphQL response to Markdown content and metadata.
Args:
post_data: The raw GraphQL response data
Returns:
Tuple of (markdown_content, metadata_dict)
"""
metadata = extract_article_metadata(post_data)
paragraphs = extract_paragraphs(post_data)
if not paragraphs:
logger.warning("No paragraphs found in response")
return "", metadata
markdown = parse_paragraphs_to_markdown(
paragraphs=paragraphs,
title=metadata.get("title", ""),
subtitle=metadata.get("subtitle", ""),
tags=metadata.get("tags", []),
preview_image_id=metadata.get("previewImageId"),
highlights=metadata.get("highlights", []),
)
return markdown, metadata