File size: 7,141 Bytes
4e5fc16
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
"""Data processing pipeline for Francis Botcon."""

import json
from pathlib import Path
from typing import List, Dict, Tuple
import sys

from src.text_processor import TextCleaner, TextSegmenter, process_raw_file
from src.logger import LoggerSetup
from src.config_loader import config

logger = LoggerSetup.setup().getChild(__name__)


class DataProcessor:
    """Process raw texts into cleaned and segmented datasets."""

    def __init__(self, raw_dir: str = None, processed_dir: str = None):
        """Initialize data processor.



        Args:

            raw_dir: Directory containing raw text files

            processed_dir: Directory for processed outputs

        """
        self.raw_dir = Path(raw_dir or config.get("data.raw_dir", "./data/raw"))
        self.processed_dir = Path(processed_dir or config.get("data.processed_dir", "./data/processed"))

        # Create directories if they don't exist
        self.processed_dir.mkdir(parents=True, exist_ok=True)

        logger.info(f"Raw data directory: {self.raw_dir}")
        logger.info(f"Processed data directory: {self.processed_dir}")

    def process_all_files(self) -> List[Dict[str, str]]:
        """Process all raw text files.



        Returns:

            List of processed document records

        """
        raw_files = list(self.raw_dir.glob("*.txt"))
        logger.info(f"Found {len(raw_files)} raw text files")

        all_segments = []

        for i, file_path in enumerate(raw_files, 1):
            logger.info(f"Processing [{i}/{len(raw_files)}]: {file_path.name}")

            try:
                cleaned_text, filename = process_raw_file(file_path)
                title, author = TextSegmenter.extract_title_and_author(cleaned_text)

                # Segment text
                segments = TextSegmenter.segment_by_paragraphs(cleaned_text, min_length=100)
                logger.info(f"  → Segmented into {len(segments)} paragraphs")

                # Create records
                for j, segment in enumerate(segments):
                    record = {
                        "id": f"{filename}_para_{j}",
                        "source": filename,
                        "title": title,
                        "author": author,
                        "segment_index": j,
                        "text": segment,
                        "length": len(segment)
                    }
                    all_segments.append(record)

            except Exception as e:
                logger.error(f"  ✗ Error processing {file_path.name}: {str(e)}")
                continue

        logger.info(f"Total segments created: {len(all_segments)}")
        return all_segments

    def save_processed_data(self, segments: List[Dict[str, str]]) -> Path:
        """Save processed segments to JSONL file.



        Args:

            segments: List of processed segments



        Returns:

            Path to saved file

        """
        output_path = self.processed_dir / "processed_segments.jsonl"

        logger.info(f"Saving {len(segments)} segments to {output_path}")

        with open(output_path, 'w', encoding='utf-8') as f:
            for segment in segments:
                f.write(json.dumps(segment, ensure_ascii=False) + '\n')

        logger.info(f"✓ Saved to {output_path}")
        return output_path

    def create_training_examples(self, segments: List[Dict[str, str]]) -> List[Dict[str, str]]:
        """Create instruction-response training examples from segments.



        Args:

            segments: Processed text segments



        Returns:

            List of training examples

        """
        examples = []

        # Example templates for generating instruction-response pairs
        templates = [
            {
                "instruction": "Explain this passage from your works as if speaking to a contemporary scholar:",
                "prefix": "In this passage, I discuss: "
            },
            {
                "instruction": "What philosophical principle does this text embody?",
                "prefix": "This passage exemplifies the principle that "
            },
            {
                "instruction": "Summarize the main argument of this passage:",
                "prefix": "The essential point I make here is that "
            },
        ]

        logger.info(f"Creating training examples from {len(segments)} segments")

        for i, segment in enumerate(segments):
            text = segment["text"]

            # Skip very short segments
            if len(text.split()) < 20:
                continue

            # Create multiple examples from each segment
            for template in templates[:1]:  # Use at least first template
                example = {
                    "instruction": template["instruction"],
                    "input": text[:200] + "..." if len(text) > 200 else text,
                    "output": template["prefix"] + text[:300],
                    "source": segment["source"],
                    "segment_id": segment["id"]
                }
                examples.append(example)

        logger.info(f"Created {len(examples)} training examples")
        return examples

    def save_training_data(self, examples: List[Dict[str, str]]) -> Path:
        """Save training examples to JSON file.



        Args:

            examples: Training examples



        Returns:

            Path to saved file

        """
        output_path = self.processed_dir / "training_examples.json"

        logger.info(f"Saving {len(examples)} training examples to {output_path}")

        with open(output_path, 'w', encoding='utf-8') as f:
            json.dump(examples, f, ensure_ascii=False, indent=2)

        logger.info(f"✓ Saved to {output_path}")
        return output_path

    def process_pipeline(self) -> Tuple[Path, Path]:
        """Run complete data processing pipeline.



        Returns:

            Tuple of (processed_segments_path, training_data_path)

        """
        logger.info("=" * 60)
        logger.info("Starting data processing pipeline")
        logger.info("=" * 60)

        # Process all files
        segments = self.process_all_files()

        # Save processed segments
        segments_path = self.save_processed_data(segments)

        # Create training examples
        training_examples = self.create_training_examples(segments)

        # Save training data
        training_path = self.save_training_data(training_examples)

        logger.info("=" * 60)
        logger.info("Data processing pipeline completed successfully!")
        logger.info(f"Processed segments: {len(segments)}")
        logger.info(f"Training examples: {len(training_examples)}")
        logger.info("=" * 60)

        return segments_path, training_path


def main():
    """Main entry point for data processing."""
    processor = DataProcessor()
    processor.process_pipeline()


if __name__ == "__main__":
    main()