File size: 13,550 Bytes
a0f20a0
 
 
 
 
 
12f4fc7
a0f20a0
83175f3
a0f20a0
 
c759ad8
a0f20a0
 
 
 
 
 
 
 
9614b5c
a0f20a0
 
 
 
c759ad8
 
 
 
 
 
 
a197ca5
 
 
c759ad8
a197ca5
c759ad8
a197ca5
 
 
 
c759ad8
a197ca5
 
9614b5c
 
 
 
 
 
 
a0f20a0
 
83175f3
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
c759ad8
83175f3
 
 
 
 
 
 
 
 
 
 
c759ad8
 
83175f3
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
c759ad8
 
 
83175f3
 
 
 
 
 
ee6b298
 
 
 
 
 
12f4fc7
ee6b298
 
 
 
12f4fc7
 
ee6b298
 
 
12f4fc7
 
ee6b298
12f4fc7
ee6b298
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
12f4fc7
ee6b298
 
 
 
12f4fc7
ee6b298
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
12f4fc7
ee6b298
12f4fc7
c759ad8
12f4fc7
c759ad8
 
 
 
12f4fc7
c759ad8
 
 
 
 
 
 
 
 
a0f20a0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
ee6b298
 
 
 
12f4fc7
9614b5c
a0f20a0
ee6b298
 
 
 
 
12f4fc7
ee6b298
a0f20a0
12f4fc7
a0f20a0
 
 
ee6b298
 
 
 
 
12f4fc7
ee6b298
a0f20a0
c759ad8
12f4fc7
 
 
ee6b298
 
 
 
 
12f4fc7
 
 
 
 
 
 
a0f20a0
 
 
 
12f4fc7
c759ad8
12f4fc7
a0f20a0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
c759ad8
a0f20a0
c759ad8
 
a0f20a0
c759ad8
a0f20a0
 
 
 
12f4fc7
a0f20a0
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
"""Citation validation and formatting for RAG system.

This module handles structured citations with validation to prevent hallucination.
"""

import json
from typing import List, Dict, Any, Tuple
from urllib.parse import quote
from rapidfuzz import fuzz


FUZZY_THRESHOLD = 90

def create_highlighted_url(base_url: str, quote_text: str) -> str:
    """Create a URL with text fragment that highlights the quoted text.
    
    Uses the :~:text= URL fragment feature to scroll to and highlight text.
    
    Args:
        base_url: The base article URL
        quote_text: The text to highlight (should be the exact text from source)
        
    Returns:
        URL with text fragment
    """
    # Take only the first line/paragraph (text fragments can't match across elements)
    first_line = quote_text.split('\n')[0].strip()
    
    # Remove bullet point markers (they're formatting, not content)
    if first_line.startswith('- '):
        first_line = first_line[2:].strip()
    
    # Extract a meaningful snippet (first ~80 chars work better for text fragments)
    # Cut at word boundaries to avoid breaking words mid-way
    max_length = 80
    if len(first_line) > max_length:
        # Find the last space before the cutoff
        text_fragment = first_line[:max_length]
        last_space = text_fragment.rfind(' ')
        if last_space > 0:  # If we found a space, cut there
            text_fragment = text_fragment[:last_space]
    else:
        text_fragment = first_line
    
    text_fragment = text_fragment.strip()
    
    encoded_text = quote(text_fragment, safe='')
    # Manually encode the unreserved chars that quote() preserves
    encoded_text = encoded_text.replace('-', '%2D')
    encoded_text = encoded_text.replace('.', '%2E')
    encoded_text = encoded_text.replace('_', '%5F')
    encoded_text = encoded_text.replace('~', '%7E')
    return f"{base_url}#:~:text={encoded_text}"

def parse_llm_response(response_content: str) -> Dict[str, Any]:
    """Parse and validate LLM JSON response.
    
    Args:
        response_content: Raw JSON string from LLM
        
    Returns:
        Dict with answer and citations, or error information
    """
    try:
        result = json.loads(response_content)
        # Enforce strict shape: must have 'answer' (str) and 'citations' (list of dicts)
        if not isinstance(result, dict) or 'answer' not in result or 'citations' not in result:
            return {
                "answer": response_content,
                "citations": [],
                "validation_errors": ["Response JSON missing required keys 'answer' and/or 'citations'."]
            }
        if not isinstance(result['answer'], str) or not isinstance(result['citations'], list):
            return {
                "answer": response_content,
                "citations": [],
                "validation_errors": ["Response JSON has incorrect types for 'answer' or 'citations'."]
            }
        return result
    except json.JSONDecodeError:
        return {
            "answer": response_content,
            "citations": [],
            "validation_errors": ["Failed to parse JSON response"]
        }

