File size: 4,623 Bytes
26fe9a7
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
from typing import List, Dict, Optional
import re

class Chunk:
    def __init__(self, content: str, metadata: Dict):
        self.content = content
        self.metadata = metadata

def split_text(text: str, chunk_size: int = 1000, chunk_overlap: int = 200) -> List[str]:
    """
    Split text into chunks with overlap.
    Simple recursive-like splitting on newlines and spaces.
    """
    if not text:
        return []
        
    chunks = []
    start = 0
    text_len = len(text)

    while start < text_len:
        end = start + chunk_size
        if end >= text_len:
            chunks.append(text[start:])
            break
            
        # Try to find a nice break point
        # Prioritize double newline, then newline, then space
        boundary = -1
        
        # Look for double newline within the overlap area
        search_start = max(start, end - chunk_overlap)
        
        double_newline_pos = text.rfind('\n\n', search_start, end)
        if double_newline_pos != -1:
            boundary = double_newline_pos + 2
        else:
            newline_pos = text.rfind('\n', search_start, end)
            if newline_pos != -1:
                boundary = newline_pos + 1
            else:
                space_pos = text.rfind(' ', search_start, end)
                if space_pos != -1:
                    boundary = space_pos + 1
        
        if boundary != -1:
            chunks.append(text[start:boundary])
            start = boundary
        else:
            # Force cut
            chunks.append(text[start:end])
            start = end - chunk_overlap  # Backtrack only if forced cut, or just continue?
            # Actually standard sliding window logic:
            # If we couldn't find a delimiter, we cut at 'end'.
            # To respect overlap, next chunk should start at end - overlap.
            start = max(start, end - chunk_overlap)

    return chunks

def extract_sections(text: str) -> List[Dict]:
    """
    Extract high-level sections based on markdown headers.
    Returns: [{'title': '...', 'content': '...', 'level': 1}, ...]
    """
    lines = text.split('\n')
    sections = []
    current_section = {"title": "Introduction", "content": [], "level": 0}
    
    for line in lines:
        match = re.match(r'^(#+)\s+(.*)', line)
        if match:
            # Save previous section
            if current_section["content"]:
                sections.append({
                    "title": current_section["title"],
                    "content": '\n'.join(current_section["content"]).strip(),
                    "level": current_section["level"]
                })
            
            level = len(match.group(1))
            title = match.group(2).strip()
            current_section = {"title": title, "content": [], "level": level}
        else:
            current_section["content"].append(line)
            
    # Append last
    if current_section["content"]:
         sections.append({
            "title": current_section["title"],
            "content": '\n'.join(current_section["content"]).strip(),
            "level": current_section["level"]
        })
        
    return sections

def create_chunks(text: str, metadata: Dict, chunk_size: int = 1000, chunk_overlap: int = 200) -> List[Chunk]:
    """
    Process text into Chunks with metadata.
    Tries to respect sections.
    """
    sections = extract_sections(text)
    all_chunks = []
    
    for section in sections:
        section_text = section['content']
        if not section_text:
            continue
            
        # Add section title context to the text or metadata? 
        # Ideally prepended to text for better retrieval context.
        # But we also store it in metadata.
        
        raw_chunks = split_text(section_text, chunk_size, chunk_overlap)
        
        for i, rc in enumerate(raw_chunks):
            # Prepend section title for context if it's not the main intro
            contextualized_content = rc
            if section['title'] != 'Introduction':
               contextualized_content = f"Section: {section['title']}\n{rc}"
            
            chunk_meta = metadata.copy()
            chunk_meta.update({
                "section_title": section['title'],
                "chunk_id": f"{metadata.get('doc_id', 'unknown')}_{section['title'][:10]}_{i}",
                "original_text": rc # Store original for precise citation if needed, or just use content
            })
            
            all_chunks.append(Chunk(content=contextualized_content, metadata=chunk_meta))
            
    return all_chunks