Spaces:
Paused
Paused
File size: 7,894 Bytes
45ee481 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 | #!/usr/bin/env python3
"""
Process Blogs CLI
Parse raw blog content, segment into chunks, and analyze writing style.
This is the first step in the data processing pipeline.
Usage:
python scripts/process_blogs.py --input data/raw/blogs.txt --output data/processed/
Output:
- posts.json: Parsed blog posts with metadata
- segments.json: Text segments for training
- style_profile.json: Writing style analysis
"""
import argparse
import json
import sys
from pathlib import Path
# Add src to path for imports
sys.path.insert(0, str(Path(__file__).parent.parent))
from rich.console import Console
from rich.progress import Progress, SpinnerColumn, TextColumn
from rich.table import Table
from src.data_processing.blog_parser import BlogParser
from src.data_processing.text_segmenter import TextSegmenter
from src.data_processing.style_analyzer import StyleAnalyzer
console = Console()
def main():
parser = argparse.ArgumentParser(
description="Process raw blog content into training-ready data",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
# Basic usage
python scripts/process_blogs.py --input data/raw/blogs.txt --output data/processed/
# Custom segmentation
python scripts/process_blogs.py --input data/raw/blogs.txt --output data/processed/ \\
--target-tokens 256 --overlap 30
# Skip style analysis
python scripts/process_blogs.py --input data/raw/blogs.txt --output data/processed/ \\
--skip-style-analysis
Input format (blogs.txt):
=== BLOG START ===
Title of first blog post
Content of first blog post...
=== BLOG END ===
=== BLOG START ===
Title of second blog post
Content...
=== BLOG END ===
""",
)
parser.add_argument(
"--input", "-i",
default="data/raw/blogs.txt",
help="Path to blogs.txt file (default: data/raw/blogs.txt)",
)
parser.add_argument(
"--output", "-o",
default="data/processed/",
help="Output directory (default: data/processed/)",
)
parser.add_argument(
"--target-tokens",
type=int,
default=384,
help="Target tokens per segment (default: 384)",
)
parser.add_argument(
"--min-tokens",
type=int,
default=100,
help="Minimum tokens per segment (default: 100)",
)
parser.add_argument(
"--max-tokens",
type=int,
default=512,
help="Maximum tokens per segment (default: 512)",
)
parser.add_argument(
"--overlap",
type=int,
default=50,
help="Overlap tokens between segments (default: 50)",
)
parser.add_argument(
"--min-post-length",
type=int,
default=100,
help="Minimum characters for a valid post (default: 100)",
)
parser.add_argument(
"--skip-style-analysis",
action="store_true",
help="Skip writing style analysis",
)
parser.add_argument(
"--verbose", "-v",
action="store_true",
help="Verbose output",
)
args = parser.parse_args()
# Validate input file
input_path = Path(args.input)
if not input_path.exists():
console.print(f"[red]Error:[/red] Input file not found: {input_path}")
console.print("\nPlease create the file with your blog content in this format:")
console.print(" === BLOG START ===")
console.print(" [Title]")
console.print(" [Content...]")
console.print(" === BLOG END ===")
return 1
# Create output directory
output_dir = Path(args.output)
output_dir.mkdir(parents=True, exist_ok=True)
console.print("\n[bold blue]AI Executive - Blog Processor[/bold blue]")
console.print("=" * 50)
# Step 1: Parse blogs
console.print("\n[yellow]Step 1:[/yellow] Parsing blog content...")
blog_parser = BlogParser(min_content_length=args.min_post_length)
try:
posts = blog_parser.parse_file(input_path)
except Exception as e:
console.print(f"[red]Error parsing blogs:[/red] {e}")
return 1
# Save posts
posts_data = [p.to_dict() for p in posts]
posts_path = output_dir / "posts.json"
with open(posts_path, "w", encoding="utf-8") as f:
json.dump(posts_data, f, indent=2, ensure_ascii=False)
console.print(f" [green]β[/green] Parsed {len(posts)} blog posts")
console.print(f" [green]β[/green] Saved to: {posts_path}")
# Show post summary
if args.verbose:
table = Table(title="Parsed Posts")
table.add_column("Index", justify="right", style="cyan")
table.add_column("Title", style="white")
table.add_column("Words", justify="right", style="green")
for post in posts[:10]: # Show first 10
table.add_row(
str(post.index),
post.title[:50] + "..." if len(post.title) > 50 else post.title,
f"{post.word_count:,}",
)
if len(posts) > 10:
table.add_row("...", f"({len(posts) - 10} more)", "...")
console.print(table)
# Step 2: Segment text
console.print("\n[yellow]Step 2:[/yellow] Segmenting into chunks...")
segmenter = TextSegmenter(
target_tokens=args.target_tokens,
min_tokens=args.min_tokens,
max_tokens=args.max_tokens,
overlap_tokens=args.overlap,
)
segments = segmenter.segment_posts(posts)
# Save segments
segments_data = [s.to_dict() for s in segments]
segments_path = output_dir / "segments.json"
with open(segments_path, "w", encoding="utf-8") as f:
json.dump(segments_data, f, indent=2, ensure_ascii=False)
console.print(f" [green]β[/green] Created {len(segments)} segments")
console.print(f" [green]β[/green] Saved to: {segments_path}")
# Show segment statistics
token_counts = [s.token_count for s in segments]
avg_tokens = sum(token_counts) / len(token_counts) if token_counts else 0
console.print(f" [dim]Average tokens/segment: {avg_tokens:.1f}[/dim]")
console.print(f" [dim]Token range: {min(token_counts)} - {max(token_counts)}[/dim]")
# Step 3: Style analysis (optional)
if not args.skip_style_analysis:
console.print("\n[yellow]Step 3:[/yellow] Analyzing writing style...")
analyzer = StyleAnalyzer()
profile = analyzer.analyze_posts(posts)
# Save profile
profile_path = output_dir / "style_profile.json"
profile.save(profile_path)
console.print(f" [green]β[/green] Vocabulary size: {profile.vocabulary_size:,} words")
console.print(f" [green]β[/green] Avg sentence length: {profile.avg_words_per_sentence:.1f} words")
console.print(f" [green]β[/green] Formality score: {profile.formality_score:.2f}")
console.print(f" [green]β[/green] Saved to: {profile_path}")
if args.verbose and profile.top_words:
console.print("\n [dim]Top 10 words:[/dim]")
for item in profile.top_words[:10]:
console.print(f" {item['word']}: {item['count']}")
else:
console.print("\n[yellow]Step 3:[/yellow] [dim]Skipped style analysis[/dim]")
# Summary
console.print("\n" + "=" * 50)
console.print("[bold green]Processing complete![/bold green]")
console.print(f"\nOutput files in: {output_dir}")
console.print(f" - posts.json ({len(posts)} posts)")
console.print(f" - segments.json ({len(segments)} segments)")
if not args.skip_style_analysis:
console.print(" - style_profile.json")
console.print("\n[dim]Next step: Generate Q&A training pairs[/dim]")
console.print(f"[dim] python scripts/generate_training_data.py --input {segments_path}[/dim]")
return 0
if __name__ == "__main__":
exit(main())
|