import tiktoken import re from typing import List from nltk.tokenize.punkt import PunktSentenceTokenizer def chunk_text(text, max_tokens=3000): """Legacy function - kept for backwards compatibility""" return chunk_text_semantic(text, "Other", max_tokens) def count_tokens(text: str) -> int: """Count tokens using tiktoken""" try: enc = tiktoken.get_encoding("cl100k_base") return len(enc.encode(text)) except Exception: # Fallback to word-based estimate return int(len(text.split()) * 1.3) def split_into_sentences(text: str) -> List[str]: """Split text into sentences with improved handling""" try: tokenizer = PunktSentenceTokenizer() sentences = tokenizer.tokenize(text) return sentences except Exception: # Fallback to simple split return [s.strip() + '.' for s in text.split('.') if s.strip()] def find_topic_boundaries(text: str, interviewee_type: str) -> List[int]: """ Identify topic boundaries in the text for smarter chunking Returns list of character positions where topics likely change """ boundaries = [0] # Start position # Topic change indicators topic_patterns = [ r'\n\n+', # Paragraph breaks r'\[Interviewer\].*?(next|another|different|moving on|let\'s talk about)', r'\[Interviewer\].*?\?.*?\n.*?\[(?:Doctor|Patient|Respondent)\]', # Q&A pairs ] # Find all topic boundaries for pattern in topic_patterns: for match in re.finditer(pattern, text, re.IGNORECASE): pos = match.start() # Only add if not too close to existing boundary if all(abs(pos - b) > 100 for b in boundaries): boundaries.append(pos) boundaries.append(len(text)) # End position boundaries.sort() return boundaries def extract_speaker_segments(text: str) -> List[dict]: """ Extract segments with speaker labels and content """ pattern = r'\[([^\]]+)\]\s*([^\[]*)' segments = [] for match in re.finditer(pattern, text, re.DOTALL): speaker = match.group(1).strip() content = match.group(2).strip() if content: segments.append({ "speaker": speaker, "content": content, "start_pos": match.start(), "tokens": count_tokens(content) }) return segments def chunk_text_semantic( text: str, interviewee_type: str = "Other", max_tokens: int = 3000, overlap_tokens: int = 150 ) -> List[str]: """ Advanced chunking that respects: 1. Speaker boundaries (don't split mid-sentence) 2. Topic boundaries (keep related Q&A together) 3. Token limits for LLM context 4. Overlap for context continuity """ # Check if text has speaker tags has_tags = bool(re.search(r'\[[^\]]+\]', text)) if not has_tags: # Fallback to sentence-based chunking return chunk_by_sentences(text, max_tokens, overlap_tokens) # Extract speaker segments segments = extract_speaker_segments(text) if not segments: return chunk_by_sentences(text, max_tokens, overlap_tokens) # Group segments into chunks chunks = [] current_chunk_segments = [] current_tokens = 0 i = 0 while i < len(segments): segment = segments[i] segment_tokens = segment["tokens"] # If single segment exceeds max_tokens, split it if segment_tokens > max_tokens: # Split long segment by sentences sub_chunks = chunk_by_sentences( f"[{segment['speaker']}] {segment['content']}", max_tokens, overlap_tokens ) chunks.extend(sub_chunks) i += 1 continue # Check if adding this segment would exceed limit if current_tokens + segment_tokens > max_tokens and current_chunk_segments: # Finalize current chunk chunk_text = "\n\n".join([ f"[{s['speaker']}] {s['content']}" for s in current_chunk_segments ]) chunks.append(chunk_text) # Start new chunk with overlap # Keep last few segments for context overlap_segments = [] overlap_token_count = 0 for seg in reversed(current_chunk_segments): if overlap_token_count + seg["tokens"] < overlap_tokens: overlap_segments.insert(0, seg) overlap_token_count += seg["tokens"] else: break current_chunk_segments = overlap_segments current_tokens = overlap_token_count # Add segment to current chunk current_chunk_segments.append(segment) current_tokens += segment_tokens i += 1 # Add final chunk if current_chunk_segments: chunk_text = "\n\n".join([ f"[{s['speaker']}] {s['content']}" for s in current_chunk_segments ]) chunks.append(chunk_text) return chunks if chunks else [text] def chunk_by_sentences( text: str, max_tokens: int = 3000, overlap_tokens: int = 150 ) -> List[str]: """ Fallback chunking method based on sentences """ sentences = split_into_sentences(text) chunks = [] current_chunk = [] current_tokens = 0 for sentence in sentences: sentence_tokens = count_tokens(sentence) if current_tokens + sentence_tokens > max_tokens and current_chunk: # Finalize current chunk chunks.append(" ".join(current_chunk)) # Create overlap overlap_sents = [] overlap_token_count = 0 for sent in reversed(current_chunk): sent_tokens = count_tokens(sent) if overlap_token_count + sent_tokens < overlap_tokens: overlap_sents.insert(0, sent) overlap_token_count += sent_tokens else: break current_chunk = overlap_sents current_tokens = overlap_token_count current_chunk.append(sentence) current_tokens += sentence_tokens # Add final chunk if current_chunk: chunks.append(" ".join(current_chunk)) return chunks if chunks else [text] def analyze_chunk_quality(chunks: List[str]) -> dict: """ Analyze chunking quality for debugging """ if not chunks: return {"error": "No chunks"} token_counts = [count_tokens(chunk) for chunk in chunks] return { "num_chunks": len(chunks), "avg_tokens": sum(token_counts) / len(token_counts), "min_tokens": min(token_counts), "max_tokens": max(token_counts), "total_tokens": sum(token_counts), "chunks_over_limit": sum(1 for t in token_counts if t > 3000) }