| import tiktoken
|
| import re
|
| from typing import List
|
| from nltk.tokenize.punkt import PunktSentenceTokenizer
|
|
|
| def chunk_text(text, max_tokens=3000):
|
| """Legacy function - kept for backwards compatibility"""
|
| return chunk_text_semantic(text, "Other", max_tokens)
|
|
|
|
|
| def count_tokens(text: str) -> int:
|
| """Count tokens using tiktoken"""
|
| try:
|
| enc = tiktoken.get_encoding("cl100k_base")
|
| return len(enc.encode(text))
|
| except Exception:
|
|
|
| return int(len(text.split()) * 1.3)
|
|
|
|
|
| def split_into_sentences(text: str) -> List[str]:
|
| """Split text into sentences with improved handling"""
|
| try:
|
| tokenizer = PunktSentenceTokenizer()
|
| sentences = tokenizer.tokenize(text)
|
| return sentences
|
| except Exception:
|
|
|
| return [s.strip() + '.' for s in text.split('.') if s.strip()]
|
|
|
|
|
| def find_topic_boundaries(text: str, interviewee_type: str) -> List[int]:
|
| """
|
| Identify topic boundaries in the text for smarter chunking
|
| Returns list of character positions where topics likely change
|
| """
|
|
|
| boundaries = [0]
|
|
|
|
|
| topic_patterns = [
|
| r'\n\n+',
|
| r'\[Interviewer\].*?(next|another|different|moving on|let\'s talk about)',
|
| r'\[Interviewer\].*?\?.*?\n.*?\[(?:Doctor|Patient|Respondent)\]',
|
| ]
|
|
|
|
|
| for pattern in topic_patterns:
|
| for match in re.finditer(pattern, text, re.IGNORECASE):
|
| pos = match.start()
|
|
|
| if all(abs(pos - b) > 100 for b in boundaries):
|
| boundaries.append(pos)
|
|
|
| boundaries.append(len(text))
|
| boundaries.sort()
|
|
|
| return boundaries
|
|
|
|
|
| def extract_speaker_segments(text: str) -> List[dict]:
|
| """
|
| Extract segments with speaker labels and content
|
| """
|
|
|
| pattern = r'\[([^\]]+)\]\s*([^\[]*)'
|
| segments = []
|
|
|
| for match in re.finditer(pattern, text, re.DOTALL):
|
| speaker = match.group(1).strip()
|
| content = match.group(2).strip()
|
| if content:
|
| segments.append({
|
| "speaker": speaker,
|
| "content": content,
|
| "start_pos": match.start(),
|
| "tokens": count_tokens(content)
|
| })
|
|
|
| return segments
|
|
|
|
|
| def chunk_text_semantic(
|
| text: str,
|
| interviewee_type: str = "Other",
|
| max_tokens: int = 3000,
|
| overlap_tokens: int = 150
|
| ) -> List[str]:
|
| """
|
| Advanced chunking that respects:
|
| 1. Speaker boundaries (don't split mid-sentence)
|
| 2. Topic boundaries (keep related Q&A together)
|
| 3. Token limits for LLM context
|
| 4. Overlap for context continuity
|
| """
|
|
|
|
|
| has_tags = bool(re.search(r'\[[^\]]+\]', text))
|
|
|
| if not has_tags:
|
|
|
| return chunk_by_sentences(text, max_tokens, overlap_tokens)
|
|
|
|
|
| segments = extract_speaker_segments(text)
|
|
|
| if not segments:
|
| return chunk_by_sentences(text, max_tokens, overlap_tokens)
|
|
|
|
|
| chunks = []
|
| current_chunk_segments = []
|
| current_tokens = 0
|
|
|
| i = 0
|
| while i < len(segments):
|
| segment = segments[i]
|
| segment_tokens = segment["tokens"]
|
|
|
|
|
| if segment_tokens > max_tokens:
|
|
|
| sub_chunks = chunk_by_sentences(
|
| f"[{segment['speaker']}] {segment['content']}",
|
| max_tokens,
|
| overlap_tokens
|
| )
|
| chunks.extend(sub_chunks)
|
| i += 1
|
| continue
|
|
|
|
|
| if current_tokens + segment_tokens > max_tokens and current_chunk_segments:
|
|
|
| chunk_text = "\n\n".join([
|
| f"[{s['speaker']}] {s['content']}"
|
| for s in current_chunk_segments
|
| ])
|
| chunks.append(chunk_text)
|
|
|
|
|
|
|
| overlap_segments = []
|
| overlap_token_count = 0
|
|
|
| for seg in reversed(current_chunk_segments):
|
| if overlap_token_count + seg["tokens"] < overlap_tokens:
|
| overlap_segments.insert(0, seg)
|
| overlap_token_count += seg["tokens"]
|
| else:
|
| break
|
|
|
| current_chunk_segments = overlap_segments
|
| current_tokens = overlap_token_count
|
|
|
|
|
| current_chunk_segments.append(segment)
|
| current_tokens += segment_tokens
|
| i += 1
|
|
|
|
|
| if current_chunk_segments:
|
| chunk_text = "\n\n".join([
|
| f"[{s['speaker']}] {s['content']}"
|
| for s in current_chunk_segments
|
| ])
|
| chunks.append(chunk_text)
|
|
|
| return chunks if chunks else [text]
|
|
|
|
|
| def chunk_by_sentences(
|
| text: str,
|
| max_tokens: int = 3000,
|
| overlap_tokens: int = 150
|
| ) -> List[str]:
|
| """
|
| Fallback chunking method based on sentences
|
| """
|
|
|
| sentences = split_into_sentences(text)
|
|
|
| chunks = []
|
| current_chunk = []
|
| current_tokens = 0
|
|
|
| for sentence in sentences:
|
| sentence_tokens = count_tokens(sentence)
|
|
|
| if current_tokens + sentence_tokens > max_tokens and current_chunk:
|
|
|
| chunks.append(" ".join(current_chunk))
|
|
|
|
|
| overlap_sents = []
|
| overlap_token_count = 0
|
|
|
| for sent in reversed(current_chunk):
|
| sent_tokens = count_tokens(sent)
|
| if overlap_token_count + sent_tokens < overlap_tokens:
|
| overlap_sents.insert(0, sent)
|
| overlap_token_count += sent_tokens
|
| else:
|
| break
|
|
|
| current_chunk = overlap_sents
|
| current_tokens = overlap_token_count
|
|
|
| current_chunk.append(sentence)
|
| current_tokens += sentence_tokens
|
|
|
|
|
| if current_chunk:
|
| chunks.append(" ".join(current_chunk))
|
|
|
| return chunks if chunks else [text]
|
|
|
|
|
| def analyze_chunk_quality(chunks: List[str]) -> dict:
|
| """
|
| Analyze chunking quality for debugging
|
| """
|
|
|
| if not chunks:
|
| return {"error": "No chunks"}
|
|
|
| token_counts = [count_tokens(chunk) for chunk in chunks]
|
|
|
| return {
|
| "num_chunks": len(chunks),
|
| "avg_tokens": sum(token_counts) / len(token_counts),
|
| "min_tokens": min(token_counts),
|
| "max_tokens": max(token_counts),
|
| "total_tokens": sum(token_counts),
|
| "chunks_over_limit": sum(1 for t in token_counts if t > 3000)
|
| } |