""" Blog Parser Module Parse and clean raw blog content from data/raw/blogs.txt. Handles various blog formats and extracts clean text with metadata. Example usage: parser = BlogParser() posts = parser.parse_file("data/raw/blogs.txt") for post in posts: print(f"Title: {post['title']}") print(f"Content: {post['content'][:100]}...") """ import re import html import unicodedata import logging from pathlib import Path from dataclasses import dataclass, field from typing import Optional from loguru import logger @dataclass class BlogPost: """Represents a parsed blog post.""" title: str content: str raw_content: str word_count: int char_count: int index: int metadata: dict = field(default_factory=dict) def to_dict(self) -> dict: """Convert to dictionary for serialization.""" return { "title": self.title, "content": self.content, "raw_content": self.raw_content, "word_count": self.word_count, "char_count": self.char_count, "index": self.index, "metadata": self.metadata, } class BlogParser: """ Parse and clean blog content from various formats. Supports: - Explicit markers (=== BLOG START/END ===) - Markdown headers as separators - Double newlines as fallback - Date patterns as indicators Example: >>> parser = BlogParser() >>> posts = parser.parse_file("data/raw/blogs.txt") >>> print(f"Parsed {len(posts)} posts") """ # Regex patterns for different separator formats MARKER_PATTERN = re.compile( r"===\s*BLOG\s*START\s*===\s*(.*?)\s*===\s*BLOG\s*END\s*===", re.DOTALL | re.IGNORECASE ) MARKDOWN_HEADER_PATTERN = re.compile(r"^#\s+(.+)$", re.MULTILINE) DATE_PATTERN = re.compile( r"^\s*(\d{4}[-/]\d{1,2}[-/]\d{1,2}|\d{1,2}[-/]\d{1,2}[-/]\d{4})\s*$", re.MULTILINE ) # HTML tag pattern for cleaning HTML_TAG_PATTERN = re.compile(r"<[^>]+>") # Multiple whitespace pattern MULTI_WHITESPACE_PATTERN = re.compile(r"[ \t]+") MULTI_NEWLINE_PATTERN = re.compile(r"\n{3,}") def __init__( self, min_content_length: int = 100, max_content_length: int = 50000, remove_html: bool = True, normalize_unicode: bool = True, ): """ Initialize the blog parser. Args: min_content_length: Minimum characters for a valid post max_content_length: Maximum characters for a valid post remove_html: Whether to strip HTML tags normalize_unicode: Whether to normalize Unicode characters """ self.min_content_length = min_content_length self.max_content_length = max_content_length self.remove_html = remove_html self.normalize_unicode = normalize_unicode def parse_file(self, file_path: str | Path) -> list[BlogPost]: """ Parse a blog file and return list of BlogPost objects. Args: file_path: Path to the blogs.txt file Returns: List of parsed BlogPost objects Raises: FileNotFoundError: If file doesn't exist ValueError: If no valid posts found """ file_path = Path(file_path) if not file_path.exists(): raise FileNotFoundError(f"Blog file not found: {file_path}") logger.info(f"Reading blog file: {file_path}") with open(file_path, "r", encoding="utf-8") as f: raw_content = f.read() logger.info(f"Read {len(raw_content):,} characters") # Try different parsing strategies in order of preference posts = self._parse_with_markers(raw_content) if not posts: logger.info("No marker-based posts found, trying markdown headers") posts = self._parse_with_markdown_headers(raw_content) if not posts: logger.info("No markdown headers found, trying double newlines") posts = self._parse_with_double_newlines(raw_content) if not posts: logger.warning("No separators found, treating entire content as single post") posts = self._parse_as_single_post(raw_content) # Clean and validate posts cleaned_posts = [] for i, post in enumerate(posts): cleaned = self._clean_post(post, i) if cleaned and self._is_valid_post(cleaned): cleaned_posts.append(cleaned) else: logger.debug(f"Skipping invalid post at index {i}") logger.info(f"Parsed {len(cleaned_posts)} valid posts from {len(posts)} raw posts") if not cleaned_posts: raise ValueError("No valid blog posts found in file") return cleaned_posts def _parse_with_markers(self, content: str) -> list[dict]: """Parse using === BLOG START/END === markers.""" matches = self.MARKER_PATTERN.findall(content) posts = [] for match in matches: posts.append({"raw_content": match.strip()}) logger.debug(f"Found {len(posts)} posts with markers") return posts def _parse_with_markdown_headers(self, content: str) -> list[dict]: """Parse using markdown # headers as separators.""" # Split on markdown headers parts = re.split(r"^(#\s+.+)$", content, flags=re.MULTILINE) posts = [] current_title = None current_content = [] for part in parts: part = part.strip() if not part: continue if self.MARKDOWN_HEADER_PATTERN.match(part): # Save previous post if exists if current_title and current_content: posts.append({ "raw_content": f"{current_title}\n\n{''.join(current_content)}", "title_hint": current_title.lstrip("#").strip(), }) current_title = part current_content = [] else: current_content.append(part) # Don't forget last post if current_title and current_content: posts.append({ "raw_content": f"{current_title}\n\n{''.join(current_content)}", "title_hint": current_title.lstrip("#").strip(), }) logger.debug(f"Found {len(posts)} posts with markdown headers") return posts def _parse_with_double_newlines(self, content: str) -> list[dict]: """Parse using triple+ newlines as separators (common blog format).""" # Split on 3+ consecutive newlines parts = re.split(r"\n{3,}", content) posts = [] for part in parts: part = part.strip() if len(part) >= self.min_content_length: posts.append({"raw_content": part}) logger.debug(f"Found {len(posts)} posts with double newlines") return posts def _parse_as_single_post(self, content: str) -> list[dict]: """Treat entire content as a single post.""" return [{"raw_content": content.strip()}] def _clean_post(self, post_data: dict, index: int) -> Optional[BlogPost]: """ Clean and structure a raw post. Args: post_data: Dictionary with raw_content and optional hints index: Post index in sequence Returns: BlogPost object or None if invalid """ raw_content = post_data.get("raw_content", "") if not raw_content: return None content = raw_content # Remove HTML tags if enabled if self.remove_html: content = self._remove_html_tags(content) # Decode HTML entities content = html.unescape(content) # Normalize Unicode if enabled if self.normalize_unicode: content = self._normalize_unicode(content) # Clean whitespace content = self._clean_whitespace(content) # Extract title title = post_data.get("title_hint") or self._extract_title(content) # Remove title from content if it appears at the start if title and content.startswith(title): content = content[len(title):].strip() # Calculate stats word_count = len(content.split()) char_count = len(content) return BlogPost( title=title, content=content, raw_content=raw_content, word_count=word_count, char_count=char_count, index=index, metadata=post_data.get("metadata", {}), ) def _remove_html_tags(self, text: str) -> str: """Remove HTML tags from text.""" return self.HTML_TAG_PATTERN.sub("", text) def _normalize_unicode(self, text: str) -> str: """Normalize Unicode characters to NFC form.""" # Normalize to NFC (Canonical Decomposition, followed by Canonical Composition) text = unicodedata.normalize("NFC", text) # Replace common problematic characters replacements = { "\u2018": "'", # Left single quote "\u2019": "'", # Right single quote "\u201c": '"', # Left double quote "\u201d": '"', # Right double quote "\u2013": "-", # En dash "\u2014": "—", # Em dash (keep as-is, it's valid) "\u2026": "...", # Ellipsis "\u00a0": " ", # Non-breaking space } for old, new in replacements.items(): text = text.replace(old, new) return text def _clean_whitespace(self, text: str) -> str: """Clean excessive whitespace while preserving structure.""" # Replace multiple spaces/tabs with single space text = self.MULTI_WHITESPACE_PATTERN.sub(" ", text) # Replace 3+ newlines with 2 text = self.MULTI_NEWLINE_PATTERN.sub("\n\n", text) # Strip leading/trailing whitespace from each line lines = [line.strip() for line in text.split("\n")] text = "\n".join(lines) # Final strip return text.strip() def _extract_title(self, content: str) -> str: """ Extract title from content. Strategy: 1. First line if it's short enough 2. First sentence if first line is too long 3. "Untitled" as fallback """ lines = content.split("\n") first_line = lines[0].strip() if lines else "" # Remove markdown header markers first_line = re.sub(r"^#+\s*", "", first_line) if first_line and len(first_line) <= 100: return first_line # Try first sentence sentences = re.split(r"[.!?]", first_line) if sentences and len(sentences[0]) <= 100: return sentences[0].strip() # Fallback return "Untitled" def _is_valid_post(self, post: BlogPost) -> bool: """Check if a post meets validity criteria.""" if not post.content: return False if post.char_count < self.min_content_length: logger.debug(f"Post too short: {post.char_count} chars") return False if post.char_count > self.max_content_length: logger.debug(f"Post too long: {post.char_count} chars") return False return True def parse_string(self, content: str) -> list[BlogPost]: """ Parse blog content from a string instead of file. Args: content: Raw blog content string Returns: List of parsed BlogPost objects """ # Use same logic as file parsing posts = self._parse_with_markers(content) if not posts: posts = self._parse_with_markdown_headers(content) if not posts: posts = self._parse_with_double_newlines(content) if not posts: posts = self._parse_as_single_post(content) cleaned_posts = [] for i, post in enumerate(posts): cleaned = self._clean_post(post, i) if cleaned and self._is_valid_post(cleaned): cleaned_posts.append(cleaned) return cleaned_posts def main(): """CLI entry point for testing the parser.""" import argparse import json parser = argparse.ArgumentParser( description="Parse blog content from a text file", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: python blog_parser.py data/raw/blogs.txt python blog_parser.py data/raw/blogs.txt --output parsed.json python blog_parser.py data/raw/blogs.txt --min-length 50 """, ) parser.add_argument("input", help="Path to blogs.txt file") parser.add_argument("--output", "-o", help="Output JSON file path") parser.add_argument( "--min-length", type=int, default=100, help="Minimum content length (default: 100)", ) parser.add_argument( "--max-length", type=int, default=50000, help="Maximum content length (default: 50000)", ) parser.add_argument("--verbose", "-v", action="store_true", help="Verbose output") args = parser.parse_args() if args.verbose: logger.enable("") else: logger.disable("") blog_parser = BlogParser( min_content_length=args.min_length, max_content_length=args.max_length, ) try: posts = blog_parser.parse_file(args.input) print(f"\nParsed {len(posts)} blog posts:") print("-" * 50) for post in posts: print(f"\n[{post.index}] {post.title}") print(f" Words: {post.word_count:,} | Chars: {post.char_count:,}") if args.output: output_data = [p.to_dict() for p in posts] with open(args.output, "w", encoding="utf-8") as f: json.dump(output_data, f, indent=2, ensure_ascii=False) print(f"\nSaved to: {args.output}") except Exception as e: print(f"Error: {e}") return 1 return 0 if __name__ == "__main__": exit(main())