Spaces:
Paused
Paused
| """ | |
| Blog Parser Module | |
| Parse and clean raw blog content from data/raw/blogs.txt. | |
| Handles various blog formats and extracts clean text with metadata. | |
| Example usage: | |
| parser = BlogParser() | |
| posts = parser.parse_file("data/raw/blogs.txt") | |
| for post in posts: | |
| print(f"Title: {post['title']}") | |
| print(f"Content: {post['content'][:100]}...") | |
| """ | |
| import re | |
| import html | |
| import unicodedata | |
| import logging | |
| from pathlib import Path | |
| from dataclasses import dataclass, field | |
| from typing import Optional | |
| from loguru import logger | |
| class BlogPost: | |
| """Represents a parsed blog post.""" | |
| title: str | |
| content: str | |
| raw_content: str | |
| word_count: int | |
| char_count: int | |
| index: int | |
| metadata: dict = field(default_factory=dict) | |
| def to_dict(self) -> dict: | |
| """Convert to dictionary for serialization.""" | |
| return { | |
| "title": self.title, | |
| "content": self.content, | |
| "raw_content": self.raw_content, | |
| "word_count": self.word_count, | |
| "char_count": self.char_count, | |
| "index": self.index, | |
| "metadata": self.metadata, | |
| } | |
| class BlogParser: | |
| """ | |
| Parse and clean blog content from various formats. | |
| Supports: | |
| - Explicit markers (=== BLOG START/END ===) | |
| - Markdown headers as separators | |
| - Double newlines as fallback | |
| - Date patterns as indicators | |
| Example: | |
| >>> parser = BlogParser() | |
| >>> posts = parser.parse_file("data/raw/blogs.txt") | |
| >>> print(f"Parsed {len(posts)} posts") | |
| """ | |
| # Regex patterns for different separator formats | |
| MARKER_PATTERN = re.compile( | |
| r"===\s*BLOG\s*START\s*===\s*(.*?)\s*===\s*BLOG\s*END\s*===", | |
| re.DOTALL | re.IGNORECASE | |
| ) | |
| MARKDOWN_HEADER_PATTERN = re.compile(r"^#\s+(.+)$", re.MULTILINE) | |
| DATE_PATTERN = re.compile( | |
| r"^\s*(\d{4}[-/]\d{1,2}[-/]\d{1,2}|\d{1,2}[-/]\d{1,2}[-/]\d{4})\s*$", | |
| re.MULTILINE | |
| ) | |
| # HTML tag pattern for cleaning | |
| HTML_TAG_PATTERN = re.compile(r"<[^>]+>") | |
| # Multiple whitespace pattern | |
| MULTI_WHITESPACE_PATTERN = re.compile(r"[ \t]+") | |
| MULTI_NEWLINE_PATTERN = re.compile(r"\n{3,}") | |
| def __init__( | |
| self, | |
| min_content_length: int = 100, | |
| max_content_length: int = 50000, | |
| remove_html: bool = True, | |
| normalize_unicode: bool = True, | |
| ): | |
| """ | |
| Initialize the blog parser. | |
| Args: | |
| min_content_length: Minimum characters for a valid post | |
| max_content_length: Maximum characters for a valid post | |
| remove_html: Whether to strip HTML tags | |
| normalize_unicode: Whether to normalize Unicode characters | |
| """ | |
| self.min_content_length = min_content_length | |
| self.max_content_length = max_content_length | |
| self.remove_html = remove_html | |
| self.normalize_unicode = normalize_unicode | |
| def parse_file(self, file_path: str | Path) -> list[BlogPost]: | |
| """ | |
| Parse a blog file and return list of BlogPost objects. | |
| Args: | |
| file_path: Path to the blogs.txt file | |
| Returns: | |
| List of parsed BlogPost objects | |
| Raises: | |
| FileNotFoundError: If file doesn't exist | |
| ValueError: If no valid posts found | |
| """ | |
| file_path = Path(file_path) | |
| if not file_path.exists(): | |
| raise FileNotFoundError(f"Blog file not found: {file_path}") | |
| logger.info(f"Reading blog file: {file_path}") | |
| with open(file_path, "r", encoding="utf-8") as f: | |
| raw_content = f.read() | |
| logger.info(f"Read {len(raw_content):,} characters") | |
| # Try different parsing strategies in order of preference | |
| posts = self._parse_with_markers(raw_content) | |
| if not posts: | |
| logger.info("No marker-based posts found, trying markdown headers") | |
| posts = self._parse_with_markdown_headers(raw_content) | |
| if not posts: | |
| logger.info("No markdown headers found, trying double newlines") | |
| posts = self._parse_with_double_newlines(raw_content) | |
| if not posts: | |
| logger.warning("No separators found, treating entire content as single post") | |
| posts = self._parse_as_single_post(raw_content) | |
| # Clean and validate posts | |
| cleaned_posts = [] | |
| for i, post in enumerate(posts): | |
| cleaned = self._clean_post(post, i) | |
| if cleaned and self._is_valid_post(cleaned): | |
| cleaned_posts.append(cleaned) | |
| else: | |
| logger.debug(f"Skipping invalid post at index {i}") | |
| logger.info(f"Parsed {len(cleaned_posts)} valid posts from {len(posts)} raw posts") | |
| if not cleaned_posts: | |
| raise ValueError("No valid blog posts found in file") | |
| return cleaned_posts | |
| def _parse_with_markers(self, content: str) -> list[dict]: | |
| """Parse using === BLOG START/END === markers.""" | |
| matches = self.MARKER_PATTERN.findall(content) | |
| posts = [] | |
| for match in matches: | |
| posts.append({"raw_content": match.strip()}) | |
| logger.debug(f"Found {len(posts)} posts with markers") | |
| return posts | |
| def _parse_with_markdown_headers(self, content: str) -> list[dict]: | |
| """Parse using markdown # headers as separators.""" | |
| # Split on markdown headers | |
| parts = re.split(r"^(#\s+.+)$", content, flags=re.MULTILINE) | |
| posts = [] | |
| current_title = None | |
| current_content = [] | |
| for part in parts: | |
| part = part.strip() | |
| if not part: | |
| continue | |
| if self.MARKDOWN_HEADER_PATTERN.match(part): | |
| # Save previous post if exists | |
| if current_title and current_content: | |
| posts.append({ | |
| "raw_content": f"{current_title}\n\n{''.join(current_content)}", | |
| "title_hint": current_title.lstrip("#").strip(), | |
| }) | |
| current_title = part | |
| current_content = [] | |
| else: | |
| current_content.append(part) | |
| # Don't forget last post | |
| if current_title and current_content: | |
| posts.append({ | |
| "raw_content": f"{current_title}\n\n{''.join(current_content)}", | |
| "title_hint": current_title.lstrip("#").strip(), | |
| }) | |
| logger.debug(f"Found {len(posts)} posts with markdown headers") | |
| return posts | |
| def _parse_with_double_newlines(self, content: str) -> list[dict]: | |
| """Parse using triple+ newlines as separators (common blog format).""" | |
| # Split on 3+ consecutive newlines | |
| parts = re.split(r"\n{3,}", content) | |
| posts = [] | |
| for part in parts: | |
| part = part.strip() | |
| if len(part) >= self.min_content_length: | |
| posts.append({"raw_content": part}) | |
| logger.debug(f"Found {len(posts)} posts with double newlines") | |
| return posts | |
| def _parse_as_single_post(self, content: str) -> list[dict]: | |
| """Treat entire content as a single post.""" | |
| return [{"raw_content": content.strip()}] | |
| def _clean_post(self, post_data: dict, index: int) -> Optional[BlogPost]: | |
| """ | |
| Clean and structure a raw post. | |
| Args: | |
| post_data: Dictionary with raw_content and optional hints | |
| index: Post index in sequence | |
| Returns: | |
| BlogPost object or None if invalid | |
| """ | |
| raw_content = post_data.get("raw_content", "") | |
| if not raw_content: | |
| return None | |
| content = raw_content | |
| # Remove HTML tags if enabled | |
| if self.remove_html: | |
| content = self._remove_html_tags(content) | |
| # Decode HTML entities | |
| content = html.unescape(content) | |
| # Normalize Unicode if enabled | |
| if self.normalize_unicode: | |
| content = self._normalize_unicode(content) | |
| # Clean whitespace | |
| content = self._clean_whitespace(content) | |
| # Extract title | |
| title = post_data.get("title_hint") or self._extract_title(content) | |
| # Remove title from content if it appears at the start | |
| if title and content.startswith(title): | |
| content = content[len(title):].strip() | |
| # Calculate stats | |
| word_count = len(content.split()) | |
| char_count = len(content) | |
| return BlogPost( | |
| title=title, | |
| content=content, | |
| raw_content=raw_content, | |
| word_count=word_count, | |
| char_count=char_count, | |
| index=index, | |
| metadata=post_data.get("metadata", {}), | |
| ) | |
| def _remove_html_tags(self, text: str) -> str: | |
| """Remove HTML tags from text.""" | |
| return self.HTML_TAG_PATTERN.sub("", text) | |
| def _normalize_unicode(self, text: str) -> str: | |
| """Normalize Unicode characters to NFC form.""" | |
| # Normalize to NFC (Canonical Decomposition, followed by Canonical Composition) | |
| text = unicodedata.normalize("NFC", text) | |
| # Replace common problematic characters | |
| replacements = { | |
| "\u2018": "'", # Left single quote | |
| "\u2019": "'", # Right single quote | |
| "\u201c": '"', # Left double quote | |
| "\u201d": '"', # Right double quote | |
| "\u2013": "-", # En dash | |
| "\u2014": "—", # Em dash (keep as-is, it's valid) | |
| "\u2026": "...", # Ellipsis | |
| "\u00a0": " ", # Non-breaking space | |
| } | |
| for old, new in replacements.items(): | |
| text = text.replace(old, new) | |
| return text | |
| def _clean_whitespace(self, text: str) -> str: | |
| """Clean excessive whitespace while preserving structure.""" | |
| # Replace multiple spaces/tabs with single space | |
| text = self.MULTI_WHITESPACE_PATTERN.sub(" ", text) | |
| # Replace 3+ newlines with 2 | |
| text = self.MULTI_NEWLINE_PATTERN.sub("\n\n", text) | |
| # Strip leading/trailing whitespace from each line | |
| lines = [line.strip() for line in text.split("\n")] | |
| text = "\n".join(lines) | |
| # Final strip | |
| return text.strip() | |
| def _extract_title(self, content: str) -> str: | |
| """ | |
| Extract title from content. | |
| Strategy: | |
| 1. First line if it's short enough | |
| 2. First sentence if first line is too long | |
| 3. "Untitled" as fallback | |
| """ | |
| lines = content.split("\n") | |
| first_line = lines[0].strip() if lines else "" | |
| # Remove markdown header markers | |
| first_line = re.sub(r"^#+\s*", "", first_line) | |
| if first_line and len(first_line) <= 100: | |
| return first_line | |
| # Try first sentence | |
| sentences = re.split(r"[.!?]", first_line) | |
| if sentences and len(sentences[0]) <= 100: | |
| return sentences[0].strip() | |
| # Fallback | |
| return "Untitled" | |
| def _is_valid_post(self, post: BlogPost) -> bool: | |
| """Check if a post meets validity criteria.""" | |
| if not post.content: | |
| return False | |
| if post.char_count < self.min_content_length: | |
| logger.debug(f"Post too short: {post.char_count} chars") | |
| return False | |
| if post.char_count > self.max_content_length: | |
| logger.debug(f"Post too long: {post.char_count} chars") | |
| return False | |
| return True | |
| def parse_string(self, content: str) -> list[BlogPost]: | |
| """ | |
| Parse blog content from a string instead of file. | |
| Args: | |
| content: Raw blog content string | |
| Returns: | |
| List of parsed BlogPost objects | |
| """ | |
| # Use same logic as file parsing | |
| posts = self._parse_with_markers(content) | |
| if not posts: | |
| posts = self._parse_with_markdown_headers(content) | |
| if not posts: | |
| posts = self._parse_with_double_newlines(content) | |
| if not posts: | |
| posts = self._parse_as_single_post(content) | |
| cleaned_posts = [] | |
| for i, post in enumerate(posts): | |
| cleaned = self._clean_post(post, i) | |
| if cleaned and self._is_valid_post(cleaned): | |
| cleaned_posts.append(cleaned) | |
| return cleaned_posts | |
| def main(): | |
| """CLI entry point for testing the parser.""" | |
| import argparse | |
| import json | |
| parser = argparse.ArgumentParser( | |
| description="Parse blog content from a text file", | |
| formatter_class=argparse.RawDescriptionHelpFormatter, | |
| epilog=""" | |
| Examples: | |
| python blog_parser.py data/raw/blogs.txt | |
| python blog_parser.py data/raw/blogs.txt --output parsed.json | |
| python blog_parser.py data/raw/blogs.txt --min-length 50 | |
| """, | |
| ) | |
| parser.add_argument("input", help="Path to blogs.txt file") | |
| parser.add_argument("--output", "-o", help="Output JSON file path") | |
| parser.add_argument( | |
| "--min-length", | |
| type=int, | |
| default=100, | |
| help="Minimum content length (default: 100)", | |
| ) | |
| parser.add_argument( | |
| "--max-length", | |
| type=int, | |
| default=50000, | |
| help="Maximum content length (default: 50000)", | |
| ) | |
| parser.add_argument("--verbose", "-v", action="store_true", help="Verbose output") | |
| args = parser.parse_args() | |
| if args.verbose: | |
| logger.enable("") | |
| else: | |
| logger.disable("") | |
| blog_parser = BlogParser( | |
| min_content_length=args.min_length, | |
| max_content_length=args.max_length, | |
| ) | |
| try: | |
| posts = blog_parser.parse_file(args.input) | |
| print(f"\nParsed {len(posts)} blog posts:") | |
| print("-" * 50) | |
| for post in posts: | |
| print(f"\n[{post.index}] {post.title}") | |
| print(f" Words: {post.word_count:,} | Chars: {post.char_count:,}") | |
| if args.output: | |
| output_data = [p.to_dict() for p in posts] | |
| with open(args.output, "w", encoding="utf-8") as f: | |
| json.dump(output_data, f, indent=2, ensure_ascii=False) | |
| print(f"\nSaved to: {args.output}") | |
| except Exception as e: | |
| print(f"Error: {e}") | |
| return 1 | |
| return 0 | |
| if __name__ == "__main__": | |
| exit(main()) | |