| """ |
| Evaluation script for CyberCoder-7B-v1 |
| |
| Tests: |
| 1. JSON structured output validity and accuracy |
| 2. Cybersecurity knowledge (CVE analysis, vulnerability detection) |
| 3. Code reasoning |
| |
| Usage: |
| pip install transformers torch peft |
| python evaluate_cybersec.py |
| """ |
|
|
| import json |
| import os |
| import re |
| import torch |
| from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline |
| from peft import PeftModel |
|
|
| MODEL_ID = "moro72842/CyberCoder-7B-v1" |
| BASE_MODEL = "Qwen/Qwen2.5-Coder-7B-Instruct" |
|
|
| JSON_OUTPUT_TESTS = [ |
| { |
| "name": "CVE Analysis JSON", |
| "prompt": "Analyze CVE-2023-44487 (HTTP/2 Rapid Reset) and provide the analysis in JSON format with fields: cve_id, severity, cvss_score, attack_vector, affected_software, description, mitigation.", |
| "required_json_keys": ["cve_id", "severity", "cvss_score", "attack_vector", "affected_software", "description", "mitigation"], |
| }, |
| { |
| "name": "Vulnerability Assessment JSON", |
| "prompt": "Analyze this code for vulnerabilities and output as JSON with schema {vulnerabilities: [{cwe_id, severity, description, fix}], risk_score}:\\n\\n```python\\nimport pickle\\nimport base64\\ndef load_user_data(encoded_data):\\n data = base64.b64decode(encoded_data)\\n return pickle.loads(data)\\n```", |
| "required_json_keys": ["vulnerabilities", "risk_score"], |
| }, |
| { |
| "name": "MITRE ATT&CK Mapping JSON", |
| "prompt": "Map the following observed behavior to MITRE ATT&CK framework and output as JSON:\\nObserved: A process named 'chrome_update.exe' was seen creating a scheduled task that runs every 6 hours, connecting to a .onion address via Tor, and exfiltrating clipboard data.\\nSchema: {tactics: [{tactic_id, name, technique_id, technique_name, evidence}], severity, confidence}", |
| "required_json_keys": ["tactics", "severity", "confidence"], |
| }, |
| { |
| "name": "Network IDS Alert JSON", |
| "prompt": "Classify this network alert and output structured JSON:\\nAlert: Multiple SYN packets from 10.0.0.5 to ports 22,80,443,8080,3306,5432,6379,27017 on 192.168.1.100 within 2 seconds.\\nSchema: {alert_type, severity, source_ip, target_ip, ports_scanned, attack_classification, mitre_technique, recommended_action}", |
| "required_json_keys": ["alert_type", "severity", "source_ip", "target_ip", "attack_classification"], |
| }, |
| { |
| "name": "Malware IOC Report JSON", |
| "prompt": "Generate a structured IOC report in JSON for this sample:\\nHash: 5f2b14dc8a32c3e4b981293f4f5e7a12\\nConnects to: evil-c2.darknet.xyz:8443\\nCreates: %APPDATA%\\\\Microsoft\\\\svchost.dll\\nRegistry: HKCU\\\\Software\\\\Microsoft\\\\Windows\\\\CurrentVersion\\\\Run\\\\WinUpdate\\nSchema: {ioc_report: {hashes: {md5}, network: [{indicator, type}], filesystem: [{path, action}], registry: [{key, value}], classification, recommendations: [str]}}", |
| "required_json_keys": ["ioc_report"], |
| }, |
| ] |
|
|
| CYBERSEC_KNOWLEDGE_TESTS = [ |
| { |
| "name": "Buffer Overflow Explanation", |
| "prompt": "Explain how a stack-based buffer overflow in C can lead to arbitrary code execution. Include the role of the return address, NOP sleds, and shellcode.", |
| "expected_keywords": ["return address", "stack", "shellcode", "NOP", "overflow", "EIP", "RIP"], |
| }, |
| { |
| "name": "SQL Injection Types", |
| "prompt": "List and briefly explain the three main types of SQL injection attacks with an example payload for each.", |
| "expected_keywords": ["UNION", "blind", "error-based", "time-based", "boolean", "SELECT"], |
| }, |
| { |
| "name": "ROP Chain Concept", |
| "prompt": "Explain what a Return-Oriented Programming (ROP) chain is and why it's used to bypass DEP/NX protection.", |
| "expected_keywords": ["gadget", "return", "DEP", "NX", "executable", "stack"], |
| }, |
| ] |
|
|
| def extract_json_from_response(text): |
| json_blocks = re.findall(r'```json\\s*\\n(.*?)\\n```', text, re.DOTALL) |
| if json_blocks: |
| try: |
| return json.loads(json_blocks[0]), True |
| except json.JSONDecodeError: |
| pass |
| for start_char in ['{', '[']: |
| idx = text.find(start_char) |
| if idx != -1: |
| depth = 0 |
| end_char = '}' if start_char == '{' else ']' |
| for i in range(idx, len(text)): |
| if text[i] == start_char: |
| depth += 1 |
| elif text[i] == end_char: |
| depth -= 1 |
| if depth == 0: |
| try: |
| return json.loads(text[idx:i+1]), True |
| except json.JSONDecodeError: |
| break |
| return None, False |
|
|
| def evaluate_json_output(pipe, tests): |
| results = [] |
| for test in tests: |
| print(f"\\n Testing: {test['name']}") |
| messages = [ |
| {"role": "system", "content": "You are a cybersecurity expert. Include reasoning before JSON output. Wrap JSON in ```json blocks."}, |
| {"role": "user", "content": test["prompt"]} |
| ] |
| try: |
| response = pipe(messages, max_new_tokens=2048, temperature=0.1, do_sample=True) |
| text = response[0]["generated_text"][-1]["content"] |
| parsed, is_valid = extract_json_from_response(text) |
| has_keys = False |
| if parsed and isinstance(parsed, dict): |
| has_keys = all(k in parsed for k in test["required_json_keys"]) |
| result = {"name": test["name"], "json_valid": is_valid, "has_required_keys": has_keys, "response_length": len(text)} |
| results.append(result) |
| status = "PASS" if (is_valid and has_keys) else "FAIL" |
| print(f" {status} JSON valid: {is_valid}, Required keys: {has_keys}") |
| except Exception as e: |
| print(f" FAIL Error: {e}") |
| results.append({"name": test["name"], "json_valid": False, "has_required_keys": False, "error": str(e)}) |
| return results |
|
|
| def evaluate_cybersec_knowledge(pipe, tests): |
| results = [] |
| for test in tests: |
| print(f"\\n Testing: {test['name']}") |
| messages = [ |
| {"role": "system", "content": "You are a cybersecurity expert specializing in penetration testing and exploit development."}, |
| {"role": "user", "content": test["prompt"]} |
| ] |
| try: |
| response = pipe(messages, max_new_tokens=1024, temperature=0.1, do_sample=True) |
| text = response[0]["generated_text"][-1]["content"].lower() |
| found = [kw for kw in test["expected_keywords"] if kw.lower() in text] |
| score = len(found) / len(test["expected_keywords"]) |
| result = {"name": test["name"], "keyword_score": score, "found": found, "missing": [kw for kw in test["expected_keywords"] if kw.lower() not in text]} |
| results.append(result) |
| print(f" Score: {score:.1%} ({len(found)}/{len(test['expected_keywords'])})") |
| except Exception as e: |
| print(f" Error: {e}") |
| results.append({"name": test["name"], "keyword_score": 0, "error": str(e)}) |
| return results |
|
|
| def main(): |
| print("=" * 60) |
| print("CYBERCODER-7B EVALUATION") |
| print("=" * 60) |
| |
| print("\\nLoading model...") |
| try: |
| pipe = pipeline("text-generation", model=MODEL_ID, torch_dtype=torch.bfloat16, device_map="auto") |
| except: |
| base_model = AutoModelForCausalLM.from_pretrained(BASE_MODEL, torch_dtype=torch.bfloat16, device_map="auto") |
| model = PeftModel.from_pretrained(base_model, MODEL_ID) |
| tokenizer = AutoTokenizer.from_pretrained(MODEL_ID) |
| pipe = pipeline("text-generation", model=model, tokenizer=tokenizer) |
| |
| print("\\n1. JSON STRUCTURED OUTPUT") |
| json_results = evaluate_json_output(pipe, JSON_OUTPUT_TESTS) |
| json_valid = sum(1 for r in json_results if r["json_valid"]) / len(json_results) |
| json_keys = sum(1 for r in json_results if r["has_required_keys"]) / len(json_results) |
| |
| print("\\n2. CYBERSECURITY KNOWLEDGE") |
| cyber_results = evaluate_cybersec_knowledge(pipe, CYBERSEC_KNOWLEDGE_TESTS) |
| avg_cyber = sum(r["keyword_score"] for r in cyber_results) / len(cyber_results) |
| |
| print(f"\\nSUMMARY: JSON={json_valid:.0%}, Schema={json_keys:.0%}, Cyber={avg_cyber:.0%}") |
| |
| results = {"json_output": {"validity": json_valid, "schema": json_keys, "details": json_results}, |
| "cyber_knowledge": {"avg_score": avg_cyber, "details": cyber_results}} |
| with open("evaluation_results.json", "w") as f: |
| json.dump(results, f, indent=2) |
|
|
| if __name__ == "__main__": |
| main() |
|
|