File size: 9,478 Bytes
a989b46
3460f59
 
 
 
 
 
 
7013831
3460f59
 
 
 
 
 
 
 
 
 
 
 
 
 
7013831
3460f59
7013831
167cd77
3460f59
 
 
167cd77
7013831
167cd77
 
7013831
 
167cd77
3460f59
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
7013831
3460f59
7013831
3460f59
 
 
 
 
 
 
 
 
 
 
 
7013831
3460f59
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
7013831
3460f59
 
 
7013831
3460f59
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
7013831
 
3460f59
7013831
8e698f2
 
3460f59
7013831
3460f59
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
7013831
3460f59
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
7013831
3460f59
 
 
 
 
7013831
3460f59
 
 
 
 
 
 
 
 
 
 
 
 
a989b46
3460f59
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
import streamlit as st
from transformers import pipeline, AutoTokenizer, AutoModelForTokenClassification
import torch
import pypdf
import os
import pandas as pd
import json

# --- FIX PRO WINDOWS (Neškodí na Linuxu) ---
import torch._dynamo
torch._dynamo.config.suppress_errors = True

# Nastavení stránky
st.set_page_config(page_title="CTI NER Analyzer", page_icon="🛡️", layout="wide")

st.title("🛡️ CTI NER Analyzer")
st.markdown("Detekce entit v textu pomocí modelu **attack-vector/SecureModernBERT-NER**.")

# --- Funkce ---

@st.cache_resource
def load_model():
    """
    Načte model. Vynucuje CPU nastavení pro Hugging Face Free Tier.
    """
    # 1. Vynucení CPU (Free Tier nemá GPU)
    device = -1 
    model_name = "attack-vector/SecureModernBERT-NER"
    
    tokenizer = AutoTokenizer.from_pretrained(model_name)
    
    # 2. FIX: Vypnutí Flash Attention a vynucení float32 (prevence pádu na CPU)
    model = AutoModelForTokenClassification.from_pretrained(
        model_name,
        attn_implementation="eager", # Důležité: Vypne GPU optimalizace
        torch_dtype=torch.float32    # Důležité: Plná přesnost pro CPU
    )

    pipe = pipeline(
        "ner", 
        model=model,
        tokenizer=tokenizer,
        aggregation_strategy="simple",
        device=device
    )
    return pipe

def extract_text_from_pdf(uploaded_file):
    try:
        pdf_reader = pypdf.PdfReader(uploaded_file)
        text = ""
        for page in pdf_reader.pages:
            extracted = page.extract_text()
            if extracted: text += extracted + "\n\n"
        return text
    except Exception as e:
        st.error(f"Chyba při čtení PDF: {e}")
        return ""

def analyze_long_text_batched(pipeline, text, chunk_size=3000, batch_size=1):
    """
    Pro CPU Spaces používáme menší chunk_size a batch_size=1.
    """
    chunks = []
    offsets = []
    
    for i in range(0, len(text), chunk_size):
        chunk = text[i : i + chunk_size]
        if not chunk.strip(): continue
        chunks.append(chunk)
        offsets.append(i)

    results = []
    
    # Batch size 1 je pro CPU nejbezpečnější
    for i, batch_results in enumerate(pipeline(chunks, batch_size=batch_size)):
        current_offset = offsets[i]
        
        for entity in batch_results:
            entity['start'] += current_offset
            entity['end'] += current_offset
            results.append(entity)
            
    return results

def merge_close_entities(results, original_text, max_char_distance=2):
    if not results: return []
    merged = []
    current = results[0].copy()

    for next_entity in results[1:]:
        gap_start = current['end']
        gap_end = next_entity['start']
        
        if gap_start > gap_end: gap_start = gap_end
        
        gap_text = original_text[gap_start:gap_end]
        
        if (current['entity_group'] == next_entity['entity_group'] and 
            len(gap_text) <= max_char_distance and 
            "." not in gap_text):
            
            current['end'] = next_entity['end']
            current['score'] = float(max(current['score'], next_entity['score']))
        else:
            merged.append(current)
            current = next_entity.copy()

    merged.append(current)
    return merged

# --- Načtení modelu ---
with st.spinner('Načítám model (může trvat minutu)...'):
    try:
        nlp_pipeline = load_model()
    except Exception as e:
        st.error(f"Chyba při načítání modelu: {e}")
        st.stop()

# --- UI ---
col1, col2 = st.columns([1, 2])

