Spaces:
Sleeping
Sleeping
File size: 6,679 Bytes
1367957 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 | # embeddings/text_chunking.py
"""
Multiple text chunking strategies for research papers
"""
import re
from typing import List, Dict, Any
from abc import ABC, abstractmethod
class ChunkingStrategy(ABC):
"""Abstract base class for chunking strategies"""
@abstractmethod
def chunk_text(self, text: str, **kwargs) -> List[Dict[str, Any]]:
pass
class FixedSizeChunking(ChunkingStrategy):
"""Fixed size chunking with overlap"""
def chunk_text(self, text: str, chunk_size: int = 500, chunk_overlap: int = 50) -> List[Dict[str, Any]]:
chunks = []
start = 0
text_length = len(text)
while start < text_length:
end = min(start + chunk_size, text_length)
# Adjust chunk to not break in middle of word if possible
if end < text_length:
# Try to find a sentence boundary
sentence_end = text.rfind('. ', start, end)
if sentence_end != -1 and sentence_end > start + chunk_size // 2:
end = sentence_end + 1
else:
# Otherwise find a word boundary
word_end = text.rfind(' ', start, end)
if word_end != -1 and word_end > start + chunk_size // 2:
end = word_end
chunk = text[start:end].strip()
if chunk:
chunks.append({
'text': chunk,
'start_char': start,
'end_char': end,
'chunk_size': len(chunk)
})
start = end - chunk_overlap if end - chunk_overlap > start else end
return chunks
class SemanticChunking(ChunkingStrategy):
"""Semantic chunking based on paragraphs and sections"""
def chunk_text(self, text: str, max_chunk_size: int = 512) -> List[Dict[str, Any]]:
chunks = []
# Split by paragraphs first
paragraphs = [p.strip() for p in text.split('\n\n') if p.strip()]
current_chunk = ""
current_start = 0
for i, paragraph in enumerate(paragraphs):
# If adding this paragraph would exceed max size, save current chunk
if len(current_chunk) + len(paragraph) > max_chunk_size and current_chunk:
chunks.append({
'text': current_chunk.strip(),
'start_char': current_start,
'end_char': current_start + len(current_chunk),
'chunk_size': len(current_chunk),
'type': 'semantic'
})
current_chunk = ""
current_start = current_start + len(current_chunk)
# Add paragraph to current chunk
if current_chunk:
current_chunk += "\n\n" + paragraph
else:
current_chunk = paragraph
# Estimate start position (this is approximate)
current_start = text.find(paragraph)
# Add the last chunk
if current_chunk:
chunks.append({
'text': current_chunk.strip(),
'start_char': current_start,
'end_char': current_start + len(current_chunk),
'chunk_size': len(current_chunk),
'type': 'semantic'
})
return chunks
class ResearchPaperChunker:
"""Specialized chunker for research papers"""
def __init__(self, strategy: str = "semantic"):
self.strategy = strategy
self.chunkers = {
"fixed": FixedSizeChunking(),
"semantic": SemanticChunking()
}
def chunk_paper(self, paper: Dict[str, Any]) -> List[Dict[str, Any]]:
"""Chunk a research paper into manageable pieces"""
paper_id = paper.get('id', 'unknown')
title = paper.get('title', '')
abstract = paper.get('abstract', '')
# Combine title and abstract for chunking
full_text = f"Title: {title}\n\nAbstract: {abstract}"
# Get appropriate chunker
chunker = self.chunkers.get(self.strategy, self.chunkers["semantic"])
# Chunk the text
chunks = chunker.chunk_text(full_text)
# Add paper metadata to each chunk
for chunk in chunks:
chunk.update({
'paper_id': paper_id,
'paper_title': title,
'source': paper.get('source', ''),
'domain': paper.get('domain', ''),
'publication_date': paper.get('publication_date', ''),
'authors': paper.get('authors', []),
'chunk_strategy': self.strategy
})
return chunks
def batch_chunk_papers(self, papers: List[Dict], strategy: str = None) -> List[Dict[str, Any]]:
"""Chunk multiple papers"""
if strategy:
self.strategy = strategy
all_chunks = []
for paper in papers:
try:
chunks = self.chunk_paper(paper)
all_chunks.extend(chunks)
except Exception as e:
print(f"❌ Error chunking paper {paper.get('id', 'unknown')}: {e}")
continue
print(f"✅ Chunked {len(papers)} papers into {len(all_chunks)} chunks")
return all_chunks
# Quick test
def test_chunking_strategies():
"""Test different chunking strategies"""
test_paper = {
'id': 'test_001',
'title': 'Deep Learning for Medical Image Analysis',
'abstract': 'This paper explores the application of deep learning techniques in medical image analysis. We propose a novel transformer-based architecture that achieves state-of-the-art performance on multiple benchmark datasets. Our method improves accuracy by 15% compared to previous approaches. The model is evaluated on CT, MRI, and X-ray datasets showing consistent improvements across modalities.',
'source': 'test',
'domain': 'medical_imaging'
}
chunker = ResearchPaperChunker()
print("🧪 Testing Chunking Strategies")
print("=" * 50)
for strategy in ["fixed", "semantic"]:
print(f"\n🔬 Strategy: {strategy}")
chunks = chunker.chunk_paper(test_paper)
print(f" Number of chunks: {len(chunks)}")
for i, chunk in enumerate(chunks):
print(f" Chunk {i + 1}: {chunk['chunk_size']} chars - {chunk['text'][:80]}...")
if __name__ == "__main__":
test_chunking_strategies() |