""" Medium Paragraph Parser Converts Medium paragraph data to Markdown format. Handles all 13 paragraph types and 5 markup types. Ported from Freedium's medium-parser/core.py and markups.py """ import difflib import logging import re from typing import Dict, List, Optional, Tuple # Import centralized image URL utilities from src.utils import get_medium_image_url, MEDIUM_IMAGE_DEFAULT_WIDTH try: import tld HAS_TLD = True except ImportError: HAS_TLD = False logger = logging.getLogger("ParagraphParser") def get_percentage_match(string1: str, string2: str) -> float: """Calculate similarity percentage between two strings.""" if string1 is None or string2 is None: return 0.0 if not string1 or not string2: # Empty strings return 0.0 return difflib.SequenceMatcher(None, string1, string2).ratio() * 100 def get_fld_fallback(url: str) -> str: """Get first-level domain from URL.""" if HAS_TLD: try: return tld.get_fld(url) except Exception: pass # Fallback from urllib.parse import urlparse parsed = urlparse(url) return parsed.netloc class MarkupProcessor: """ Processes text with inline markups (bold, italic, code, links). Supports: - STRONG: **text** - EM: *text* - CODE: `text` - A (LINK): [text](url) - A (USER): [text](https://medium.com/u/{userId}) """ def __init__(self, text: str, is_code: bool = False): """ Initialize processor with raw text. Args: text: The raw paragraph text is_code: Whether this is inside a code block """ self.text = text self.is_code = is_code self.ranges: List[Tuple[int, int, str, str]] = [] def add_markup(self, start: int, end: int, prefix: str, suffix: str): """Add a markup range.""" self.ranges.append((start, end, prefix, suffix)) def process_markups(self, markups: List[Dict]) -> str: """ Process all markups and return formatted text. Args: markups: List of markup dictionaries from Medium API Returns: Text with markdown formatting applied """ if not markups or self.is_code: return self._escape_markdown(self.text) if not self.is_code else self.text # Parse markups into ranges for markup in markups: markup_type = markup.get("type") start = markup.get("start", 0) end = markup.get("end", 0) if markup_type == "STRONG": self.add_markup(start, end, "**", "**") elif markup_type == "EM": self.add_markup(start, end, "*", "*") elif markup_type == "CODE": self.add_markup(start, end, "`", "`") elif markup_type == "A": anchor_type = markup.get("anchorType", "LINK") if anchor_type == "USER": user_id = markup.get("userId", "") url = f"https://medium.com/u/{user_id}" else: url = markup.get("href", "") self.add_markup(start, end, "[", f"]({url})") # Sort ranges by start position (reverse to apply from end to start) self.ranges.sort(key=lambda x: x[0], reverse=True) # Apply markups result = list(self.text) for start, end, prefix, suffix in self.ranges: if end <= len(result) and start >= 0: result.insert(end, suffix) result.insert(start, prefix) return "".join(result) def _escape_markdown(self, text: str) -> str: """Escape special markdown characters.""" # Don't escape too aggressively - just handle common issues return text def get_image_url(image_id: str, width: int = MEDIUM_IMAGE_DEFAULT_WIDTH) -> str: """Build Medium image URL from image ID. Uses high-res by default.""" return get_medium_image_url(image_id, width) def parse_paragraphs_to_markdown( paragraphs: List[Dict], title: str = "", subtitle: str = "", tags: List[str] = None, preview_image_id: str = None, highlights: List[Dict] = None, ) -> str: """ Convert Medium paragraph objects to clean Markdown. Handles: - Title/subtitle deduplication (>80% match skipped) - Consecutive list items (ULI/OLI) grouped - Code block language detection - Image URL construction - IFRAME embedding (as links) - Highlight annotations Args: paragraphs: List of paragraph dictionaries from Medium API title: Article title (for deduplication) subtitle: Article subtitle (for deduplication) tags: List of tag names (for deduplication) preview_image_id: Preview image ID (for deduplication) highlights: List of highlight annotations Returns: Markdown formatted content """ if tags is None: tags = [] if highlights is None: highlights = [] out_lines: List[str] = [] current_pos = 0 detected_title = title detected_subtitle = subtitle while current_pos < len(paragraphs): paragraph = paragraphs[current_pos] para_type = paragraph.get("type", "") para_text = paragraph.get("text", "") or "" para_markups = paragraph.get("markups", []) para_name = paragraph.get("name", "") logger.debug(f"Processing paragraph {current_pos}: type={para_type}") # Title/subtitle deduplication (first 4 paragraphs only) if current_pos < 4: # Skip title duplicates if para_type in ["H3", "H4", "H2"]: if get_percentage_match(para_text, title) > 80: if title.endswith("…"): detected_title = para_text current_pos += 1 continue # Skip tag headers if para_type == "H4" and para_text in tags: current_pos += 1 continue # Skip subtitle duplicates if para_type in ["H4", "P"]: if get_percentage_match(para_text, subtitle) > 80: if not subtitle.endswith("…"): detected_subtitle = para_text current_pos += 1 continue elif subtitle and subtitle.endswith("…") and len(para_text) > 100: detected_subtitle = "" # Skip preview image if para_type == "IMG": metadata = paragraph.get("metadata") or {} if metadata.get("id") == preview_image_id: current_pos += 1 continue # Process text with markups processor = MarkupProcessor(para_text, is_code=(para_type == "PRE")) formatted_text = processor.process_markups(para_markups) # Apply highlights if any for highlight in highlights: for h_para in highlight.get("paragraphs", []): if h_para.get("name") == para_name: # Mark highlighted text start = highlight.get("startOffset", 0) end = highlight.get("endOffset", len(para_text)) # Note: Markdown doesn't have native highlight, use bold logger.debug(f"Highlight found: {start}-{end}") # === Paragraph Type Handlers === if para_type == "H2": out_lines.append(f"## {formatted_text}") out_lines.append("") elif para_type == "H3": out_lines.append(f"### {formatted_text}") out_lines.append("") elif para_type == "H4": out_lines.append(f"#### {formatted_text}") out_lines.append("") elif para_type == "P": # Check for drop cap (informational only, Markdown doesn't support) has_drop_cap = paragraph.get("hasDropCap", False) if has_drop_cap: logger.debug("Paragraph has drop cap styling") out_lines.append(formatted_text) out_lines.append("") elif para_type == "IMG": metadata = paragraph.get("metadata") or {} image_id = metadata.get("id", "") alt_text = metadata.get("alt", "") layout = paragraph.get("layout", "") if layout == "OUTSET_ROW": # Gallery: collect consecutive OUTSET_ROW images images = [] tmp_pos = current_pos while tmp_pos < len(paragraphs): p = paragraphs[tmp_pos] p_layout = p.get("layout", "") if p.get("type") == "IMG" and (p_layout == "OUTSET_ROW" or p_layout == "OUTSET_ROW_CONTINUE"): p_meta = p.get("metadata") or {} p_id = p_meta.get("id", "") p_alt = p_meta.get("alt", "") if p_id: images.append((p_id, p_alt)) tmp_pos += 1 else: break for img_id, img_alt in images: img_url = get_image_url(img_id) out_lines.append(f"![{img_alt}]({img_url})") out_lines.append("") current_pos = tmp_pos - 1 elif layout == "FULL_WIDTH": logger.warning("FULL_WIDTH image layout not fully supported") if image_id: img_url = get_image_url(image_id, width=1400) out_lines.append(f"![{alt_text}]({img_url})") out_lines.append("") else: # Standard image if image_id: img_url = get_image_url(image_id) out_lines.append(f"![{alt_text}]({img_url})") # Caption if formatted_text: out_lines.append(f"*{formatted_text}*") out_lines.append("") elif para_type == "ULI": # Unordered list: collect consecutive items list_items = [] tmp_pos = current_pos while tmp_pos < len(paragraphs): p = paragraphs[tmp_pos] if p.get("type") == "ULI": p_text = p.get("text", "") or "" p_markups = p.get("markups", []) proc = MarkupProcessor(p_text) list_items.append(proc.process_markups(p_markups)) tmp_pos += 1 else: break for item in list_items: out_lines.append(f"- {item}") out_lines.append("") current_pos = tmp_pos - 1 elif para_type == "OLI": # Ordered list: collect consecutive items list_items = [] tmp_pos = current_pos while tmp_pos < len(paragraphs): p = paragraphs[tmp_pos] if p.get("type") == "OLI": p_text = p.get("text", "") or "" p_markups = p.get("markups", []) proc = MarkupProcessor(p_text) list_items.append(proc.process_markups(p_markups)) tmp_pos += 1 else: break for i, item in enumerate(list_items, 1): out_lines.append(f"{i}. {item}") out_lines.append("") current_pos = tmp_pos - 1 elif para_type == "PRE": # Code block: collect consecutive blocks code_lines = [] language = "" tmp_pos = current_pos while tmp_pos < len(paragraphs): p = paragraphs[tmp_pos] if p.get("type") == "PRE": p_text = p.get("text", "") or "" code_lines.append(p_text) # Get language from first block if not language: code_meta = p.get("codeBlockMetadata") or {} language = code_meta.get("lang", "") tmp_pos += 1 else: break out_lines.append(f"```{language}") out_lines.extend(code_lines) out_lines.append("```") out_lines.append("") current_pos = tmp_pos - 1 elif para_type == "BQ": # Block quote out_lines.append(f"> {formatted_text}") out_lines.append("") elif para_type == "PQ": # Pull quote (styled blockquote) out_lines.append(f"> *{formatted_text}*") out_lines.append("") elif para_type == "MIXTAPE_EMBED": # Link preview card mixtape = paragraph.get("mixtapeMetadata") or {} url = mixtape.get("href", "") if not url: logger.warning("MIXTAPE_EMBED missing href, skipping") current_pos += 1 continue # Try to extract title and description from markups raw_text = para_text markups = para_markups embed_title = "" embed_description = "" if len(markups) >= 3: title_range = markups[1] desc_range = markups[2] embed_title = raw_text[title_range.get("start", 0):title_range.get("end", 0)] embed_description = raw_text[desc_range.get("start", 0):desc_range.get("end", 0)] elif raw_text: embed_title = raw_text embed_site = get_fld_fallback(url) # Format as link card if embed_title: out_lines.append(f"[**{embed_title}**]({url})") else: out_lines.append(f"[{url}]({url})") if embed_description: out_lines.append(f"> {embed_description}") if embed_site: out_lines.append(f"*— {embed_site}*") out_lines.append("") elif para_type == "IFRAME": # Embedded content iframe_data = paragraph.get("iframe") or {} media_resource = iframe_data.get("mediaResource") or {} iframe_src = media_resource.get("iframeSrc", "") iframe_id = media_resource.get("id", "") iframe_title = media_resource.get("title", "Embedded content") if iframe_src: out_lines.append(f"[📺 {iframe_title}]({iframe_src})") elif iframe_id: # Fallback - reference to iframe ID out_lines.append(f"[📺 Embedded content (ID: {iframe_id})]") else: logger.warning("IFRAME missing source, skipping") out_lines.append("") else: # Unknown paragraph type logger.warning(f"Unknown paragraph type: {para_type}") if formatted_text: out_lines.append(formatted_text) out_lines.append("") current_pos += 1 # Clean up excessive blank lines result = "\n".join(out_lines) result = re.sub(r'\n{3,}', '\n\n', result) return result.strip() def extract_article_metadata(post_data: Dict) -> Dict: """ Extract article metadata from GraphQL response. Args: post_data: The raw GraphQL response data Returns: Dict with title, subtitle, author, publication, tags, etc. """ post = post_data.get("data", {}).get("post", {}) if not post: return {} # Author info creator = post.get("creator", {}) author = { "name": creator.get("name", ""), "username": creator.get("username", ""), "id": creator.get("id", ""), "bio": creator.get("bio", ""), "imageId": creator.get("imageId", ""), } # Publication info collection = post.get("collection") or {} publication = collection.get("name", "") # Tags tags_raw = post.get("tags", []) tags = [tag.get("displayTitle", "") for tag in tags_raw] # Preview content preview = post.get("previewContent") or {} subtitle = preview.get("subtitle", "") # Preview image preview_image = post.get("previewImage") or {} preview_image_id = preview_image.get("id", "") # Highlights highlights = post.get("highlights", []) return { "title": post.get("title", ""), "subtitle": subtitle, "author": author, "publication": publication, "tags": tags, "previewImageId": preview_image_id, "highlights": highlights, "readingTime": post.get("readingTime", 0), "clapCount": post.get("clapCount", 0), "mediumUrl": post.get("mediumUrl", ""), "canonicalUrl": post.get("canonicalUrl", ""), "firstPublishedAt": post.get("firstPublishedAt"), "updatedAt": post.get("updatedAt"), "isLocked": post.get("isLocked", False), "detectedLanguage": post.get("detectedLanguage", "en"), } def extract_paragraphs(post_data: Dict) -> List[Dict]: """ Extract paragraph list from GraphQL response. Args: post_data: The raw GraphQL response data Returns: List of paragraph dictionaries """ post = post_data.get("data", {}).get("post", {}) content = post.get("content") or {} body_model = content.get("bodyModel") or {} return body_model.get("paragraphs", []) def parse_graphql_response_to_markdown(post_data: Dict) -> Tuple[str, Dict]: """ Parse full GraphQL response to Markdown content and metadata. Args: post_data: The raw GraphQL response data Returns: Tuple of (markdown_content, metadata_dict) """ metadata = extract_article_metadata(post_data) paragraphs = extract_paragraphs(post_data) if not paragraphs: logger.warning("No paragraphs found in response") return "", metadata markdown = parse_paragraphs_to_markdown( paragraphs=paragraphs, title=metadata.get("title", ""), subtitle=metadata.get("subtitle", ""), tags=metadata.get("tags", []), preview_image_id=metadata.get("previewImageId"), highlights=metadata.get("highlights", []), ) return markdown, metadata