File size: 7,417 Bytes
54c99ad
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
import tiktoken
import re
from typing import List
from nltk.tokenize.punkt import PunktSentenceTokenizer

def chunk_text(text, max_tokens=3000):
    """Legacy function - kept for backwards compatibility"""
    return chunk_text_semantic(text, "Other", max_tokens)


def count_tokens(text: str) -> int:
    """Count tokens using tiktoken"""
    try:
        enc = tiktoken.get_encoding("cl100k_base")
        return len(enc.encode(text))
    except Exception:
        # Fallback to word-based estimate
        return int(len(text.split()) * 1.3)


def split_into_sentences(text: str) -> List[str]:
    """Split text into sentences with improved handling"""
    try:
        tokenizer = PunktSentenceTokenizer()
        sentences = tokenizer.tokenize(text)
        return sentences
    except Exception:
        # Fallback to simple split
        return [s.strip() + '.' for s in text.split('.') if s.strip()]


def find_topic_boundaries(text: str, interviewee_type: str) -> List[int]:
    """

    Identify topic boundaries in the text for smarter chunking

    Returns list of character positions where topics likely change

    """
    
    boundaries = [0]  # Start position
    
    # Topic change indicators
    topic_patterns = [
        r'\n\n+',  # Paragraph breaks
        r'\[Interviewer\].*?(next|another|different|moving on|let\'s talk about)',
        r'\[Interviewer\].*?\?.*?\n.*?\[(?:Doctor|Patient|Respondent)\]',  # Q&A pairs
    ]
    
    # Find all topic boundaries
    for pattern in topic_patterns:
        for match in re.finditer(pattern, text, re.IGNORECASE):
            pos = match.start()
            # Only add if not too close to existing boundary
            if all(abs(pos - b) > 100 for b in boundaries):
                boundaries.append(pos)
    
    boundaries.append(len(text))  # End position
    boundaries.sort()
    
    return boundaries


def extract_speaker_segments(text: str) -> List[dict]:
    """

    Extract segments with speaker labels and content

    """
    
    pattern = r'\[([^\]]+)\]\s*([^\[]*)'
    segments = []
    
    for match in re.finditer(pattern, text, re.DOTALL):
        speaker = match.group(1).strip()
        content = match.group(2).strip()
        if content:
            segments.append({
                "speaker": speaker,
                "content": content,
                "start_pos": match.start(),
                "tokens": count_tokens(content)
            })
    
    return segments


def chunk_text_semantic(

    text: str,

    interviewee_type: str = "Other",

    max_tokens: int = 3000,

    overlap_tokens: int = 150

) -> List[str]:
    """

    Advanced chunking that respects:

    1. Speaker boundaries (don't split mid-sentence)

    2. Topic boundaries (keep related Q&A together)

    3. Token limits for LLM context

    4. Overlap for context continuity

    """
    
    # Check if text has speaker tags
    has_tags = bool(re.search(r'\[[^\]]+\]', text))
    
    if not has_tags:
        # Fallback to sentence-based chunking
        return chunk_by_sentences(text, max_tokens, overlap_tokens)
    
    # Extract speaker segments
    segments = extract_speaker_segments(text)
    
    if not segments:
        return chunk_by_sentences(text, max_tokens, overlap_tokens)
    
    # Group segments into chunks
    chunks = []
    current_chunk_segments = []
    current_tokens = 0
    
    i = 0
    while i < len(segments):
        segment = segments[i]
        segment_tokens = segment["tokens"]
        
        # If single segment exceeds max_tokens, split it
        if segment_tokens > max_tokens:
            # Split long segment by sentences
            sub_chunks = chunk_by_sentences(
                f"[{segment['speaker']}] {segment['content']}",
                max_tokens,
                overlap_tokens
            )
            chunks.extend(sub_chunks)
            i += 1
            continue
        
        # Check if adding this segment would exceed limit
        if current_tokens + segment_tokens > max_tokens and current_chunk_segments:
            # Finalize current chunk
            chunk_text = "\n\n".join([
                f"[{s['speaker']}] {s['content']}" 
                for s in current_chunk_segments
            ])
            chunks.append(chunk_text)
            
            # Start new chunk with overlap
            # Keep last few segments for context
            overlap_segments = []
            overlap_token_count = 0
            
            for seg in reversed(current_chunk_segments):
                if overlap_token_count + seg["tokens"] < overlap_tokens:
                    overlap_segments.insert(0, seg)
                    overlap_token_count += seg["tokens"]
                else:
                    break
            
            current_chunk_segments = overlap_segments
            current_tokens = overlap_token_count
        
        # Add segment to current chunk
        current_chunk_segments.append(segment)
        current_tokens += segment_tokens
        i += 1
    
    # Add final chunk
    if current_chunk_segments:
        chunk_text = "\n\n".join([
            f"[{s['speaker']}] {s['content']}" 
            for s in current_chunk_segments
        ])
        chunks.append(chunk_text)
    
    return chunks if chunks else [text]


def chunk_by_sentences(

    text: str,

    max_tokens: int = 3000,

    overlap_tokens: int = 150

) -> List[str]:
    """

    Fallback chunking method based on sentences

    """
    
    sentences = split_into_sentences(text)
    
    chunks = []
    current_chunk = []
    current_tokens = 0
    
    for sentence in sentences:
        sentence_tokens = count_tokens(sentence)
        
        if current_tokens + sentence_tokens > max_tokens and current_chunk:
            # Finalize current chunk
            chunks.append(" ".join(current_chunk))
            
            # Create overlap
            overlap_sents = []
            overlap_token_count = 0
            
            for sent in reversed(current_chunk):
                sent_tokens = count_tokens(sent)
                if overlap_token_count + sent_tokens < overlap_tokens:
                    overlap_sents.insert(0, sent)
                    overlap_token_count += sent_tokens
                else:
                    break
            
            current_chunk = overlap_sents
            current_tokens = overlap_token_count
        
        current_chunk.append(sentence)
        current_tokens += sentence_tokens
    
    # Add final chunk
    if current_chunk:
        chunks.append(" ".join(current_chunk))
    
    return chunks if chunks else [text]


def analyze_chunk_quality(chunks: List[str]) -> dict:
    """

    Analyze chunking quality for debugging

    """
    
    if not chunks:
        return {"error": "No chunks"}
    
    token_counts = [count_tokens(chunk) for chunk in chunks]
    
    return {
        "num_chunks": len(chunks),
        "avg_tokens": sum(token_counts) / len(token_counts),
        "min_tokens": min(token_counts),
        "max_tokens": max(token_counts),
        "total_tokens": sum(token_counts),
        "chunks_over_limit": sum(1 for t in token_counts if t > 3000)
    }