def build_citation_entry(citation: Dict[str, Any], validation_result: Dict[str, Any]) -> Dict[str, Any]:
    """Build a citation entry from validation result.
    
    Args:
        citation: Raw citation dict from LLM with citation_id, source_id, quote
        validation_result: Result from validate_citation()
        
    Returns:
        Complete citation entry with URL and metadata
    """
    matched_text = validation_result["matched_text"]
    highlighted_url = create_highlighted_url(
        validation_result["url"], 
        matched_text
    )
    citation_entry = {
        "citation_id": citation.get("citation_id", 0),
        "source_id": validation_result["source_id"],
        "quote": citation.get("quote", ""),  # AI's claimed quote
        "matched_text": matched_text,  # Actual text from source
        "title": validation_result["title"],
        "url": highlighted_url,
        "fuzzy_match_score": validation_result["fuzzy_match_score"],
        "remapped": validation_result.get("remapped", False)
    }
    return citation_entry

def process_citations(citations: List[Dict[str, Any]], source_chunks: List[Any]) -> Dict[str, Any]:
    """Validate and process a batch of citations.
    
    Args:
        citations: List of citation dicts from LLM
        source_chunks: List of source chunks from Qdrant
        
    Returns:
        Dict with validated_citations and validation_errors lists
    """
    validated_citations = []
    validation_errors = []
    
    for citation in citations:
        quote = citation.get("quote", "")
        source_id = citation.get("source_id", 0)
        citation_id = citation.get("citation_id", 0)
        
        validation_result = validate_citation(quote, source_chunks, source_id)
        
        if validation_result["valid"]:
            citation_entry = build_citation_entry(citation, validation_result)
            validated_citations.append(citation_entry)
        else:
            # Add citation_id to validation result for tracking
            validation_result["citation_id"] = citation_id
            validation_errors.append(validation_result)
    
    return {
        "validated_citations": validated_citations,
        "validation_errors": validation_errors
    }

def _is_word_char(char: str) -> bool:
    """Check if character is part of a word (alphanumeric, comma, hyphen, apostrophe)."""
    return char.isalnum() or char in (',', '-', "'", "'")

def _find_best_match_position(quote: str, source_text: str, alignment_hint=None) -> Tuple[int, int, float]:
    """Find the best matching position for a quote in source text using sliding window.
    
    This method is better than partial_ratio_alignment because it:
    1. Uses word boundaries naturally
    2. Finds the best matching substring at the token level
    3. Returns positions that align with actual text segments
    
    Args:
        quote: The text to find
        source_text: The text to search in
        alignment_hint: Optional alignment result from partial_ratio_alignment to focus search
        
    Returns:
        Tuple of (start_pos, end_pos, score). Returns (-1, -1, 0) if no good match.
    """
    import re
    
    # Normalize whitespace for matching
    quote_normalized = ' '.join(quote.split())
    
    # Split source into words with their positions
    # This regex splits on whitespace while preserving positions
    word_pattern = re.compile(r'\S+')
    source_words = []
    for match in word_pattern.finditer(source_text):
        source_words.append({
            'word': match.group(),
            'start': match.start(),
            'end': match.end()
        })
    
    quote_words = quote_normalized.split()
    
    if not quote_words or not source_words:
        return -1, -1, 0
    
    # Determine search range based on alignment hint
    if alignment_hint:
        # Find which word index contains the alignment position
        center_word_idx = 0
        for idx, word_info in enumerate(source_words):
            if word_info['start'] <= alignment_hint.dest_start < word_info['end']:
                center_word_idx = idx
                break
        
        # Search within +/- 5 words of the hint position
        search_start_idx = max(0, center_word_idx - 5)
        search_end_idx = min(len(source_words), center_word_idx + len(quote_words) + 5)
    else:
        # No hint found, search entire text (fallback)
        search_start_idx = 0
        search_end_idx = len(source_words)
    
    best_score = 0
    best_start = -1
    best_end = -1
    
    # Try different window sizes around the quote length
    # Quote should never be longer than source, so only check smaller windows
    min_window = max(1, len(quote_words) - 3)
    max_window = min(search_end_idx - search_start_idx, len(quote_words))
    
    for window_size in range(min_window, max_window + 1):
        for i in range(search_start_idx, min(search_end_idx - window_size + 1, len(source_words) - window_size + 1)):
            # Get window of words
            window_words = [source_words[j]['word'] for j in range(i, i + window_size)]
            window_text = ' '.join(window_words)
            
            # Calculate similarity score
            score = fuzz.ratio(quote_normalized, window_text)
            
            if score > best_score:
                best_score = score
                # Use the start of the first word and end of the last word
                best_start = source_words[i]['start']
                best_end = source_words[i + window_size - 1]['end']
                
                # Strip trailing punctuation from the end position
                while best_end > best_start and source_text[best_end - 1] in '.,;:!?)':
                    best_end -= 1
    
    return best_start, best_end, best_score