with col1:
    st.subheader("📂 Vstup dat")
    uploaded_file = st.file_uploader("Nahrajte PDF", type=["pdf"])
    manual_text = st.text_area("Vložte text:", height=300, disabled=(uploaded_file is not None))

    text_to_analyze = ""
    if uploaded_file:
        with st.spinner("Čtu PDF..."):
            text_to_analyze = extract_text_from_pdf(uploaded_file)
            if text_to_analyze: st.success(f"PDF načteno: {len(text_to_analyze)} znaků.")
    else:
        text_to_analyze = manual_text

    analyze_button = st.button("Analyzovat", type="primary")

# --- Analýza ---
with col2:
    if analyze_button and text_to_analyze.strip():
        progress_bar = st.progress(0, text="Zahajuji analýzu...")
        
        try:
            # 1. Analýza 
            progress_bar.progress(10, text="Běží AI model (bude to chvíli trvat)...")
            
            # Batch size 1 pro CPU stabilitu
            raw_results = analyze_long_text_batched(nlp_pipeline, text_to_analyze, batch_size=1)

            # 2. Slepování entit
            progress_bar.progress(90, text="Čištění výsledků...")
            results = merge_close_entities(raw_results, text_to_analyze)
            
            progress_bar.progress(100, text="Hotovo!")
            progress_bar.empty()

            if not results:
                st.info("Nic nenalezeno.")
            else:
                st.subheader("📝 Výsledky")

                # --- VIZUALIZACE ---
                display_limit = 5000
                st.caption(f"🎨 Náhled barevného textu (prvních {display_limit} znaků):")
                
                visible_results = [r for r in results if r['end'] < display_limit]
                html_string = "<div style='line-height: 2.0; font-family: sans-serif;'>"
                last_idx = 0
                
                for entity in visible_results:
                    start = entity['start']
                    end = entity['end']
                    label = entity['entity_group']
                    word = text_to_analyze[start:end]
                    
                    html_string += text_to_analyze[last_idx:start].replace("\n", "<br>")
                    
                    color_map = {
                        "MALWARE": "#ff4b4b", "ACTOR": "#ffa421", "THREAT-ACTOR": "#ffa421",
                        "TOOL": "#1c83e1", "MITRE-TACTIC": "#800080", "INDICATOR": "#21c354",
                        "FILEPATH": "#6c757d", "DOMAIN": "#21c354", "IP": "#21c354"
                    }
                    color = color_map.get(label, "#6c757d")
                    
                    html_string += f"<mark style='background-color: {color}; color: white; border-radius: 4px; padding: 2px 4px;'>{word} <sub style='font-size: 0.6em'>{label}</sub></mark>"
                    last_idx = end
                
                html_string += text_to_analyze[last_idx:display_limit].replace("\n", "<br>")
                if len(text_to_analyze) > display_limit:
                    html_string += "<br><br><i>... (zbytek textu je v tabulce níže) ...</i>"
                html_string += "</div>"
                
                with st.expander("Rozbalit barevný náhled", expanded=True):
                    st.markdown(html_string, unsafe_allow_html=True)
                
                st.divider()

                # --- TABULKA ---
                st.subheader("📊 Kompletní přehled nalezených entit")
                
                unique_entities = {}
                full_export_data = [] 

                for res in results:
                    raw_word = text_to_analyze[res['start']:res['end']]
                    clean_word = raw_word.strip(" .,;:)('\"")
                    if len(clean_word) < 2: continue
                    
                    score_float = float(res['score'])

                    key = (clean_word, res['entity_group'])
                    if key not in unique_entities:
                        unique_entities[key] = score_float
                    else:
                        unique_entities[key] = max(unique_entities[key], score_float)
                    
                    full_export_data.append({
                        "Entity": clean_word,
                        "Type": res['entity_group'],
                        "Confidence": score_float,
                        "Start_Char": int(res['start']),
                        "End_Char": int(res['end'])
                    })

                table_data = [
                    {"Entity": k[0], "Type": k[1], "Confidence": v}
                    for k, v in unique_entities.items()
                ]
                df_unique = pd.DataFrame(table_data).sort_values(by=["Type", "Entity"])
                
                df_display = df_unique.copy()
                df_display["Confidence"] = df_display["Confidence"].apply(lambda x: f"{x:.2%}")
                st.dataframe(df_display, use_container_width=True)

                # --- EXPORT ---
                col_exp1, col_exp2 = st.columns(2)
                
                with col_exp1:
                    csv = df_unique.to_csv(index=False).encode('utf-8')
                    st.download_button(
                        label="📥 Stáhnout CSV",
                        data=csv,
                        file_name='cti_analyza.csv',
                        mime='text/csv',
                    )
                
                with col_exp2:
                    json_str = json.dumps(full_export_data, indent=4)
                    st.download_button(
                        label="📥 Stáhnout JSON",
                        data=json_str,
                        file_name='cti_analyza_full.json',
                        mime='application/json',
                    )

        except Exception as e:
            st.error(f"Chyba při analýze: {e}")