Spaces:
Sleeping
Sleeping
File size: 4,623 Bytes
26fe9a7 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 |
from typing import List, Dict, Optional
import re
class Chunk:
def __init__(self, content: str, metadata: Dict):
self.content = content
self.metadata = metadata
def split_text(text: str, chunk_size: int = 1000, chunk_overlap: int = 200) -> List[str]:
"""
Split text into chunks with overlap.
Simple recursive-like splitting on newlines and spaces.
"""
if not text:
return []
chunks = []
start = 0
text_len = len(text)
while start < text_len:
end = start + chunk_size
if end >= text_len:
chunks.append(text[start:])
break
# Try to find a nice break point
# Prioritize double newline, then newline, then space
boundary = -1
# Look for double newline within the overlap area
search_start = max(start, end - chunk_overlap)
double_newline_pos = text.rfind('\n\n', search_start, end)
if double_newline_pos != -1:
boundary = double_newline_pos + 2
else:
newline_pos = text.rfind('\n', search_start, end)
if newline_pos != -1:
boundary = newline_pos + 1
else:
space_pos = text.rfind(' ', search_start, end)
if space_pos != -1:
boundary = space_pos + 1
if boundary != -1:
chunks.append(text[start:boundary])
start = boundary
else:
# Force cut
chunks.append(text[start:end])
start = end - chunk_overlap # Backtrack only if forced cut, or just continue?
# Actually standard sliding window logic:
# If we couldn't find a delimiter, we cut at 'end'.
# To respect overlap, next chunk should start at end - overlap.
start = max(start, end - chunk_overlap)
return chunks
def extract_sections(text: str) -> List[Dict]:
"""
Extract high-level sections based on markdown headers.
Returns: [{'title': '...', 'content': '...', 'level': 1}, ...]
"""
lines = text.split('\n')
sections = []
current_section = {"title": "Introduction", "content": [], "level": 0}
for line in lines:
match = re.match(r'^(#+)\s+(.*)', line)
if match:
# Save previous section
if current_section["content"]:
sections.append({
"title": current_section["title"],
"content": '\n'.join(current_section["content"]).strip(),
"level": current_section["level"]
})
level = len(match.group(1))
title = match.group(2).strip()
current_section = {"title": title, "content": [], "level": level}
else:
current_section["content"].append(line)
# Append last
if current_section["content"]:
sections.append({
"title": current_section["title"],
"content": '\n'.join(current_section["content"]).strip(),
"level": current_section["level"]
})
return sections
def create_chunks(text: str, metadata: Dict, chunk_size: int = 1000, chunk_overlap: int = 200) -> List[Chunk]:
"""
Process text into Chunks with metadata.
Tries to respect sections.
"""
sections = extract_sections(text)
all_chunks = []
for section in sections:
section_text = section['content']
if not section_text:
continue
# Add section title context to the text or metadata?
# Ideally prepended to text for better retrieval context.
# But we also store it in metadata.
raw_chunks = split_text(section_text, chunk_size, chunk_overlap)
for i, rc in enumerate(raw_chunks):
# Prepend section title for context if it's not the main intro
contextualized_content = rc
if section['title'] != 'Introduction':
contextualized_content = f"Section: {section['title']}\n{rc}"
chunk_meta = metadata.copy()
chunk_meta.update({
"section_title": section['title'],
"chunk_id": f"{metadata.get('doc_id', 'unknown')}_{section['title'][:10]}_{i}",
"original_text": rc # Store original for precise citation if needed, or just use content
})
all_chunks.append(Chunk(content=contextualized_content, metadata=chunk_meta))
return all_chunks
|