def _build_valid_result(quote: str, chunk: Any, chunk_id: int, score: float, 
                        matched_text: str, remapped: bool = False) -> Dict[str, Any]:
    """Build a valid citation result dict."""
    result = {
        "valid": True,
        "quote": quote,
        "matched_text": matched_text,
        "source_id": chunk_id,
        "title": chunk.payload['title'],
        "url": chunk.payload['url'],
        "fuzzy_match_score": score
    }
    if remapped:
        result["remapped"] = True
    return result

def validate_citation(quote: str, source_chunks: List[Any], source_id: int) -> Dict[str, Any]:
    """Validate that a quote exists in the specified source chunk.
    
    Args:
        quote: The quoted text to validate
        source_chunks: List of source chunks from Qdrant
        source_id: 1-indexed source ID
        
    Returns:
        Dict with validation result and metadata
    """
    if source_id < 1 or source_id > len(source_chunks):
        return {
            "valid": False,
            "quote": quote,
            "source_id": source_id,
            "reason": "Invalid source ID",
            "source_text": None
        }
    
    # If quote contains ellipsis, only match the part before it
    if '...' in quote:
        quote = quote.split('...')[0].strip()
    
    # Step 1: Check the AI's cited source first (fast path)
    source_text = source_chunks[source_id - 1].payload['text']
    
    # Get alignment hint from partial_ratio_alignment
    alignment_hint = fuzz.partial_ratio_alignment(quote, source_text, score_cutoff=70)
    start, end, score = _find_best_match_position(quote, source_text, alignment_hint)
    
    if score >= FUZZY_THRESHOLD and start != -1:
        matched_text = source_text[start:end].strip()
        return _build_valid_result(quote, source_chunks[source_id - 1], source_id, score, matched_text)
    
    # Step 2: Search other sources for remapping (AI cited wrong source)
    for idx, chunk in enumerate(source_chunks, 1):
        if idx == source_id:
            continue  # Already checked
        
        # Get alignment hint for this chunk
        alignment_hint = fuzz.partial_ratio_alignment(quote, chunk.payload['text'], score_cutoff=70)
        start, end, score = _find_best_match_position(quote, chunk.payload['text'], alignment_hint)
        if score >= FUZZY_THRESHOLD and start != -1:
            matched_text = chunk.payload['text'][start:end].strip()
            return _build_valid_result(quote, chunk, idx, score, matched_text, remapped=True)
    
    # Validation failed - find closest match for debugging
    matched_text = ""
    actual_score = 0
    try:
        debug_hint = fuzz.partial_ratio_alignment(quote, source_text, score_cutoff=60)
        debug_start, debug_end, debug_score = _find_best_match_position(quote, source_text, debug_hint)
        if debug_score >= 70 and debug_start != -1:
            matched_text = source_text[debug_start:debug_end].strip()
            actual_score = debug_score
    except:
        pass
    
    # If no decent match found, show snippet of source
    if not matched_text:
        matched_text = source_text[:200].strip() + "..." if len(source_text) > 200 else source_text
    
    return {
        "valid": False,
        "quote": quote,
        "source_id": source_id,
        "reason": f"Quote not found in any source (AI's cited source: {actual_score:.1f}% fuzzy match)",
        "matched_text": matched_text,
        "fuzzy_match_score": actual_score
    }


def format_citations_display(citations: List[Dict[str, Any]]) -> str:
    """Format validated citations in order with article title, URL, and quoted text.
    
    Args:
        citations: List of validated citation dicts
        
    Returns:
        Formatted string for display
    """
    if not citations:
        return "No citations available."
    
    # Sort citations by citation_id to display in order
    sorted_citations = sorted(citations, key=lambda x: x.get('citation_id', 0))
    
    citation_parts = []
    for cit in sorted_citations:
        marker = f"[{cit['citation_id']}]"
        score = cit.get('fuzzy_match_score', 100)
        
        if cit.get('remapped'):
            note = f" ({score:.1f}% fuzzy match, remapped)"
        else:
            note = f" ({score:.1f}% fuzzy match)"
        
        citation_parts.append(
            f"{marker} {cit['title']}{note}\n"
            f"    URL: {cit['url']}\n"
            f"    Quote: \"{cit['matched_text']}\"\n"
        )
    return "\n".join(citation_parts)