| |
| """ |
| Script to process CTI-bench TSV files into Hugging Face datasets with comprehensive README documentation. |
| """ |
|
|
| import pandas as pd |
| import os |
| from pathlib import Path |
| from datasets import Dataset |
| from huggingface_hub import HfApi, login |
| import argparse |
| import logging |
| import tempfile |
|
|
| |
| logging.basicConfig(level=logging.INFO) |
| logger = logging.getLogger(__name__) |
|
|
| def generate_mcq_readme(dataset_size): |
| """Generate README for Multiple Choice Questions dataset.""" |
| return f"""# CTI-Bench: Multiple Choice Questions (MCQ) |
| |
| ## Dataset Description |
| |
| This dataset contains **{dataset_size:,} multiple choice questions** focused on cybersecurity knowledge, particularly based on the MITRE ATT&CK framework. It's part of the CTI-Bench suite for evaluating Large Language Models on Cyber Threat Intelligence tasks. |
| |
| ## Dataset Structure |
| |
| Each example contains: |
| - **url**: Source URL (typically MITRE ATT&CK technique pages) |
| - **question**: The cybersecurity question |
| - **option_a**: First multiple choice option |
| - **option_b**: Second multiple choice option |
| - **option_c**: Third multiple choice option |
| - **option_d**: Fourth multiple choice option |
| - **prompt**: Full prompt with instructions for the model |
| - **ground_truth**: Correct answer (A, B, C, or D) |
| - **task_type**: Always "multiple_choice_question" |
| |
| ## Usage |
| |
| ```python |
| from datasets import load_dataset |
| |
| # Load the dataset |
| dataset = load_dataset("tuandunghcmut/cti_bench_mcq") |
| |
| # Access a sample |
| sample = dataset['train'][0] |
| print(f"Question: {{sample['question']}}") |
| print(f"Options: A) {{sample['option_a']}}, B) {{sample['option_b']}}") |
| print(f"Answer: {{sample['ground_truth']}}") |
| ``` |
| |
| ## Example |
| |
| **Question:** Which of the following mitigations involves preventing applications from running that haven't been downloaded from legitimate repositories? |
| |
| **Options:** |
| - A) Audit |
| - B) Execution Prevention |
| - C) Operating System Configuration |
| - D) User Account Control |
| |
| **Answer:** B |
| |
| ## Citation |
| |
| If you use this dataset, please cite the original CTI-Bench paper: |
| |
| ```bibtex |
| @article{{ctibench2024, |
| title={{CTIBench: A Benchmark for Evaluating LLMs in Cyber Threat Intelligence}}, |
| author={{[Authors]}}, |
| journal={{NeurIPS 2024}}, |
| year={{2024}} |
| }} |
| ``` |
| |
| ## Original Source |
| |
| This dataset is derived from [CTI-Bench](https://github.com/xashru/cti-bench) and is available under the same license terms. |
| |
| ## Tasks |
| |
| This dataset is designed for: |
| - β
Multiple choice question answering |
| - β
Cybersecurity knowledge evaluation |
| - β
MITRE ATT&CK framework understanding |
| - β
Model benchmarking on CTI tasks |
| """ |
|
|
| def generate_ate_readme(dataset_size): |
| """Generate README for Attack Technique Extraction dataset.""" |
| return f"""# CTI-Bench: Attack Technique Extraction (ATE) |
| |
| ## Dataset Description |
| |
| This dataset contains **{dataset_size} examples** for extracting MITRE Enterprise attack technique IDs from malware and attack descriptions. It tests a model's ability to map cybersecurity descriptions to specific MITRE ATT&CK techniques. |
| |
| ## Dataset Structure |
| |
| Each example contains: |
| - **url**: Source URL (typically MITRE software/malware pages) |
| - **platform**: Target platform (Enterprise, Mobile, etc.) |
| - **description**: Detailed description of the malware or attack technique |
| - **prompt**: Full instruction prompt with MITRE technique reference list |
| - **ground_truth**: Comma-separated list of main MITRE technique IDs (e.g., "T1071, T1573, T1083") |
| - **task_type**: Always "attack_technique_extraction" |
| |
| ## Usage |
| |
| ```python |
| from datasets import load_dataset |
| |
| # Load the dataset |
| dataset = load_dataset("tuandunghcmut/cti_bench_ate") |
| |
| # Access a sample |
| sample = dataset['train'][0] |
| print(f"Description: {{sample['description']}}") |
| print(f"MITRE Techniques: {{sample['ground_truth']}}") |
| ``` |
| |
| ## Example |
| |
| **Description:** 3PARA RAT is a remote access tool (RAT) developed in C++ and associated with the group Putter Panda. It communicates with its command and control (C2) servers via HTTP, with commands encrypted using the DES algorithm in CBC mode... |
| |
| **Expected Output:** T1071, T1573, T1083, T1070 |
| |
| ## MITRE ATT&CK Techniques |
| |
| The dataset covers techniques such as: |
| - **T1071**: Application Layer Protocol |
| - **T1573**: Encrypted Channel |
| - **T1083**: File and Directory Discovery |
| - **T1105**: Ingress Tool Transfer |
| - And many more... |
| |
| ## Citation |
| |
| ```bibtex |
| @article{{ctibench2024, |
| title={{CTIBench: A Benchmark for Evaluating LLMs in Cyber Threat Intelligence}}, |
| author={{[Authors]}}, |
| journal={{NeurIPS 2024}}, |
| year={{2024}} |
| }} |
| ``` |
| |
| ## Original Source |
| |
| This dataset is derived from [CTI-Bench](https://github.com/xashru/cti-bench) and is available under the same license terms. |
| |
| ## Tasks |
| |
| This dataset is designed for: |
| - β
Named entity recognition (MITRE technique IDs) |
| - β
Information extraction from cybersecurity text |
| - β
MITRE ATT&CK framework mapping |
| - β
Threat intelligence analysis |
| """ |
|
|
| def generate_vsp_readme(dataset_size): |
| """Generate README for Vulnerability Severity Prediction dataset.""" |
| return f"""# CTI-Bench: Vulnerability Severity Prediction (VSP) |
| |
| ## Dataset Description |
| |
| This dataset contains **{dataset_size:,} CVE descriptions** with corresponding CVSS v3.1 base scores. It evaluates a model's ability to assess vulnerability severity and generate proper CVSS vector strings. |
| |
| ## Dataset Structure |
| |
| Each example contains: |
| - **url**: CVE URL (typically from nvd.nist.gov) |
| - **description**: CVE description detailing the vulnerability |
| - **prompt**: Full instruction prompt explaining CVSS v3.1 metrics |
| - **cvss_vector**: Ground truth CVSS v3.1 vector string (e.g., "CVSS:3.1/AV:N/AC:L/PR:N/UI:N/S:U/C:H/I:H/A:H") |
| - **task_type**: Always "vulnerability_severity_prediction" |
| |
| ## CVSS v3.1 Metrics |
| |
| The dataset covers all base metrics: |
| - **AV** (Attack Vector): Network (N), Adjacent (A), Local (L), Physical (P) |
| - **AC** (Attack Complexity): Low (L), High (H) |
| - **PR** (Privileges Required): None (N), Low (L), High (H) |
| - **UI** (User Interaction): None (N), Required (R) |
| - **S** (Scope): Unchanged (U), Changed (C) |
| - **C** (Confidentiality): None (N), Low (L), High (H) |
| - **I** (Integrity): None (N), Low (L), High (H) |
| - **A** (Availability): None (N), Low (L), High (H) |
| |
| ## Usage |
| |
| ```python |
| from datasets import load_dataset |
| |
| # Load the dataset |
| dataset = load_dataset("tuandunghcmut/cti_bench_vsp") |
| |
| # Access a sample |
| sample = dataset['train'][0] |
| print(f"CVE: {{sample['description']}}") |
| print(f"CVSS Vector: {{sample['cvss_vector']}}") |
| ``` |
| |
| ## Example |
| |
| **CVE Description:** In the Linux kernel through 6.7.1, there is a use-after-free in cec_queue_msg_fh, related to drivers/media/cec/core/cec-adap.c... |
| |
| **CVSS Vector:** CVSS:3.1/AV:L/AC:L/PR:L/UI:N/S:U/C:N/I:N/A:H |
| |
| ## Citation |
| |
| ```bibtex |
| @article{{ctibench2024, |
| title={{CTIBench: A Benchmark for Evaluating LLMs in Cyber Threat Intelligence}}, |
| author={{[Authors]}}, |
| journal={{NeurIPS 2024}}, |
| year={{2024}} |
| }} |
| ``` |
| |
| ## Original Source |
| |
| This dataset is derived from [CTI-Bench](https://github.com/xashru/cti-bench) and is available under the same license terms. |
| |
| ## Tasks |
| |
| This dataset is designed for: |
| - β
Vulnerability severity assessment |
| - β
CVSS score calculation |
| - β
Risk analysis and prioritization |
| - β
Cybersecurity impact evaluation |
| """ |
|
|
| def generate_taa_readme(dataset_size): |
| """Generate README for Threat Actor Attribution dataset.""" |
| return f"""# CTI-Bench: Threat Actor Attribution (TAA) |
| |
| ## Dataset Description |
| |
| This dataset contains **{dataset_size} examples** for threat actor attribution tasks. It evaluates a model's ability to identify and attribute cyber attacks to specific threat actors based on attack patterns, techniques, and indicators. |
| |
| ## Dataset Structure |
| |
| Each example contains: |
| - **task_type**: Always "threat_actor_attribution" |
| - Additional fields vary based on the specific attribution task |
| - Common fields include threat descriptions, attack patterns, and attribution targets |
| |
| ## Usage |
| |
| ```python |
| from datasets import load_dataset |
| |
| # Load the dataset |
| dataset = load_dataset("tuandunghcmut/cti_bench_taa") |
| |
| # Access a sample |
| sample = dataset['train'][0] |
| print(f"Task: {{sample['task_type']}}") |
| ``` |
| |
| ## Attribution Categories |
| |
| The dataset may cover attribution to: |
| - **APT Groups**: Advanced Persistent Threat organizations |
| - **Nation-State Actors**: Government-sponsored cyber units |
| - **Cybercriminal Organizations**: Profit-motivated threat groups |
| - **Hacktivist Groups**: Ideologically motivated actors |
| |
| ## Citation |
| |
| ```bibtex |
| @article{{ctibench2024, |
| title={{CTIBench: A Benchmark for Evaluating LLMs in Cyber Threat Intelligence}}, |
| author={{[Authors]}}, |
| journal={{NeurIPS 2024}}, |
| year={{2024}} |
| }} |
| ``` |
| |
| ## Original Source |
| |
| This dataset is derived from [CTI-Bench](https://github.com/xashru/cti-bench) and is available under the same license terms. |
| |
| ## Tasks |
| |
| This dataset is designed for: |
| - β
Threat actor identification |
| - β
Attribution analysis |
| - β
Attack pattern recognition |
| - β
Intelligence correlation |
| """ |
|
|
| def generate_rcm_readme(dataset_size, variant=""): |
| """Generate README for Reverse Cyber Mapping dataset.""" |
| variant_text = f" ({variant})" if variant else "" |
| return f"""# CTI-Bench: Reverse Cyber Mapping (RCM){variant_text} |
| |
| ## Dataset Description |
| |
| This dataset contains **{dataset_size:,} examples** for reverse cyber mapping tasks. It evaluates a model's ability to work backwards from observed indicators or effects to identify the underlying attack techniques, tools, or threat actors. |
| |
| ## Dataset Structure |
| |
| Each example contains: |
| - **task_type**: Always "reverse_cyber_mapping" |
| - Additional fields vary based on the specific mapping task |
| - Common fields include indicators, observables, and mapping targets |
| |
| ## Usage |
| |
| ```python |
| from datasets import load_dataset |
| |
| # Load the dataset |
| dataset = load_dataset("tuandunghcmut/cti_bench_rcm{'_2021' if '2021' in variant else ''}") |
| |
| # Access a sample |
| sample = dataset['train'][0] |
| print(f"Task: {{sample['task_type']}}") |
| ``` |
| |
| ## Reverse Mapping Categories |
| |
| The dataset may include mapping from: |
| - **Indicators of Compromise (IoCs)** β Attack techniques |
| - **Network signatures** β Malware families |
| - **Attack patterns** β Threat actors |
| - **Behavioral analysis** β MITRE techniques |
| |
| ## Citation |
| |
| ```bibtex |
| @article{{ctibench2024, |
| title={{CTIBench: A Benchmark for Evaluating LLMs in Cyber Threat Intelligence}}, |
| author={{[Authors]}}, |
| journal={{NeurIPS 2024}}, |
| year={{2024}} |
| }} |
| ``` |
| |
| ## Original Source |
| |
| This dataset is derived from [CTI-Bench](https://github.com/xashru/cti-bench) and is available under the same license terms. |
| |
| ## Tasks |
| |
| This dataset is designed for: |
| - β
Reverse engineering of attack chains |
| - β
Indicator-to-technique mapping |
| - β
Threat hunting and investigation |
| - β
Forensic analysis |
| """ |
|
|
| def process_mcq_dataset(file_path): |
| """Process Multiple Choice Questions dataset.""" |
| logger.info(f"Processing MCQ dataset: {file_path}") |
| |
| df = pd.read_csv(file_path, sep='\t') |
| |
| |
| processed_data = [] |
| for _, row in df.iterrows(): |
| processed_data.append({ |
| 'url': str(row['URL']) if pd.notna(row['URL']) else '', |
| 'question': str(row['Question']) if pd.notna(row['Question']) else '', |
| 'option_a': str(row['Option A']) if pd.notna(row['Option A']) else '', |
| 'option_b': str(row['Option B']) if pd.notna(row['Option B']) else '', |
| 'option_c': str(row['Option C']) if pd.notna(row['Option C']) else '', |
| 'option_d': str(row['Option D']) if pd.notna(row['Option D']) else '', |
| 'prompt': str(row['Prompt']) if pd.notna(row['Prompt']) else '', |
| 'ground_truth': str(row['GT']) if pd.notna(row['GT']) else '', |
| 'task_type': 'multiple_choice_question' |
| }) |
| |
| return Dataset.from_list(processed_data) |
|
|
| def process_ate_dataset(file_path): |
| """Process Attack Technique Extraction dataset.""" |
| logger.info(f"Processing ATE dataset: {file_path}") |
| |
| df = pd.read_csv(file_path, sep='\t') |
| |
| processed_data = [] |
| for _, row in df.iterrows(): |
| processed_data.append({ |
| 'url': str(row['URL']) if pd.notna(row['URL']) else '', |
| 'platform': str(row['Platform']) if pd.notna(row['Platform']) else '', |
| 'description': str(row['Description']) if pd.notna(row['Description']) else '', |
| 'prompt': str(row['Prompt']) if pd.notna(row['Prompt']) else '', |
| 'ground_truth': str(row['GT']) if pd.notna(row['GT']) else '', |
| 'task_type': 'attack_technique_extraction' |
| }) |
| |
| return Dataset.from_list(processed_data) |
|
|
| def process_vsp_dataset(file_path): |
| """Process Vulnerability Severity Prediction dataset.""" |
| logger.info(f"Processing VSP dataset: {file_path}") |
| |
| df = pd.read_csv(file_path, sep='\t') |
| |
| processed_data = [] |
| for _, row in df.iterrows(): |
| processed_data.append({ |
| 'url': str(row['URL']) if pd.notna(row['URL']) else '', |
| 'description': str(row['Description']) if pd.notna(row['Description']) else '', |
| 'prompt': str(row['Prompt']) if pd.notna(row['Prompt']) else '', |
| 'cvss_vector': str(row['GT']) if pd.notna(row['GT']) else '', |
| 'task_type': 'vulnerability_severity_prediction' |
| }) |
| |
| return Dataset.from_list(processed_data) |
|
|
| def process_taa_dataset(file_path): |
| """Process Threat Actor Attribution dataset.""" |
| logger.info(f"Processing TAA dataset: {file_path}") |
| |
| |
| chunk_list = [] |
| chunk_size = 10000 |
| |
| for chunk in pd.read_csv(file_path, sep='\t', chunksize=chunk_size): |
| chunk_list.append(chunk) |
| |
| df = pd.concat(chunk_list, ignore_index=True) |
| |
| processed_data = [] |
| for _, row in df.iterrows(): |
| |
| data_entry = {'task_type': 'threat_actor_attribution'} |
| |
| |
| for col in df.columns: |
| col_lower = col.lower() |
| if 'url' in col_lower: |
| data_entry['url'] = str(row[col]) if pd.notna(row[col]) else '' |
| elif 'description' in col_lower or 'text' in col_lower: |
| data_entry['description'] = str(row[col]) if pd.notna(row[col]) else '' |
| elif 'prompt' in col_lower: |
| data_entry['prompt'] = str(row[col]) if pd.notna(row[col]) else '' |
| elif col == 'GT' or 'ground' in col_lower or 'truth' in col_lower: |
| data_entry['ground_truth'] = str(row[col]) if pd.notna(row[col]) else '' |
| else: |
| |
| data_entry[col.lower().replace(' ', '_')] = str(row[col]) if pd.notna(row[col]) else '' |
| |
| processed_data.append(data_entry) |
| |
| return Dataset.from_list(processed_data) |
|
|
| def process_rcm_dataset(file_path): |
| """Process Reverse Cyber Mapping dataset.""" |
| logger.info(f"Processing RCM dataset: {file_path}") |
| |
| |
| chunk_list = [] |
| chunk_size = 10000 |
| |
| for chunk in pd.read_csv(file_path, sep='\t', chunksize=chunk_size): |
| chunk_list.append(chunk) |
| |
| df = pd.concat(chunk_list, ignore_index=True) |
| |
| processed_data = [] |
| for _, row in df.iterrows(): |
| data_entry = {'task_type': 'reverse_cyber_mapping'} |
| |
| |
| for col in df.columns: |
| col_lower = col.lower() |
| if 'url' in col_lower: |
| data_entry['url'] = str(row[col]) if pd.notna(row[col]) else '' |
| elif 'description' in col_lower or 'text' in col_lower: |
| data_entry['description'] = str(row[col]) if pd.notna(row[col]) else '' |
| elif 'prompt' in col_lower: |
| data_entry['prompt'] = str(row[col]) if pd.notna(row[col]) else '' |
| elif col == 'GT' or 'ground' in col_lower or 'truth' in col_lower: |
| data_entry['ground_truth'] = str(row[col]) if pd.notna(row[col]) else '' |
| else: |
| data_entry[col.lower().replace(' ', '_')] = str(row[col]) if pd.notna(row[col]) else '' |
| |
| processed_data.append(data_entry) |
| |
| return Dataset.from_list(processed_data) |
|
|
| def upload_dataset_to_hub_with_readme(dataset, dataset_name, username, readme_content, token=None): |
| """Upload dataset to Hugging Face Hub with README.""" |
| try: |
| logger.info(f"Uploading {dataset_name} to Hugging Face Hub...") |
| |
| |
| dataset.push_to_hub( |
| repo_id=f"{username}/{dataset_name}", |
| token=token, |
| private=False |
| ) |
| |
| |
| api = HfApi() |
| |
| |
| with tempfile.NamedTemporaryFile(mode='w', suffix='.md', delete=False) as f: |
| f.write(readme_content) |
| readme_path = f.name |
| |
| try: |
| |
| api.upload_file( |
| path_or_fileobj=readme_path, |
| path_in_repo="README.md", |
| repo_id=f"{username}/{dataset_name}", |
| repo_type="dataset", |
| token=token |
| ) |
| finally: |
| |
| os.unlink(readme_path) |
| |
| logger.info(f"Successfully uploaded {dataset_name} with documentation to {username}/{dataset_name}") |
| return True |
| |
| except Exception as e: |
| logger.error(f"Error uploading {dataset_name}: {str(e)}") |
| return False |
|
|
| def main(): |
| parser = argparse.ArgumentParser(description='Process CTI-bench TSV files and upload to Hugging Face Hub with documentation') |
| parser.add_argument('--username', default='tuandunghcmut', help='Hugging Face username') |
| parser.add_argument('--token', help='Hugging Face token (optional if logged in via CLI)') |
| parser.add_argument('--data-dir', default='cti-bench/data', help='Directory containing TSV files') |
| |
| args = parser.parse_args() |
| |
| data_dir = Path(args.data_dir) |
| |
| |
| file_processors = { |
| 'cti-mcq.tsv': ('cti_bench_mcq', process_mcq_dataset, generate_mcq_readme), |
| 'cti-ate.tsv': ('cti_bench_ate', process_ate_dataset, generate_ate_readme), |
| 'cti-vsp.tsv': ('cti_bench_vsp', process_vsp_dataset, generate_vsp_readme), |
| 'cti-taa.tsv': ('cti_bench_taa', process_taa_dataset, generate_taa_readme), |
| 'cti-rcm.tsv': ('cti_bench_rcm', process_rcm_dataset, lambda size: generate_rcm_readme(size)), |
| 'cti-rcm-2021.tsv': ('cti_bench_rcm_2021', process_rcm_dataset, lambda size: generate_rcm_readme(size, "2021")), |
| } |
| |
| successful_uploads = [] |
| failed_uploads = [] |
| |
| |
| for filename, (dataset_name, processor_func, readme_generator) in file_processors.items(): |
| file_path = data_dir / filename |
| |
| if not file_path.exists(): |
| logger.warning(f"File not found: {file_path}") |
| failed_uploads.append(filename) |
| continue |
| |
| try: |
| logger.info(f"Processing {filename}...") |
| |
| |
| dataset = processor_func(file_path) |
| dataset_size = len(dataset) |
| logger.info(f"Created dataset with {dataset_size:,} entries") |
| |
| |
| readme_content = readme_generator(dataset_size) |
| |
| |
| success = upload_dataset_to_hub_with_readme( |
| dataset, dataset_name, args.username, readme_content, args.token |
| ) |
| |
| if success: |
| successful_uploads.append(dataset_name) |
| logger.info(f"β
Successfully processed and uploaded: {dataset_name}") |
| else: |
| failed_uploads.append(filename) |
| logger.error(f"β Failed to upload: {dataset_name}") |
| |
| except Exception as e: |
| logger.error(f"β Error processing {filename}: {str(e)}") |
| failed_uploads.append(filename) |
| |
| |
| logger.info(f"\nπ Processing complete!") |
| logger.info(f"β
Successfully uploaded {len(successful_uploads)} datasets with documentation:") |
| for name in successful_uploads: |
| logger.info(f" - https://huggingface.co/datasets/{args.username}/{name}") |
| |
| if failed_uploads: |
| logger.info(f"β Failed to process {len(failed_uploads)} files:") |
| for name in failed_uploads: |
| logger.info(f" - {name}") |
| |
| logger.info(f"\nVisit https://huggingface.co/{args.username} to see your uploaded datasets with full documentation!") |
|
|
| if __name__ == "__main__": |
| main() |
|
|