File size: 4,336 Bytes
38c9982
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
200a73b
38c9982
 
 
 
 
200a73b
38c9982
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
from __future__ import annotations

import json
from dataclasses import asdict, dataclass
from pathlib import Path
from typing import Protocol

from src.executive_assistant.env import ExecutiveAssistantEnv
from src.executive_assistant.models import AssistantAction, PolicyDecision, TaskReward, WorkspaceObservation


class AssistantPolicy(Protocol):
    def choose_action(self, task_name: str, observation: WorkspaceObservation) -> PolicyDecision:
        ...


@dataclass(frozen=True)
class EpisodeStepRecord:
    step_index: int
    reasoning: str
    action: dict[str, object]
    observation: dict[str, object]
    snapshot: dict[str, object]
    reward: dict[str, object]
    status: str


@dataclass(frozen=True)
class EpisodeTrace:
    task_name: str
    policy_name: str
    steps: list[EpisodeStepRecord]
    final_score: float
    completed: bool
    termination_reason: str

    def to_dict(self) -> dict[str, object]:
        return {
            "task_name": self.task_name,
            "policy_name": self.policy_name,
            "steps": [asdict(step) for step in self.steps],
            "final_score": self.final_score,
            "completed": self.completed,
            "termination_reason": self.termination_reason,
        }


class EpisodeRunner:
    def __init__(self, policy: AssistantPolicy, max_steps: int = 12) -> None:
        self.policy = policy
        self.max_steps = max_steps

    def initialize(self, task_name: str) -> tuple[ExecutiveAssistantEnv, WorkspaceObservation]:
        """Load environment state and generate the initial observation."""
        env = ExecutiveAssistantEnv(task_name=task_name)
        env.max_steps = self.max_steps
        observation = env.reset()
        return env, observation

    def advance(
        self,
        task_name: str,
        env: ExecutiveAssistantEnv,
        observation: WorkspaceObservation,
    ) -> tuple[PolicyDecision, WorkspaceObservation, TaskReward, EpisodeStepRecord]:
        """
        Execute one full agent workflow step:
        1. Send observation to policy
        2. Receive structured action
        3. Execute action in workspace
        4. Update state and capture the resulting trace record
        """
        decision = self.policy.choose_action(task_name, observation)
        next_observation, reward, done, info = env.step(decision.action)
        record = EpisodeStepRecord(
            step_index=env.step_count,
            reasoning=decision.reasoning,
            action=decision.action.model_dump(),
            observation=next_observation.model_dump(),
            snapshot=info["state"]["workspace"],
            reward=reward.model_dump(),
            status=next_observation.last_action_status,
        )
        return decision, next_observation, reward, record

    def run(self, task_name: str) -> EpisodeTrace:
        """
        Agent workflow loop:
        1. Load environment state
        2. Generate observation
        3. Send to policy/LLM
        4. Receive structured action
        5. Execute action in workspace
        6. Update state
        7. Repeat until task complete
        """
        env, observation = self.initialize(task_name)
        steps: list[EpisodeStepRecord] = []

        while True:
            _, observation, reward, record = self.advance(task_name, env, observation)
            steps.append(record)
            if reward.is_done:
                return EpisodeTrace(
                    task_name=task_name,
                    policy_name=type(self.policy).__name__,
                    steps=steps,
                    final_score=reward.total_score,
                    completed=reward.total_score >= 1.0,
                    termination_reason=reward.reasoning,
                )


def run_policy_suite(
    policy: AssistantPolicy,
    task_names: list[str],
    max_steps: int = 12,
) -> dict[str, EpisodeTrace]:
    runner = EpisodeRunner(policy=policy, max_steps=max_steps)
    return {task_name: runner.run(task_name) for task_name in task_names}


def export_traces_jsonl(traces: list[EpisodeTrace], output_path: str | Path) -> Path:
    path = Path(output_path)
    path.parent.mkdir(parents=True, exist_ok=True)
    lines = [json.dumps(trace.to_dict()) for trace in traces]
    path.write_text("\n".join(lines) + ("\n" if lines else ""))
    return path