File size: 1,853 Bytes
8c391c7
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
from __future__ import annotations

from typing import Dict

try:
    from core.client_types import StepResult
    from core.env_client import EnvClient
except ImportError:
    from openenv.core.client_types import StepResult
    from openenv.core.env_client import EnvClient

try:
    from .models import CodeSecurityAction, CodeSecurityObservation, CodeSecurityState
except ImportError:
    from models import CodeSecurityAction, CodeSecurityObservation, CodeSecurityState


class CodeSecurityAuditorEnv(
    EnvClient[CodeSecurityAction, CodeSecurityObservation, CodeSecurityState]
):
    """Client wrapper for the Code Security Auditor environment server."""

    def _step_payload(self, action: CodeSecurityAction) -> dict:
        payload = {
            "action_type": action.action_type,
            "confidence": action.confidence,
            "evidence": action.evidence,
            "summary": action.summary,
        }
        if action.filename is not None:
            payload["filename"] = action.filename
        if action.line_start is not None:
            payload["line_start"] = action.line_start
        if action.line_end is not None:
            payload["line_end"] = action.line_end
        if action.vuln_type is not None:
            payload["vuln_type"] = action.vuln_type
        if action.severity is not None:
            payload["severity"] = action.severity
        return payload

    def _parse_result(self, payload: Dict) -> StepResult[CodeSecurityObservation]:
        observation = CodeSecurityObservation(**payload.get("observation", {}))
        return StepResult(
            observation=observation,
            reward=payload.get("reward"),
            done=bool(payload.get("done", False)),
        )

    def _parse_state(self, payload: Dict) -> CodeSecurityState:
        return CodeSecurityState(**payload)