Spaces:
Sleeping
Sleeping
| import tiktoken | |
| import re | |
| from typing import List | |
| from nltk.tokenize.punkt import PunktSentenceTokenizer | |
| def chunk_text(text, max_tokens=3000): | |
| """Legacy function - kept for backwards compatibility""" | |
| return chunk_text_semantic(text, "Other", max_tokens) | |
| def count_tokens(text: str) -> int: | |
| """Count tokens using tiktoken""" | |
| try: | |
| enc = tiktoken.get_encoding("cl100k_base") | |
| return len(enc.encode(text)) | |
| except Exception: | |
| # Fallback to word-based estimate | |
| return int(len(text.split()) * 1.3) | |
| def split_into_sentences(text: str) -> List[str]: | |
| """Split text into sentences with improved handling""" | |
| try: | |
| tokenizer = PunktSentenceTokenizer() | |
| sentences = tokenizer.tokenize(text) | |
| return sentences | |
| except Exception: | |
| # Fallback to simple split | |
| return [s.strip() + '.' for s in text.split('.') if s.strip()] | |
| def find_topic_boundaries(text: str, interviewee_type: str) -> List[int]: | |
| """ | |
| Identify topic boundaries in the text for smarter chunking | |
| Returns list of character positions where topics likely change | |
| """ | |
| boundaries = [0] # Start position | |
| # Topic change indicators | |
| topic_patterns = [ | |
| r'\n\n+', # Paragraph breaks | |
| r'\[Interviewer\].*?(next|another|different|moving on|let\'s talk about)', | |
| r'\[Interviewer\].*?\?.*?\n.*?\[(?:Doctor|Patient|Respondent)\]', # Q&A pairs | |
| ] | |
| # Find all topic boundaries | |
| for pattern in topic_patterns: | |
| for match in re.finditer(pattern, text, re.IGNORECASE): | |
| pos = match.start() | |
| # Only add if not too close to existing boundary | |
| if all(abs(pos - b) > 100 for b in boundaries): | |
| boundaries.append(pos) | |
| boundaries.append(len(text)) # End position | |
| boundaries.sort() | |
| return boundaries | |
| def extract_speaker_segments(text: str) -> List[dict]: | |
| """ | |
| Extract segments with speaker labels and content | |
| """ | |
| pattern = r'\[([^\]]+)\]\s*([^\[]*)' | |
| segments = [] | |
| for match in re.finditer(pattern, text, re.DOTALL): | |
| speaker = match.group(1).strip() | |
| content = match.group(2).strip() | |
| if content: | |
| segments.append({ | |
| "speaker": speaker, | |
| "content": content, | |
| "start_pos": match.start(), | |
| "tokens": count_tokens(content) | |
| }) | |
| return segments | |
| def chunk_text_semantic( | |
| text: str, | |
| interviewee_type: str = "Other", | |
| max_tokens: int = 3000, | |
| overlap_tokens: int = 150 | |
| ) -> List[str]: | |
| """ | |
| Advanced chunking that respects: | |
| 1. Speaker boundaries (don't split mid-sentence) | |
| 2. Topic boundaries (keep related Q&A together) | |
| 3. Token limits for LLM context | |
| 4. Overlap for context continuity | |
| """ | |
| # Check if text has speaker tags | |
| has_tags = bool(re.search(r'\[[^\]]+\]', text)) | |
| if not has_tags: | |
| # Fallback to sentence-based chunking | |
| return chunk_by_sentences(text, max_tokens, overlap_tokens) | |
| # Extract speaker segments | |
| segments = extract_speaker_segments(text) | |
| if not segments: | |
| return chunk_by_sentences(text, max_tokens, overlap_tokens) | |
| # Group segments into chunks | |
| chunks = [] | |
| current_chunk_segments = [] | |
| current_tokens = 0 | |
| i = 0 | |
| while i < len(segments): | |
| segment = segments[i] | |
| segment_tokens = segment["tokens"] | |
| # If single segment exceeds max_tokens, split it | |
| if segment_tokens > max_tokens: | |
| # Split long segment by sentences | |
| sub_chunks = chunk_by_sentences( | |
| f"[{segment['speaker']}] {segment['content']}", | |
| max_tokens, | |
| overlap_tokens | |
| ) | |
| chunks.extend(sub_chunks) | |
| i += 1 | |
| continue | |
| # Check if adding this segment would exceed limit | |
| if current_tokens + segment_tokens > max_tokens and current_chunk_segments: | |
| # Finalize current chunk | |
| chunk_text = "\n\n".join([ | |
| f"[{s['speaker']}] {s['content']}" | |
| for s in current_chunk_segments | |
| ]) | |
| chunks.append(chunk_text) | |
| # Start new chunk with overlap | |
| # Keep last few segments for context | |
| overlap_segments = [] | |
| overlap_token_count = 0 | |
| for seg in reversed(current_chunk_segments): | |
| if overlap_token_count + seg["tokens"] < overlap_tokens: | |
| overlap_segments.insert(0, seg) | |
| overlap_token_count += seg["tokens"] | |
| else: | |
| break | |
| current_chunk_segments = overlap_segments | |
| current_tokens = overlap_token_count | |
| # Add segment to current chunk | |
| current_chunk_segments.append(segment) | |
| current_tokens += segment_tokens | |
| i += 1 | |
| # Add final chunk | |
| if current_chunk_segments: | |
| chunk_text = "\n\n".join([ | |
| f"[{s['speaker']}] {s['content']}" | |
| for s in current_chunk_segments | |
| ]) | |
| chunks.append(chunk_text) | |
| return chunks if chunks else [text] | |
| def chunk_by_sentences( | |
| text: str, | |
| max_tokens: int = 3000, | |
| overlap_tokens: int = 150 | |
| ) -> List[str]: | |
| """ | |
| Fallback chunking method based on sentences | |
| """ | |
| sentences = split_into_sentences(text) | |
| chunks = [] | |
| current_chunk = [] | |
| current_tokens = 0 | |
| for sentence in sentences: | |
| sentence_tokens = count_tokens(sentence) | |
| if current_tokens + sentence_tokens > max_tokens and current_chunk: | |
| # Finalize current chunk | |
| chunks.append(" ".join(current_chunk)) | |
| # Create overlap | |
| overlap_sents = [] | |
| overlap_token_count = 0 | |
| for sent in reversed(current_chunk): | |
| sent_tokens = count_tokens(sent) | |
| if overlap_token_count + sent_tokens < overlap_tokens: | |
| overlap_sents.insert(0, sent) | |
| overlap_token_count += sent_tokens | |
| else: | |
| break | |
| current_chunk = overlap_sents | |
| current_tokens = overlap_token_count | |
| current_chunk.append(sentence) | |
| current_tokens += sentence_tokens | |
| # Add final chunk | |
| if current_chunk: | |
| chunks.append(" ".join(current_chunk)) | |
| return chunks if chunks else [text] | |
| def analyze_chunk_quality(chunks: List[str]) -> dict: | |
| """ | |
| Analyze chunking quality for debugging | |
| """ | |
| if not chunks: | |
| return {"error": "No chunks"} | |
| token_counts = [count_tokens(chunk) for chunk in chunks] | |
| return { | |
| "num_chunks": len(chunks), | |
| "avg_tokens": sum(token_counts) / len(token_counts), | |
| "min_tokens": min(token_counts), | |
| "max_tokens": max(token_counts), | |
| "total_tokens": sum(token_counts), | |
| "chunks_over_limit": sum(1 for t in token_counts if t > 3000) | |
| } |