#!/usr/bin/env python3 """ Script to process CTI-bench TSV files into Hugging Face datasets with comprehensive README documentation. """ import pandas as pd import os from pathlib import Path from datasets import Dataset from huggingface_hub import HfApi, login import argparse import logging import tempfile # Set up logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) def generate_mcq_readme(dataset_size): """Generate README for Multiple Choice Questions dataset.""" return f"""# CTI-Bench: Multiple Choice Questions (MCQ) ## Dataset Description This dataset contains **{dataset_size:,} multiple choice questions** focused on cybersecurity knowledge, particularly based on the MITRE ATT&CK framework. It's part of the CTI-Bench suite for evaluating Large Language Models on Cyber Threat Intelligence tasks. ## Dataset Structure Each example contains: - **url**: Source URL (typically MITRE ATT&CK technique pages) - **question**: The cybersecurity question - **option_a**: First multiple choice option - **option_b**: Second multiple choice option - **option_c**: Third multiple choice option - **option_d**: Fourth multiple choice option - **prompt**: Full prompt with instructions for the model - **ground_truth**: Correct answer (A, B, C, or D) - **task_type**: Always "multiple_choice_question" ## Usage ```python from datasets import load_dataset # Load the dataset dataset = load_dataset("tuandunghcmut/cti_bench_mcq") # Access a sample sample = dataset['train'][0] print(f"Question: {{sample['question']}}") print(f"Options: A) {{sample['option_a']}}, B) {{sample['option_b']}}") print(f"Answer: {{sample['ground_truth']}}") ``` ## Example **Question:** Which of the following mitigations involves preventing applications from running that haven't been downloaded from legitimate repositories? **Options:** - A) Audit - B) Execution Prevention - C) Operating System Configuration - D) User Account Control **Answer:** B ## Citation If you use this dataset, please cite the original CTI-Bench paper: ```bibtex @article{{ctibench2024, title={{CTIBench: A Benchmark for Evaluating LLMs in Cyber Threat Intelligence}}, author={{[Authors]}}, journal={{NeurIPS 2024}}, year={{2024}} }} ``` ## Original Source This dataset is derived from [CTI-Bench](https://github.com/xashru/cti-bench) and is available under the same license terms. ## Tasks This dataset is designed for: - ✅ Multiple choice question answering - ✅ Cybersecurity knowledge evaluation - ✅ MITRE ATT&CK framework understanding - ✅ Model benchmarking on CTI tasks """ def generate_ate_readme(dataset_size): """Generate README for Attack Technique Extraction dataset.""" return f"""# CTI-Bench: Attack Technique Extraction (ATE) ## Dataset Description This dataset contains **{dataset_size} examples** for extracting MITRE Enterprise attack technique IDs from malware and attack descriptions. It tests a model's ability to map cybersecurity descriptions to specific MITRE ATT&CK techniques. ## Dataset Structure Each example contains: - **url**: Source URL (typically MITRE software/malware pages) - **platform**: Target platform (Enterprise, Mobile, etc.) - **description**: Detailed description of the malware or attack technique - **prompt**: Full instruction prompt with MITRE technique reference list - **ground_truth**: Comma-separated list of main MITRE technique IDs (e.g., "T1071, T1573, T1083") - **task_type**: Always "attack_technique_extraction" ## Usage ```python from datasets import load_dataset # Load the dataset dataset = load_dataset("tuandunghcmut/cti_bench_ate") # Access a sample sample = dataset['train'][0] print(f"Description: {{sample['description']}}") print(f"MITRE Techniques: {{sample['ground_truth']}}") ``` ## Example **Description:** 3PARA RAT is a remote access tool (RAT) developed in C++ and associated with the group Putter Panda. It communicates with its command and control (C2) servers via HTTP, with commands encrypted using the DES algorithm in CBC mode... **Expected Output:** T1071, T1573, T1083, T1070 ## MITRE ATT&CK Techniques The dataset covers techniques such as: - **T1071**: Application Layer Protocol - **T1573**: Encrypted Channel - **T1083**: File and Directory Discovery - **T1105**: Ingress Tool Transfer - And many more... ## Citation ```bibtex @article{{ctibench2024, title={{CTIBench: A Benchmark for Evaluating LLMs in Cyber Threat Intelligence}}, author={{[Authors]}}, journal={{NeurIPS 2024}}, year={{2024}} }} ``` ## Original Source This dataset is derived from [CTI-Bench](https://github.com/xashru/cti-bench) and is available under the same license terms. ## Tasks This dataset is designed for: - ✅ Named entity recognition (MITRE technique IDs) - ✅ Information extraction from cybersecurity text - ✅ MITRE ATT&CK framework mapping - ✅ Threat intelligence analysis """ def generate_vsp_readme(dataset_size): """Generate README for Vulnerability Severity Prediction dataset.""" return f"""# CTI-Bench: Vulnerability Severity Prediction (VSP) ## Dataset Description This dataset contains **{dataset_size:,} CVE descriptions** with corresponding CVSS v3.1 base scores. It evaluates a model's ability to assess vulnerability severity and generate proper CVSS vector strings. ## Dataset Structure Each example contains: - **url**: CVE URL (typically from nvd.nist.gov) - **description**: CVE description detailing the vulnerability - **prompt**: Full instruction prompt explaining CVSS v3.1 metrics - **cvss_vector**: Ground truth CVSS v3.1 vector string (e.g., "CVSS:3.1/AV:N/AC:L/PR:N/UI:N/S:U/C:H/I:H/A:H") - **task_type**: Always "vulnerability_severity_prediction" ## CVSS v3.1 Metrics The dataset covers all base metrics: - **AV** (Attack Vector): Network (N), Adjacent (A), Local (L), Physical (P) - **AC** (Attack Complexity): Low (L), High (H) - **PR** (Privileges Required): None (N), Low (L), High (H) - **UI** (User Interaction): None (N), Required (R) - **S** (Scope): Unchanged (U), Changed (C) - **C** (Confidentiality): None (N), Low (L), High (H) - **I** (Integrity): None (N), Low (L), High (H) - **A** (Availability): None (N), Low (L), High (H) ## Usage ```python from datasets import load_dataset # Load the dataset dataset = load_dataset("tuandunghcmut/cti_bench_vsp") # Access a sample sample = dataset['train'][0] print(f"CVE: {{sample['description']}}") print(f"CVSS Vector: {{sample['cvss_vector']}}") ``` ## Example **CVE Description:** In the Linux kernel through 6.7.1, there is a use-after-free in cec_queue_msg_fh, related to drivers/media/cec/core/cec-adap.c... **CVSS Vector:** CVSS:3.1/AV:L/AC:L/PR:L/UI:N/S:U/C:N/I:N/A:H ## Citation ```bibtex @article{{ctibench2024, title={{CTIBench: A Benchmark for Evaluating LLMs in Cyber Threat Intelligence}}, author={{[Authors]}}, journal={{NeurIPS 2024}}, year={{2024}} }} ``` ## Original Source This dataset is derived from [CTI-Bench](https://github.com/xashru/cti-bench) and is available under the same license terms. ## Tasks This dataset is designed for: - ✅ Vulnerability severity assessment - ✅ CVSS score calculation - ✅ Risk analysis and prioritization - ✅ Cybersecurity impact evaluation """ def generate_taa_readme(dataset_size): """Generate README for Threat Actor Attribution dataset.""" return f"""# CTI-Bench: Threat Actor Attribution (TAA) ## Dataset Description This dataset contains **{dataset_size} examples** for threat actor attribution tasks. It evaluates a model's ability to identify and attribute cyber attacks to specific threat actors based on attack patterns, techniques, and indicators. ## Dataset Structure Each example contains: - **task_type**: Always "threat_actor_attribution" - Additional fields vary based on the specific attribution task - Common fields include threat descriptions, attack patterns, and attribution targets ## Usage ```python from datasets import load_dataset # Load the dataset dataset = load_dataset("tuandunghcmut/cti_bench_taa") # Access a sample sample = dataset['train'][0] print(f"Task: {{sample['task_type']}}") ``` ## Attribution Categories The dataset may cover attribution to: - **APT Groups**: Advanced Persistent Threat organizations - **Nation-State Actors**: Government-sponsored cyber units - **Cybercriminal Organizations**: Profit-motivated threat groups - **Hacktivist Groups**: Ideologically motivated actors ## Citation ```bibtex @article{{ctibench2024, title={{CTIBench: A Benchmark for Evaluating LLMs in Cyber Threat Intelligence}}, author={{[Authors]}}, journal={{NeurIPS 2024}}, year={{2024}} }} ``` ## Original Source This dataset is derived from [CTI-Bench](https://github.com/xashru/cti-bench) and is available under the same license terms. ## Tasks This dataset is designed for: - ✅ Threat actor identification - ✅ Attribution analysis - ✅ Attack pattern recognition - ✅ Intelligence correlation """ def generate_rcm_readme(dataset_size, variant=""): """Generate README for Reverse Cyber Mapping dataset.""" variant_text = f" ({variant})" if variant else "" return f"""# CTI-Bench: Reverse Cyber Mapping (RCM){variant_text} ## Dataset Description This dataset contains **{dataset_size:,} examples** for reverse cyber mapping tasks. It evaluates a model's ability to work backwards from observed indicators or effects to identify the underlying attack techniques, tools, or threat actors. ## Dataset Structure Each example contains: - **task_type**: Always "reverse_cyber_mapping" - Additional fields vary based on the specific mapping task - Common fields include indicators, observables, and mapping targets ## Usage ```python from datasets import load_dataset # Load the dataset dataset = load_dataset("tuandunghcmut/cti_bench_rcm{'_2021' if '2021' in variant else ''}") # Access a sample sample = dataset['train'][0] print(f"Task: {{sample['task_type']}}") ``` ## Reverse Mapping Categories The dataset may include mapping from: - **Indicators of Compromise (IoCs)** → Attack techniques - **Network signatures** → Malware families - **Attack patterns** → Threat actors - **Behavioral analysis** → MITRE techniques ## Citation ```bibtex @article{{ctibench2024, title={{CTIBench: A Benchmark for Evaluating LLMs in Cyber Threat Intelligence}}, author={{[Authors]}}, journal={{NeurIPS 2024}}, year={{2024}} }} ``` ## Original Source This dataset is derived from [CTI-Bench](https://github.com/xashru/cti-bench) and is available under the same license terms. ## Tasks This dataset is designed for: - ✅ Reverse engineering of attack chains - ✅ Indicator-to-technique mapping - ✅ Threat hunting and investigation - ✅ Forensic analysis """ def process_mcq_dataset(file_path): """Process Multiple Choice Questions dataset.""" logger.info(f"Processing MCQ dataset: {file_path}") df = pd.read_csv(file_path, sep='\t') # Clean and structure the data processed_data = [] for _, row in df.iterrows(): processed_data.append({ 'url': str(row['URL']) if pd.notna(row['URL']) else '', 'question': str(row['Question']) if pd.notna(row['Question']) else '', 'option_a': str(row['Option A']) if pd.notna(row['Option A']) else '', 'option_b': str(row['Option B']) if pd.notna(row['Option B']) else '', 'option_c': str(row['Option C']) if pd.notna(row['Option C']) else '', 'option_d': str(row['Option D']) if pd.notna(row['Option D']) else '', 'prompt': str(row['Prompt']) if pd.notna(row['Prompt']) else '', 'ground_truth': str(row['GT']) if pd.notna(row['GT']) else '', 'task_type': 'multiple_choice_question' }) return Dataset.from_list(processed_data) def process_ate_dataset(file_path): """Process Attack Technique Extraction dataset.""" logger.info(f"Processing ATE dataset: {file_path}") df = pd.read_csv(file_path, sep='\t') processed_data = [] for _, row in df.iterrows(): processed_data.append({ 'url': str(row['URL']) if pd.notna(row['URL']) else '', 'platform': str(row['Platform']) if pd.notna(row['Platform']) else '', 'description': str(row['Description']) if pd.notna(row['Description']) else '', 'prompt': str(row['Prompt']) if pd.notna(row['Prompt']) else '', 'ground_truth': str(row['GT']) if pd.notna(row['GT']) else '', 'task_type': 'attack_technique_extraction' }) return Dataset.from_list(processed_data) def process_vsp_dataset(file_path): """Process Vulnerability Severity Prediction dataset.""" logger.info(f"Processing VSP dataset: {file_path}") df = pd.read_csv(file_path, sep='\t') processed_data = [] for _, row in df.iterrows(): processed_data.append({ 'url': str(row['URL']) if pd.notna(row['URL']) else '', 'description': str(row['Description']) if pd.notna(row['Description']) else '', 'prompt': str(row['Prompt']) if pd.notna(row['Prompt']) else '', 'cvss_vector': str(row['GT']) if pd.notna(row['GT']) else '', 'task_type': 'vulnerability_severity_prediction' }) return Dataset.from_list(processed_data) def process_taa_dataset(file_path): """Process Threat Actor Attribution dataset.""" logger.info(f"Processing TAA dataset: {file_path}") # Read in chunks due to potential large size chunk_list = [] chunk_size = 10000 for chunk in pd.read_csv(file_path, sep='\t', chunksize=chunk_size): chunk_list.append(chunk) df = pd.concat(chunk_list, ignore_index=True) processed_data = [] for _, row in df.iterrows(): # Handle different possible column structures for TAA data_entry = {'task_type': 'threat_actor_attribution'} # Try to map common column names for col in df.columns: col_lower = col.lower() if 'url' in col_lower: data_entry['url'] = str(row[col]) if pd.notna(row[col]) else '' elif 'description' in col_lower or 'text' in col_lower: data_entry['description'] = str(row[col]) if pd.notna(row[col]) else '' elif 'prompt' in col_lower: data_entry['prompt'] = str(row[col]) if pd.notna(row[col]) else '' elif col == 'GT' or 'ground' in col_lower or 'truth' in col_lower: data_entry['ground_truth'] = str(row[col]) if pd.notna(row[col]) else '' else: # Include other columns as-is data_entry[col.lower().replace(' ', '_')] = str(row[col]) if pd.notna(row[col]) else '' processed_data.append(data_entry) return Dataset.from_list(processed_data) def process_rcm_dataset(file_path): """Process Reverse Cyber Mapping dataset.""" logger.info(f"Processing RCM dataset: {file_path}") # Read in chunks due to potential large size chunk_list = [] chunk_size = 10000 for chunk in pd.read_csv(file_path, sep='\t', chunksize=chunk_size): chunk_list.append(chunk) df = pd.concat(chunk_list, ignore_index=True) processed_data = [] for _, row in df.iterrows(): data_entry = {'task_type': 'reverse_cyber_mapping'} # Map columns dynamically for col in df.columns: col_lower = col.lower() if 'url' in col_lower: data_entry['url'] = str(row[col]) if pd.notna(row[col]) else '' elif 'description' in col_lower or 'text' in col_lower: data_entry['description'] = str(row[col]) if pd.notna(row[col]) else '' elif 'prompt' in col_lower: data_entry['prompt'] = str(row[col]) if pd.notna(row[col]) else '' elif col == 'GT' or 'ground' in col_lower or 'truth' in col_lower: data_entry['ground_truth'] = str(row[col]) if pd.notna(row[col]) else '' else: data_entry[col.lower().replace(' ', '_')] = str(row[col]) if pd.notna(row[col]) else '' processed_data.append(data_entry) return Dataset.from_list(processed_data) def upload_dataset_to_hub_with_readme(dataset, dataset_name, username, readme_content, token=None): """Upload dataset to Hugging Face Hub with README.""" try: logger.info(f"Uploading {dataset_name} to Hugging Face Hub...") # First, push the dataset dataset.push_to_hub( repo_id=f"{username}/{dataset_name}", token=token, private=False ) # Then upload the README file using HfApi api = HfApi() # Create a temporary README file with tempfile.NamedTemporaryFile(mode='w', suffix='.md', delete=False) as f: f.write(readme_content) readme_path = f.name try: # Upload README file api.upload_file( path_or_fileobj=readme_path, path_in_repo="README.md", repo_id=f"{username}/{dataset_name}", repo_type="dataset", token=token ) finally: # Clean up temp file os.unlink(readme_path) logger.info(f"Successfully uploaded {dataset_name} with documentation to {username}/{dataset_name}") return True except Exception as e: logger.error(f"Error uploading {dataset_name}: {str(e)}") return False def main(): parser = argparse.ArgumentParser(description='Process CTI-bench TSV files and upload to Hugging Face Hub with documentation') parser.add_argument('--username', default='tuandunghcmut', help='Hugging Face username') parser.add_argument('--token', help='Hugging Face token (optional if logged in via CLI)') parser.add_argument('--data-dir', default='cti-bench/data', help='Directory containing TSV files') args = parser.parse_args() data_dir = Path(args.data_dir) # Define file processors with README generators file_processors = { 'cti-mcq.tsv': ('cti_bench_mcq', process_mcq_dataset, generate_mcq_readme), 'cti-ate.tsv': ('cti_bench_ate', process_ate_dataset, generate_ate_readme), 'cti-vsp.tsv': ('cti_bench_vsp', process_vsp_dataset, generate_vsp_readme), 'cti-taa.tsv': ('cti_bench_taa', process_taa_dataset, generate_taa_readme), 'cti-rcm.tsv': ('cti_bench_rcm', process_rcm_dataset, lambda size: generate_rcm_readme(size)), 'cti-rcm-2021.tsv': ('cti_bench_rcm_2021', process_rcm_dataset, lambda size: generate_rcm_readme(size, "2021")), } successful_uploads = [] failed_uploads = [] # Process each file for filename, (dataset_name, processor_func, readme_generator) in file_processors.items(): file_path = data_dir / filename if not file_path.exists(): logger.warning(f"File not found: {file_path}") failed_uploads.append(filename) continue try: logger.info(f"Processing {filename}...") # Process the dataset dataset = processor_func(file_path) dataset_size = len(dataset) logger.info(f"Created dataset with {dataset_size:,} entries") # Generate README readme_content = readme_generator(dataset_size) # Upload to Hub with README success = upload_dataset_to_hub_with_readme( dataset, dataset_name, args.username, readme_content, args.token ) if success: successful_uploads.append(dataset_name) logger.info(f"✅ Successfully processed and uploaded: {dataset_name}") else: failed_uploads.append(filename) logger.error(f"❌ Failed to upload: {dataset_name}") except Exception as e: logger.error(f"❌ Error processing {filename}: {str(e)}") failed_uploads.append(filename) # Summary logger.info(f"\n🎉 Processing complete!") logger.info(f"✅ Successfully uploaded {len(successful_uploads)} datasets with documentation:") for name in successful_uploads: logger.info(f" - https://huggingface.co/datasets/{args.username}/{name}") if failed_uploads: logger.info(f"❌ Failed to process {len(failed_uploads)} files:") for name in failed_uploads: logger.info(f" - {name}") logger.info(f"\nVisit https://huggingface.co/{args.username} to see your uploaded datasets with full documentation!") if __name__ == "__main__": main()