File size: 7,605 Bytes
6ef4823
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
"""Document processor for parsing and chunking HPMOR HTML."""

import re
import json
from pathlib import Path
from typing import List, Dict, Optional
from bs4 import BeautifulSoup
from llama_index.core import Document
from llama_index.core.node_parser import SentenceSplitter
from src.config import config


class HPMORProcessor:
    """Process HPMOR HTML document into chunks for RAG."""

    def __init__(self):
        self.chunk_size = config.chunk_size
        self.chunk_overlap = config.chunk_overlap
        self.processed_dir = config.processed_data_dir

    def parse_html(self, file_path: Path) -> List[Dict]:
        """Parse HTML file and extract chapters with metadata."""
        print(f"Parsing HTML file: {file_path}")

        with open(file_path, 'r', encoding='utf-8') as f:
            html_content = f.read()

        soup = BeautifulSoup(html_content, 'lxml')

        # Remove style and script tags
        for tag in soup(['style', 'script']):
            tag.decompose()

        # Try to identify chapters by common patterns
        chapters = []
        chapter_pattern = re.compile(r'Chapter\s+(\d+)', re.IGNORECASE)

        # Find all h1, h2, h3 tags that might be chapter headers
        headers = soup.find_all(['h1', 'h2', 'h3'])

        current_chapter = None
        current_content = []
        chapter_num = 0

        for header in headers:
            header_text = header.get_text(strip=True)
            match = chapter_pattern.search(header_text)

            if match:
                # Save previous chapter if exists
                if current_chapter and current_content:
                    chapters.append({
                        'chapter_number': current_chapter['number'],
                        'chapter_title': current_chapter['title'],
                        'content': '\n'.join(current_content)
                    })

                # Start new chapter
                chapter_num = int(match.group(1))
                current_chapter = {
                    'number': chapter_num,
                    'title': header_text
                }
                current_content = []

                # Get content after this header until next chapter
                for sibling in header.find_next_siblings():
                    if sibling.name in ['h1', 'h2', 'h3']:
                        if chapter_pattern.search(sibling.get_text()):
                            break
                    text = sibling.get_text(strip=True)
                    if text:
                        current_content.append(text)

        # Add the last chapter
        if current_chapter and current_content:
            chapters.append({
                'chapter_number': current_chapter['number'],
                'chapter_title': current_chapter['title'],
                'content': '\n'.join(current_content)
            })

        # If no chapters found, treat entire content as one document
        if not chapters:
            print("No chapter structure found, processing as single document")
            text_content = soup.get_text(separator='\n', strip=True)
            chapters = [{
                'chapter_number': 0,
                'chapter_title': 'Harry Potter and the Methods of Rationality',
                'content': text_content
            }]

        print(f"Extracted {len(chapters)} chapters")
        return chapters

    def create_chunks(self, chapters: List[Dict]) -> List[Document]:
        """Create overlapping chunks from chapters."""
        print(f"Creating chunks with size={self.chunk_size}, overlap={self.chunk_overlap}")

        documents = []
        splitter = SentenceSplitter(
            chunk_size=self.chunk_size,
            chunk_overlap=self.chunk_overlap,
        )

        for chapter in chapters:
            # Create a document for the chapter
            chapter_doc = Document(
                text=chapter['content'],
                metadata={
                    'chapter_number': chapter['chapter_number'],
                    'chapter_title': chapter['chapter_title'],
                    'source': 'hpmor.html'
                }
            )

            # Split into chunks
            nodes = splitter.get_nodes_from_documents([chapter_doc])

            # Convert nodes back to documents with enhanced metadata
            for i, node in enumerate(nodes):
                doc = Document(
                    text=node.text,
                    metadata={
                        **chapter_doc.metadata,
                        'chunk_id': f"ch{chapter['chapter_number']}_chunk{i}",
                        'chunk_index': i,
                        'total_chunks_in_chapter': len(nodes)
                    }
                )
                documents.append(doc)

        print(f"Created {len(documents)} chunks total")
        return documents

    def save_processed_data(self, documents: List[Document], chapters: List[Dict]) -> None:
        """Save processed documents and metadata to disk."""
        # Save documents as JSON for easy loading
        docs_data = []
        for doc in documents:
            docs_data.append({
                'text': doc.text,
                'metadata': doc.metadata
            })

        docs_file = self.processed_dir / 'documents.json'
        with open(docs_file, 'w', encoding='utf-8') as f:
            json.dump(docs_data, f, indent=2, ensure_ascii=False)
        print(f"Saved {len(docs_data)} documents to {docs_file}")

        # Save chapter metadata
        chapters_file = self.processed_dir / 'chapters.json'
        with open(chapters_file, 'w', encoding='utf-8') as f:
            json.dump(chapters, f, indent=2, ensure_ascii=False)
        print(f"Saved chapter metadata to {chapters_file}")

    def load_processed_data(self) -> Optional[List[Document]]:
        """Load previously processed documents."""
        docs_file = self.processed_dir / 'documents.json'

        if not docs_file.exists():
            return None

        with open(docs_file, 'r', encoding='utf-8') as f:
            docs_data = json.load(f)

        documents = []
        for doc_data in docs_data:
            doc = Document(
                text=doc_data['text'],
                metadata=doc_data['metadata']
            )
            documents.append(doc)

        print(f"Loaded {len(documents)} documents from cache")
        return documents

    def process(self, force_reprocess: bool = False) -> List[Document]:
        """Main processing pipeline."""
        # Check if already processed
        if not force_reprocess:
            documents = self.load_processed_data()
            if documents:
                return documents

        # Process from scratch
        print("Processing HPMOR document from scratch...")

        if not config.hpmor_file.exists():
            raise FileNotFoundError(f"HPMOR file not found: {config.hpmor_file}")

        # Parse HTML
        chapters = self.parse_html(config.hpmor_file)

        # Create chunks
        documents = self.create_chunks(chapters)

        # Save processed data
        self.save_processed_data(documents, chapters)

        return documents


def main():
    """Process HPMOR document."""
    processor = HPMORProcessor()
    documents = processor.process(force_reprocess=True)
    print(f"\nProcessing complete! Created {len(documents)} document chunks.")

    # Show sample
    if documents:
        print("\nSample chunk:")
        print(f"Text: {documents[0].text[:200]}...")
        print(f"Metadata: {documents[0].metadata}")


if __name__ == "__main__":
    main()