File size: 4,760 Bytes
2b050c2
 
 
 
 
 
 
 
 
 
 
 
 
c2e63f8
2b050c2
 
 
 
c2e63f8
2b050c2
 
 
 
 
 
 
 
 
 
 
 
 
 
 
2e8f87e
 
 
 
 
 
 
2b050c2
 
 
 
 
 
 
 
2e8f87e
 
 
 
 
 
 
 
 
2b050c2
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
9604368
 
 
 
 
2b050c2
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
import json
import os

from dotenv import load_dotenv

load_dotenv()
from openai import OpenAI

from environment import Action, VulnPatchEnv

# Strict Environment Variables required by the hackathon rubric
API_BASE_URL = os.getenv("API_BASE_URL", "https://router.huggingface.co/v1")
MODEL_NAME = os.getenv("MODEL_NAME", "meta-llama/Llama-3.3-70B-Instruct")
HF_TOKEN = os.getenv("HF_TOKEN")

if not HF_TOKEN:
    print("WARNING: HF_TOKEN is missing. API calls will fail.", flush=True)

client = OpenAI(base_url=API_BASE_URL, api_key=HF_TOKEN)


def run_episode(task_name: str):
    env = VulnPatchEnv(task=task_name)
    obs = env.reset()

    # REQUIRED [START] line
    print(f"[START] task={task_name} env=vuln-patch-env model={MODEL_NAME}", flush=True)

    done = False
    rewards = []

    try:
        while not done:
            prompt = (
                "Task: Analyze the provided Python code snippet for Supply Chain compromises and securely patch it.\n"
                "If you need hints, you can use the Action 'run_scan' to run a static analyzer.\n"
                "When you identify the vulnerability, use 'submit_patch' and provide the entirely resolved codebase in 'patched_code'.\n"
                f"Observation: {obs.model_dump_json()}\n\n"
                "Available Action JSON Schemas:\n"
                "1. Gather info: {\"action_type\": \"run_scan\"}\n"
                "2. Submit fix: {\"action_type\": \"submit_patch\", \"patched_code\": \"<full patched file>\"}"
            )

            try:
                response = client.chat.completions.create(
                    model=MODEL_NAME,
                    messages=[
                        {
                            "role": "system",
                            "content": (
                                "You are an elite Software Composition Analysis (SCA) and Supply Chain Security Agent.\n"
                                "Your objective is to identify and auto-remediate critical supply chain vulnerabilities in Python packages.\n"
                                "You must be vigilant for hardcoded API tokens, insecure configuration deserialization (e.g., yaml.load), "
                                "and CI/CD command injections in build scripts.\n"
                                "Your proposed fixes MUST use secure functional equivalents, such as os.getenv, yaml.safe_load, "
                                "and isolated subprocess array execution.\n"
                                "Always output valid JSON."
                            ),
                        },
                        {"role": "user", "content": prompt},
                    ],
                    response_format={"type": "json_object"},
                    temperature=0.0,  # Deterministic LLM response
                    timeout=60,  # Prevent indefinite hang on slow API
                )
                raw_content = response.choices[0].message.content
                action_data = json.loads(raw_content)
                action = Action(**action_data)
                error_msg = "null"
            except Exception as e:
                action = Action(action_type="error", patched_code="")
                error_msg = str(e).replace("\n", " ")

            # Step the environment
            obs, reward_obj, done, info = env.step(action)
            reward = reward_obj.value
            rewards.append(reward)

            # Use environment's info.error if set, else fall back to LLM error, else null
            env_error = info.error if info.error else None
            step_error = env_error or (error_msg if error_msg != "null" else None)
            step_error_str = step_error if step_error else "null"

            # REQUIRED [STEP] line (no newlines, 2 decimal places, lowercase bools)
            action_safe_str = f"{action.action_type}()"
            done_str = "true" if done else "false"
            print(
                f"[STEP] step={env.step_count} action={action_safe_str} reward={reward:.2f} done={done_str} error={step_error_str}",
                flush=True,
            )

    finally:
        env.close()
        # REQUIRED [END] line — always emitted even on exception, score to 2 decimal places
        score = rewards[-1] if rewards else 0.01
        score = min(
            max(score, 0.01), 0.99
        )  # Strictly within (0, 1) — exclusive of 0 and 1

        success_str = "true" if score >= 0.8 else "false"
        rewards_str = ",".join([f"{r:.2f}" for r in rewards])
        print(
            f"[END] success={success_str} steps={env.step_count} score={score:.2f} rewards={rewards_str}",
            flush=True,
        )


if __name__ == "__main__":
    for t in ["easy", "medium", "hard"]:
        run_episode(t)