| | import streamlit as st |
| | from transformers import AutoTokenizer, AutoModelForTokenClassification, pipeline |
| | from collections import defaultdict |
| |
|
| | |
| | path_to_checkpoint = 'PranavaKailash/CyNER-2.0-DeBERTa-v3-base' |
| |
|
| | @st.cache_resource |
| | def load_model(): |
| | """Load model and tokenizer with proper error handling""" |
| | try: |
| | tokenizer = AutoTokenizer.from_pretrained(path_to_checkpoint, use_fast=True) |
| | model = AutoModelForTokenClassification.from_pretrained(path_to_checkpoint) |
| | |
| | |
| | ner_pipeline = pipeline( |
| | "ner", |
| | model=model, |
| | tokenizer=tokenizer, |
| | device=-1 |
| | ) |
| | return ner_pipeline |
| | except Exception as e: |
| | st.error(f"Error loading model: {str(e)}") |
| | return None |
| |
|
| | def tag_sentence(sentence, entities): |
| | """ |
| | Add HTML tags to entities for visualization. |
| | """ |
| | if not entities: |
| | return sentence |
| | |
| | |
| | sorted_entities = sorted(entities, key=lambda x: x['start']) |
| | |
| | tagged_sentence = "" |
| | last_idx = 0 |
| | |
| | for entity in sorted_entities: |
| | |
| | tagged_sentence += sentence[last_idx:entity['start']] |
| | |
| | |
| | entity_text = sentence[entity['start']:entity['end']] |
| | entity_label = entity['entity_group'] if 'entity_group' in entity else entity['entity'] |
| | confidence = entity.get('score', 0) |
| | |
| | tagged_sentence += f""" |
| | <span style='background-color: #e6f3ff; padding: 2px 6px; border-radius: 4px; border-left: 3px solid #007acc; margin: 1px;'> |
| | <strong style='color: #005299;'>{entity_label}</strong> |
| | <span style='color: #333;'>{entity_text}</span> |
| | <small style='color: #666; font-size: 0.8em;'>({confidence:.2f})</small> |
| | </span> |
| | """ |
| | |
| | last_idx = entity['end'] |
| | |
| | |
| | tagged_sentence += sentence[last_idx:] |
| | return tagged_sentence |
| |
|
| | @st.cache_data |
| | def perform_ner(text, _pipeline): |
| | """ |
| | Run NER pipeline and prepare results for display. |
| | """ |
| | if not _pipeline: |
| | return [], text |
| | |
| | try: |
| | |
| | entities = _pipeline(text) |
| | |
| | |
| | entities_by_type = defaultdict(list) |
| | for entity in entities: |
| | entity_type = entity.get('entity_group', entity.get('entity', 'Unknown')) |
| | entities_by_type[entity_type].append({ |
| | 'text': text[entity['start']:entity['end']], |
| | 'confidence': round(entity['score'], 3), |
| | 'start': entity['start'], |
| | 'end': entity['end'] |
| | }) |
| | |
| | |
| | tagged_sentence = tag_sentence(text, entities) |
| | |
| | return dict(entities_by_type), tagged_sentence, entities |
| | except Exception as e: |
| | st.error(f"Error during NER processing: {str(e)}") |
| | return {}, text, [] |
| |
|
| | |
| | st.set_page_config( |
| | page_title="CyNER 2.0", |
| | page_icon="π", |
| | layout="wide" |
| | ) |
| |
|
| | |
| | ner_pipeline = load_model() |
| |
|
| | if not ner_pipeline: |
| | st.error("β Failed to load the model. Please refresh the page or contact support.") |
| | st.stop() |
| |
|
| | st.title("π CyNER 2.0 - Cybersecurity Named Entity Recognition") |
| | st.markdown("**Advanced NER for Cybersecurity Text Analysis using DeBERTa-v3**") |
| | st.write("Enter cybersecurity-related text to identify and extract named entities.") |
| |
|
| | |
| | examples = { |
| | "Malware Analysis": "The Zeus trojan was detected on the victim's Windows 10 system at IP address 192.168.1.100. The malware communicated with command and control server evil.example.com using port 8080.", |
| | "Vulnerability Report": "CVE-2021-44228 affects Apache Log4j versions 2.0 to 2.15.0. The vulnerability allows remote code execution through LDAP injection.", |
| | "Incident Response": "Suspicious network traffic detected from IP 203.0.113.1 attempting to access /admin/login.php on our web server nginx running on Ubuntu 20.04.", |
| | "Phishing Attack": "Users received emails from admin@secur3-bank.com asking them to update their credentials by clicking on https://phishing-site.malicious.com/login" |
| | } |
| |
|
| | |
| | with st.sidebar: |
| | st.header("π Example Texts") |
| | st.write("Click to load example cybersecurity text:") |
| | for title, text in examples.items(): |
| | if st.button(f"π {title}", key=f"example_{title}"): |
| | st.session_state.input_text = text |
| |
|
| | |
| | input_text = st.text_area( |
| | "**Input Text**", |
| | value=st.session_state.get('input_text', "Enter your cybersecurity text here..."), |
| | height=150, |
| | help="Paste any cybersecurity-related text to analyze", |
| | key='input_text' |
| | ) |
| |
|
| | col1, col2, col3 = st.columns([2, 1, 3]) |
| | with col1: |
| | analyze_button = st.button("π Analyze Text", type="primary") |
| | with col2: |
| | clear_button = st.button("ποΈ Clear") |
| |
|
| | if clear_button: |
| | st.session_state.input_text = "" |
| | st.experimental_rerun() |
| |
|
| | if analyze_button and ner_pipeline: |
| | if input_text.strip() and input_text != "Enter your cybersecurity text here...": |
| | with st.spinner("π€ Processing text with CyNER 2.0..."): |
| | entities_dict, tagged_sentence, raw_entities = perform_ner(input_text, ner_pipeline) |
| | |
| | if entities_dict: |
| | st.success(f"β
Analysis complete! Found {sum(len(v) for v in entities_dict.values())} entities") |
| | |
| | |
| | st.subheader("π Analysis Results") |
| | |
| | |
| | st.markdown("**π·οΈ Tagged Entities:**") |
| | st.markdown(tagged_sentence, unsafe_allow_html=True) |
| | |
| | |
| | st.markdown("**π Entity Summary:**") |
| | if len(entities_dict) > 0: |
| | cols = st.columns(min(len(entities_dict), 4)) |
| | for i, (entity_type, entities_list) in enumerate(entities_dict.items()): |
| | with cols[i % 4]: |
| | st.metric( |
| | label=entity_type.replace('B-', '').replace('I-', ''), |
| | value=len(entities_list) |
| | ) |
| | |
| | |
| | with st.expander("π Detailed Entity Breakdown", expanded=True): |
| | for entity_type, entities_list in entities_dict.items(): |
| | st.markdown(f"**{entity_type}:**") |
| | for entity in entities_list: |
| | st.markdown(f"- `{entity['text']}` (confidence: {entity['confidence']})") |
| | |
| | |
| | with st.expander("π§ Raw JSON Data", expanded=False): |
| | st.json(entities_dict) |
| | else: |
| | st.info("βΉοΈ No cybersecurity entities detected in the provided text. Try using text with security-related terms like IP addresses, malware names, CVEs, etc.") |
| | else: |
| | st.warning("β οΈ Please enter some text for analysis.") |
| |
|
| | |
| | st.markdown("---") |
| | st.markdown(""" |
| | <div style='text-align: center; color: #666; font-size: 0.9em;'> |
| | <strong>CyNER 2.0</strong> - Cybersecurity Named Entity Recognition<br> |
| | Model: <code>PranavaKailash/CyNER-2.0-DeBERTa-v3-base</code> | Built with Streamlit |
| | </div> |
| | """, unsafe_allow_html=True) |