"""Data processing pipeline for Francis Botcon.""" import json from pathlib import Path from typing import List, Dict, Tuple import sys from src.text_processor import TextCleaner, TextSegmenter, process_raw_file from src.logger import LoggerSetup from src.config_loader import config logger = LoggerSetup.setup().getChild(__name__) class DataProcessor: """Process raw texts into cleaned and segmented datasets.""" def __init__(self, raw_dir: str = None, processed_dir: str = None): """Initialize data processor. Args: raw_dir: Directory containing raw text files processed_dir: Directory for processed outputs """ self.raw_dir = Path(raw_dir or config.get("data.raw_dir", "./data/raw")) self.processed_dir = Path(processed_dir or config.get("data.processed_dir", "./data/processed")) # Create directories if they don't exist self.processed_dir.mkdir(parents=True, exist_ok=True) logger.info(f"Raw data directory: {self.raw_dir}") logger.info(f"Processed data directory: {self.processed_dir}") def process_all_files(self) -> List[Dict[str, str]]: """Process all raw text files. Returns: List of processed document records """ raw_files = list(self.raw_dir.glob("*.txt")) logger.info(f"Found {len(raw_files)} raw text files") all_segments = [] for i, file_path in enumerate(raw_files, 1): logger.info(f"Processing [{i}/{len(raw_files)}]: {file_path.name}") try: cleaned_text, filename = process_raw_file(file_path) title, author = TextSegmenter.extract_title_and_author(cleaned_text) # Segment text segments = TextSegmenter.segment_by_paragraphs(cleaned_text, min_length=100) logger.info(f" → Segmented into {len(segments)} paragraphs") # Create records for j, segment in enumerate(segments): record = { "id": f"{filename}_para_{j}", "source": filename, "title": title, "author": author, "segment_index": j, "text": segment, "length": len(segment) } all_segments.append(record) except Exception as e: logger.error(f" ✗ Error processing {file_path.name}: {str(e)}") continue logger.info(f"Total segments created: {len(all_segments)}") return all_segments def save_processed_data(self, segments: List[Dict[str, str]]) -> Path: """Save processed segments to JSONL file. Args: segments: List of processed segments Returns: Path to saved file """ output_path = self.processed_dir / "processed_segments.jsonl" logger.info(f"Saving {len(segments)} segments to {output_path}") with open(output_path, 'w', encoding='utf-8') as f: for segment in segments: f.write(json.dumps(segment, ensure_ascii=False) + '\n') logger.info(f"✓ Saved to {output_path}") return output_path def create_training_examples(self, segments: List[Dict[str, str]]) -> List[Dict[str, str]]: """Create instruction-response training examples from segments. Args: segments: Processed text segments Returns: List of training examples """ examples = [] # Example templates for generating instruction-response pairs templates = [ { "instruction": "Explain this passage from your works as if speaking to a contemporary scholar:", "prefix": "In this passage, I discuss: " }, { "instruction": "What philosophical principle does this text embody?", "prefix": "This passage exemplifies the principle that " }, { "instruction": "Summarize the main argument of this passage:", "prefix": "The essential point I make here is that " }, ] logger.info(f"Creating training examples from {len(segments)} segments") for i, segment in enumerate(segments): text = segment["text"] # Skip very short segments if len(text.split()) < 20: continue # Create multiple examples from each segment for template in templates[:1]: # Use at least first template example = { "instruction": template["instruction"], "input": text[:200] + "..." if len(text) > 200 else text, "output": template["prefix"] + text[:300], "source": segment["source"], "segment_id": segment["id"] } examples.append(example) logger.info(f"Created {len(examples)} training examples") return examples def save_training_data(self, examples: List[Dict[str, str]]) -> Path: """Save training examples to JSON file. Args: examples: Training examples Returns: Path to saved file """ output_path = self.processed_dir / "training_examples.json" logger.info(f"Saving {len(examples)} training examples to {output_path}") with open(output_path, 'w', encoding='utf-8') as f: json.dump(examples, f, ensure_ascii=False, indent=2) logger.info(f"✓ Saved to {output_path}") return output_path def process_pipeline(self) -> Tuple[Path, Path]: """Run complete data processing pipeline. Returns: Tuple of (processed_segments_path, training_data_path) """ logger.info("=" * 60) logger.info("Starting data processing pipeline") logger.info("=" * 60) # Process all files segments = self.process_all_files() # Save processed segments segments_path = self.save_processed_data(segments) # Create training examples training_examples = self.create_training_examples(segments) # Save training data training_path = self.save_training_data(training_examples) logger.info("=" * 60) logger.info("Data processing pipeline completed successfully!") logger.info(f"Processed segments: {len(segments)}") logger.info(f"Training examples: {len(training_examples)}") logger.info("=" * 60) return segments_path, training_path def main(): """Main entry point for data processing.""" processor = DataProcessor() processor.process_pipeline() if __name__ == "__main__": main()