Spaces:
Sleeping
Sleeping
File size: 9,366 Bytes
be9311f | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 | import chromadb
import os
class CVEVectorStorage:
def __init__(self):
try:
# Initialise ChromaDB with persistent storage
self.client = chromadb.PersistentClient(path="./data/chroma_db")
self.collection = self.client.get_or_create_collection(name="cve_collection")
# Load sentence transformer for embeddings
from sentence_transformers import SentenceTransformer
self.model = SentenceTransformer('all-MiniLM-L6-v2')
self.enabled = True
except Exception as e:
print(f"Vector storage disabled: {e}")
self.enabled = False
def add_cve(self, cve_id, cve_data, ai_summary=None):
"""Store CVE in vector database for similarity search"""
if not self.enabled:
return
try:
cve = cve_data['cve']
description = cve['descriptions'][0]['value']
# Combine description with AI summary for better search
text = description
if ai_summary:
text += f" {ai_summary}"
# Generate embedding vector
embedding = self.model.encode(text).tolist()
# Prepare metadata for filtering
metadata = {
"cve_id": cve_id,
"published": cve.get('published', '')[:10],
"severity": self._get_severity(cve),
"description": description[:200]
}
# Delete if exists first (avoid duplicates)
try:
self.collection.delete(ids=[cve_id])
except:
pass
# Store in vector database
self.collection.add(
embeddings=[embedding],
documents=[text],
metadatas=[metadata],
ids=[cve_id]
)
except Exception as e:
print(f"Warning: Vector storage failed for {cve_id}: {e}")
def find_similar_cves(self, cve_id, top_k=3, similarity_threshold=0.001):
"""Find CVEs similar to the given CVE ID using AI vector similarity"""
if not self.enabled:
return []
try:
# Check how many CVEs we have in total
count_result = self.collection.count()
# If database is empty or has insufficient data, pre-populate with example CVEs
if count_result < 3: # Need at least 3 CVEs for good similarity demo
self._populate_example_cves(cve_id) # Pass target CVE for smart selection
count_result = self.collection.count()
# After population, check if we now have enough data
if count_result < 2:
return []
# Get target CVE text for semantic comparison
target_doc = self.collection.get(ids=[cve_id], include=['documents'])
if not target_doc['documents']:
return []
target_text = target_doc['documents'][0]
# Generate embedding for target text using AI model
target_embedding = self.model.encode(target_text).tolist()
# Search for similar CVEs using vector similarity
results = self.collection.query(
query_embeddings=[target_embedding],
n_results=min(top_k + 1, count_result),
include=['metadatas', 'distances']
)
# Format results, excluding the target CVE itself and applying threshold
similar = []
if results and 'metadatas' in results and results['metadatas']:
for metadata, distance in zip(results['metadatas'][0], results['distances'][0]):
similarity_score = max(0, 1-distance)
if metadata['cve_id'] != cve_id and similarity_score >= similarity_threshold:
similar.append({
'cve_id': metadata['cve_id'],
'similarity': f"{similarity_score:.2f}",
'severity': metadata.get('severity', 'Unknown'),
'description': metadata.get('description', '')
})
return similar[:top_k]
except Exception as e:
print(f"AI similarity search failed: {e}")
return []
def _populate_example_cves(self, target_cve_id=None):
"""Pre-populate database with ALL example CVEs for demonstration"""
try:
from api_client import fetch_cve_simple
# Add ALL example CVEs that users might try
all_example_cves = [
'CVE-2021-44228', # Log4Shell
'CVE-2021-4104', # Log4j 1.x
'CVE-2022-22965', # Spring4Shell
'CVE-2020-1472', # Zerologon
'CVE-2021-34527', # PrintNightmare
'CVE-2020-0601', # CurveBall
'CVE-2021-45046', # Log4j incomplete fix
'CVE-2020-5421', # Spring Framework
'CVE-2021-36934', # HiveNightmare
'CVE-2022-21999', # Windows Print Spooler
'CVE-2020-1350', # SIGRed (DNS vulnerability - similar to CurveBall in impact)
'CVE-2019-0708' # BlueKeep (Windows RDP - another Windows crypto-related)
]
print("Pre-populating vector database with ALL example CVEs...")
# Get existing CVE IDs to avoid duplicates
existing_cves = set()
try:
existing = self.collection.get()
if existing and 'metadatas' in existing:
existing_cves = {meta.get('cve_id') for meta in existing['metadatas'] if meta.get('cve_id')}
except:
pass
for cve_id in all_example_cves:
if cve_id in existing_cves:
print(f" • {cve_id} already exists")
continue
try:
cve_data = fetch_cve_simple(cve_id)
if cve_data:
# Add a basic summary to match format with analyzed CVEs
cve = cve_data['cve']
description = cve['descriptions'][0]['value']
basic_summary = f"Security vulnerability affecting {description.split()[0] if description else 'software'}. This is a critical vulnerability that requires immediate attention and proper security measures to mitigate potential risks."
self.add_cve(cve_id, cve_data, basic_summary)
print(f" • Added {cve_id}")
except Exception as e:
print(f" • Failed to add {cve_id}: {e}")
continue
print("Vector database ready with ALL examples!")
except Exception as e:
print(f"Failed to populate example CVEs: {e}")
def search_by_text(self, query_text, top_k=5, similarity_threshold=0.3):
"""Search CVEs using natural language queries"""
if not self.enabled:
return []
try:
# Check if we have any CVEs
count_result = self.collection.count()
if count_result == 0:
return []
# Generate embedding for the search query using AI model
query_embedding = self.model.encode(query_text).tolist()
# Search the vector database
results = self.collection.query(
query_embeddings=[query_embedding],
n_results=min(top_k * 2, count_result),
include=['metadatas', 'distances']
)
# Format search results with threshold filtering
search_results = []
if results and 'metadatas' in results and results['metadatas']:
for metadata, distance in zip(results['metadatas'][0], results['distances'][0]):
similarity_score = max(0, 1 - distance)
if similarity_score >= similarity_threshold:
search_results.append({
'cve_id': metadata['cve_id'],
'similarity': f"{similarity_score:.2f}",
'severity': metadata.get('severity', 'Unknown'),
'description': metadata.get('description', '')
})
return search_results[:top_k]
except Exception as e:
print(f"AI semantic search failed: {e}")
return []
def _get_severity(self, cve):
"""Extract CVSS severity from CVE data"""
try:
return cve['metrics']['cvssMetricV31'][0]['cvssData']['baseSeverity']
except:
return "Unknown"
# Global instance for easy access
vector_storage = CVEVectorStorage() |