Spaces:
Sleeping
Sleeping
| """Data processing pipeline for Francis Botcon.""" | |
| import json | |
| from pathlib import Path | |
| from typing import List, Dict, Tuple | |
| import sys | |
| from src.text_processor import TextCleaner, TextSegmenter, process_raw_file | |
| from src.logger import LoggerSetup | |
| from src.config_loader import config | |
| logger = LoggerSetup.setup().getChild(__name__) | |
| class DataProcessor: | |
| """Process raw texts into cleaned and segmented datasets.""" | |
| def __init__(self, raw_dir: str = None, processed_dir: str = None): | |
| """Initialize data processor. | |
| Args: | |
| raw_dir: Directory containing raw text files | |
| processed_dir: Directory for processed outputs | |
| """ | |
| self.raw_dir = Path(raw_dir or config.get("data.raw_dir", "./data/raw")) | |
| self.processed_dir = Path(processed_dir or config.get("data.processed_dir", "./data/processed")) | |
| # Create directories if they don't exist | |
| self.processed_dir.mkdir(parents=True, exist_ok=True) | |
| logger.info(f"Raw data directory: {self.raw_dir}") | |
| logger.info(f"Processed data directory: {self.processed_dir}") | |
| def process_all_files(self) -> List[Dict[str, str]]: | |
| """Process all raw text files. | |
| Returns: | |
| List of processed document records | |
| """ | |
| raw_files = list(self.raw_dir.glob("*.txt")) | |
| logger.info(f"Found {len(raw_files)} raw text files") | |
| all_segments = [] | |
| for i, file_path in enumerate(raw_files, 1): | |
| logger.info(f"Processing [{i}/{len(raw_files)}]: {file_path.name}") | |
| try: | |
| cleaned_text, filename = process_raw_file(file_path) | |
| title, author = TextSegmenter.extract_title_and_author(cleaned_text) | |
| # Segment text | |
| segments = TextSegmenter.segment_by_paragraphs(cleaned_text, min_length=100) | |
| logger.info(f" → Segmented into {len(segments)} paragraphs") | |
| # Create records | |
| for j, segment in enumerate(segments): | |
| record = { | |
| "id": f"{filename}_para_{j}", | |
| "source": filename, | |
| "title": title, | |
| "author": author, | |
| "segment_index": j, | |
| "text": segment, | |
| "length": len(segment) | |
| } | |
| all_segments.append(record) | |
| except Exception as e: | |
| logger.error(f" ✗ Error processing {file_path.name}: {str(e)}") | |
| continue | |
| logger.info(f"Total segments created: {len(all_segments)}") | |
| return all_segments | |
| def save_processed_data(self, segments: List[Dict[str, str]]) -> Path: | |
| """Save processed segments to JSONL file. | |
| Args: | |
| segments: List of processed segments | |
| Returns: | |
| Path to saved file | |
| """ | |
| output_path = self.processed_dir / "processed_segments.jsonl" | |
| logger.info(f"Saving {len(segments)} segments to {output_path}") | |
| with open(output_path, 'w', encoding='utf-8') as f: | |
| for segment in segments: | |
| f.write(json.dumps(segment, ensure_ascii=False) + '\n') | |
| logger.info(f"✓ Saved to {output_path}") | |
| return output_path | |
| def create_training_examples(self, segments: List[Dict[str, str]]) -> List[Dict[str, str]]: | |
| """Create instruction-response training examples from segments. | |
| Args: | |
| segments: Processed text segments | |
| Returns: | |
| List of training examples | |
| """ | |
| examples = [] | |
| # Example templates for generating instruction-response pairs | |
| templates = [ | |
| { | |
| "instruction": "Explain this passage from your works as if speaking to a contemporary scholar:", | |
| "prefix": "In this passage, I discuss: " | |
| }, | |
| { | |
| "instruction": "What philosophical principle does this text embody?", | |
| "prefix": "This passage exemplifies the principle that " | |
| }, | |
| { | |
| "instruction": "Summarize the main argument of this passage:", | |
| "prefix": "The essential point I make here is that " | |
| }, | |
| ] | |
| logger.info(f"Creating training examples from {len(segments)} segments") | |
| for i, segment in enumerate(segments): | |
| text = segment["text"] | |
| # Skip very short segments | |
| if len(text.split()) < 20: | |
| continue | |
| # Create multiple examples from each segment | |
| for template in templates[:1]: # Use at least first template | |
| example = { | |
| "instruction": template["instruction"], | |
| "input": text[:200] + "..." if len(text) > 200 else text, | |
| "output": template["prefix"] + text[:300], | |
| "source": segment["source"], | |
| "segment_id": segment["id"] | |
| } | |
| examples.append(example) | |
| logger.info(f"Created {len(examples)} training examples") | |
| return examples | |
| def save_training_data(self, examples: List[Dict[str, str]]) -> Path: | |
| """Save training examples to JSON file. | |
| Args: | |
| examples: Training examples | |
| Returns: | |
| Path to saved file | |
| """ | |
| output_path = self.processed_dir / "training_examples.json" | |
| logger.info(f"Saving {len(examples)} training examples to {output_path}") | |
| with open(output_path, 'w', encoding='utf-8') as f: | |
| json.dump(examples, f, ensure_ascii=False, indent=2) | |
| logger.info(f"✓ Saved to {output_path}") | |
| return output_path | |
| def process_pipeline(self) -> Tuple[Path, Path]: | |
| """Run complete data processing pipeline. | |
| Returns: | |
| Tuple of (processed_segments_path, training_data_path) | |
| """ | |
| logger.info("=" * 60) | |
| logger.info("Starting data processing pipeline") | |
| logger.info("=" * 60) | |
| # Process all files | |
| segments = self.process_all_files() | |
| # Save processed segments | |
| segments_path = self.save_processed_data(segments) | |
| # Create training examples | |
| training_examples = self.create_training_examples(segments) | |
| # Save training data | |
| training_path = self.save_training_data(training_examples) | |
| logger.info("=" * 60) | |
| logger.info("Data processing pipeline completed successfully!") | |
| logger.info(f"Processed segments: {len(segments)}") | |
| logger.info(f"Training examples: {len(training_examples)}") | |
| logger.info("=" * 60) | |
| return segments_path, training_path | |
| def main(): | |
| """Main entry point for data processing.""" | |
| processor = DataProcessor() | |
| processor.process_pipeline() | |
| if __name__ == "__main__": | |
| main() | |