""" Evaluation script for CyberCoder-7B-v1 Tests: 1. JSON structured output validity and accuracy 2. Cybersecurity knowledge (CVE analysis, vulnerability detection) 3. Code reasoning Usage: pip install transformers torch peft python evaluate_cybersec.py """ import json import os import re import torch from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline from peft import PeftModel MODEL_ID = "moro72842/CyberCoder-7B-v1" BASE_MODEL = "Qwen/Qwen2.5-Coder-7B-Instruct" JSON_OUTPUT_TESTS = [ { "name": "CVE Analysis JSON", "prompt": "Analyze CVE-2023-44487 (HTTP/2 Rapid Reset) and provide the analysis in JSON format with fields: cve_id, severity, cvss_score, attack_vector, affected_software, description, mitigation.", "required_json_keys": ["cve_id", "severity", "cvss_score", "attack_vector", "affected_software", "description", "mitigation"], }, { "name": "Vulnerability Assessment JSON", "prompt": "Analyze this code for vulnerabilities and output as JSON with schema {vulnerabilities: [{cwe_id, severity, description, fix}], risk_score}:\\n\\n```python\\nimport pickle\\nimport base64\\ndef load_user_data(encoded_data):\\n data = base64.b64decode(encoded_data)\\n return pickle.loads(data)\\n```", "required_json_keys": ["vulnerabilities", "risk_score"], }, { "name": "MITRE ATT&CK Mapping JSON", "prompt": "Map the following observed behavior to MITRE ATT&CK framework and output as JSON:\\nObserved: A process named 'chrome_update.exe' was seen creating a scheduled task that runs every 6 hours, connecting to a .onion address via Tor, and exfiltrating clipboard data.\\nSchema: {tactics: [{tactic_id, name, technique_id, technique_name, evidence}], severity, confidence}", "required_json_keys": ["tactics", "severity", "confidence"], }, { "name": "Network IDS Alert JSON", "prompt": "Classify this network alert and output structured JSON:\\nAlert: Multiple SYN packets from 10.0.0.5 to ports 22,80,443,8080,3306,5432,6379,27017 on 192.168.1.100 within 2 seconds.\\nSchema: {alert_type, severity, source_ip, target_ip, ports_scanned, attack_classification, mitre_technique, recommended_action}", "required_json_keys": ["alert_type", "severity", "source_ip", "target_ip", "attack_classification"], }, { "name": "Malware IOC Report JSON", "prompt": "Generate a structured IOC report in JSON for this sample:\\nHash: 5f2b14dc8a32c3e4b981293f4f5e7a12\\nConnects to: evil-c2.darknet.xyz:8443\\nCreates: %APPDATA%\\\\Microsoft\\\\svchost.dll\\nRegistry: HKCU\\\\Software\\\\Microsoft\\\\Windows\\\\CurrentVersion\\\\Run\\\\WinUpdate\\nSchema: {ioc_report: {hashes: {md5}, network: [{indicator, type}], filesystem: [{path, action}], registry: [{key, value}], classification, recommendations: [str]}}", "required_json_keys": ["ioc_report"], }, ] CYBERSEC_KNOWLEDGE_TESTS = [ { "name": "Buffer Overflow Explanation", "prompt": "Explain how a stack-based buffer overflow in C can lead to arbitrary code execution. Include the role of the return address, NOP sleds, and shellcode.", "expected_keywords": ["return address", "stack", "shellcode", "NOP", "overflow", "EIP", "RIP"], }, { "name": "SQL Injection Types", "prompt": "List and briefly explain the three main types of SQL injection attacks with an example payload for each.", "expected_keywords": ["UNION", "blind", "error-based", "time-based", "boolean", "SELECT"], }, { "name": "ROP Chain Concept", "prompt": "Explain what a Return-Oriented Programming (ROP) chain is and why it's used to bypass DEP/NX protection.", "expected_keywords": ["gadget", "return", "DEP", "NX", "executable", "stack"], }, ] def extract_json_from_response(text): json_blocks = re.findall(r'```json\\s*\\n(.*?)\\n```', text, re.DOTALL) if json_blocks: try: return json.loads(json_blocks[0]), True except json.JSONDecodeError: pass for start_char in ['{', '[']: idx = text.find(start_char) if idx != -1: depth = 0 end_char = '}' if start_char == '{' else ']' for i in range(idx, len(text)): if text[i] == start_char: depth += 1 elif text[i] == end_char: depth -= 1 if depth == 0: try: return json.loads(text[idx:i+1]), True except json.JSONDecodeError: break return None, False def evaluate_json_output(pipe, tests): results = [] for test in tests: print(f"\\n Testing: {test['name']}") messages = [ {"role": "system", "content": "You are a cybersecurity expert. Include reasoning before JSON output. Wrap JSON in ```json blocks."}, {"role": "user", "content": test["prompt"]} ] try: response = pipe(messages, max_new_tokens=2048, temperature=0.1, do_sample=True) text = response[0]["generated_text"][-1]["content"] parsed, is_valid = extract_json_from_response(text) has_keys = False if parsed and isinstance(parsed, dict): has_keys = all(k in parsed for k in test["required_json_keys"]) result = {"name": test["name"], "json_valid": is_valid, "has_required_keys": has_keys, "response_length": len(text)} results.append(result) status = "PASS" if (is_valid and has_keys) else "FAIL" print(f" {status} JSON valid: {is_valid}, Required keys: {has_keys}") except Exception as e: print(f" FAIL Error: {e}") results.append({"name": test["name"], "json_valid": False, "has_required_keys": False, "error": str(e)}) return results def evaluate_cybersec_knowledge(pipe, tests): results = [] for test in tests: print(f"\\n Testing: {test['name']}") messages = [ {"role": "system", "content": "You are a cybersecurity expert specializing in penetration testing and exploit development."}, {"role": "user", "content": test["prompt"]} ] try: response = pipe(messages, max_new_tokens=1024, temperature=0.1, do_sample=True) text = response[0]["generated_text"][-1]["content"].lower() found = [kw for kw in test["expected_keywords"] if kw.lower() in text] score = len(found) / len(test["expected_keywords"]) result = {"name": test["name"], "keyword_score": score, "found": found, "missing": [kw for kw in test["expected_keywords"] if kw.lower() not in text]} results.append(result) print(f" Score: {score:.1%} ({len(found)}/{len(test['expected_keywords'])})") except Exception as e: print(f" Error: {e}") results.append({"name": test["name"], "keyword_score": 0, "error": str(e)}) return results def main(): print("=" * 60) print("CYBERCODER-7B EVALUATION") print("=" * 60) print("\\nLoading model...") try: pipe = pipeline("text-generation", model=MODEL_ID, torch_dtype=torch.bfloat16, device_map="auto") except: base_model = AutoModelForCausalLM.from_pretrained(BASE_MODEL, torch_dtype=torch.bfloat16, device_map="auto") model = PeftModel.from_pretrained(base_model, MODEL_ID) tokenizer = AutoTokenizer.from_pretrained(MODEL_ID) pipe = pipeline("text-generation", model=model, tokenizer=tokenizer) print("\\n1. JSON STRUCTURED OUTPUT") json_results = evaluate_json_output(pipe, JSON_OUTPUT_TESTS) json_valid = sum(1 for r in json_results if r["json_valid"]) / len(json_results) json_keys = sum(1 for r in json_results if r["has_required_keys"]) / len(json_results) print("\\n2. CYBERSECURITY KNOWLEDGE") cyber_results = evaluate_cybersec_knowledge(pipe, CYBERSEC_KNOWLEDGE_TESTS) avg_cyber = sum(r["keyword_score"] for r in cyber_results) / len(cyber_results) print(f"\\nSUMMARY: JSON={json_valid:.0%}, Schema={json_keys:.0%}, Cyber={avg_cyber:.0%}") results = {"json_output": {"validity": json_valid, "schema": json_keys, "details": json_results}, "cyber_knowledge": {"avg_score": avg_cyber, "details": cyber_results}} with open("evaluation_results.json", "w") as f: json.dump(results, f, indent=2) if __name__ == "__main__": main()