cti_bench_processor / process_cti_bench_with_docs.py
tuandunghcmut's picture
Upload process_cti_bench_with_docs.py with huggingface_hub
d6060fa verified
#!/usr/bin/env python3
"""
Script to process CTI-bench TSV files into Hugging Face datasets with comprehensive README documentation.
"""
import pandas as pd
import os
from pathlib import Path
from datasets import Dataset
from huggingface_hub import HfApi, login
import argparse
import logging
import tempfile
# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def generate_mcq_readme(dataset_size):
"""Generate README for Multiple Choice Questions dataset."""
return f"""# CTI-Bench: Multiple Choice Questions (MCQ)
## Dataset Description
This dataset contains **{dataset_size:,} multiple choice questions** focused on cybersecurity knowledge, particularly based on the MITRE ATT&CK framework. It's part of the CTI-Bench suite for evaluating Large Language Models on Cyber Threat Intelligence tasks.
## Dataset Structure
Each example contains:
- **url**: Source URL (typically MITRE ATT&CK technique pages)
- **question**: The cybersecurity question
- **option_a**: First multiple choice option
- **option_b**: Second multiple choice option
- **option_c**: Third multiple choice option
- **option_d**: Fourth multiple choice option
- **prompt**: Full prompt with instructions for the model
- **ground_truth**: Correct answer (A, B, C, or D)
- **task_type**: Always "multiple_choice_question"
## Usage
```python
from datasets import load_dataset
# Load the dataset
dataset = load_dataset("tuandunghcmut/cti_bench_mcq")
# Access a sample
sample = dataset['train'][0]
print(f"Question: {{sample['question']}}")
print(f"Options: A) {{sample['option_a']}}, B) {{sample['option_b']}}")
print(f"Answer: {{sample['ground_truth']}}")
```
## Example
**Question:** Which of the following mitigations involves preventing applications from running that haven't been downloaded from legitimate repositories?
**Options:**
- A) Audit
- B) Execution Prevention
- C) Operating System Configuration
- D) User Account Control
**Answer:** B
## Citation
If you use this dataset, please cite the original CTI-Bench paper:
```bibtex
@article{{ctibench2024,
title={{CTIBench: A Benchmark for Evaluating LLMs in Cyber Threat Intelligence}},
author={{[Authors]}},
journal={{NeurIPS 2024}},
year={{2024}}
}}
```
## Original Source
This dataset is derived from [CTI-Bench](https://github.com/xashru/cti-bench) and is available under the same license terms.
## Tasks
This dataset is designed for:
- ✅ Multiple choice question answering
- ✅ Cybersecurity knowledge evaluation
- ✅ MITRE ATT&CK framework understanding
- ✅ Model benchmarking on CTI tasks
"""
def generate_ate_readme(dataset_size):
"""Generate README for Attack Technique Extraction dataset."""
return f"""# CTI-Bench: Attack Technique Extraction (ATE)
## Dataset Description
This dataset contains **{dataset_size} examples** for extracting MITRE Enterprise attack technique IDs from malware and attack descriptions. It tests a model's ability to map cybersecurity descriptions to specific MITRE ATT&CK techniques.
## Dataset Structure
Each example contains:
- **url**: Source URL (typically MITRE software/malware pages)
- **platform**: Target platform (Enterprise, Mobile, etc.)
- **description**: Detailed description of the malware or attack technique
- **prompt**: Full instruction prompt with MITRE technique reference list
- **ground_truth**: Comma-separated list of main MITRE technique IDs (e.g., "T1071, T1573, T1083")
- **task_type**: Always "attack_technique_extraction"
## Usage
```python
from datasets import load_dataset
# Load the dataset
dataset = load_dataset("tuandunghcmut/cti_bench_ate")
# Access a sample
sample = dataset['train'][0]
print(f"Description: {{sample['description']}}")
print(f"MITRE Techniques: {{sample['ground_truth']}}")
```
## Example
**Description:** 3PARA RAT is a remote access tool (RAT) developed in C++ and associated with the group Putter Panda. It communicates with its command and control (C2) servers via HTTP, with commands encrypted using the DES algorithm in CBC mode...
**Expected Output:** T1071, T1573, T1083, T1070
## MITRE ATT&CK Techniques
The dataset covers techniques such as:
- **T1071**: Application Layer Protocol
- **T1573**: Encrypted Channel
- **T1083**: File and Directory Discovery
- **T1105**: Ingress Tool Transfer
- And many more...
## Citation
```bibtex
@article{{ctibench2024,
title={{CTIBench: A Benchmark for Evaluating LLMs in Cyber Threat Intelligence}},
author={{[Authors]}},
journal={{NeurIPS 2024}},
year={{2024}}
}}
```
## Original Source
This dataset is derived from [CTI-Bench](https://github.com/xashru/cti-bench) and is available under the same license terms.
## Tasks
This dataset is designed for:
- ✅ Named entity recognition (MITRE technique IDs)
- ✅ Information extraction from cybersecurity text
- ✅ MITRE ATT&CK framework mapping
- ✅ Threat intelligence analysis
"""
def generate_vsp_readme(dataset_size):
"""Generate README for Vulnerability Severity Prediction dataset."""
return f"""# CTI-Bench: Vulnerability Severity Prediction (VSP)
## Dataset Description
This dataset contains **{dataset_size:,} CVE descriptions** with corresponding CVSS v3.1 base scores. It evaluates a model's ability to assess vulnerability severity and generate proper CVSS vector strings.
## Dataset Structure
Each example contains:
- **url**: CVE URL (typically from nvd.nist.gov)
- **description**: CVE description detailing the vulnerability
- **prompt**: Full instruction prompt explaining CVSS v3.1 metrics
- **cvss_vector**: Ground truth CVSS v3.1 vector string (e.g., "CVSS:3.1/AV:N/AC:L/PR:N/UI:N/S:U/C:H/I:H/A:H")
- **task_type**: Always "vulnerability_severity_prediction"
## CVSS v3.1 Metrics
The dataset covers all base metrics:
- **AV** (Attack Vector): Network (N), Adjacent (A), Local (L), Physical (P)
- **AC** (Attack Complexity): Low (L), High (H)
- **PR** (Privileges Required): None (N), Low (L), High (H)
- **UI** (User Interaction): None (N), Required (R)
- **S** (Scope): Unchanged (U), Changed (C)
- **C** (Confidentiality): None (N), Low (L), High (H)
- **I** (Integrity): None (N), Low (L), High (H)
- **A** (Availability): None (N), Low (L), High (H)
## Usage
```python
from datasets import load_dataset
# Load the dataset
dataset = load_dataset("tuandunghcmut/cti_bench_vsp")
# Access a sample
sample = dataset['train'][0]
print(f"CVE: {{sample['description']}}")
print(f"CVSS Vector: {{sample['cvss_vector']}}")
```
## Example
**CVE Description:** In the Linux kernel through 6.7.1, there is a use-after-free in cec_queue_msg_fh, related to drivers/media/cec/core/cec-adap.c...
**CVSS Vector:** CVSS:3.1/AV:L/AC:L/PR:L/UI:N/S:U/C:N/I:N/A:H
## Citation
```bibtex
@article{{ctibench2024,
title={{CTIBench: A Benchmark for Evaluating LLMs in Cyber Threat Intelligence}},
author={{[Authors]}},
journal={{NeurIPS 2024}},
year={{2024}}
}}
```
## Original Source
This dataset is derived from [CTI-Bench](https://github.com/xashru/cti-bench) and is available under the same license terms.
## Tasks
This dataset is designed for:
- ✅ Vulnerability severity assessment
- ✅ CVSS score calculation
- ✅ Risk analysis and prioritization
- ✅ Cybersecurity impact evaluation
"""
def generate_taa_readme(dataset_size):
"""Generate README for Threat Actor Attribution dataset."""
return f"""# CTI-Bench: Threat Actor Attribution (TAA)
## Dataset Description
This dataset contains **{dataset_size} examples** for threat actor attribution tasks. It evaluates a model's ability to identify and attribute cyber attacks to specific threat actors based on attack patterns, techniques, and indicators.
## Dataset Structure
Each example contains:
- **task_type**: Always "threat_actor_attribution"
- Additional fields vary based on the specific attribution task
- Common fields include threat descriptions, attack patterns, and attribution targets
## Usage
```python
from datasets import load_dataset
# Load the dataset
dataset = load_dataset("tuandunghcmut/cti_bench_taa")
# Access a sample
sample = dataset['train'][0]
print(f"Task: {{sample['task_type']}}")
```
## Attribution Categories
The dataset may cover attribution to:
- **APT Groups**: Advanced Persistent Threat organizations
- **Nation-State Actors**: Government-sponsored cyber units
- **Cybercriminal Organizations**: Profit-motivated threat groups
- **Hacktivist Groups**: Ideologically motivated actors
## Citation
```bibtex
@article{{ctibench2024,
title={{CTIBench: A Benchmark for Evaluating LLMs in Cyber Threat Intelligence}},
author={{[Authors]}},
journal={{NeurIPS 2024}},
year={{2024}}
}}
```
## Original Source
This dataset is derived from [CTI-Bench](https://github.com/xashru/cti-bench) and is available under the same license terms.
## Tasks
This dataset is designed for:
- ✅ Threat actor identification
- ✅ Attribution analysis
- ✅ Attack pattern recognition
- ✅ Intelligence correlation
"""
def generate_rcm_readme(dataset_size, variant=""):
"""Generate README for Reverse Cyber Mapping dataset."""
variant_text = f" ({variant})" if variant else ""
return f"""# CTI-Bench: Reverse Cyber Mapping (RCM){variant_text}
## Dataset Description
This dataset contains **{dataset_size:,} examples** for reverse cyber mapping tasks. It evaluates a model's ability to work backwards from observed indicators or effects to identify the underlying attack techniques, tools, or threat actors.
## Dataset Structure
Each example contains:
- **task_type**: Always "reverse_cyber_mapping"
- Additional fields vary based on the specific mapping task
- Common fields include indicators, observables, and mapping targets
## Usage
```python
from datasets import load_dataset
# Load the dataset
dataset = load_dataset("tuandunghcmut/cti_bench_rcm{'_2021' if '2021' in variant else ''}")
# Access a sample
sample = dataset['train'][0]
print(f"Task: {{sample['task_type']}}")
```
## Reverse Mapping Categories
The dataset may include mapping from:
- **Indicators of Compromise (IoCs)** → Attack techniques
- **Network signatures** → Malware families
- **Attack patterns** → Threat actors
- **Behavioral analysis** → MITRE techniques
## Citation
```bibtex
@article{{ctibench2024,
title={{CTIBench: A Benchmark for Evaluating LLMs in Cyber Threat Intelligence}},
author={{[Authors]}},
journal={{NeurIPS 2024}},
year={{2024}}
}}
```
## Original Source
This dataset is derived from [CTI-Bench](https://github.com/xashru/cti-bench) and is available under the same license terms.
## Tasks
This dataset is designed for:
- ✅ Reverse engineering of attack chains
- ✅ Indicator-to-technique mapping
- ✅ Threat hunting and investigation
- ✅ Forensic analysis
"""
def process_mcq_dataset(file_path):
"""Process Multiple Choice Questions dataset."""
logger.info(f"Processing MCQ dataset: {file_path}")
df = pd.read_csv(file_path, sep='\t')
# Clean and structure the data
processed_data = []
for _, row in df.iterrows():
processed_data.append({
'url': str(row['URL']) if pd.notna(row['URL']) else '',
'question': str(row['Question']) if pd.notna(row['Question']) else '',
'option_a': str(row['Option A']) if pd.notna(row['Option A']) else '',
'option_b': str(row['Option B']) if pd.notna(row['Option B']) else '',
'option_c': str(row['Option C']) if pd.notna(row['Option C']) else '',
'option_d': str(row['Option D']) if pd.notna(row['Option D']) else '',
'prompt': str(row['Prompt']) if pd.notna(row['Prompt']) else '',
'ground_truth': str(row['GT']) if pd.notna(row['GT']) else '',
'task_type': 'multiple_choice_question'
})
return Dataset.from_list(processed_data)
def process_ate_dataset(file_path):
"""Process Attack Technique Extraction dataset."""
logger.info(f"Processing ATE dataset: {file_path}")
df = pd.read_csv(file_path, sep='\t')
processed_data = []
for _, row in df.iterrows():
processed_data.append({
'url': str(row['URL']) if pd.notna(row['URL']) else '',
'platform': str(row['Platform']) if pd.notna(row['Platform']) else '',
'description': str(row['Description']) if pd.notna(row['Description']) else '',
'prompt': str(row['Prompt']) if pd.notna(row['Prompt']) else '',
'ground_truth': str(row['GT']) if pd.notna(row['GT']) else '',
'task_type': 'attack_technique_extraction'
})
return Dataset.from_list(processed_data)
def process_vsp_dataset(file_path):
"""Process Vulnerability Severity Prediction dataset."""
logger.info(f"Processing VSP dataset: {file_path}")
df = pd.read_csv(file_path, sep='\t')
processed_data = []
for _, row in df.iterrows():
processed_data.append({
'url': str(row['URL']) if pd.notna(row['URL']) else '',
'description': str(row['Description']) if pd.notna(row['Description']) else '',
'prompt': str(row['Prompt']) if pd.notna(row['Prompt']) else '',
'cvss_vector': str(row['GT']) if pd.notna(row['GT']) else '',
'task_type': 'vulnerability_severity_prediction'
})
return Dataset.from_list(processed_data)
def process_taa_dataset(file_path):
"""Process Threat Actor Attribution dataset."""
logger.info(f"Processing TAA dataset: {file_path}")
# Read in chunks due to potential large size
chunk_list = []
chunk_size = 10000
for chunk in pd.read_csv(file_path, sep='\t', chunksize=chunk_size):
chunk_list.append(chunk)
df = pd.concat(chunk_list, ignore_index=True)
processed_data = []
for _, row in df.iterrows():
# Handle different possible column structures for TAA
data_entry = {'task_type': 'threat_actor_attribution'}
# Try to map common column names
for col in df.columns:
col_lower = col.lower()
if 'url' in col_lower:
data_entry['url'] = str(row[col]) if pd.notna(row[col]) else ''
elif 'description' in col_lower or 'text' in col_lower:
data_entry['description'] = str(row[col]) if pd.notna(row[col]) else ''
elif 'prompt' in col_lower:
data_entry['prompt'] = str(row[col]) if pd.notna(row[col]) else ''
elif col == 'GT' or 'ground' in col_lower or 'truth' in col_lower:
data_entry['ground_truth'] = str(row[col]) if pd.notna(row[col]) else ''
else:
# Include other columns as-is
data_entry[col.lower().replace(' ', '_')] = str(row[col]) if pd.notna(row[col]) else ''
processed_data.append(data_entry)
return Dataset.from_list(processed_data)
def process_rcm_dataset(file_path):
"""Process Reverse Cyber Mapping dataset."""
logger.info(f"Processing RCM dataset: {file_path}")
# Read in chunks due to potential large size
chunk_list = []
chunk_size = 10000
for chunk in pd.read_csv(file_path, sep='\t', chunksize=chunk_size):
chunk_list.append(chunk)
df = pd.concat(chunk_list, ignore_index=True)
processed_data = []
for _, row in df.iterrows():
data_entry = {'task_type': 'reverse_cyber_mapping'}
# Map columns dynamically
for col in df.columns:
col_lower = col.lower()
if 'url' in col_lower:
data_entry['url'] = str(row[col]) if pd.notna(row[col]) else ''
elif 'description' in col_lower or 'text' in col_lower:
data_entry['description'] = str(row[col]) if pd.notna(row[col]) else ''
elif 'prompt' in col_lower:
data_entry['prompt'] = str(row[col]) if pd.notna(row[col]) else ''
elif col == 'GT' or 'ground' in col_lower or 'truth' in col_lower:
data_entry['ground_truth'] = str(row[col]) if pd.notna(row[col]) else ''
else:
data_entry[col.lower().replace(' ', '_')] = str(row[col]) if pd.notna(row[col]) else ''
processed_data.append(data_entry)
return Dataset.from_list(processed_data)
def upload_dataset_to_hub_with_readme(dataset, dataset_name, username, readme_content, token=None):
"""Upload dataset to Hugging Face Hub with README."""
try:
logger.info(f"Uploading {dataset_name} to Hugging Face Hub...")
# First, push the dataset
dataset.push_to_hub(
repo_id=f"{username}/{dataset_name}",
token=token,
private=False
)
# Then upload the README file using HfApi
api = HfApi()
# Create a temporary README file
with tempfile.NamedTemporaryFile(mode='w', suffix='.md', delete=False) as f:
f.write(readme_content)
readme_path = f.name
try:
# Upload README file
api.upload_file(
path_or_fileobj=readme_path,
path_in_repo="README.md",
repo_id=f"{username}/{dataset_name}",
repo_type="dataset",
token=token
)
finally:
# Clean up temp file
os.unlink(readme_path)
logger.info(f"Successfully uploaded {dataset_name} with documentation to {username}/{dataset_name}")
return True
except Exception as e:
logger.error(f"Error uploading {dataset_name}: {str(e)}")
return False
def main():
parser = argparse.ArgumentParser(description='Process CTI-bench TSV files and upload to Hugging Face Hub with documentation')
parser.add_argument('--username', default='tuandunghcmut', help='Hugging Face username')
parser.add_argument('--token', help='Hugging Face token (optional if logged in via CLI)')
parser.add_argument('--data-dir', default='cti-bench/data', help='Directory containing TSV files')
args = parser.parse_args()
data_dir = Path(args.data_dir)
# Define file processors with README generators
file_processors = {
'cti-mcq.tsv': ('cti_bench_mcq', process_mcq_dataset, generate_mcq_readme),
'cti-ate.tsv': ('cti_bench_ate', process_ate_dataset, generate_ate_readme),
'cti-vsp.tsv': ('cti_bench_vsp', process_vsp_dataset, generate_vsp_readme),
'cti-taa.tsv': ('cti_bench_taa', process_taa_dataset, generate_taa_readme),
'cti-rcm.tsv': ('cti_bench_rcm', process_rcm_dataset, lambda size: generate_rcm_readme(size)),
'cti-rcm-2021.tsv': ('cti_bench_rcm_2021', process_rcm_dataset, lambda size: generate_rcm_readme(size, "2021")),
}
successful_uploads = []
failed_uploads = []
# Process each file
for filename, (dataset_name, processor_func, readme_generator) in file_processors.items():
file_path = data_dir / filename
if not file_path.exists():
logger.warning(f"File not found: {file_path}")
failed_uploads.append(filename)
continue
try:
logger.info(f"Processing {filename}...")
# Process the dataset
dataset = processor_func(file_path)
dataset_size = len(dataset)
logger.info(f"Created dataset with {dataset_size:,} entries")
# Generate README
readme_content = readme_generator(dataset_size)
# Upload to Hub with README
success = upload_dataset_to_hub_with_readme(
dataset, dataset_name, args.username, readme_content, args.token
)
if success:
successful_uploads.append(dataset_name)
logger.info(f"✅ Successfully processed and uploaded: {dataset_name}")
else:
failed_uploads.append(filename)
logger.error(f"❌ Failed to upload: {dataset_name}")
except Exception as e:
logger.error(f"❌ Error processing {filename}: {str(e)}")
failed_uploads.append(filename)
# Summary
logger.info(f"\n🎉 Processing complete!")
logger.info(f"✅ Successfully uploaded {len(successful_uploads)} datasets with documentation:")
for name in successful_uploads:
logger.info(f" - https://huggingface.co/datasets/{args.username}/{name}")
if failed_uploads:
logger.info(f"❌ Failed to process {len(failed_uploads)} files:")
for name in failed_uploads:
logger.info(f" - {name}")
logger.info(f"\nVisit https://huggingface.co/{args.username} to see your uploaded datasets with full documentation!")
if __name__ == "__main__":
main()