File size: 9,380 Bytes
f29b6e6
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
#!/usr/bin/env python3
# app.py
# Streamlit app for link detection with word-level highlighting

import streamlit as st
import torch
import torch.nn.functional as F
from transformers import AutoTokenizer, AutoModelForTokenClassification
import json

st.set_page_config(page_title="Link Detection", page_icon="🔗")

@st.cache_resource
def load_model(model_path="model_link_token_cls"):
    """Load model and tokenizer."""
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    tokenizer = AutoTokenizer.from_pretrained(model_path, use_fast=True)
    model = AutoModelForTokenClassification.from_pretrained(model_path)
    model = model.to(device)
    model.eval()
    return tokenizer, model, device

def group_tokens_into_words(tokens, offset_mapping, link_probs):
    """Group tokens into words based on tokenizer patterns."""
    words = []
    current_word_tokens = []
    current_word_offsets = []
    current_word_probs = []
    
    for i, (token, offsets, prob) in enumerate(zip(tokens, offset_mapping, link_probs)):
        # Skip special tokens
        if offsets == [0, 0]:
            if current_word_tokens:
                words.append({
                    'tokens': current_word_tokens,
                    'offsets': current_word_offsets,
                    'probs': current_word_probs
                })
                current_word_tokens = []
                current_word_offsets = []
                current_word_probs = []
            continue
        
        # Check if this is a new word or continuation
        is_new_word = False
        
        # DeBERTa uses ▁ for word boundaries
        if token.startswith("▁"):
            is_new_word = True
        # BERT uses ## for subword continuation
        elif i == 0 or not token.startswith("##"):
            # If previous token exists and doesn't indicate continuation
            if i == 0 or offset_mapping[i-1] == [0, 0]:
                is_new_word = True
            # Check if there's a gap between tokens (indicates new word)
            elif current_word_offsets and offsets[0] > current_word_offsets[-1][1]:
                is_new_word = True
        
        if is_new_word and current_word_tokens:
            # Save current word
            words.append({
                'tokens': current_word_tokens,
                'offsets': current_word_offsets,
                'probs': current_word_probs
            })
            current_word_tokens = []
            current_word_offsets = []
            current_word_probs = []
        
        # Add token to current word
        current_word_tokens.append(token)
        current_word_offsets.append(offsets)
        current_word_probs.append(prob)
    
    # Add last word if exists
    if current_word_tokens:
        words.append({
            'tokens': current_word_tokens,
            'offsets': current_word_offsets,
            'probs': current_word_probs
        })
    
    return words

