MMADS commited on
Commit
46439ca
·
1 Parent(s): f0c96e8

test small updated version with multiple LLMS

Browse files
Files changed (1) hide show
  1. app.py +751 -251
app.py CHANGED
@@ -1,31 +1,23 @@
 
 
 
 
1
  import json
 
2
  import logging
3
- import gzip
4
- import os
5
- from collections import OrderedDict
6
- from datetime import datetime
7
- from io import BytesIO
8
- from typing import Dict
9
-
10
  import gradio as gr
11
  import pandas as pd
12
  import plotly.express as px
 
13
  import requests
14
 
15
- # Configure logging for the application
16
  logging.basicConfig(level=logging.INFO)
17
  logger = logging.getLogger(__name__)
18
 
19
- # --- Constants and Global Variables ---
20
-
21
- CURRENT_YEAR = datetime.now().year
22
- NVD_BASE_URL = "https://nvd.nist.gov/feeds/json/cve/1.1/nvdcve-1.1-{year}.json.gz"
23
-
24
- # In-memory LRU cache (by insertion order) to store DataFrames for recent years.
25
- CACHE_MAX_SIZE = 3
26
- DATAFRAME_CACHE: Dict[int, pd.DataFrame] = OrderedDict()
27
-
28
- # Profiles for tailoring LLM-generated summaries to different audiences
29
  AUDIENCE_PROFILES = {
30
  "Cybersecurity Professional": {
31
  "focus": "threat assessment, attack vectors, mitigation strategies, and security controls",
@@ -59,254 +51,762 @@ AUDIENCE_PROFILES = {
59
  }
60
  }
61
 
62
-
63
- # --- Data Fetching and Parsing ---
64
-
65
- def get_cve_dataframe(year: int) -> pd.DataFrame:
66
- """
67
- Downloads, parses, and caches the NVD feed for a specific year.
68
- It returns a pandas DataFrame. Caching is used to avoid repeated downloads.
69
- """
70
- if year in DATAFRAME_CACHE:
71
- logger.info(f"Cache hit for year {year}.")
72
- DATAFRAME_CACHE.move_to_end(year) # Mark as recently used
73
- return DATAFRAME_CACHE[year]
74
-
75
- logger.info(f"Cache miss. Downloading NVD data for year {year}.")
76
- url = NVD_BASE_URL.format(year=year)
77
-
78
- try:
79
- response = requests.get(url, timeout=30)
80
- response.raise_for_status()
81
-
82
- with gzip.GzipFile(fileobj=BytesIO(response.content)) as f:
83
- nvd_data = json.load(f)
84
-
85
- df = parse_cve_items(nvd_data)
86
-
87
- if len(DATAFRAME_CACHE) >= CACHE_MAX_SIZE:
88
- DATAFRAME_CACHE.popitem(last=False)
89
- DATAFRAME_CACHE[year] = df
90
- return df
91
-
92
- except requests.exceptions.HTTPError as e:
93
- logger.error(f"HTTP Error for {year}: {e}")
94
- raise gr.Error(f"Failed to download data for {year}. The feed may be unavailable.")
95
- except Exception as e:
96
- logger.error(f"Error processing feed for {year}: {e}")
97
- raise gr.Error(f"An unexpected error occurred: {str(e)}")
98
-
99
- def parse_cve_items(nvd_data: dict) -> pd.DataFrame:
100
- """
101
- Extracts vulnerability details from the raw NVD JSON data into a structured DataFrame.
102
- """
103
- rows = []
104
- for item in nvd_data.get("CVE_Items", []):
105
  try:
106
- cve_id = item.get("cve", {}).get("CVE_data_meta", {}).get("ID", "N/A")
107
- desc_data = item.get("cve", {}).get("description", {}).get("description_data", [])
108
- description = desc_data[0].get("value", "No description") if desc_data else "No description"
109
- published = item.get("publishedDate", "")
110
- base_score, severity, attack_vector = None, "N/A", "N/A"
111
-
112
- if "baseMetricV3" in item.get("impact", {}):
113
- impact_v3 = item["impact"]["baseMetricV3"]["cvssV3"]
114
- base_score = impact_v3.get("baseScore")
115
- severity = impact_v3.get("baseSeverity")
116
- attack_vector = impact_v3.get("attackVector")
117
- elif "baseMetricV2" in item.get("impact", {}):
118
- impact_v2 = item["impact"]["baseMetricV2"]
119
- base_score = impact_v2["cvssV2"].get("baseScore")
120
- severity = impact_v2.get("severity")
121
- attack_vector = impact_v2.get("accessVector")
122
-
123
- problem_types = item.get("cve", {}).get("problemtype", {}).get("problemtype_data", [])
124
- cwe_ids = [desc["value"] for pt in problem_types for desc in pt.get("description", []) if desc.get("value", "").startswith("CWE-")]
125
 
126
- rows.append({
127
- "CVE_ID": cve_id, "Description": description, "Published": published[:10],
128
- "Base_Score": base_score, "Severity": severity, "Attack_Vector": attack_vector,
129
- "CWE_IDs": ", ".join(cwe_ids) if cwe_ids else "N/A"
130
- })
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
131
  except Exception as e:
132
- cve_id_str = cve_id if 'cve_id' in locals() else "Unknown"
133
- logger.warning(f"Skipping malformed CVE item ({cve_id_str}): {e}")
134
- continue
135
-
136
- df = pd.DataFrame(rows)
137
- if "Base_Score" in df.columns:
138
- df["Base_Score"] = pd.to_numeric(df["Base_Score"], errors='coerce')
139
- return df
140
-
141
-
142
- # --- LLM Integration ---
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
143
 
144
- def generate_tailored_summary(cve_description: str, audience: str, hf_token: str) -> str:
145
  """
146
- Generates a tailored CVE summary using the Hugging Face Inference API.
 
 
 
 
 
 
 
 
 
147
  """
148
- if not hf_token:
149
- raise gr.Error("Hugging Face API token is not configured as a Space Secret.")
 
 
 
 
150
  if not cve_description or not audience:
151
- return "Please select a CVE and an audience first."
152
-
153
- api_url = "https://api-inference.huggingface.co/models/mistralai/Mistral-7B-Instruct-v0.2"
154
- headers = {"Authorization": f"Bearer {hf_token}"}
155
- profile = AUDIENCE_PROFILES.get(audience, {})
156
 
157
- prompt = f"""<s>[INST] You are an expert cybersecurity analyst. Your task is to rewrite the following technical CVE description into a concise, actionable summary for a specific professional audience.
158
-
159
- **Target Audience:** {audience}
160
- - **Focus:** {profile.get('focus', 'N/A')}
161
- - **Key Priorities:** {', '.join(profile.get('priorities', []))}
162
-
163
- **Original CVE Description:**
164
- ---
165
- {cve_description}
166
- ---
167
-
168
- Rewrite the description in a {profile.get('tone', 'professional')} tone, focusing on what matters most to this audience. Do not start with "As a [role]...". Directly provide the summary. [/INST]"""
169
-
170
- payload = {"inputs": prompt, "parameters": {"max_new_tokens": 256, "return_full_text": False}}
171
-
172
- try:
173
- response = requests.post(api_url, headers=headers, json=payload, timeout=45)
174
- if response.status_code != 200:
175
- error_message = response.json().get("error", "Unknown error")
176
- logger.error(f"Inference API Error: {error_message}")
177
- return f"Error from API: {error_message}. The model might be loading, please try again."
178
-
179
- return response.json()[0]['generated_text'].strip()
180
-
181
- except requests.exceptions.RequestException as e:
182
- logger.error(f"Request to Inference API failed: {e}")
183
- return f"Error: Could not connect to the Hugging Face API. {e}"
184
-
185
-
186
- # --- Analysis and Visualization ---
187
-
188
- def analyze_and_visualize(df: pd.DataFrame, severity: str, vector: str, search: str):
189
- """
190
- Filters the main DataFrame and generates all outputs: a filtered table,
191
- visualizations, and a summary markdown string.
192
- """
193
- if df is None or df.empty:
194
- return pd.DataFrame(), None, None, "### No Data Loaded"
195
-
196
- filtered_df = df.copy()
197
- if severity != "All":
198
- filtered_df = filtered_df[filtered_df["Severity"] == severity]
199
- if vector != "All":
200
- filtered_df = filtered_df[filtered_df["Attack_Vector"] == vector]
201
- if search:
202
- mask = (filtered_df["CVE_ID"].str.contains(search, case=False, na=False) |
203
- filtered_df["Description"].str.contains(search, case=False, na=False) |
204
- filtered_df["CWE_IDs"].str.contains(search, case=False, na=False))
205
- filtered_df = filtered_df[mask]
206
-
207
- return filtered_df, create_severity_chart(filtered_df), create_timeline_chart(filtered_df), create_summary_text(filtered_df)
208
-
209
- def create_severity_chart(df: pd.DataFrame):
210
- """Creates a bar chart for CVE severity distribution."""
211
- if df.empty: return None
212
- order = ["CRITICAL", "HIGH", "MEDIUM", "LOW", "N/A"]
213
- counts = df["Severity"].value_counts().reindex(order, fill_value=0)
214
- color_map = {"CRITICAL": "#8B0000", "HIGH": "#FF4500", "MEDIUM": "#FFA500", "LOW": "#FFD700", "N/A": "#D3D3D3"}
215
-
216
- fig = px.bar(counts, x=counts.index, y=counts.values, labels={"x": "Severity", "y": "Count"},
217
- title="CVE Severity Distribution", color=counts.index, color_discrete_map=color_map, text_auto=True)
218
- fig.update_layout(showlegend=False, xaxis={'categoryorder':'array', 'categoryarray':order})
219
- return fig
220
-
221
- def create_timeline_chart(df: pd.DataFrame):
222
- """Creates a line chart showing CVE publications over time."""
223
- if df.empty or 'Published' not in df.columns: return None
224
- df_copy = df.copy()
225
- df_copy["Date"] = pd.to_datetime(df_copy["Published"], errors='coerce')
226
- df_copy.dropna(subset=["Date"], inplace=True)
227
- if df_copy.empty: return None
228
-
229
- counts = df_copy.set_index("Date").resample('M').size()
230
- fig = px.line(x=counts.index, y=counts.values, labels={"x": "Month", "y": "Number of CVEs"},
231
- title="CVE Publications Timeline", markers=True)
232
- return fig
233
-
234
- def create_summary_text(df: pd.DataFrame) -> str:
235
- """Generates a markdown string with key statistics from the DataFrame."""
236
- if df.empty: return "### No results match your filter criteria."
237
- scores = df['Base_Score'].dropna()
238
- avg_score = f"{scores.mean():.2f}" if not scores.empty else 'N/A'
239
- return f"""### Summary Statistics
240
- - **Total CVEs Found:** {len(df):,}
241
- - **Critical:** {len(df[df['Severity'] == 'CRITICAL']):,}
242
- - **High:** {len(df[df['Severity'] == 'HIGH']):,}
243
- - **Average Base Score:** {avg_score}"""
244
-
245
-
246
- # --- Gradio UI and Event Logic ---
247
-
248
- def create_dashboard():
249
- """Builds the entire Gradio interface and defines event handling."""
250
- with gr.Blocks(theme=gr.themes.Soft(), title="CVE Dashboard") as dashboard:
251
- df_state = gr.State()
252
- selected_cve_description = gr.State("")
253
- hf_token_state = gr.State(os.environ.get("HF_TOKEN"))
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
254
 
255
- gr.Markdown("# CVE Dashboard: NVD Feed Analyzer")
256
- gr.Markdown("Explore CVE data from the National Vulnerability Database. **Note:** This demo uses deprecated NVD JSON feeds; a production app should use the NVD API 2.0.")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
257
 
258
  with gr.Row():
259
  with gr.Column(scale=1):
260
- year_dd = gr.Dropdown(choices=list(range(2002, CURRENT_YEAR + 1))[::-1], value=CURRENT_YEAR - 1, label="1. Select Year")
261
- severity_dd = gr.Dropdown(choices=["All", "CRITICAL", "HIGH", "MEDIUM", "LOW"], value="All", label="2. Filter by Severity")
262
- vector_dd = gr.Dropdown(choices=["All", "NETWORK", "ADJACENT_NETWORK", "LOCAL", "PHYSICAL"], value="All", label="3. Filter by Attack Vector")
263
- search_tb = gr.Textbox(label="4. Search Keyword", placeholder="e.g., 'Log4j', 'CWE-79', ...")
264
- filter_btn = gr.Button("Apply Filters", variant="primary")
265
-
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
266
  with gr.Column(scale=3):
267
- summary_out = gr.Markdown()
 
268
  with gr.Tabs():
269
- with gr.TabItem("📊 Data Table"):
270
- table_out = gr.DataFrame(headers=["CVE_ID", "Severity", "Base_Score", "Description"], wrap=True, row_count=15, interactive=True)
271
- with gr.TabItem("📈 Severity Chart"):
272
- plot_severity_out = gr.Plot()
273
- with gr.TabItem("📉 Timeline Chart"):
274
- plot_timeline_out = gr.Plot()
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
275
 
276
- with gr.Accordion("Tailored CVE Analysis (Select a row in the table above)", open=False) as llm_accordion:
277
- with gr.Row():
278
- with gr.Column(scale=2):
279
- original_desc_out = gr.Textbox(label="Full Original CVE Description", lines=8, interactive=False)
280
- with gr.Column(scale=1):
281
- audience_dd = gr.Dropdown(choices=list(AUDIENCE_PROFILES.keys()), label="Select Audience", value="Cybersecurity Professional")
282
- generate_btn = gr.Button("Generate Tailored Summary", variant="primary")
283
- summary_llm_out = gr.Markdown("*Your tailored summary will appear here...*")
284
-
285
- # --- Event Handling Logic ---
286
- def on_year_change(year):
287
- df = get_cve_dataframe(year)
288
- return df, *analyze_and_visualize(df, "All", "All", "")
289
-
290
- def on_select_cve(df: pd.DataFrame, evt: gr.SelectData):
291
- if evt.value is None: return "", "", gr.update(visible=False)
292
- full_description = df.iloc[evt.index[0]]["Description"]
293
- return full_description, full_description, gr.update(visible=True)
294
-
295
- filter_inputs = [df_state, severity_dd, vector_dd, search_tb]
296
- analysis_outputs = [table_out, plot_severity_out, plot_timeline_out, summary_out]
 
 
 
 
 
 
 
 
297
 
298
- year_dd.change(fn=on_year_change, inputs=[year_dd], outputs=[df_state] + analysis_outputs)
299
- dashboard.load(fn=on_year_change, inputs=[year_dd], outputs=[df_state] + analysis_outputs)
300
-
301
- for control in [severity_dd, vector_dd, filter_btn, search_tb]:
302
- event = control.click if isinstance(control, gr.Button) else (control.submit if isinstance(control, gr.Textbox) else control.change)
303
- event(fn=analyze_and_visualize, inputs=filter_inputs, outputs=analysis_outputs)
304
-
305
- table_out.select(fn=on_select_cve, inputs=[df_state], outputs=[selected_cve_description, original_desc_out, llm_accordion], show_progress="hidden")
306
- generate_btn.click(fn=generate_tailored_summary, inputs=[selected_cve_description, audience_dd, hf_token_state], outputs=[summary_llm_out])
307
-
308
- return dashboard
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
309
 
310
  if __name__ == "__main__":
311
- cve_dashboard = create_dashboard()
312
- cve_dashboard.launch()
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ #!/usr/bin/env python
2
+ """CVE Dashboard - Real-time vulnerability monitoring with NVD API and LLM-powered audience customization."""
3
+
4
+ import os
5
  import json
6
+ import time
7
  import logging
8
+ from datetime import datetime, timedelta
9
+ from typing import List, Dict, Optional, Tuple
 
 
 
 
 
10
  import gradio as gr
11
  import pandas as pd
12
  import plotly.express as px
13
+ import plotly.graph_objects as go
14
  import requests
15
 
16
+ # Configure logging
17
  logging.basicConfig(level=logging.INFO)
18
  logger = logging.getLogger(__name__)
19
 
20
+ # Audience profiles for tailored CVE descriptions
 
 
 
 
 
 
 
 
 
21
  AUDIENCE_PROFILES = {
22
  "Cybersecurity Professional": {
23
  "focus": "threat assessment, attack vectors, mitigation strategies, and security controls",
 
51
  }
52
  }
53
 
54
+ class CVEDashboard:
55
+ """Main CVE Dashboard application class."""
56
+
57
+ def __init__(self):
58
+ """Initialize the CVE Dashboard."""
59
+ self.api_key = os.getenv('NVD_API_KEY')
60
+ self.base_url = "https://services.nvd.nist.gov/rest/json/cves/2.0"
61
+ self.headers = {'apiKey': self.api_key} if self.api_key else {}
62
+ self.cache = {}
63
+ self.last_request_time = 0
64
+ self.rate_limit_delay = 0.7 if self.api_key else 6 # seconds between requests
65
+
66
+ # HuggingFace token - try environment first
67
+ self.hf_token = os.getenv('HF_TOKEN') or os.getenv('HUGGINGFACE_TOKEN')
68
+
69
+ def _rate_limit(self):
70
+ """Implement rate limiting for NVD API."""
71
+ current_time = time.time()
72
+ time_since_last = current_time - self.last_request_time
73
+ if time_since_last < self.rate_limit_delay:
74
+ time.sleep(self.rate_limit_delay - time_since_last)
75
+ self.last_request_time = time.time()
76
+
77
+ def fetch_cves(self,
78
+ days_back: int = 7,
79
+ keyword: Optional[str] = None,
80
+ severity: Optional[str] = None,
81
+ results_per_page: int = 50) -> Tuple[List[Dict], str]:
82
+ """
83
+ Fetch CVEs from NVD API.
84
+
85
+ Args:
86
+ days_back: Number of days to look back
87
+ keyword: Optional keyword to search
88
+ severity: Optional severity filter (LOW, MEDIUM, HIGH, CRITICAL)
89
+ results_per_page: Number of results per page (max 2000)
90
+
91
+ Returns:
92
+ Tuple of (list of CVEs, status message)
93
+ """
 
 
 
94
  try:
95
+ self._rate_limit()
96
+
97
+ end_date = datetime.now()
98
+ start_date = end_date - timedelta(days=days_back)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
99
 
100
+ params = {
101
+ 'pubStartDate': start_date.strftime('%Y-%m-%dT00:00:00.000'),
102
+ 'pubEndDate': end_date.strftime('%Y-%m-%dT23:59:59.999'),
103
+ 'resultsPerPage': min(results_per_page, 2000)
104
+ }
105
+
106
+ if keyword:
107
+ params['keywordSearch'] = keyword
108
+
109
+ response = requests.get(
110
+ self.base_url,
111
+ headers=self.headers,
112
+ params=params,
113
+ timeout=30
114
+ )
115
+ response.raise_for_status()
116
+
117
+ data = response.json()
118
+ vulnerabilities = data.get('vulnerabilities', [])
119
+
120
+ # Process and filter CVEs
121
+ processed_cves = []
122
+ for vuln in vulnerabilities:
123
+ cve = self._process_cve(vuln.get('cve', {}))
124
+ if severity and cve['severity'] != severity:
125
+ continue
126
+ processed_cves.append(cve)
127
+
128
+ status = f"✓ Fetched {len(processed_cves)} CVEs from the last {days_back} days"
129
+ if keyword:
130
+ status += f" matching '{keyword}'"
131
+ if severity:
132
+ status += f" with {severity} severity"
133
+
134
+ return processed_cves, status
135
+
136
+ except requests.exceptions.RequestException as e:
137
+ return [], f"✗ API Error: {str(e)}"
138
  except Exception as e:
139
+ return [], f"✗ Error: {str(e)}"
140
+
141
+ def _process_cve(self, cve_data: Dict) -> Dict:
142
+ """Process raw CVE data into a structured format."""
143
+ cve_id = cve_data.get('id', 'Unknown')
144
+
145
+ # Extract description
146
+ descriptions = cve_data.get('descriptions', [])
147
+ description = next(
148
+ (d['value'] for d in descriptions if d.get('lang') == 'en'),
149
+ 'No description available'
150
+ )
151
+
152
+ # Extract CVSS metrics and severity
153
+ metrics = cve_data.get('metrics', {})
154
+ cvss_data = {}
155
+ severity = 'UNKNOWN'
156
+ score = 0.0
157
+
158
+ # Try CVSS 3.1 first, then 3.0, then 2.0
159
+ for cvss_version in ['cvssMetricV31', 'cvssMetricV30', 'cvssMetricV2']:
160
+ if cvss_version in metrics and metrics[cvss_version]:
161
+ metric = metrics[cvss_version][0]
162
+ cvss_data = metric.get('cvssData', {})
163
+ score = cvss_data.get('baseScore', 0.0)
164
+ severity = cvss_data.get('baseSeverity', 'UNKNOWN')
165
+ break
166
+
167
+ # Extract references
168
+ references = cve_data.get('references', [])
169
+ ref_urls = [ref.get('url', '') for ref in references[:5]] # Limit to 5 refs
170
+
171
+ # Extract dates
172
+ published = cve_data.get('published', '')
173
+ modified = cve_data.get('lastModified', '')
174
+
175
+ return {
176
+ 'id': cve_id,
177
+ 'description': description, # Keep full description for LLM processing
178
+ 'display_description': description[:500] + '...' if len(description) > 500 else description,
179
+ 'severity': severity,
180
+ 'score': score,
181
+ 'published': published[:10] if published else 'Unknown',
182
+ 'modified': modified[:10] if modified else 'Unknown',
183
+ 'references': ref_urls,
184
+ 'cvss_version': cvss_data.get('version', 'Unknown'),
185
+ 'vector_string': cvss_data.get('vectorString', 'N/A')
186
+ }
187
+
188
+ def create_severity_chart(self, cves: List[Dict]) -> go.Figure:
189
+ """Create a pie chart of CVE severities."""
190
+ if not cves:
191
+ fig = go.Figure()
192
+ fig.add_annotation(text="No data available",
193
+ xref="paper", yref="paper",
194
+ x=0.5, y=0.5, showarrow=False)
195
+ return fig
196
+
197
+ severity_counts = pd.DataFrame(cves)['severity'].value_counts()
198
+
199
+ colors = {
200
+ 'CRITICAL': '#d32f2f',
201
+ 'HIGH': '#f57c00',
202
+ 'MEDIUM': '#fbc02d',
203
+ 'LOW': '#388e3c',
204
+ 'UNKNOWN': '#9e9e9e'
205
+ }
206
+
207
+ fig = px.pie(
208
+ values=severity_counts.values,
209
+ names=severity_counts.index,
210
+ title="CVE Distribution by Severity",
211
+ color=severity_counts.index,
212
+ color_discrete_map=colors
213
+ )
214
+
215
+ fig.update_traces(textposition='inside', textinfo='percent+label')
216
+ fig.update_layout(height=400)
217
+
218
+ return fig
219
+
220
+ def create_timeline_chart(self, cves: List[Dict]) -> go.Figure:
221
+ """Create a timeline chart of CVE publications."""
222
+ if not cves:
223
+ fig = go.Figure()
224
+ fig.add_annotation(text="No data available",
225
+ xref="paper", yref="paper",
226
+ x=0.5, y=0.5, showarrow=False)
227
+ return fig
228
+
229
+ df = pd.DataFrame(cves)
230
+ df['published'] = pd.to_datetime(df['published'])
231
+
232
+ # Group by date and severity
233
+ timeline_data = df.groupby([df['published'].dt.date, 'severity']).size().reset_index(name='count')
234
+
235
+ fig = px.bar(
236
+ timeline_data,
237
+ x='published',
238
+ y='count',
239
+ color='severity',
240
+ title="CVE Publications Timeline",
241
+ color_discrete_map={
242
+ 'CRITICAL': '#d32f2f',
243
+ 'HIGH': '#f57c00',
244
+ 'MEDIUM': '#fbc02d',
245
+ 'LOW': '#388e3c',
246
+ 'UNKNOWN': '#9e9e9e'
247
+ }
248
+ )
249
+
250
+ fig.update_layout(
251
+ xaxis_title="Publication Date",
252
+ yaxis_title="Number of CVEs",
253
+ height=400,
254
+ hovermode='x unified'
255
+ )
256
+
257
+ return fig
258
+
259
+ def create_score_distribution(self, cves: List[Dict]) -> go.Figure:
260
+ """Create a histogram of CVSS scores."""
261
+ if not cves:
262
+ fig = go.Figure()
263
+ fig.add_annotation(text="No data available",
264
+ xref="paper", yref="paper",
265
+ x=0.5, y=0.5, showarrow=False)
266
+ return fig
267
+
268
+ scores = [cve['score'] for cve in cves if cve['score'] > 0]
269
+
270
+ fig = go.Figure(data=[go.Histogram(
271
+ x=scores,
272
+ nbinsx=20,
273
+ marker_color='#1976d2'
274
+ )])
275
+
276
+ fig.update_layout(
277
+ title="CVSS Score Distribution",
278
+ xaxis_title="CVSS Score",
279
+ yaxis_title="Count",
280
+ height=400,
281
+ showlegend=False
282
+ )
283
+
284
+ # Add severity range annotations
285
+ fig.add_vrect(x0=0, x1=3.9, fillcolor="green", opacity=0.1, annotation_text="Low")
286
+ fig.add_vrect(x0=4, x1=6.9, fillcolor="yellow", opacity=0.1, annotation_text="Medium")
287
+ fig.add_vrect(x0=7, x1=8.9, fillcolor="orange", opacity=0.1, annotation_text="High")
288
+ fig.add_vrect(x0=9, x1=10, fillcolor="red", opacity=0.1, annotation_text="Critical")
289
+
290
+ return fig
291
+
292
+ def format_cve_table(self, cves: List[Dict]) -> pd.DataFrame:
293
+ """Format CVEs for display in a table."""
294
+ if not cves:
295
+ return pd.DataFrame()
296
+
297
+ df = pd.DataFrame(cves)
298
+
299
+ # Select and reorder columns
300
+ columns = ['id', 'severity', 'score', 'published', 'display_description']
301
+ df = df[columns]
302
+
303
+ # Format the dataframe
304
+ df = df.rename(columns={
305
+ 'id': 'CVE ID',
306
+ 'severity': 'Severity',
307
+ 'score': 'CVSS Score',
308
+ 'published': 'Published',
309
+ 'display_description': 'Description'
310
+ })
311
+
312
+ return df
313
+
314
+ def export_to_json(self, cves: List[Dict]) -> str:
315
+ """Export CVEs to JSON format."""
316
+ if not cves:
317
+ return "No data to export"
318
+
319
+ filename = f"cve_export_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
320
+ with open(filename, 'w') as f:
321
+ json.dump(cves, f, indent=2)
322
+
323
+ return f"✓ Exported {len(cves)} CVEs to {filename}"
324
+
325
+ def export_to_csv(self, cves: List[Dict]) -> str:
326
+ """Export CVEs to CSV format."""
327
+ if not cves:
328
+ return "No data to export"
329
+
330
+ df = self.format_cve_table(cves)
331
+ filename = f"cve_export_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv"
332
+ df.to_csv(filename, index=False)
333
+
334
+ return f"✓ Exported {len(cves)} CVEs to {filename}"
335
 
336
+ def generate_tailored_summary(cve_description: str, audience: str, hf_token: Optional[str] = None, max_retries: int = 2) -> str:
337
  """
338
+ Generates a tailored CVE summary using Apertus via HuggingFace Inference API.
339
+
340
+ Args:
341
+ cve_description: The original CVE description
342
+ audience: Target audience from AUDIENCE_PROFILES
343
+ hf_token: HuggingFace API token (optional if set as env var)
344
+ max_retries: Maximum number of retry attempts
345
+
346
+ Returns:
347
+ Tailored summary or error message
348
  """
349
+ # Use provided token or fall back to environment variable
350
+ token = hf_token or os.getenv('HF_TOKEN') or os.getenv('HUGGINGFACE_TOKEN')
351
+
352
+ if not token:
353
+ return "❌ HuggingFace API token is required. Please set HF_TOKEN environment variable or enter your token."
354
+
355
  if not cve_description or not audience:
356
+ return "Please select a CVE and an audience first."
 
 
 
 
357
 
358
+ if audience not in AUDIENCE_PROFILES:
359
+ return f"❌ Unknown audience: {audience}"
360
+
361
+ # Try Apertus models with fallback options
362
+ models = [
363
+ "swiss-ai/Apertus-8B-Instruct-2509", # Primary choice - smaller, faster
364
+ "mistralai/Mistral-7B-Instruct-v0.3" # Fallback to original
365
+ ]
366
+
367
+ headers = {"Authorization": f"Bearer {token}"}
368
+ profile = AUDIENCE_PROFILES[audience]
369
+
370
+ # Apertus uses chat template format
371
+ messages = [
372
+ {
373
+ "role": "user",
374
+ "content": f"""You are an expert cybersecurity analyst. Rewrite this CVE description for a {audience}.
375
+
376
+ **Target Audience:** {audience}
377
+ **Focus:** {profile['focus']}
378
+ **Tone:** {profile['tone']}
379
+ **Key Priorities:** {', '.join(profile['priorities'])}
380
+
381
+ **CVE Description:**
382
+ {cve_description[:1200]}
383
+
384
+ Provide a concise, actionable summary (2-3 sentences) highlighting what matters most to this audience. Focus on practical implications and next steps."""
385
+ }
386
+ ]
387
+
388
+ for model in models:
389
+ api_url = f"https://api-inference.huggingface.co/models/{model}"
390
+
391
+ # Different payload structure for Apertus vs Mistral
392
+ if "Apertus" in model:
393
+ payload = {
394
+ "inputs": {
395
+ "messages": messages,
396
+ "max_tokens": 200,
397
+ "temperature": 0.8, # Recommended by Apertus docs
398
+ "top_p": 0.9 # Recommended by Apertus docs
399
+ }
400
+ }
401
+ else:
402
+ # Fallback to Mistral format
403
+ prompt = f"""<s>[INST] You are an expert cybersecurity analyst. Rewrite the following CVE description for a {audience}.
404
+
405
+ **Focus:** {profile['focus']}
406
+ **Tone:** {profile['tone']}
407
+
408
+ CVE: {cve_description[:1000]}
409
+
410
+ Provide a 2-3 sentence summary highlighting what matters most to this audience: [/INST]"""
411
+
412
+ payload = {
413
+ "inputs": prompt,
414
+ "parameters": {
415
+ "max_new_tokens": 150,
416
+ "temperature": 0.5,
417
+ "top_p": 0.9,
418
+ "do_sample": True,
419
+ "return_full_text": False,
420
+ "stop": ["\n\n"]
421
+ }
422
+ }
423
+
424
+ for attempt in range(max_retries):
425
+ try:
426
+ logger.info(f"Generating summary with {model} (attempt {attempt + 1})")
427
+
428
+ response = requests.post(api_url, headers=headers, json=payload, timeout=45)
429
+
430
+ if response.status_code == 200:
431
+ try:
432
+ result = response.json()
433
+
434
+ # Handle different response formats
435
+ summary = ""
436
+ if "Apertus" in model:
437
+ # Apertus response format
438
+ if isinstance(result, list) and len(result) > 0:
439
+ if "generated_text" in result[0]:
440
+ summary = result[0]["generated_text"]
441
+ elif "choices" in result[0] and len(result[0]["choices"]) > 0:
442
+ summary = result[0]["choices"][0].get("message", {}).get("content", "")
443
+ else:
444
+ # Mistral response format
445
+ if isinstance(result, list) and len(result) > 0:
446
+ summary = result[0].get('generated_text', '').strip()
447
+
448
+ if summary and len(summary) > 20:
449
+ logger.info(f"Successfully generated summary with {model}")
450
+ return f"**{audience} Summary (via {model.split('/')[-1]}):**\n\n{summary}"
451
+
452
+ except json.JSONDecodeError as e:
453
+ logger.warning(f"JSON decode error with {model}: {e}")
454
+ continue
455
+
456
+ elif response.status_code == 503:
457
+ logger.warning(f"Model {model} is loading, trying next model...")
458
+ break # Try next model
459
+
460
+ elif response.status_code == 429:
461
+ if attempt < max_retries - 1:
462
+ time.sleep(5)
463
+ continue
464
+ else:
465
+ break
466
+
467
+ else:
468
+ logger.warning(f"HTTP {response.status_code} with {model}")
469
+ break
470
+
471
+ except requests.exceptions.Timeout:
472
+ logger.warning(f"Timeout with {model}, trying next model...")
473
+ break
474
+
475
+ except requests.exceptions.RequestException as e:
476
+ logger.error(f"Request failed with {model}: {e}")
477
+ break
478
+
479
+ except Exception as e:
480
+ logger.error(f"Unexpected error with {model}: {e}")
481
+ break
482
+
483
+ return "⏳ AI models are currently busy. This can happen during peak usage. Please try again in a few minutes."
484
 
485
+ def create_interface():
486
+ """Create the Gradio interface."""
487
+ dashboard = CVEDashboard()
488
+
489
+ # Check if HF token is available in environment
490
+ has_env_token = bool(dashboard.hf_token)
491
+
492
+ with gr.Blocks(title="CVE Dashboard", theme=gr.themes.Soft()) as interface:
493
+ # State to store fetched CVEs
494
+ cve_state = gr.State([])
495
+
496
+ gr.Markdown(
497
+ """
498
+ # 🛡️ CVE Dashboard with AI-Powered Audience Customization
499
+ Real-time vulnerability monitoring using NIST National Vulnerability Database (NVD) with LLM-powered audience-specific summaries
500
+ """
501
+ )
502
 
503
  with gr.Row():
504
  with gr.Column(scale=1):
505
+ # Only show token input if not available in environment
506
+ if not has_env_token:
507
+ gr.Markdown("### 🔧 Configuration")
508
+ hf_token = gr.Textbox(
509
+ label="HuggingFace API Token",
510
+ placeholder="hf_...",
511
+ type="password",
512
+ info="Required for AI summaries. Get yours at https://huggingface.co/settings/tokens"
513
+ )
514
+ else:
515
+ gr.Markdown("### ✅ AI Ready")
516
+ gr.Markdown("HuggingFace token configured via environment variable")
517
+ hf_token = gr.State(dashboard.hf_token) # Hidden state
518
+
519
+ gr.Markdown("### 🔍 Search Parameters")
520
+
521
+ days_back = gr.Slider(
522
+ minimum=1,
523
+ maximum=30,
524
+ value=7,
525
+ step=1,
526
+ label="Days to Look Back"
527
+ )
528
+
529
+ keyword = gr.Textbox(
530
+ label="Keyword Search (Optional)",
531
+ placeholder="e.g., Apache, Linux, Microsoft"
532
+ )
533
+
534
+ severity_filter = gr.Dropdown(
535
+ choices=[None, "CRITICAL", "HIGH", "MEDIUM", "LOW"],
536
+ label="Severity Filter",
537
+ value=None
538
+ )
539
+
540
+ fetch_btn = gr.Button("🔍 Fetch CVEs", variant="primary")
541
+
542
+ gr.Markdown("### 📤 Export Options")
543
+ export_json_btn = gr.Button("📄 Export to JSON")
544
+ export_csv_btn = gr.Button("📊 Export to CSV")
545
+ export_status = gr.Textbox(label="Export Status", interactive=False)
546
+
547
  with gr.Column(scale=3):
548
+ status_text = gr.Textbox(label="Status", interactive=False)
549
+
550
  with gr.Tabs():
551
+ with gr.Tab("📊 Overview"):
552
+ with gr.Row():
553
+ severity_chart = gr.Plot(label="Severity Distribution")
554
+ timeline_chart = gr.Plot(label="Timeline")
555
+ score_chart = gr.Plot(label="CVSS Score Distribution")
556
+
557
+ with gr.Tab("📋 CVE List"):
558
+ cve_table = gr.DataFrame(
559
+ label="CVE Details",
560
+ wrap=True,
561
+ row_count=15
562
+ )
563
+
564
+ with gr.Tab("🤖 AI-Powered Summaries"):
565
+ gr.Markdown("### Generate Audience-Specific CVE Summaries")
566
+
567
+ if not has_env_token:
568
+ gr.Markdown("⚠️ **Note:** Enter your HuggingFace token in the Configuration section first")
569
+
570
+ with gr.Row():
571
+ with gr.Column():
572
+ cve_selector = gr.Dropdown(
573
+ label="Select CVE",
574
+ choices=[],
575
+ info="Choose a CVE from the fetched results"
576
+ )
577
+
578
+ audience_selector = gr.Dropdown(
579
+ label="Target Audience",
580
+ choices=list(AUDIENCE_PROFILES.keys()),
581
+ value="Cybersecurity Professional",
582
+ info="Select the professional perspective"
583
+ )
584
+
585
+ generate_btn = gr.Button("🧠 Generate AI Summary", variant="primary")
586
+
587
+ # Add status for generation
588
+ generation_status = gr.Textbox(
589
+ label="Generation Status",
590
+ value="Ready to generate summaries",
591
+ interactive=False
592
+ )
593
+
594
+ with gr.Column():
595
+ audience_info = gr.Markdown(
596
+ value="**Focus:** threat assessment, attack vectors, mitigation strategies, and security controls\n\n**Priorities:** exploitation methods, defensive measures, risk assessment, compliance implications"
597
+ )
598
+
599
+ original_description = gr.Textbox(
600
+ label="Original CVE Description",
601
+ lines=4,
602
+ interactive=False
603
+ )
604
+
605
+ tailored_summary = gr.Textbox(
606
+ label="AI-Generated Summary",
607
+ lines=6,
608
+ interactive=False,
609
+ placeholder="Select a CVE and audience, then click 'Generate AI Summary'"
610
+ )
611
+
612
+ with gr.Tab("ℹ️ About"):
613
+ gr.Markdown(
614
+ """
615
+ ### About this Dashboard
616
+
617
+ This dashboard provides real-time monitoring of Common Vulnerabilities and Exposures (CVEs)
618
+ from the NIST National Vulnerability Database with AI-powered audience customization.
619
+
620
+ **Features:**
621
+ - Search CVEs by date range and keywords
622
+ - Filter by severity levels
623
+ - Visualize CVE distributions and trends
624
+ - Export data to JSON or CSV formats
625
+ - **NEW:** AI-powered audience-specific summaries using multiple LLMs
626
+
627
+ **Supported Audiences:**
628
+ - **Cybersecurity Professional:** Focus on threats, attack vectors, and mitigation
629
+ - **Data Scientist:** Emphasis on data risks and model vulnerabilities
630
+ - **Data Engineer:** Infrastructure security and pipeline risks
631
+ - **Full-Stack Developer:** Code vulnerabilities and implementation fixes
632
+ - **Product Owner:** Business impact and prioritization guidance
633
+ - **Manager:** Executive summary with business implications
634
+
635
+ **Data Source:** [NIST NVD API](https://nvd.nist.gov/developers/vulnerabilities)
636
+
637
+ **AI Models:** Multiple HuggingFace models with fallback support
638
+
639
+ **Performance Optimizations:**
640
+ - Shorter timeouts for faster failure detection
641
+ - Multiple model fallback for reliability
642
+ - Optimized prompts for quicker responses
643
+
644
+ **Rate Limits:**
645
+ - NVD API: 5 requests/30s (without key), 50 requests/30s (with key)
646
+ - HuggingFace API: Varies by plan, includes cold start delays
647
+
648
+ **Severity Levels:**
649
+ - **CRITICAL** (9.0-10.0): Complete system compromise possible
650
+ - **HIGH** (7.0-8.9): Significant impact, immediate patching recommended
651
+ - **MEDIUM** (4.0-6.9): Moderate impact, plan for updates
652
+ - **LOW** (0.1-3.9): Minor impact, update in regular cycle
653
+ """
654
+ )
655
+
656
+ # Event handlers
657
+ def fetch_and_display(days, keyword_search, severity):
658
+ """Fetch CVEs and update all displays."""
659
+ cves, status = dashboard.fetch_cves(
660
+ days_back=days,
661
+ keyword=keyword_search if keyword_search else None,
662
+ severity=severity if severity else None
663
+ )
664
+
665
+ if cves:
666
+ df = dashboard.format_cve_table(cves)
667
+ severity_fig = dashboard.create_severity_chart(cves)
668
+ timeline_fig = dashboard.create_timeline_chart(cves)
669
+ score_fig = dashboard.create_score_distribution(cves)
670
 
671
+ # Update CVE selector choices
672
+ cve_choices = [f"{cve['id']} ({cve['severity']}, {cve['score']})" for cve in cves]
673
+
674
+ return (
675
+ cves, # Update state
676
+ status,
677
+ df,
678
+ severity_fig,
679
+ timeline_fig,
680
+ score_fig,
681
+ gr.Dropdown(choices=cve_choices, value=cve_choices[0] if cve_choices else None) # Update CVE selector
682
+ )
683
+ else:
684
+ empty_fig = go.Figure()
685
+ empty_fig.add_annotation(
686
+ text="No data available",
687
+ xref="paper", yref="paper",
688
+ x=0.5, y=0.5, showarrow=False
689
+ )
690
+
691
+ return (
692
+ [], # Update state
693
+ status,
694
+ pd.DataFrame(),
695
+ empty_fig,
696
+ empty_fig,
697
+ empty_fig,
698
+ gr.Dropdown(choices=[], value=None) # Clear CVE selector
699
+ )
700
 
701
+ def update_audience_info(audience):
702
+ """Update audience information display."""
703
+ if audience in AUDIENCE_PROFILES:
704
+ profile = AUDIENCE_PROFILES[audience]
705
+ info = f"**Focus:** {profile['focus']}\n\n**Priorities:** {', '.join(profile['priorities'])}"
706
+ return info
707
+ return "Select an audience to see details"
708
+
709
+ def update_cve_description(selected_cve, cves):
710
+ """Update the original CVE description when a CVE is selected."""
711
+ if not selected_cve or not cves:
712
+ return ""
713
+
714
+ # Extract CVE ID from the selection (format: "CVE-2024-1234 (HIGH, 7.5)")
715
+ cve_id = selected_cve.split(" (")[0]
716
+
717
+ # Find the matching CVE
718
+ for cve in cves:
719
+ if cve['id'] == cve_id:
720
+ return cve['description']
721
+
722
+ return "CVE description not found"
723
+
724
+ def generate_summary_with_status(selected_cve, audience, token, cves):
725
+ """Generate audience-specific summary with status updates."""
726
+ if not selected_cve or not audience or not cves:
727
+ return "Please select a CVE and audience first.", "❌ Missing selection"
728
+
729
+ # Extract CVE ID from the selection
730
+ cve_id = selected_cve.split(" (")[0]
731
+
732
+ # Find the matching CVE
733
+ for cve in cves:
734
+ if cve['id'] == cve_id:
735
+ # Update status to show generation in progress
736
+ yield "Generating AI summary... This may take 30-60 seconds.", "🔄 Generating..."
737
+
738
+ summary = generate_tailored_summary(cve['description'], audience, token)
739
+
740
+ if summary.startswith("❌"):
741
+ yield summary, "❌ Generation failed"
742
+ elif summary.startswith("⏳"):
743
+ yield summary, "⏳ Models busy"
744
+ else:
745
+ yield summary, "✅ Summary generated"
746
+ return
747
+
748
+ yield "CVE not found", "❌ CVE not found"
749
+
750
+ # Wire up the event handlers
751
+ fetch_btn.click(
752
+ fn=fetch_and_display,
753
+ inputs=[days_back, keyword, severity_filter],
754
+ outputs=[cve_state, status_text, cve_table, severity_chart, timeline_chart, score_chart, cve_selector]
755
+ )
756
+
757
+ audience_selector.change(
758
+ fn=update_audience_info,
759
+ inputs=[audience_selector],
760
+ outputs=[audience_info]
761
+ )
762
+
763
+ cve_selector.change(
764
+ fn=update_cve_description,
765
+ inputs=[cve_selector, cve_state],
766
+ outputs=[original_description]
767
+ )
768
+
769
+ generate_btn.click(
770
+ fn=generate_summary_with_status,
771
+ inputs=[cve_selector, audience_selector, hf_token, cve_state],
772
+ outputs=[tailored_summary, generation_status]
773
+ )
774
+
775
+ export_json_btn.click(
776
+ fn=lambda cves: dashboard.export_to_json(cves),
777
+ inputs=[cve_state],
778
+ outputs=[export_status]
779
+ )
780
+
781
+ export_csv_btn.click(
782
+ fn=lambda cves: dashboard.export_to_csv(cves),
783
+ inputs=[cve_state],
784
+ outputs=[export_status]
785
+ )
786
+
787
+ # Load initial data
788
+ interface.load(
789
+ fn=fetch_and_display,
790
+ inputs=[days_back, keyword, severity_filter],
791
+ outputs=[cve_state, status_text, cve_table, severity_chart, timeline_chart, score_chart, cve_selector]
792
+ )
793
+
794
+ return interface
795
 
796
  if __name__ == "__main__":
797
+ # Check for API keys
798
+ if os.getenv('NVD_API_KEY'):
799
+ print("✓ NVD API key loaded - Higher rate limits enabled")
800
+ else:
801
+ print("⚠ No NVD API key found - Using lower rate limits")
802
+ print(" Get a free API key at: https://nvd.nist.gov/developers/request-an-api-key")
803
+
804
+ if os.getenv('HF_TOKEN') or os.getenv('HUGGINGFACE_TOKEN'):
805
+ print("✓ HuggingFace token loaded - AI summaries enabled")
806
+ else:
807
+ print("⚠ No HuggingFace token found - Users will need to enter their own")
808
+ print(" Get a free token at: https://huggingface.co/settings/tokens")
809
+
810
+ # Create and launch the interface
811
+ app = create_interface()
812
+ app.launch()