TranscriptWriting / chunking.py
jmisak's picture
Upload 23 files
54c99ad verified
raw
history blame
7.42 kB
import tiktoken
import re
from typing import List
from nltk.tokenize.punkt import PunktSentenceTokenizer
def chunk_text(text, max_tokens=3000):
"""Legacy function - kept for backwards compatibility"""
return chunk_text_semantic(text, "Other", max_tokens)
def count_tokens(text: str) -> int:
"""Count tokens using tiktoken"""
try:
enc = tiktoken.get_encoding("cl100k_base")
return len(enc.encode(text))
except Exception:
# Fallback to word-based estimate
return int(len(text.split()) * 1.3)
def split_into_sentences(text: str) -> List[str]:
"""Split text into sentences with improved handling"""
try:
tokenizer = PunktSentenceTokenizer()
sentences = tokenizer.tokenize(text)
return sentences
except Exception:
# Fallback to simple split
return [s.strip() + '.' for s in text.split('.') if s.strip()]
def find_topic_boundaries(text: str, interviewee_type: str) -> List[int]:
"""
Identify topic boundaries in the text for smarter chunking
Returns list of character positions where topics likely change
"""
boundaries = [0] # Start position
# Topic change indicators
topic_patterns = [
r'\n\n+', # Paragraph breaks
r'\[Interviewer\].*?(next|another|different|moving on|let\'s talk about)',
r'\[Interviewer\].*?\?.*?\n.*?\[(?:Doctor|Patient|Respondent)\]', # Q&A pairs
]
# Find all topic boundaries
for pattern in topic_patterns:
for match in re.finditer(pattern, text, re.IGNORECASE):
pos = match.start()
# Only add if not too close to existing boundary
if all(abs(pos - b) > 100 for b in boundaries):
boundaries.append(pos)
boundaries.append(len(text)) # End position
boundaries.sort()
return boundaries
def extract_speaker_segments(text: str) -> List[dict]:
"""
Extract segments with speaker labels and content
"""
pattern = r'\[([^\]]+)\]\s*([^\[]*)'
segments = []
for match in re.finditer(pattern, text, re.DOTALL):
speaker = match.group(1).strip()
content = match.group(2).strip()
if content:
segments.append({
"speaker": speaker,
"content": content,
"start_pos": match.start(),
"tokens": count_tokens(content)
})
return segments
def chunk_text_semantic(
text: str,
interviewee_type: str = "Other",
max_tokens: int = 3000,
overlap_tokens: int = 150
) -> List[str]:
"""
Advanced chunking that respects:
1. Speaker boundaries (don't split mid-sentence)
2. Topic boundaries (keep related Q&A together)
3. Token limits for LLM context
4. Overlap for context continuity
"""
# Check if text has speaker tags
has_tags = bool(re.search(r'\[[^\]]+\]', text))
if not has_tags:
# Fallback to sentence-based chunking
return chunk_by_sentences(text, max_tokens, overlap_tokens)
# Extract speaker segments
segments = extract_speaker_segments(text)
if not segments:
return chunk_by_sentences(text, max_tokens, overlap_tokens)
# Group segments into chunks
chunks = []
current_chunk_segments = []
current_tokens = 0
i = 0
while i < len(segments):
segment = segments[i]
segment_tokens = segment["tokens"]
# If single segment exceeds max_tokens, split it
if segment_tokens > max_tokens:
# Split long segment by sentences
sub_chunks = chunk_by_sentences(
f"[{segment['speaker']}] {segment['content']}",
max_tokens,
overlap_tokens
)
chunks.extend(sub_chunks)
i += 1
continue
# Check if adding this segment would exceed limit
if current_tokens + segment_tokens > max_tokens and current_chunk_segments:
# Finalize current chunk
chunk_text = "\n\n".join([
f"[{s['speaker']}] {s['content']}"
for s in current_chunk_segments
])
chunks.append(chunk_text)
# Start new chunk with overlap
# Keep last few segments for context
overlap_segments = []
overlap_token_count = 0
for seg in reversed(current_chunk_segments):
if overlap_token_count + seg["tokens"] < overlap_tokens:
overlap_segments.insert(0, seg)
overlap_token_count += seg["tokens"]
else:
break
current_chunk_segments = overlap_segments
current_tokens = overlap_token_count
# Add segment to current chunk
current_chunk_segments.append(segment)
current_tokens += segment_tokens
i += 1
# Add final chunk
if current_chunk_segments:
chunk_text = "\n\n".join([
f"[{s['speaker']}] {s['content']}"
for s in current_chunk_segments
])
chunks.append(chunk_text)
return chunks if chunks else [text]
def chunk_by_sentences(
text: str,
max_tokens: int = 3000,
overlap_tokens: int = 150
) -> List[str]:
"""
Fallback chunking method based on sentences
"""
sentences = split_into_sentences(text)
chunks = []
current_chunk = []
current_tokens = 0
for sentence in sentences:
sentence_tokens = count_tokens(sentence)
if current_tokens + sentence_tokens > max_tokens and current_chunk:
# Finalize current chunk
chunks.append(" ".join(current_chunk))
# Create overlap
overlap_sents = []
overlap_token_count = 0
for sent in reversed(current_chunk):
sent_tokens = count_tokens(sent)
if overlap_token_count + sent_tokens < overlap_tokens:
overlap_sents.insert(0, sent)
overlap_token_count += sent_tokens
else:
break
current_chunk = overlap_sents
current_tokens = overlap_token_count
current_chunk.append(sentence)
current_tokens += sentence_tokens
# Add final chunk
if current_chunk:
chunks.append(" ".join(current_chunk))
return chunks if chunks else [text]
def analyze_chunk_quality(chunks: List[str]) -> dict:
"""
Analyze chunking quality for debugging
"""
if not chunks:
return {"error": "No chunks"}
token_counts = [count_tokens(chunk) for chunk in chunks]
return {
"num_chunks": len(chunks),
"avg_tokens": sum(token_counts) / len(token_counts),
"min_tokens": min(token_counts),
"max_tokens": max(token_counts),
"total_tokens": sum(token_counts),
"chunks_over_limit": sum(1 for t in token_counts if t > 3000)
}