ai_exec / src /data_processing /blog_parser.py
Chaitanya-aitf's picture
Upload 38 files
45ee481 verified
"""
Blog Parser Module
Parse and clean raw blog content from data/raw/blogs.txt.
Handles various blog formats and extracts clean text with metadata.
Example usage:
parser = BlogParser()
posts = parser.parse_file("data/raw/blogs.txt")
for post in posts:
print(f"Title: {post['title']}")
print(f"Content: {post['content'][:100]}...")
"""
import re
import html
import unicodedata
import logging
from pathlib import Path
from dataclasses import dataclass, field
from typing import Optional
from loguru import logger
@dataclass
class BlogPost:
"""Represents a parsed blog post."""
title: str
content: str
raw_content: str
word_count: int
char_count: int
index: int
metadata: dict = field(default_factory=dict)
def to_dict(self) -> dict:
"""Convert to dictionary for serialization."""
return {
"title": self.title,
"content": self.content,
"raw_content": self.raw_content,
"word_count": self.word_count,
"char_count": self.char_count,
"index": self.index,
"metadata": self.metadata,
}
class BlogParser:
"""
Parse and clean blog content from various formats.
Supports:
- Explicit markers (=== BLOG START/END ===)
- Markdown headers as separators
- Double newlines as fallback
- Date patterns as indicators
Example:
>>> parser = BlogParser()
>>> posts = parser.parse_file("data/raw/blogs.txt")
>>> print(f"Parsed {len(posts)} posts")
"""
# Regex patterns for different separator formats
MARKER_PATTERN = re.compile(
r"===\s*BLOG\s*START\s*===\s*(.*?)\s*===\s*BLOG\s*END\s*===",
re.DOTALL | re.IGNORECASE
)
MARKDOWN_HEADER_PATTERN = re.compile(r"^#\s+(.+)$", re.MULTILINE)
DATE_PATTERN = re.compile(
r"^\s*(\d{4}[-/]\d{1,2}[-/]\d{1,2}|\d{1,2}[-/]\d{1,2}[-/]\d{4})\s*$",
re.MULTILINE
)
# HTML tag pattern for cleaning
HTML_TAG_PATTERN = re.compile(r"<[^>]+>")
# Multiple whitespace pattern
MULTI_WHITESPACE_PATTERN = re.compile(r"[ \t]+")
MULTI_NEWLINE_PATTERN = re.compile(r"\n{3,}")
def __init__(
self,
min_content_length: int = 100,
max_content_length: int = 50000,
remove_html: bool = True,
normalize_unicode: bool = True,
):
"""
Initialize the blog parser.
Args:
min_content_length: Minimum characters for a valid post
max_content_length: Maximum characters for a valid post
remove_html: Whether to strip HTML tags
normalize_unicode: Whether to normalize Unicode characters
"""
self.min_content_length = min_content_length
self.max_content_length = max_content_length
self.remove_html = remove_html
self.normalize_unicode = normalize_unicode
def parse_file(self, file_path: str | Path) -> list[BlogPost]:
"""
Parse a blog file and return list of BlogPost objects.
Args:
file_path: Path to the blogs.txt file
Returns:
List of parsed BlogPost objects
Raises:
FileNotFoundError: If file doesn't exist
ValueError: If no valid posts found
"""
file_path = Path(file_path)
if not file_path.exists():
raise FileNotFoundError(f"Blog file not found: {file_path}")
logger.info(f"Reading blog file: {file_path}")
with open(file_path, "r", encoding="utf-8") as f:
raw_content = f.read()
logger.info(f"Read {len(raw_content):,} characters")
# Try different parsing strategies in order of preference
posts = self._parse_with_markers(raw_content)
if not posts:
logger.info("No marker-based posts found, trying markdown headers")
posts = self._parse_with_markdown_headers(raw_content)
if not posts:
logger.info("No markdown headers found, trying double newlines")
posts = self._parse_with_double_newlines(raw_content)
if not posts:
logger.warning("No separators found, treating entire content as single post")
posts = self._parse_as_single_post(raw_content)
# Clean and validate posts
cleaned_posts = []
for i, post in enumerate(posts):
cleaned = self._clean_post(post, i)
if cleaned and self._is_valid_post(cleaned):
cleaned_posts.append(cleaned)
else:
logger.debug(f"Skipping invalid post at index {i}")
logger.info(f"Parsed {len(cleaned_posts)} valid posts from {len(posts)} raw posts")
if not cleaned_posts:
raise ValueError("No valid blog posts found in file")
return cleaned_posts
def _parse_with_markers(self, content: str) -> list[dict]:
"""Parse using === BLOG START/END === markers."""
matches = self.MARKER_PATTERN.findall(content)
posts = []
for match in matches:
posts.append({"raw_content": match.strip()})
logger.debug(f"Found {len(posts)} posts with markers")
return posts
def _parse_with_markdown_headers(self, content: str) -> list[dict]:
"""Parse using markdown # headers as separators."""
# Split on markdown headers
parts = re.split(r"^(#\s+.+)$", content, flags=re.MULTILINE)
posts = []
current_title = None
current_content = []
for part in parts:
part = part.strip()
if not part:
continue
if self.MARKDOWN_HEADER_PATTERN.match(part):
# Save previous post if exists
if current_title and current_content:
posts.append({
"raw_content": f"{current_title}\n\n{''.join(current_content)}",
"title_hint": current_title.lstrip("#").strip(),
})
current_title = part
current_content = []
else:
current_content.append(part)
# Don't forget last post
if current_title and current_content:
posts.append({
"raw_content": f"{current_title}\n\n{''.join(current_content)}",
"title_hint": current_title.lstrip("#").strip(),
})
logger.debug(f"Found {len(posts)} posts with markdown headers")
return posts
def _parse_with_double_newlines(self, content: str) -> list[dict]:
"""Parse using triple+ newlines as separators (common blog format)."""
# Split on 3+ consecutive newlines
parts = re.split(r"\n{3,}", content)
posts = []
for part in parts:
part = part.strip()
if len(part) >= self.min_content_length:
posts.append({"raw_content": part})
logger.debug(f"Found {len(posts)} posts with double newlines")
return posts
def _parse_as_single_post(self, content: str) -> list[dict]:
"""Treat entire content as a single post."""
return [{"raw_content": content.strip()}]
def _clean_post(self, post_data: dict, index: int) -> Optional[BlogPost]:
"""
Clean and structure a raw post.
Args:
post_data: Dictionary with raw_content and optional hints
index: Post index in sequence
Returns:
BlogPost object or None if invalid
"""
raw_content = post_data.get("raw_content", "")
if not raw_content:
return None
content = raw_content
# Remove HTML tags if enabled
if self.remove_html:
content = self._remove_html_tags(content)
# Decode HTML entities
content = html.unescape(content)
# Normalize Unicode if enabled
if self.normalize_unicode:
content = self._normalize_unicode(content)
# Clean whitespace
content = self._clean_whitespace(content)
# Extract title
title = post_data.get("title_hint") or self._extract_title(content)
# Remove title from content if it appears at the start
if title and content.startswith(title):
content = content[len(title):].strip()
# Calculate stats
word_count = len(content.split())
char_count = len(content)
return BlogPost(
title=title,
content=content,
raw_content=raw_content,
word_count=word_count,
char_count=char_count,
index=index,
metadata=post_data.get("metadata", {}),
)
def _remove_html_tags(self, text: str) -> str:
"""Remove HTML tags from text."""
return self.HTML_TAG_PATTERN.sub("", text)
def _normalize_unicode(self, text: str) -> str:
"""Normalize Unicode characters to NFC form."""
# Normalize to NFC (Canonical Decomposition, followed by Canonical Composition)
text = unicodedata.normalize("NFC", text)
# Replace common problematic characters
replacements = {
"\u2018": "'", # Left single quote
"\u2019": "'", # Right single quote
"\u201c": '"', # Left double quote
"\u201d": '"', # Right double quote
"\u2013": "-", # En dash
"\u2014": "—", # Em dash (keep as-is, it's valid)
"\u2026": "...", # Ellipsis
"\u00a0": " ", # Non-breaking space
}
for old, new in replacements.items():
text = text.replace(old, new)
return text
def _clean_whitespace(self, text: str) -> str:
"""Clean excessive whitespace while preserving structure."""
# Replace multiple spaces/tabs with single space
text = self.MULTI_WHITESPACE_PATTERN.sub(" ", text)
# Replace 3+ newlines with 2
text = self.MULTI_NEWLINE_PATTERN.sub("\n\n", text)
# Strip leading/trailing whitespace from each line
lines = [line.strip() for line in text.split("\n")]
text = "\n".join(lines)
# Final strip
return text.strip()
def _extract_title(self, content: str) -> str:
"""
Extract title from content.
Strategy:
1. First line if it's short enough
2. First sentence if first line is too long
3. "Untitled" as fallback
"""
lines = content.split("\n")
first_line = lines[0].strip() if lines else ""
# Remove markdown header markers
first_line = re.sub(r"^#+\s*", "", first_line)
if first_line and len(first_line) <= 100:
return first_line
# Try first sentence
sentences = re.split(r"[.!?]", first_line)
if sentences and len(sentences[0]) <= 100:
return sentences[0].strip()
# Fallback
return "Untitled"
def _is_valid_post(self, post: BlogPost) -> bool:
"""Check if a post meets validity criteria."""
if not post.content:
return False
if post.char_count < self.min_content_length:
logger.debug(f"Post too short: {post.char_count} chars")
return False
if post.char_count > self.max_content_length:
logger.debug(f"Post too long: {post.char_count} chars")
return False
return True
def parse_string(self, content: str) -> list[BlogPost]:
"""
Parse blog content from a string instead of file.
Args:
content: Raw blog content string
Returns:
List of parsed BlogPost objects
"""
# Use same logic as file parsing
posts = self._parse_with_markers(content)
if not posts:
posts = self._parse_with_markdown_headers(content)
if not posts:
posts = self._parse_with_double_newlines(content)
if not posts:
posts = self._parse_as_single_post(content)
cleaned_posts = []
for i, post in enumerate(posts):
cleaned = self._clean_post(post, i)
if cleaned and self._is_valid_post(cleaned):
cleaned_posts.append(cleaned)
return cleaned_posts
def main():
"""CLI entry point for testing the parser."""
import argparse
import json
parser = argparse.ArgumentParser(
description="Parse blog content from a text file",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
python blog_parser.py data/raw/blogs.txt
python blog_parser.py data/raw/blogs.txt --output parsed.json
python blog_parser.py data/raw/blogs.txt --min-length 50
""",
)
parser.add_argument("input", help="Path to blogs.txt file")
parser.add_argument("--output", "-o", help="Output JSON file path")
parser.add_argument(
"--min-length",
type=int,
default=100,
help="Minimum content length (default: 100)",
)
parser.add_argument(
"--max-length",
type=int,
default=50000,
help="Maximum content length (default: 50000)",
)
parser.add_argument("--verbose", "-v", action="store_true", help="Verbose output")
args = parser.parse_args()
if args.verbose:
logger.enable("")
else:
logger.disable("")
blog_parser = BlogParser(
min_content_length=args.min_length,
max_content_length=args.max_length,
)
try:
posts = blog_parser.parse_file(args.input)
print(f"\nParsed {len(posts)} blog posts:")
print("-" * 50)
for post in posts:
print(f"\n[{post.index}] {post.title}")
print(f" Words: {post.word_count:,} | Chars: {post.char_count:,}")
if args.output:
output_data = [p.to_dict() for p in posts]
with open(args.output, "w", encoding="utf-8") as f:
json.dump(output_data, f, indent=2, ensure_ascii=False)
print(f"\nSaved to: {args.output}")
except Exception as e:
print(f"Error: {e}")
return 1
return 0
if __name__ == "__main__":
exit(main())