CyberCoder-7B-v1 / evaluate_cybersec.py
moro72842's picture
Upload evaluate_cybersec.py
f6d6bab verified
"""
Evaluation script for CyberCoder-7B-v1
Tests:
1. JSON structured output validity and accuracy
2. Cybersecurity knowledge (CVE analysis, vulnerability detection)
3. Code reasoning
Usage:
pip install transformers torch peft
python evaluate_cybersec.py
"""
import json
import os
import re
import torch
from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline
from peft import PeftModel
MODEL_ID = "moro72842/CyberCoder-7B-v1"
BASE_MODEL = "Qwen/Qwen2.5-Coder-7B-Instruct"
JSON_OUTPUT_TESTS = [
{
"name": "CVE Analysis JSON",
"prompt": "Analyze CVE-2023-44487 (HTTP/2 Rapid Reset) and provide the analysis in JSON format with fields: cve_id, severity, cvss_score, attack_vector, affected_software, description, mitigation.",
"required_json_keys": ["cve_id", "severity", "cvss_score", "attack_vector", "affected_software", "description", "mitigation"],
},
{
"name": "Vulnerability Assessment JSON",
"prompt": "Analyze this code for vulnerabilities and output as JSON with schema {vulnerabilities: [{cwe_id, severity, description, fix}], risk_score}:\\n\\n```python\\nimport pickle\\nimport base64\\ndef load_user_data(encoded_data):\\n data = base64.b64decode(encoded_data)\\n return pickle.loads(data)\\n```",
"required_json_keys": ["vulnerabilities", "risk_score"],
},
{
"name": "MITRE ATT&CK Mapping JSON",
"prompt": "Map the following observed behavior to MITRE ATT&CK framework and output as JSON:\\nObserved: A process named 'chrome_update.exe' was seen creating a scheduled task that runs every 6 hours, connecting to a .onion address via Tor, and exfiltrating clipboard data.\\nSchema: {tactics: [{tactic_id, name, technique_id, technique_name, evidence}], severity, confidence}",
"required_json_keys": ["tactics", "severity", "confidence"],
},
{
"name": "Network IDS Alert JSON",
"prompt": "Classify this network alert and output structured JSON:\\nAlert: Multiple SYN packets from 10.0.0.5 to ports 22,80,443,8080,3306,5432,6379,27017 on 192.168.1.100 within 2 seconds.\\nSchema: {alert_type, severity, source_ip, target_ip, ports_scanned, attack_classification, mitre_technique, recommended_action}",
"required_json_keys": ["alert_type", "severity", "source_ip", "target_ip", "attack_classification"],
},
{
"name": "Malware IOC Report JSON",
"prompt": "Generate a structured IOC report in JSON for this sample:\\nHash: 5f2b14dc8a32c3e4b981293f4f5e7a12\\nConnects to: evil-c2.darknet.xyz:8443\\nCreates: %APPDATA%\\\\Microsoft\\\\svchost.dll\\nRegistry: HKCU\\\\Software\\\\Microsoft\\\\Windows\\\\CurrentVersion\\\\Run\\\\WinUpdate\\nSchema: {ioc_report: {hashes: {md5}, network: [{indicator, type}], filesystem: [{path, action}], registry: [{key, value}], classification, recommendations: [str]}}",
"required_json_keys": ["ioc_report"],
},
]
CYBERSEC_KNOWLEDGE_TESTS = [
{
"name": "Buffer Overflow Explanation",
"prompt": "Explain how a stack-based buffer overflow in C can lead to arbitrary code execution. Include the role of the return address, NOP sleds, and shellcode.",
"expected_keywords": ["return address", "stack", "shellcode", "NOP", "overflow", "EIP", "RIP"],
},
{
"name": "SQL Injection Types",
"prompt": "List and briefly explain the three main types of SQL injection attacks with an example payload for each.",
"expected_keywords": ["UNION", "blind", "error-based", "time-based", "boolean", "SELECT"],
},
{
"name": "ROP Chain Concept",
"prompt": "Explain what a Return-Oriented Programming (ROP) chain is and why it's used to bypass DEP/NX protection.",
"expected_keywords": ["gadget", "return", "DEP", "NX", "executable", "stack"],
},
]
def extract_json_from_response(text):
json_blocks = re.findall(r'```json\\s*\\n(.*?)\\n```', text, re.DOTALL)
if json_blocks:
try:
return json.loads(json_blocks[0]), True
except json.JSONDecodeError:
pass
for start_char in ['{', '[']:
idx = text.find(start_char)
if idx != -1:
depth = 0
end_char = '}' if start_char == '{' else ']'
for i in range(idx, len(text)):
if text[i] == start_char:
depth += 1
elif text[i] == end_char:
depth -= 1
if depth == 0:
try:
return json.loads(text[idx:i+1]), True
except json.JSONDecodeError:
break
return None, False
def evaluate_json_output(pipe, tests):
results = []
for test in tests:
print(f"\\n Testing: {test['name']}")
messages = [
{"role": "system", "content": "You are a cybersecurity expert. Include reasoning before JSON output. Wrap JSON in ```json blocks."},
{"role": "user", "content": test["prompt"]}
]
try:
response = pipe(messages, max_new_tokens=2048, temperature=0.1, do_sample=True)
text = response[0]["generated_text"][-1]["content"]
parsed, is_valid = extract_json_from_response(text)
has_keys = False
if parsed and isinstance(parsed, dict):
has_keys = all(k in parsed for k in test["required_json_keys"])
result = {"name": test["name"], "json_valid": is_valid, "has_required_keys": has_keys, "response_length": len(text)}
results.append(result)
status = "PASS" if (is_valid and has_keys) else "FAIL"
print(f" {status} JSON valid: {is_valid}, Required keys: {has_keys}")
except Exception as e:
print(f" FAIL Error: {e}")
results.append({"name": test["name"], "json_valid": False, "has_required_keys": False, "error": str(e)})
return results
def evaluate_cybersec_knowledge(pipe, tests):
results = []
for test in tests:
print(f"\\n Testing: {test['name']}")
messages = [
{"role": "system", "content": "You are a cybersecurity expert specializing in penetration testing and exploit development."},
{"role": "user", "content": test["prompt"]}
]
try:
response = pipe(messages, max_new_tokens=1024, temperature=0.1, do_sample=True)
text = response[0]["generated_text"][-1]["content"].lower()
found = [kw for kw in test["expected_keywords"] if kw.lower() in text]
score = len(found) / len(test["expected_keywords"])
result = {"name": test["name"], "keyword_score": score, "found": found, "missing": [kw for kw in test["expected_keywords"] if kw.lower() not in text]}
results.append(result)
print(f" Score: {score:.1%} ({len(found)}/{len(test['expected_keywords'])})")
except Exception as e:
print(f" Error: {e}")
results.append({"name": test["name"], "keyword_score": 0, "error": str(e)})
return results
def main():
print("=" * 60)
print("CYBERCODER-7B EVALUATION")
print("=" * 60)
print("\\nLoading model...")
try:
pipe = pipeline("text-generation", model=MODEL_ID, torch_dtype=torch.bfloat16, device_map="auto")
except:
base_model = AutoModelForCausalLM.from_pretrained(BASE_MODEL, torch_dtype=torch.bfloat16, device_map="auto")
model = PeftModel.from_pretrained(base_model, MODEL_ID)
tokenizer = AutoTokenizer.from_pretrained(MODEL_ID)
pipe = pipeline("text-generation", model=model, tokenizer=tokenizer)
print("\\n1. JSON STRUCTURED OUTPUT")
json_results = evaluate_json_output(pipe, JSON_OUTPUT_TESTS)
json_valid = sum(1 for r in json_results if r["json_valid"]) / len(json_results)
json_keys = sum(1 for r in json_results if r["has_required_keys"]) / len(json_results)
print("\\n2. CYBERSECURITY KNOWLEDGE")
cyber_results = evaluate_cybersec_knowledge(pipe, CYBERSEC_KNOWLEDGE_TESTS)
avg_cyber = sum(r["keyword_score"] for r in cyber_results) / len(cyber_results)
print(f"\\nSUMMARY: JSON={json_valid:.0%}, Schema={json_keys:.0%}, Cyber={avg_cyber:.0%}")
results = {"json_output": {"validity": json_valid, "schema": json_keys, "details": json_results},
"cyber_knowledge": {"avg_score": avg_cyber, "details": cyber_results}}
with open("evaluation_results.json", "w") as f:
json.dump(results, f, indent=2)
if __name__ == "__main__":
main()