import streamlit as st from transformers import AutoTokenizer, AutoModelForTokenClassification, pipeline from collections import defaultdict # Load model and tokenizer path_to_checkpoint = 'PranavaKailash/CyNER-2.0-DeBERTa-v3-base' @st.cache_resource def load_model(): """Load model and tokenizer with proper error handling""" try: tokenizer = AutoTokenizer.from_pretrained(path_to_checkpoint, use_fast=True) model = AutoModelForTokenClassification.from_pretrained(path_to_checkpoint) # Initialize the NER pipeline (this handles device placement automatically) ner_pipeline = pipeline( "ner", model=model, tokenizer=tokenizer, device=-1 # Force CPU usage ) return ner_pipeline except Exception as e: st.error(f"Error loading model: {str(e)}") return None def tag_sentence(sentence, entities): """ Add HTML tags to entities for visualization. """ if not entities: return sentence # Sort entities by start position sorted_entities = sorted(entities, key=lambda x: x['start']) tagged_sentence = "" last_idx = 0 for entity in sorted_entities: # Add text before entity tagged_sentence += sentence[last_idx:entity['start']] # Add tagged entity entity_text = sentence[entity['start']:entity['end']] entity_label = entity['entity_group'] if 'entity_group' in entity else entity['entity'] confidence = entity.get('score', 0) tagged_sentence += f""" {entity_label} {entity_text} ({confidence:.2f}) """ last_idx = entity['end'] # Add remaining text tagged_sentence += sentence[last_idx:] return tagged_sentence @st.cache_data def perform_ner(text, _pipeline): """ Run NER pipeline and prepare results for display. """ if not _pipeline: return [], text try: # Get entities from pipeline entities = _pipeline(text) # Group entities by type for summary entities_by_type = defaultdict(list) for entity in entities: entity_type = entity.get('entity_group', entity.get('entity', 'Unknown')) entities_by_type[entity_type].append({ 'text': text[entity['start']:entity['end']], 'confidence': round(entity['score'], 3), 'start': entity['start'], 'end': entity['end'] }) # Create tagged sentence tagged_sentence = tag_sentence(text, entities) return dict(entities_by_type), tagged_sentence, entities except Exception as e: st.error(f"Error during NER processing: {str(e)}") return {}, text, [] # Streamlit UI st.set_page_config( page_title="CyNER 2.0", page_icon="🔐", layout="wide" ) # Load the pipeline ner_pipeline = load_model() if not ner_pipeline: st.error("❌ Failed to load the model. Please refresh the page or contact support.") st.stop() st.title("🔐 CyNER 2.0 - Cybersecurity Named Entity Recognition") st.markdown("**Advanced NER for Cybersecurity Text Analysis using DeBERTa-v3**") st.write("Enter cybersecurity-related text to identify and extract named entities.") # Example texts examples = { "Malware Analysis": "The Zeus trojan was detected on the victim's Windows 10 system at IP address 192.168.1.100. The malware communicated with command and control server evil.example.com using port 8080.", "Vulnerability Report": "CVE-2021-44228 affects Apache Log4j versions 2.0 to 2.15.0. The vulnerability allows remote code execution through LDAP injection.", "Incident Response": "Suspicious network traffic detected from IP 203.0.113.1 attempting to access /admin/login.php on our web server nginx running on Ubuntu 20.04.", "Phishing Attack": "Users received emails from admin@secur3-bank.com asking them to update their credentials by clicking on https://phishing-site.malicious.com/login" } # Sidebar for examples with st.sidebar: st.header("📝 Example Texts") st.write("Click to load example cybersecurity text:") for title, text in examples.items(): if st.button(f"📋 {title}", key=f"example_{title}"): st.session_state.input_text = text # Main input input_text = st.text_area( "**Input Text**", value=st.session_state.get('input_text', "Enter your cybersecurity text here..."), height=150, help="Paste any cybersecurity-related text to analyze", key='input_text' ) col1, col2, col3 = st.columns([2, 1, 3]) with col1: analyze_button = st.button("🔍 Analyze Text", type="primary") with col2: clear_button = st.button("đŸ—‘ī¸ Clear") if clear_button: st.session_state.input_text = "" st.experimental_rerun() if analyze_button and ner_pipeline: if input_text.strip() and input_text != "Enter your cybersecurity text here...": with st.spinner("🤖 Processing text with CyNER 2.0..."): entities_dict, tagged_sentence, raw_entities = perform_ner(input_text, ner_pipeline) if entities_dict: st.success(f"✅ Analysis complete! Found {sum(len(v) for v in entities_dict.values())} entities") # Display results st.subheader("📊 Analysis Results") # Tagged visualization st.markdown("**đŸˇī¸ Tagged Entities:**") st.markdown(tagged_sentence, unsafe_allow_html=True) # Entity summary metrics st.markdown("**📈 Entity Summary:**") if len(entities_dict) > 0: cols = st.columns(min(len(entities_dict), 4)) for i, (entity_type, entities_list) in enumerate(entities_dict.items()): with cols[i % 4]: st.metric( label=entity_type.replace('B-', '').replace('I-', ''), value=len(entities_list) ) # Detailed breakdown with st.expander("📋 Detailed Entity Breakdown", expanded=True): for entity_type, entities_list in entities_dict.items(): st.markdown(f"**{entity_type}:**") for entity in entities_list: st.markdown(f"- `{entity['text']}` (confidence: {entity['confidence']})") # Raw data for developers with st.expander("🔧 Raw JSON Data", expanded=False): st.json(entities_dict) else: st.info("â„šī¸ No cybersecurity entities detected in the provided text. Try using text with security-related terms like IP addresses, malware names, CVEs, etc.") else: st.warning("âš ī¸ Please enter some text for analysis.") # Footer st.markdown("---") st.markdown("""
CyNER 2.0 - Cybersecurity Named Entity Recognition
Model: PranavaKailash/CyNER-2.0-DeBERTa-v3-base | Built with Streamlit
""", unsafe_allow_html=True)