Spaces:
Sleeping
Sleeping
File size: 7,417 Bytes
54c99ad |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 |
import tiktoken
import re
from typing import List
from nltk.tokenize.punkt import PunktSentenceTokenizer
def chunk_text(text, max_tokens=3000):
"""Legacy function - kept for backwards compatibility"""
return chunk_text_semantic(text, "Other", max_tokens)
def count_tokens(text: str) -> int:
"""Count tokens using tiktoken"""
try:
enc = tiktoken.get_encoding("cl100k_base")
return len(enc.encode(text))
except Exception:
# Fallback to word-based estimate
return int(len(text.split()) * 1.3)
def split_into_sentences(text: str) -> List[str]:
"""Split text into sentences with improved handling"""
try:
tokenizer = PunktSentenceTokenizer()
sentences = tokenizer.tokenize(text)
return sentences
except Exception:
# Fallback to simple split
return [s.strip() + '.' for s in text.split('.') if s.strip()]
def find_topic_boundaries(text: str, interviewee_type: str) -> List[int]:
"""
Identify topic boundaries in the text for smarter chunking
Returns list of character positions where topics likely change
"""
boundaries = [0] # Start position
# Topic change indicators
topic_patterns = [
r'\n\n+', # Paragraph breaks
r'\[Interviewer\].*?(next|another|different|moving on|let\'s talk about)',
r'\[Interviewer\].*?\?.*?\n.*?\[(?:Doctor|Patient|Respondent)\]', # Q&A pairs
]
# Find all topic boundaries
for pattern in topic_patterns:
for match in re.finditer(pattern, text, re.IGNORECASE):
pos = match.start()
# Only add if not too close to existing boundary
if all(abs(pos - b) > 100 for b in boundaries):
boundaries.append(pos)
boundaries.append(len(text)) # End position
boundaries.sort()
return boundaries
def extract_speaker_segments(text: str) -> List[dict]:
"""
Extract segments with speaker labels and content
"""
pattern = r'\[([^\]]+)\]\s*([^\[]*)'
segments = []
for match in re.finditer(pattern, text, re.DOTALL):
speaker = match.group(1).strip()
content = match.group(2).strip()
if content:
segments.append({
"speaker": speaker,
"content": content,
"start_pos": match.start(),
"tokens": count_tokens(content)
})
return segments
def chunk_text_semantic(
text: str,
interviewee_type: str = "Other",
max_tokens: int = 3000,
overlap_tokens: int = 150
) -> List[str]:
"""
Advanced chunking that respects:
1. Speaker boundaries (don't split mid-sentence)
2. Topic boundaries (keep related Q&A together)
3. Token limits for LLM context
4. Overlap for context continuity
"""
# Check if text has speaker tags
has_tags = bool(re.search(r'\[[^\]]+\]', text))
if not has_tags:
# Fallback to sentence-based chunking
return chunk_by_sentences(text, max_tokens, overlap_tokens)
# Extract speaker segments
segments = extract_speaker_segments(text)
if not segments:
return chunk_by_sentences(text, max_tokens, overlap_tokens)
# Group segments into chunks
chunks = []
current_chunk_segments = []
current_tokens = 0
i = 0
while i < len(segments):
segment = segments[i]
segment_tokens = segment["tokens"]
# If single segment exceeds max_tokens, split it
if segment_tokens > max_tokens:
# Split long segment by sentences
sub_chunks = chunk_by_sentences(
f"[{segment['speaker']}] {segment['content']}",
max_tokens,
overlap_tokens
)
chunks.extend(sub_chunks)
i += 1
continue
# Check if adding this segment would exceed limit
if current_tokens + segment_tokens > max_tokens and current_chunk_segments:
# Finalize current chunk
chunk_text = "\n\n".join([
f"[{s['speaker']}] {s['content']}"
for s in current_chunk_segments
])
chunks.append(chunk_text)
# Start new chunk with overlap
# Keep last few segments for context
overlap_segments = []
overlap_token_count = 0
for seg in reversed(current_chunk_segments):
if overlap_token_count + seg["tokens"] < overlap_tokens:
overlap_segments.insert(0, seg)
overlap_token_count += seg["tokens"]
else:
break
current_chunk_segments = overlap_segments
current_tokens = overlap_token_count
# Add segment to current chunk
current_chunk_segments.append(segment)
current_tokens += segment_tokens
i += 1
# Add final chunk
if current_chunk_segments:
chunk_text = "\n\n".join([
f"[{s['speaker']}] {s['content']}"
for s in current_chunk_segments
])
chunks.append(chunk_text)
return chunks if chunks else [text]
def chunk_by_sentences(
text: str,
max_tokens: int = 3000,
overlap_tokens: int = 150
) -> List[str]:
"""
Fallback chunking method based on sentences
"""
sentences = split_into_sentences(text)
chunks = []
current_chunk = []
current_tokens = 0
for sentence in sentences:
sentence_tokens = count_tokens(sentence)
if current_tokens + sentence_tokens > max_tokens and current_chunk:
# Finalize current chunk
chunks.append(" ".join(current_chunk))
# Create overlap
overlap_sents = []
overlap_token_count = 0
for sent in reversed(current_chunk):
sent_tokens = count_tokens(sent)
if overlap_token_count + sent_tokens < overlap_tokens:
overlap_sents.insert(0, sent)
overlap_token_count += sent_tokens
else:
break
current_chunk = overlap_sents
current_tokens = overlap_token_count
current_chunk.append(sentence)
current_tokens += sentence_tokens
# Add final chunk
if current_chunk:
chunks.append(" ".join(current_chunk))
return chunks if chunks else [text]
def analyze_chunk_quality(chunks: List[str]) -> dict:
"""
Analyze chunking quality for debugging
"""
if not chunks:
return {"error": "No chunks"}
token_counts = [count_tokens(chunk) for chunk in chunks]
return {
"num_chunks": len(chunks),
"avg_tokens": sum(token_counts) / len(token_counts),
"min_tokens": min(token_counts),
"max_tokens": max(token_counts),
"total_tokens": sum(token_counts),
"chunks_over_limit": sum(1 for t in token_counts if t > 3000)
} |