|
|
import json |
|
|
import os |
|
|
from typing import List, Dict, Any, Optional, Generator |
|
|
from dataclasses import dataclass |
|
|
from enum import Enum |
|
|
import hashlib |
|
|
import logging |
|
|
from datetime import datetime |
|
|
|
|
|
import chromadb |
|
|
from chromadb.config import Settings |
|
|
from sentence_transformers import SentenceTransformer |
|
|
from huggingface_hub import hf_hub_download |
|
|
|
|
|
|
|
|
from llm_handler import CybersecurityLLM |
|
|
|
|
|
logging.basicConfig(level=logging.INFO) |
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class SecurityTopic(Enum): |
|
|
PHISHING = "phishing" |
|
|
PASSWORDS = "passwords" |
|
|
MALWARE = "malware" |
|
|
SOCIAL_ENGINEERING = "social_engineering" |
|
|
DATA_PROTECTION = "data_protection" |
|
|
NETWORK_SECURITY = "network_security" |
|
|
INCIDENT_RESPONSE = "incident_response" |
|
|
PHYSICAL_SECURITY = "physical_security" |
|
|
MOBILE_SECURITY = "mobile_security" |
|
|
CLOUD_SECURITY = "cloud_security" |
|
|
COMPLIANCE = "compliance" |
|
|
EMAIL_SECURITY = "email_security" |
|
|
RANSOMWARE = "ransomware" |
|
|
ZERO_TRUST = "zero_trust" |
|
|
SUPPLY_CHAIN = "supply_chain" |
|
|
|
|
|
|
|
|
@dataclass |
|
|
class SecurityKnowledge: |
|
|
topic: SecurityTopic |
|
|
title: str |
|
|
content: str |
|
|
keywords: List[str] |
|
|
severity: str |
|
|
last_updated: str = "" |
|
|
|
|
|
def to_dict(self) -> Dict[str, Any]: |
|
|
return { |
|
|
"topic": self.topic.value, |
|
|
"title": self.title, |
|
|
"content": self.content, |
|
|
"keywords": json.dumps(self.keywords), |
|
|
"severity": self.severity, |
|
|
"last_updated": self.last_updated or datetime.now().isoformat() |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class CybersecurityKnowledgeBase: |
|
|
def __init__(self, |
|
|
persist_directory: str = "./knowledge_db", |
|
|
embedding_model: str = "all-MiniLM-L6-v2"): |
|
|
""" |
|
|
Initialize knowledge base with vector database |
|
|
|
|
|
Args: |
|
|
persist_directory: Directory to persist ChromaDB |
|
|
embedding_model: Sentence transformer model for embeddings |
|
|
""" |
|
|
|
|
|
logger.info(f"Initializing knowledge base at {persist_directory}") |
|
|
|
|
|
|
|
|
os.makedirs(persist_directory, exist_ok=True) |
|
|
|
|
|
|
|
|
self.client = chromadb.PersistentClient( |
|
|
path=persist_directory, |
|
|
settings=Settings( |
|
|
anonymized_telemetry=False, |
|
|
allow_reset=True |
|
|
) |
|
|
) |
|
|
|
|
|
|
|
|
try: |
|
|
self.collection = self.client.get_collection("cybersecurity_knowledge") |
|
|
logger.info(f"Loaded existing collection with {self.collection.count()} documents") |
|
|
except: |
|
|
self.collection = self.client.create_collection( |
|
|
name="cybersecurity_knowledge", |
|
|
metadata={"description": "Cybersecurity best practices and knowledge"} |
|
|
) |
|
|
logger.info("Created new knowledge collection") |
|
|
|
|
|
|
|
|
logger.info(f"Loading embedding model: {embedding_model}") |
|
|
self.embedder = SentenceTransformer(embedding_model) |
|
|
|
|
|
|
|
|
if self.collection.count() == 0: |
|
|
logger.info("Loading core cybersecurity knowledge...") |
|
|
self._load_core_knowledge() |
|
|
|
|
|
|
|
|
self.stats = { |
|
|
"total_documents": self.collection.count(), |
|
|
"queries_processed": 0, |
|
|
"last_updated": datetime.now().isoformat() |
|
|
} |
|
|
|
|
|
def _load_core_knowledge(self): |
|
|
"""Load comprehensive cybersecurity knowledge""" |
|
|
|
|
|
knowledge_items = [ |
|
|
|
|
|
SecurityKnowledge( |
|
|
topic=SecurityTopic.PHISHING, |
|
|
title="Comprehensive Phishing Detection Guide", |
|
|
content=""" |
|
|
IDENTIFYING PHISHING EMAILS - Complete Guide: |
|
|
|
|
|
Red Flags to Watch For: |
|
|
• Generic greetings: "Dear Customer" instead of your actual name |
|
|
• Urgency tactics: "Act now or your account will be closed!" |
|
|
• Grammar/spelling errors: Professional companies proofread their emails |
|
|
• Mismatched sender: Display name doesn't match email address |
|
|
• Suspicious links: Hover to see if URL matches claimed sender |
|
|
• Unexpected attachments: Especially .zip, .exe, .scr, .vbs files |
|
|
• Requests for sensitive info: Legitimate companies don't ask for passwords via email |
|
|
• Too good to be true: "You've won $1 million!" |
|
|
• Emotional manipulation: Fear, greed, curiosity, sympathy |
|
|
|
|
|
How to Verify Suspicious Emails: |
|
|
1. Check sender's email address carefully (not just display name) |
|
|
2. Hover over links WITHOUT clicking to preview destination |
|
|
3. Look for HTTPS and correct domain in links |
|
|
4. Contact company directly through official channels (not email links) |
|
|
5. Check for personalization - legitimate emails often include account numbers |
|
|
6. Verify with IT security team when in doubt |
|
|
|
|
|
What to Do If You Receive Phishing: |
|
|
1. Don't click links or download attachments |
|
|
2. Don't reply or provide any information |
|
|
3. Report to IT security immediately |
|
|
4. Forward to anti-phishing team if available |
|
|
5. Delete the email after reporting |
|
|
6. Warn colleagues if it's widespread |
|
|
|
|
|
If You Clicked a Phishing Link: |
|
|
1. Disconnect from network immediately |
|
|
2. Change passwords from a different device |
|
|
3. Report to IT security IMMEDIATELY |
|
|
4. Run antivirus scan |
|
|
5. Monitor accounts for suspicious activity |
|
|
6. Enable MFA on all accounts if not already done |
|
|
""", |
|
|
keywords=["phishing", "email", "scam", "suspicious", "link", "attachment", "spear phishing", "whaling", |
|
|
"BEC"], |
|
|
severity="critical" |
|
|
), |
|
|
|
|
|
|
|
|
SecurityKnowledge( |
|
|
topic=SecurityTopic.PASSWORDS, |
|
|
title="Password Security Best Practices", |
|
|
content=""" |
|
|
CREATING STRONG PASSWORDS: |
|
|
|
|
|
Requirements for Strong Passwords: |
|
|
• Minimum 12-16 characters (longer is better) |
|
|
• Mix of uppercase and lowercase letters |
|
|
• Include numbers and special characters (!@#$%^&*) |
|
|
• Avoid dictionary words and personal information |
|
|
• Unique for every account - never reuse passwords |
|
|
• Consider passphrases: 'Coffee@7Makes$Me!Happy2024' |
|
|
• Avoid patterns: Password1, Password2, etc. |
|
|
• Don't use keyboard patterns: qwerty, 123456 |
|
|
|
|
|
Password Management Best Practices: |
|
|
• Use a reputable password manager (Bitwarden, 1Password, LastPass) |
|
|
• Enable two-factor authentication (2FA) everywhere possible |
|
|
• Use authenticator apps over SMS when possible |
|
|
• Never share passwords via email, chat, or phone |
|
|
• Change passwords immediately if breach suspected |
|
|
• Don't write passwords on sticky notes |
|
|
• Use different passwords for work and personal accounts |
|
|
• Consider using hardware keys for critical accounts |
|
|
|
|
|
Multi-Factor Authentication (MFA): |
|
|
• Something you know (password) |
|
|
• Something you have (phone, token) |
|
|
• Something you are (biometric) |
|
|
|
|
|
Password Manager Benefits: |
|
|
• Generate random, unique passwords |
|
|
• Securely store all passwords |
|
|
• Auto-fill credentials safely |
|
|
• Sync across devices |
|
|
• Alert you to breaches |
|
|
• Share passwords securely when needed |
|
|
|
|
|
Common Password Mistakes: |
|
|
• Using personal information (birthdate, pet names) |
|
|
• Reusing passwords across sites |
|
|
• Sharing passwords with others |
|
|
• Using simple substitutions (P@ssw0rd) |
|
|
• Not updating default passwords |
|
|
• Ignoring breach notifications |
|
|
""", |
|
|
keywords=["password", "authentication", "2FA", "MFA", "login", "credentials", "passphrase", |
|
|
"password manager"], |
|
|
severity="critical" |
|
|
), |
|
|
|
|
|
|
|
|
SecurityKnowledge( |
|
|
topic=SecurityTopic.MALWARE, |
|
|
title="Malware Prevention and Response", |
|
|
content=""" |
|
|
MALWARE PREVENTION STRATEGIES: |
|
|
|
|
|
Prevention Best Practices: |
|
|
• Keep OS and all software updated with latest patches |
|
|
• Use reputable antivirus with real-time protection |
|
|
• Enable Windows Defender or equivalent |
|
|
• Download software only from official sources |
|
|
• Verify digital signatures on downloads |
|
|
• Scan USB drives before opening files |
|
|
• Disable macros in Office documents from unknown sources |
|
|
• Use application sandboxing when possible |
|
|
• Regular backups following 3-2-1 rule |
|
|
• Keep UAC (User Account Control) enabled |
|
|
|
|
|
Types of Malware: |
|
|
• Viruses: Self-replicating, attaches to files |
|
|
• Worms: Self-spreading through networks |
|
|
• Trojans: Disguised as legitimate software |
|
|
• Ransomware: Encrypts files for ransom |
|
|
• Spyware: Steals information secretly |
|
|
• Adware: Displays unwanted advertisements |
|
|
• Rootkits: Hides presence from system |
|
|
• Keyloggers: Records keystrokes |
|
|
• Cryptominers: Uses resources to mine cryptocurrency |
|
|
|
|
|
Warning Signs of Infection: |
|
|
• Computer running unusually slow |
|
|
• Frequent crashes or blue screens |
|
|
• Programs starting automatically |
|
|
• Browser homepage changed |
|
|
• New toolbars or extensions |
|
|
• Excessive pop-ups |
|
|
• Files encrypted with ransom note |
|
|
• Unusual network activity |
|
|
• Disabled security software |
|
|
• Missing or modified files |
|
|
|
|
|
If Infected - Immediate Steps: |
|
|
1. Disconnect from all networks (WiFi, Ethernet) |
|
|
2. Enter Safe Mode if possible |
|
|
3. Run full antivirus scan |
|
|
4. Use additional malware removal tools (Malwarebytes) |
|
|
5. Check for system restore points |
|
|
6. Contact IT security team immediately |
|
|
7. Change all passwords from clean device |
|
|
8. Monitor financial accounts |
|
|
9. Consider complete system reinstall for severe infections |
|
|
""", |
|
|
keywords=["malware", "virus", "ransomware", "trojan", "antivirus", "infection", "worm", "spyware"], |
|
|
severity="critical" |
|
|
), |
|
|
|
|
|
|
|
|
SecurityKnowledge( |
|
|
topic=SecurityTopic.SOCIAL_ENGINEERING, |
|
|
title="Social Engineering Defense Strategies", |
|
|
content=""" |
|
|
DEFENDING AGAINST SOCIAL ENGINEERING: |
|
|
|
|
|
Common Social Engineering Tactics: |
|
|
• Pretexting: Creating fake scenarios to steal information |
|
|
• Baiting: Offering something enticing (USB drives, downloads) |
|
|
• Quid pro quo: Offering service for information |
|
|
• Tailgating: Following into secure areas |
|
|
• Vishing: Voice phishing via phone |
|
|
• Smishing: SMS/text message phishing |
|
|
• Watering hole: Compromising frequently visited websites |
|
|
• Dumpster diving: Searching trash for information |
|
|
• Shoulder surfing: Looking over shoulder for passwords |
|
|
|
|
|
Red Flags to Recognize: |
|
|
• Unsolicited contact asking for information |
|
|
• Urgency without verification |
|
|
• Requests to bypass normal procedures |
|
|
• Appeals to authority without proof |
|
|
• Offers that seem too good to be true |
|
|
• Requests for passwords or sensitive data |
|
|
• Emotional manipulation (fear, greed, sympathy) |
|
|
• Name dropping without context |
|
|
• Resistance to verification |
|
|
|
|
|
Defense Strategies: |
|
|
• Always verify identity before sharing information |
|
|
• Use callback numbers from official sources |
|
|
• Be suspicious of unsolicited contacts |
|
|
• Never give passwords over phone/email |
|
|
• Question unusual requests, even from "colleagues" |
|
|
• Report suspicious behavior immediately |
|
|
• Trust but verify - confirm through separate channel |
|
|
• Be aware of information you share publicly |
|
|
• Secure physical documents and screens |
|
|
• Educate family about work-related scams |
|
|
|
|
|
Verification Techniques: |
|
|
• Call back on known number |
|
|
• Check employee directory |
|
|
• Verify with manager |
|
|
• Ask for employee ID |
|
|
• Request email confirmation |
|
|
• Check digital signatures |
|
|
• Verify through IT security |
|
|
""", |
|
|
keywords=["social engineering", "pretexting", "vishing", "smishing", "manipulation", "tailgating", |
|
|
"phishing"], |
|
|
severity="high" |
|
|
), |
|
|
|
|
|
|
|
|
SecurityKnowledge( |
|
|
topic=SecurityTopic.NETWORK_SECURITY, |
|
|
title="Network and WiFi Security Guide", |
|
|
content=""" |
|
|
NETWORK SECURITY BEST PRACTICES: |
|
|
|
|
|
Home WiFi Security: |
|
|
• Change default router admin credentials immediately |
|
|
• Use WPA3 encryption (WPA2 minimum) |
|
|
• Create strong WiFi password (20+ characters) |
|
|
• Change default network name (SSID) |
|
|
• Disable WPS (WiFi Protected Setup) |
|
|
• Keep router firmware updated monthly |
|
|
• Use guest network for visitors and IoT devices |
|
|
• Disable remote management unless necessary |
|
|
• Turn off SSID broadcast if practical |
|
|
• Use MAC address filtering for added security |
|
|
• Position router centrally to minimize external signal |
|
|
• Regular reboot router (monthly) |
|
|
|
|
|
Public WiFi Safety: |
|
|
• Avoid accessing sensitive accounts |
|
|
• Always use VPN for all connections |
|
|
• Verify network name with venue staff |
|
|
• Turn off automatic WiFi connection |
|
|
• Forget network after use |
|
|
• Never accept certificate warnings |
|
|
• Disable file sharing |
|
|
• Use cellular data for sensitive tasks |
|
|
• Keep firewall enabled |
|
|
• Use HTTPS websites only |
|
|
|
|
|
VPN Best Practices: |
|
|
• Use company-approved VPN only |
|
|
• Connect before accessing any resources |
|
|
• Keep VPN client updated |
|
|
• Report connection issues immediately |
|
|
• Don't use free/public VPN services |
|
|
• Verify VPN is active before working |
|
|
|
|
|
Network Hygiene: |
|
|
• Regular network scans for unknown devices |
|
|
• Monitor bandwidth usage |
|
|
• Check for unauthorized access points |
|
|
• Secure all network equipment physically |
|
|
• Document network configuration |
|
|
• Regular security audits |
|
|
""", |
|
|
keywords=["wifi", "network", "router", "VPN", "encryption", "WPA3", "public wifi", "wireless"], |
|
|
severity="high" |
|
|
), |
|
|
|
|
|
|
|
|
SecurityKnowledge( |
|
|
topic=SecurityTopic.INCIDENT_RESPONSE, |
|
|
title="Security Incident Response Procedures", |
|
|
content=""" |
|
|
SECURITY INCIDENT RESPONSE GUIDE: |
|
|
|
|
|
IMMEDIATE RESPONSE STEPS: |
|
|
1. STOP - Don't try to fix it yourself |
|
|
2. DISCONNECT - Unplug network cable or disable WiFi |
|
|
3. DOCUMENT - Write down: |
|
|
- What happened |
|
|
- When it occurred |
|
|
- What you were doing |
|
|
- Error messages |
|
|
- Unusual behavior observed |
|
|
4. REPORT - Contact IT security immediately |
|
|
5. PRESERVE - Don't delete anything, take screenshots |
|
|
6. WAIT - For IT security instructions |
|
|
|
|
|
Types of Incidents Requiring Immediate Reporting: |
|
|
• Clicked suspicious link or attachment |
|
|
• Entered credentials on suspicious site |
|
|
• Lost device with company data |
|
|
• Suspicious computer behavior |
|
|
• Unauthorized access attempts |
|
|
• Data breach or leak discovered |
|
|
• Ransomware infection |
|
|
• Physical security breach |
|
|
• Stolen credentials |
|
|
• Suspicious phone calls asking for info |
|
|
|
|
|
Information to Provide: |
|
|
• Your name and contact information |
|
|
• Time and date of incident |
|
|
• Affected systems/accounts |
|
|
• Description of what happened |
|
|
• Actions taken so far |
|
|
• Any error messages (exact wording) |
|
|
• Screenshots if possible |
|
|
• Anyone else who might be affected |
|
|
|
|
|
DO NOT: |
|
|
• Try to fix it yourself |
|
|
• Delete or modify evidence |
|
|
• Inform unauthorized people |
|
|
• Post about it on social media |
|
|
• Continue using affected systems |
|
|
• Pay ransoms |
|
|
|
|
|
Contact Information: |
|
|
IT Security Hotline: [Organization specific] |
|
|
Email: security@[organization] |
|
|
After hours: [Emergency contact] |
|
|
""", |
|
|
keywords=["incident", "breach", "response", "report", "emergency", "compromise", "security incident"], |
|
|
severity="critical" |
|
|
), |
|
|
|
|
|
|
|
|
SecurityKnowledge( |
|
|
topic=SecurityTopic.DATA_PROTECTION, |
|
|
title="Data Protection and Privacy Guide", |
|
|
content=""" |
|
|
DATA PROTECTION BEST PRACTICES: |
|
|
|
|
|
Data Classification: |
|
|
• Public: Can be freely shared |
|
|
• Internal: Within organization only |
|
|
• Confidential: Specific authorized individuals |
|
|
• Restricted: Highest sensitivity, strict controls |
|
|
|
|
|
Handling Sensitive Data: |
|
|
• Encrypt files before sharing externally |
|
|
• Use approved file sharing platforms only |
|
|
• Never use personal email for work data |
|
|
• Implement clean desk policy |
|
|
• Lock computer when stepping away (Win+L or Cmd+Ctrl+Q) |
|
|
• Use privacy screens in public spaces |
|
|
• Shred physical documents with sensitive info |
|
|
• Secure disposal of electronic media |
|
|
• Don't discuss sensitive info in public |
|
|
• Be aware of smart speakers/devices |
|
|
|
|
|
Encryption Best Practices: |
|
|
• Use full disk encryption (BitLocker, FileVault) |
|
|
• Encrypt removable media |
|
|
• Use encrypted communication channels |
|
|
• Encrypt email with sensitive data |
|
|
• Password protect sensitive documents |
|
|
• Use enterprise encryption tools |
|
|
• Store encryption keys securely |
|
|
|
|
|
Data Backup Practices: |
|
|
• Follow 3-2-1 rule: |
|
|
- 3 copies of important data |
|
|
- 2 different storage media |
|
|
- 1 offsite backup |
|
|
• Test restore procedures regularly |
|
|
• Encrypt backup drives |
|
|
• Store backups securely |
|
|
• Automate where possible |
|
|
• Document what's backed up |
|
|
• Verify backup integrity |
|
|
|
|
|
Privacy Considerations: |
|
|
• Minimize data collection |
|
|
• Only share need-to-know basis |
|
|
• Regular data audits |
|
|
• Respect retention policies |
|
|
• Secure data destruction |
|
|
• GDPR/CCPA compliance |
|
|
""", |
|
|
keywords=["data", "encryption", "backup", "confidential", "sensitive", "GDPR", "privacy", |
|
|
"classification"], |
|
|
severity="high" |
|
|
), |
|
|
|
|
|
|
|
|
SecurityKnowledge( |
|
|
topic=SecurityTopic.MOBILE_SECURITY, |
|
|
title="Mobile Device Security Guidelines", |
|
|
content=""" |
|
|
MOBILE DEVICE SECURITY: |
|
|
|
|
|
Device Security Settings: |
|
|
• Enable screen lock (PIN, password, biometric) |
|
|
• Set auto-lock to 1-2 minutes |
|
|
• Keep OS and apps updated automatically |
|
|
• Download apps only from official stores |
|
|
• Review app permissions carefully |
|
|
• Enable remote wipe capability |
|
|
• Use Find My Device features |
|
|
• Encrypt device storage |
|
|
• Disable Bluetooth when not needed |
|
|
• Turn off WiFi auto-connect |
|
|
• Disable Siri/Assistant on lock screen |
|
|
|
|
|
BYOD (Bring Your Own Device) Security: |
|
|
• Separate work and personal data |
|
|
• Use MDM if required by company |
|
|
• Install company security apps |
|
|
• Follow company mobile policy |
|
|
• Report lost/stolen immediately |
|
|
• Don't jailbreak/root devices |
|
|
• Use company VPN for work |
|
|
• Regular security updates |
|
|
|
|
|
Mobile Threats: |
|
|
• Malicious apps |
|
|
• Unsecured WiFi |
|
|
• SMiShing (SMS phishing) |
|
|
• Bluetooth attacks |
|
|
• Physical theft |
|
|
• Shoulder surfing |
|
|
• Juice jacking (USB charging) |
|
|
• SIM swapping |
|
|
|
|
|
Safe Mobile Practices: |
|
|
• Avoid public WiFi for sensitive tasks |
|
|
• Use VPN when on public networks |
|
|
• Don't click links in text messages |
|
|
• Be cautious with QR codes |
|
|
• Use official app stores only |
|
|
• Keep personal info private |
|
|
• Regular app permission audits |
|
|
• Backup device regularly |
|
|
• Use mobile antivirus |
|
|
• Avoid charging at public USB ports |
|
|
""", |
|
|
keywords=["mobile", "smartphone", "tablet", "BYOD", "iOS", "Android", "app security", "MDM"], |
|
|
severity="medium" |
|
|
), |
|
|
|
|
|
|
|
|
SecurityKnowledge( |
|
|
topic=SecurityTopic.RANSOMWARE, |
|
|
title="Ransomware Prevention and Response", |
|
|
content=""" |
|
|
RANSOMWARE PROTECTION GUIDE: |
|
|
|
|
|
Prevention Strategies: |
|
|
• Regular automated backups (tested restores) |
|
|
• Keep all software patched and updated |
|
|
• Email filtering and sandboxing |
|
|
• Disable macros by default |
|
|
• User training on phishing |
|
|
• Network segmentation |
|
|
• Principle of least privilege |
|
|
• Application whitelisting |
|
|
• Endpoint detection and response (EDR) |
|
|
|
|
|
If Ransomware Strikes: |
|
|
1. Immediately disconnect from network |
|
|
2. Power off if actively encrypting |
|
|
3. Report to IT security immediately |
|
|
4. Do NOT pay ransom |
|
|
5. Preserve evidence for investigation |
|
|
6. Check for decryption tools |
|
|
7. Restore from clean backups |
|
|
8. Rebuild affected systems |
|
|
9. Investigate root cause |
|
|
10. Implement lessons learned |
|
|
|
|
|
Warning Signs: |
|
|
• Files with strange extensions |
|
|
• Cannot open documents |
|
|
• Ransom notes in folders |
|
|
• Slow computer performance |
|
|
• Renamed files |
|
|
• Wallpaper changed to ransom message |
|
|
|
|
|
Recovery Planning: |
|
|
• Maintain offline backups |
|
|
• Test restore procedures |
|
|
• Document critical systems |
|
|
• Incident response plan |
|
|
• Communication plan |
|
|
• Legal/law enforcement contacts |
|
|
""", |
|
|
keywords=["ransomware", "encryption", "ransom", "backup", "recovery", "bitcoin", "crypto"], |
|
|
severity="critical" |
|
|
), |
|
|
|
|
|
|
|
|
SecurityKnowledge( |
|
|
topic=SecurityTopic.CLOUD_SECURITY, |
|
|
title="Cloud Services Security", |
|
|
content=""" |
|
|
CLOUD SECURITY BEST PRACTICES: |
|
|
|
|
|
Account Security: |
|
|
• Use strong, unique passwords |
|
|
• Enable MFA on all cloud accounts |
|
|
• Regular access reviews |
|
|
• Monitor for unusual activity |
|
|
• Use SSO where available |
|
|
• Secure API keys and tokens |
|
|
|
|
|
Data Protection in Cloud: |
|
|
• Understand shared responsibility model |
|
|
• Encrypt data at rest and in transit |
|
|
• Use cloud provider encryption |
|
|
• Control data residency |
|
|
• Regular security audits |
|
|
• Implement DLP policies |
|
|
|
|
|
Safe Cloud Usage: |
|
|
• Only use approved cloud services |
|
|
• Read terms of service |
|
|
• Understand data ownership |
|
|
• Configure privacy settings |
|
|
• Regular permission reviews |
|
|
• Monitor shared links |
|
|
• Set expiration on shares |
|
|
• Audit access logs |
|
|
|
|
|
Common Cloud Risks: |
|
|
• Misconfigured storage buckets |
|
|
• Excessive permissions |
|
|
• Shadow IT |
|
|
• Account takeover |
|
|
• Data leakage |
|
|
• Compliance violations |
|
|
• Insider threats |
|
|
• API vulnerabilities |
|
|
""", |
|
|
keywords=["cloud", "SaaS", "AWS", "Azure", "Google Cloud", "OneDrive", "Dropbox", "Office 365"], |
|
|
severity="high" |
|
|
) |
|
|
] |
|
|
|
|
|
|
|
|
batch_size = 10 |
|
|
for i in range(0, len(knowledge_items), batch_size): |
|
|
batch = knowledge_items[i:i + batch_size] |
|
|
|
|
|
embeddings = [] |
|
|
documents = [] |
|
|
metadatas = [] |
|
|
ids = [] |
|
|
|
|
|
for item in batch: |
|
|
|
|
|
embedding = self.embedder.encode(item.content).tolist() |
|
|
embeddings.append(embedding) |
|
|
|
|
|
|
|
|
documents.append(item.content) |
|
|
|
|
|
|
|
|
metadatas.append(item.to_dict()) |
|
|
|
|
|
|
|
|
doc_id = hashlib.md5( |
|
|
f"{item.topic.value}_{item.title}_{len(item.content)}".encode() |
|
|
).hexdigest() |
|
|
ids.append(doc_id) |
|
|
|
|
|
|
|
|
self.collection.add( |
|
|
embeddings=embeddings, |
|
|
documents=documents, |
|
|
metadatas=metadatas, |
|
|
ids=ids |
|
|
) |
|
|
|
|
|
logger.info(f"Added batch {i // batch_size + 1} of knowledge items") |
|
|
|
|
|
logger.info(f"Successfully loaded {len(knowledge_items)} knowledge items") |
|
|
|
|
|
def search(self, |
|
|
query: str, |
|
|
k: int = 3, |
|
|
filter_topic: Optional[str] = None, |
|
|
min_severity: Optional[str] = None) -> List[Dict[str, Any]]: |
|
|
""" |
|
|
Search for relevant security information |
|
|
|
|
|
Args: |
|
|
query: User's question |
|
|
k: Number of results to return |
|
|
filter_topic: Optional topic filter |
|
|
min_severity: Minimum severity level filter |
|
|
|
|
|
Returns: |
|
|
List of relevant documents with metadata |
|
|
""" |
|
|
|
|
|
self.stats["queries_processed"] += 1 |
|
|
|
|
|
|
|
|
query_embedding = self.embedder.encode(query).tolist() |
|
|
|
|
|
|
|
|
where_filter = {} |
|
|
if filter_topic: |
|
|
where_filter["topic"] = filter_topic |
|
|
if min_severity: |
|
|
severity_levels = ["low", "medium", "high", "critical"] |
|
|
min_index = severity_levels.index(min_severity) |
|
|
valid_severities = severity_levels[min_index:] |
|
|
where_filter["severity"] = {"$in": valid_severities} |
|
|
|
|
|
|
|
|
if where_filter: |
|
|
results = self.collection.query( |
|
|
query_embeddings=[query_embedding], |
|
|
n_results=k, |
|
|
where=where_filter |
|
|
) |
|
|
else: |
|
|
results = self.collection.query( |
|
|
query_embeddings=[query_embedding], |
|
|
n_results=k |
|
|
) |
|
|
|
|
|
|
|
|
formatted_results = [] |
|
|
if results['documents'] and results['documents'][0]: |
|
|
for doc, metadata, distance in zip( |
|
|
results['documents'][0], |
|
|
results['metadatas'][0], |
|
|
results['distances'][0] |
|
|
): |
|
|
formatted_results.append({ |
|
|
'content': doc, |
|
|
'topic': metadata.get('topic', 'unknown'), |
|
|
'title': metadata.get('title', 'Untitled'), |
|
|
'severity': metadata.get('severity', 'medium'), |
|
|
'keywords': json.loads(metadata.get('keywords', '[]')), |
|
|
'relevance_score': 1 - (distance / 2) |
|
|
}) |
|
|
|
|
|
return formatted_results |
|
|
|
|
|
def add_custom_knowledge(self, |
|
|
content: str, |
|
|
topic: str, |
|
|
title: str, |
|
|
keywords: List[str], |
|
|
severity: str = "medium") -> bool: |
|
|
""" |
|
|
Add custom security knowledge to the database |
|
|
|
|
|
Args: |
|
|
content: Knowledge content |
|
|
topic: Topic category |
|
|
title: Title of the knowledge |
|
|
keywords: Related keywords |
|
|
severity: Severity level |
|
|
|
|
|
Returns: |
|
|
Success status |
|
|
""" |
|
|
|
|
|
try: |
|
|
|
|
|
embedding = self.embedder.encode(content).tolist() |
|
|
|
|
|
|
|
|
doc_id = hashlib.md5( |
|
|
f"{topic}_{title}_{len(content)}_{datetime.now().isoformat()}".encode() |
|
|
).hexdigest() |
|
|
|
|
|
|
|
|
self.collection.add( |
|
|
embeddings=[embedding], |
|
|
documents=[content], |
|
|
metadatas=[{ |
|
|
"topic": topic, |
|
|
"title": title, |
|
|
"keywords": json.dumps(keywords), |
|
|
"severity": severity, |
|
|
"last_updated": datetime.now().isoformat(), |
|
|
"custom": True |
|
|
}], |
|
|
ids=[doc_id] |
|
|
) |
|
|
|
|
|
self.stats["total_documents"] = self.collection.count() |
|
|
logger.info(f"Added custom knowledge: {title}") |
|
|
return True |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Failed to add custom knowledge: {e}") |
|
|
return False |
|
|
|
|
|
def get_statistics(self) -> Dict[str, Any]: |
|
|
"""Get knowledge base statistics""" |
|
|
|
|
|
self.stats["total_documents"] = self.collection.count() |
|
|
self.stats["last_accessed"] = datetime.now().isoformat() |
|
|
|
|
|
|
|
|
all_metadata = self.collection.get()['metadatas'] |
|
|
topic_counts = {} |
|
|
severity_counts = {} |
|
|
|
|
|
for metadata in all_metadata: |
|
|
topic = metadata.get('topic', 'unknown') |
|
|
severity = metadata.get('severity', 'unknown') |
|
|
|
|
|
topic_counts[topic] = topic_counts.get(topic, 0) + 1 |
|
|
severity_counts[severity] = severity_counts.get(severity, 0) + 1 |
|
|
|
|
|
self.stats["topic_distribution"] = topic_counts |
|
|
self.stats["severity_distribution"] = severity_counts |
|
|
|
|
|
return self.stats |
|
|
|
|
|
def export_knowledge(self, output_file: str = "knowledge_export.json") -> bool: |
|
|
"""Export all knowledge to JSON file""" |
|
|
|
|
|
try: |
|
|
all_data = self.collection.get() |
|
|
|
|
|
export_data = { |
|
|
"exported_at": datetime.now().isoformat(), |
|
|
"total_documents": len(all_data['ids']), |
|
|
"documents": [] |
|
|
} |
|
|
|
|
|
for doc, metadata, doc_id in zip( |
|
|
all_data['documents'], |
|
|
all_data['metadatas'], |
|
|
all_data['ids'] |
|
|
): |
|
|
export_data["documents"].append({ |
|
|
"id": doc_id, |
|
|
"content": doc, |
|
|
"metadata": metadata |
|
|
}) |
|
|
|
|
|
with open(output_file, 'w') as f: |
|
|
json.dump(export_data, f, indent=2) |
|
|
|
|
|
logger.info(f"Exported knowledge to {output_file}") |
|
|
return True |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Failed to export knowledge: {e}") |
|
|
return False |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class RAGCybersecurityLLM(CybersecurityLLM): |
|
|
def __init__(self, |
|
|
repo_id: str = "daskalos-apps/phi4-cybersec-Q4_K_M", |
|
|
filename: str = "phi4-mini-instruct-Q4_K_M.gguf", |
|
|
local_dir: str = "./models", |
|
|
knowledge_dir: str = "./knowledge_db", |
|
|
force_download: bool = False): |
|
|
""" |
|
|
Initialize LLM with RAG capabilities |
|
|
|
|
|
Args: |
|
|
repo_id: Hugging Face repository ID |
|
|
filename: Model filename |
|
|
local_dir: Local cache directory |
|
|
knowledge_dir: Knowledge base directory |
|
|
force_download: Force model re-download |
|
|
""" |
|
|
|
|
|
|
|
|
super().__init__(repo_id, filename, local_dir, force_download) |
|
|
|
|
|
|
|
|
logger.info("Initializing RAG knowledge base...") |
|
|
self.knowledge_base = CybersecurityKnowledgeBase(persist_directory=knowledge_dir) |
|
|
|
|
|
|
|
|
self.rag_prompt_template = """<|system|> |
|
|
{system} |
|
|
|
|
|
You have access to a comprehensive cybersecurity knowledge base. Use the provided context to give accurate, detailed answers. If the context doesn't contain relevant information, use your general knowledge but indicate when you're doing so. |
|
|
<|end|> |
|
|
<|user|> |
|
|
Context from knowledge base: |
|
|
{context} |
|
|
|
|
|
User Question: {user} |
|
|
<|end|> |
|
|
<|assistant|>""" |
|
|
|
|
|
def generate_with_rag(self, |
|
|
prompt: str, |
|
|
max_tokens: int = 512, |
|
|
use_rag: bool = True, |
|
|
k_documents: int = 3, |
|
|
min_relevance: float = 0.5) -> Dict[str, Any]: |
|
|
""" |
|
|
Generate response with RAG enhancement |
|
|
|
|
|
Args: |
|
|
prompt: User's question |
|
|
max_tokens: Maximum response length |
|
|
use_rag: Whether to use RAG |
|
|
k_documents: Number of documents to retrieve |
|
|
min_relevance: Minimum relevance threshold |
|
|
|
|
|
Returns: |
|
|
Response with metadata and sources |
|
|
""" |
|
|
|
|
|
context = None |
|
|
sources = [] |
|
|
|
|
|
if use_rag: |
|
|
|
|
|
logger.info(f"Searching knowledge base for: {prompt[:50]}...") |
|
|
relevant_docs = self.knowledge_base.search(prompt, k=k_documents) |
|
|
|
|
|
|
|
|
relevant_docs = [ |
|
|
doc for doc in relevant_docs |
|
|
if doc.get('relevance_score', 0) >= min_relevance |
|
|
] |
|
|
|
|
|
if relevant_docs: |
|
|
|
|
|
context_parts = [] |
|
|
for i, doc in enumerate(relevant_docs, 1): |
|
|
context_parts.append( |
|
|
f"[Source {i}: {doc['title']} - Severity: {doc['severity']}]\n" |
|
|
f"{doc['content'][:1000]}..." |
|
|
) |
|
|
sources.append({ |
|
|
"title": doc['title'], |
|
|
"topic": doc['topic'], |
|
|
"severity": doc['severity'], |
|
|
"relevance": doc['relevance_score'] |
|
|
}) |
|
|
|
|
|
context = "\n\n".join(context_parts) |
|
|
logger.info(f"Found {len(relevant_docs)} relevant documents") |
|
|
else: |
|
|
logger.info("No highly relevant documents found") |
|
|
|
|
|
|
|
|
if context and use_rag: |
|
|
|
|
|
full_prompt = self.rag_prompt_template.format( |
|
|
system=self.system_prompt, |
|
|
context=context, |
|
|
user=prompt |
|
|
) |
|
|
else: |
|
|
|
|
|
full_prompt = self.format_prompt(prompt) |
|
|
|
|
|
try: |
|
|
response = self.llm( |
|
|
full_prompt, |
|
|
max_tokens=max_tokens, |
|
|
temperature=0.7, |
|
|
top_p=0.95, |
|
|
top_k=40, |
|
|
repeat_penalty=1.1, |
|
|
stop=self.stop_tokens, |
|
|
echo=False |
|
|
) |
|
|
|
|
|
text = response['choices'][0]['text'].strip() |
|
|
|
|
|
return { |
|
|
"response": text, |
|
|
"tokens_used": response['usage']['total_tokens'], |
|
|
"model": self.model_info['repo_id'], |
|
|
"sources": sources, |
|
|
"rag_used": use_rag and bool(context) |
|
|
} |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Generation error: {e}") |
|
|
return { |
|
|
"response": "I apologize, but I encountered an error. Please try rephrasing your question.", |
|
|
"error": str(e), |
|
|
"sources": [], |
|
|
"rag_used": False |
|
|
} |
|
|
|
|
|
def generate_stream_with_rag(self, |
|
|
prompt: str, |
|
|
max_tokens: int = 512, |
|
|
use_rag: bool = True, |
|
|
k_documents: int = 3) -> Generator: |
|
|
"""Stream response with RAG enhancement""" |
|
|
|
|
|
|
|
|
context = None |
|
|
if use_rag: |
|
|
relevant_docs = self.knowledge_base.search(prompt, k=k_documents) |
|
|
if relevant_docs: |
|
|
context_parts = [f"{doc['title']}: {doc['content'][:500]}" for doc in relevant_docs] |
|
|
context = "\n\n".join(context_parts) |
|
|
|
|
|
|
|
|
if context: |
|
|
full_prompt = self.rag_prompt_template.format( |
|
|
system=self.system_prompt, |
|
|
context=context, |
|
|
user=prompt |
|
|
) |
|
|
else: |
|
|
full_prompt = self.format_prompt(prompt) |
|
|
|
|
|
|
|
|
stream = self.llm( |
|
|
full_prompt, |
|
|
max_tokens=max_tokens, |
|
|
temperature=0.7, |
|
|
top_p=0.95, |
|
|
top_k=40, |
|
|
repeat_penalty=1.1, |
|
|
stop=self.stop_tokens, |
|
|
echo=False, |
|
|
stream=True |
|
|
) |
|
|
|
|
|
for output in stream: |
|
|
token = output['choices'][0].get('text', '') |
|
|
if token: |
|
|
yield token |
|
|
|
|
|
def add_knowledge(self, content: str, topic: str, title: str, keywords: List[str]) -> bool: |
|
|
"""Add new knowledge to the RAG system""" |
|
|
return self.knowledge_base.add_custom_knowledge( |
|
|
content=content, |
|
|
topic=topic, |
|
|
title=title, |
|
|
keywords=keywords |
|
|
) |
|
|
|
|
|
def get_knowledge_stats(self) -> Dict[str, Any]: |
|
|
"""Get knowledge base statistics""" |
|
|
return self.knowledge_base.get_statistics() |
|
|
|