security-audit-env / inference.py
anshumanatrey's picture
Restore score= in [END] stdout per updated spec
1118dd7 verified
"""
Security Audit Environment β€” Baseline Inference Script
=======================================================
MANDATORY for hackathon submission.
Uses OpenAI Client to run an LLM agent against the security audit
environment. Reads API credentials from environment variables.
ENV VARS (required):
API_BASE_URL β€” The API endpoint for the LLM
MODEL_NAME β€” The model identifier to use
HF_TOKEN β€” Your Hugging Face / API key
"""
import json
import os
import re
import sys
import textwrap
from typing import Any, Dict, List, Optional
from openai import OpenAI
# --- ENV VARS ---
API_BASE_URL = os.getenv("API_BASE_URL", "https://router.huggingface.co/v1")
MODEL_NAME = os.getenv("MODEL_NAME", "meta-llama/Llama-3.3-70B-Instruct")
HF_TOKEN = os.getenv("HF_TOKEN")
if HF_TOKEN is None:
raise ValueError("HF_TOKEN environment variable is required")
# --- CONFIG ---
SCENARIO_MAX_STEPS = {"easy": 25, "medium": 35, "hard": 45}
TEMPERATURE = 0.1
MAX_TOKENS = 1024
SCENARIOS = ["easy", "medium", "hard"]
ENV_NAME = "security_audit_env"
# --- SYSTEM PROMPT ---
SYSTEM_PROMPT = textwrap.dedent("""\
You are a professional security auditor. You interact with the environment using JSON actions.
ACTIONS (respond with exactly ONE JSON object, no other text):
1. {"action_type": "use_tool", "tool_name": "TOOL", "arguments": {...}}
Tools: network_scan (target: CIDR), web_crawl (host: IP), test_injection (host, endpoint),
test_xss (host, endpoint), test_auth (host), test_config (host), test_crypto (host),
check_secrets (host, endpoint), vulnerability_scan (host), service_fingerprint (host)
2. {"action_type": "submit_finding", "arguments": {"title": "...", "host": "IP",
"type": "Vuln Type", "severity": "Critical|High|Medium|Low", "cvss_score": 9.8,
"cwe": "CWE-XXX", "owasp": "AXX:2021 - ...", "endpoint": "/path",
"evidence": "...", "remediation": "..."}}
3. {"action_type": "generate_report"} (call this when done to get your score)
STRICT WORKFLOW β€” follow this order, do NOT repeat steps:
Phase 1: network_scan the target CIDR (do this ONCE, never again)
Phase 2: web_crawl each discovered host (once per host)
Phase 3: For each endpoint found, run test_injection, test_xss, check_secrets.
For each host, run test_auth, test_config, test_crypto, vulnerability_scan.
Phase 4: For EVERY anomaly or issue in tool output, submit_finding with your assessment.
You MUST infer the vulnerability type, CWE, CVSS, and severity from the evidence.
Phase 5: generate_report
CRITICAL RULES:
- NEVER run network_scan or service_fingerprint more than once.
- After web_crawl, immediately start testing endpoints β€” do NOT re-scan.
- When tool output shows anomalies (unusual HTTP responses, errors, data leaks), ALWAYS submit a finding.
- You are scored on findings submitted, not on tools run. Running tools without submitting findings = 0 score.
""").strip()
def parse_action(response_text: str) -> Optional[Dict[str, Any]]:
"""Extract a JSON action from the LLM's response."""
if not response_text:
return None
text = response_text.strip()
text = re.sub(r"```json\s*", "", text)
text = re.sub(r"```\s*$", "", text)
text = text.strip()
try:
return json.loads(text)
except json.JSONDecodeError:
pass
match = re.search(r"\{[^{}]*(?:\{[^{}]*\}[^{}]*)*\}", text, re.DOTALL)
if match:
try:
return json.loads(match.group(0))
except json.JSONDecodeError:
pass
return None
def build_prompt(step: int, observation: Any, history: List[str], max_steps: int = 30) -> str:
"""Build user prompt from current observation and history."""
parts = [f"[Step {step}/{max_steps}]"]
if hasattr(observation, "tool_output") and observation.tool_output:
output = observation.tool_output
if len(output) > 2000:
output = output[:2000] + "\n... (truncated)"
parts.append(f"\nTool Output:\n{output}")
if hasattr(observation, "message") and observation.message:
parts.append(f"\nMessage: {observation.message}")
hosts = []
if hasattr(observation, "discovered_hosts") and observation.discovered_hosts:
hosts = observation.discovered_hosts
parts.append(f"\nDiscovered Hosts: {', '.join(hosts)}")
findings = 0
if hasattr(observation, "findings_submitted"):
findings = observation.findings_submitted
parts.append(f"Findings Submitted: {findings}")
if hasattr(observation, "steps_remaining"):
parts.append(f"Steps Remaining: {observation.steps_remaining}")
if history:
parts.append(f"\nRecent Actions:\n" + "\n".join(history[-8:]))
has_scanned = any("network_scan" in h for h in history)
has_crawled = any("web_crawl" in h for h in history)
has_tested = any(t in " ".join(history) for t in ["test_injection", "test_xss", "test_auth", "test_config"])
if not has_scanned:
parts.append("\n>> Phase 1: Run network_scan on the target CIDR now.")
elif not has_crawled and hosts:
parts.append(f"\n>> Phase 2: Run web_crawl on each host: {', '.join(hosts)}")
elif has_crawled and not has_tested:
parts.append("\n>> Phase 3: Test endpoints with test_injection, test_xss, test_auth, test_config, test_crypto, check_secrets, vulnerability_scan.")
elif has_tested and findings == 0:
parts.append("\n>> Phase 4: You MUST submit_finding for any anomalies detected. Review tool output and submit findings NOW.")
elif step >= max_steps - 2:
parts.append("\n>> Phase 5: Time is almost up. Run generate_report NOW.")
parts.append("\nRespond with a single JSON action.")
return "\n".join(parts)
def run_scenario(client: OpenAI, scenario_id: str, env_url: str) -> float:
"""Run the agent on one scenario and return the final score."""
from security_audit_env import SecurityAuditEnv, SecurityAuditAction
max_steps = SCENARIO_MAX_STEPS.get(scenario_id, 30)
print(f"\n{'='*60}")
print(f"Running scenario: {scenario_id} (max {max_steps} steps)")
print(f"{'='*60}")
# --- MANDATORY STDOUT: [START] ---
print(f"[START] task={scenario_id} env={ENV_NAME} model={MODEL_NAME}", flush=True)
all_rewards: List[float] = []
final_score = 0.0
total_steps = 0
success = False
last_error = None
try:
with SecurityAuditEnv(base_url=env_url).sync() as env:
result = env.reset(scenario_id=scenario_id)
observation = result.observation
history: List[str] = []
for step in range(1, max_steps + 1):
if result.done:
break
prompt = build_prompt(step, observation, history, max_steps=max_steps)
messages = [
{"role": "system", "content": SYSTEM_PROMPT},
{"role": "user", "content": prompt},
]
last_error = None
try:
completion = client.chat.completions.create(
model=MODEL_NAME,
messages=messages,
temperature=TEMPERATURE,
max_tokens=MAX_TOKENS,
stream=False,
)
response_text = completion.choices[0].message.content or ""
except Exception as exc:
last_error = str(exc)
response_text = '{"action_type": "list_tools"}'
action_dict = parse_action(response_text)
if not action_dict:
last_error = "Could not parse LLM response as JSON"
action_dict = {"action_type": "list_tools"}
action_type = action_dict.get("action_type", "list_tools")
tool_name = action_dict.get("tool_name")
arguments = action_dict.get("arguments", {})
action_str = action_type
if tool_name:
action_str += f"({tool_name})"
try:
action = SecurityAuditAction(
action_type=action_type,
tool_name=tool_name,
arguments=arguments,
)
result = env.step(action)
observation = result.observation
last_error = None
except Exception as exc:
last_error = str(exc)
reward = 0.0
all_rewards.append(reward)
total_steps = step
# --- MANDATORY STDOUT: [STEP] ---
error_str = last_error.replace("\n", " ") if last_error else "null"
print(f"[STEP] step={step} action={action_str} reward={reward:.2f} done=false error={error_str}", flush=True)
break
reward = result.reward or 0.0
all_rewards.append(reward)
total_steps = step
history.append(f"Step {step}: {action_str} β†’ reward {reward:+.2f}")
# --- MANDATORY STDOUT: [STEP] ---
done_str = "true" if result.done else "false"
error_str = last_error.replace("\n", " ") if last_error else "null"
print(f"[STEP] step={step} action={action_str} reward={reward:.2f} done={done_str} error={error_str}", flush=True)
if result.done:
grades = getattr(observation, "metadata", {}) or {}
grades = grades.get("grades", {})
final_score = grades.get("final_score", reward)
success = final_score > 0
break
else:
# Didn't finish β€” force report generation
try:
action = SecurityAuditAction(action_type="generate_report")
result = env.step(action)
reward = result.reward or 0.0
all_rewards.append(reward)
total_steps += 1
done_str = "true" if result.done else "false"
print(f"[STEP] step={total_steps} action=generate_report reward={reward:.2f} done={done_str} error=null", flush=True)
grades = getattr(result.observation, "metadata", {}) or {}
grades = grades.get("grades", {})
final_score = grades.get("final_score", 0.0)
success = final_score > 0
except Exception as exc:
final_score = 0.0
last_error = str(exc)
except Exception as exc:
last_error = str(exc)
finally:
# --- MANDATORY STDOUT: [END] (always emitted, even on exception) ---
rewards_str = ",".join(f"{r:.2f}" for r in all_rewards)
success_str = "true" if success else "false"
print(f"[END] success={success_str} steps={total_steps} score={final_score:.2f} rewards={rewards_str}", flush=True)
return final_score
def main():
"""Run baseline inference across all scenarios."""
print("Security Audit Environment β€” Baseline Inference")
print(f"API: {API_BASE_URL}")
print(f"Model: {MODEL_NAME}")
llm_client = OpenAI(base_url=API_BASE_URL, api_key=HF_TOKEN)
env_url = os.getenv("ENV_URL", "http://localhost:8000")
scores = {}
for scenario_id in SCENARIOS:
try:
score = run_scenario(llm_client, scenario_id, env_url)
scores[scenario_id] = score
except Exception as exc:
print(f" ERROR on {scenario_id}: {exc}")
scores[scenario_id] = 0.0
print(f"\n{'='*60}")
print("BASELINE SCORES")
print(f"{'='*60}")
for sid, score in scores.items():
print(f" {sid:10s}: {score:.4f}")
avg = sum(scores.values()) / len(scores) if scores else 0.0
print(f" {'average':10s}: {avg:.4f}")
print(f"{'='*60}")
if __name__ == "__main__":
main()