def predict_links(text, tokenizer, model, device, threshold=0.5,

                   max_length=512, doc_stride=128):
    """Predict link tokens with word-level highlighting using sliding windows."""
    if not text.strip():
        return [], []

    # Tokenize full text without truncation or special tokens
    full_enc = tokenizer(
        text,
        add_special_tokens=False,
        truncation=False,
        return_offsets_mapping=True,
    )
    all_ids = full_enc["input_ids"]
    all_offsets = full_enc["offset_mapping"]
    n_tokens = len(all_ids)

    # Accumulate probabilities per token position (for averaging overlaps)
    prob_sums = [0.0] * n_tokens
    prob_counts = [0] * n_tokens

    # Sliding window parameters (matching training _prep.py)
    specials = tokenizer.num_special_tokens_to_add(pair=False)  # 2 for DeBERTa
    cap = max_length - specials  # 510 content tokens per window
    step = max(cap - doc_stride, 1)  # 382

    # Generate windows and run inference
    start = 0
    while start < n_tokens:
        end = min(start + cap, n_tokens)
        window_ids = all_ids[start:end]

        # Add special tokens (CLS + content + SEP)
        input_ids = torch.tensor(
            [tokenizer.build_inputs_with_special_tokens(window_ids)],
            device=device
        )
        attention_mask = torch.ones_like(input_ids)

        with torch.no_grad():
            logits = model(input_ids=input_ids, attention_mask=attention_mask).logits
            probs = F.softmax(logits, dim=-1)[0].cpu()
            # Skip special tokens (first and last) to get content probs
            content_probs = probs[1:-1, 1].tolist()

        # Map back to original token positions
        for i, p in enumerate(content_probs):
            orig_idx = start + i
            if orig_idx < n_tokens:
                prob_sums[orig_idx] += p
                prob_counts[orig_idx] += 1

        if end == n_tokens:
            break
        start += step

    # Average probabilities across overlapping windows
    link_probs = [
        prob_sums[i] / prob_counts[i] if prob_counts[i] > 0 else 0.0
        for i in range(n_tokens)
    ]

    # Get tokens and offsets for word grouping
    tokens = tokenizer.convert_ids_to_tokens(all_ids)
    offset_mapping = [list(o) for o in all_offsets]

    # Group tokens into words
    words = group_tokens_into_words(tokens, offset_mapping, link_probs)
    
    # Extract link spans - if ANY token in a word meets threshold, highlight entire word
    link_spans = []
    link_details = []
    
    for word_group in words:
        word_offsets = word_group['offsets']
        word_probs = word_group['probs']
        
        # Check if any token in the word meets the threshold
        if any(prob >= threshold for prob in word_probs):
            # Get the span of the entire word
            start = word_offsets[0][0]
            end = word_offsets[-1][1]
            link_spans.append((start, end))
            
            # Calculate max confidence for the word
            max_confidence = max(word_probs)
            avg_confidence = sum(word_probs) / len(word_probs)
            
            link_text = text[start:end]
            link_details.append({
                "text": link_text,
                "start": start,
                "end": end,
                "max_confidence": round(max_confidence, 4),
                "avg_confidence": round(avg_confidence, 4)
            })
    
    return link_spans, link_details

def render_highlighted_text(text, link_spans):
    """Render text with highlighted link spans."""
    if not text:
        return ""
    
    # Sort spans by start position
    link_spans = sorted(link_spans, key=lambda x: x[0])
    
    # Build HTML with highlights
    html_parts = []
    last_end = 0
    
    for start, end in link_spans:
        # Add text before the link
        if start > last_end:
            html_parts.append(text[last_end:start])
        # Add highlighted link
        html_parts.append(
            f'<span style="background-color: #90EE90; padding: 2px 4px; '
            f'border-radius: 3px; font-weight: 500;">{text[start:end]}</span>'
        )
        last_end = end
    
    # Add remaining text
    if last_end < len(text):
        html_parts.append(text[last_end:])
    
    html_content = "".join(html_parts)
    
    # Wrap in a div
    full_html = f"""

    <div style="

        padding: 20px;

        background-color: #f8f9fa;

        border-radius: 8px;

        line-height: 1.8;

        font-size: 16px;

        white-space: pre-wrap;

        word-wrap: break-word;

        font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto, sans-serif;

    ">

        {html_content}

    </div>

    """
    
    return full_html

def main():
    st.title("Link Detection")
    
    # Load model
    try:
        tokenizer, model, device = load_model()
        st.success(f"Model loaded on {device}")
    except Exception as e:
        st.error(f"Failed to load model: {e}")
        return
    
    # Threshold slider
    threshold = st.slider(
        "Confidence Threshold (%)",
        min_value=0,
        max_value=100,
        value=5,
        step=1,
        help="Highlights entire word if ANY of its tokens meet this threshold"
    ) / 100.0
    
    # Text input
    text = st.text_area("Input text:", height=200)
    
    if st.button("Detect Links"):
        if text:
            link_spans, link_details = predict_links(text, tokenizer, model, device, threshold)
            
            # Display highlighted text
            st.subheader("Text with Highlighted Links")
            html = render_highlighted_text(text, link_spans)
            st.markdown(html, unsafe_allow_html=True)
            
            # Show statistics
            st.info(f"Found {len(link_details)} words with link confidence above {threshold:.0%}")
            
            # Display JSON details
            if link_details:
                st.subheader("Link Details (JSON)")
                st.json(link_details)
        else:
            st.warning("Please enter text")

if __name__ == "__main__":
    main()