File size: 6,062 Bytes
38c9982
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
200a73b
 
 
 
 
 
 
 
 
 
 
 
38c9982
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
200a73b
38c9982
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
200a73b
 
 
 
 
 
 
 
 
 
 
38c9982
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
from __future__ import annotations

from src.executive_assistant.graders import grade_easy, grade_hard, grade_medium
from src.executive_assistant.models import (
    AssistantAction,
    EmailDetail,
    EmailSummary,
    FileSearchResult,
    TaskReward,
    WorkspaceObservation,
)
from src.executive_assistant.seeds import TASK_SEEDS
from src.executive_assistant.workspace import MockWorkspace


class ExecutiveAssistantEnv:
    def __init__(self, task_name: str = "easy_deadline_extraction") -> None:
        self.task_name = task_name
        self.workspace = MockWorkspace()
        self.last_action_status = "environment initialized"
        self.current_email: EmailDetail | None = None
        self.search_results: list[FileSearchResult] = []
        self.step_count = 0
        self.max_steps = 12

    def reset(self) -> WorkspaceObservation:
        self.workspace = MockWorkspace()
        seed = TASK_SEEDS[self.task_name]
        self.workspace.seed(seed.get("emails", []), seed.get("files", []))
        self.last_action_status = f"scenario reset: {self.task_name}"
        self.current_email = None
        self.search_results = []
        self.step_count = 0
        return self.observe()

    def state(self) -> dict[str, object]:
        return {
            "task_name": self.task_name,
            "step_count": self.step_count,
            "max_steps": self.max_steps,
            "last_action_status": self.last_action_status,
            "current_email": self.current_email.model_dump() if self.current_email else None,
            "search_results": [result.model_dump() for result in self.search_results],
            "observation": self.observe().model_dump(),
            "workspace": self.workspace.snapshot(),
        }

    def observe(self) -> WorkspaceObservation:
        unread = [
            EmailSummary(
                id=row["id"],
                sender=row["sender"],
                subject=row["subject"],
                snippet=row["snippet"],
            )
            for row in self.workspace.get_unread_emails()
        ]
        todos = [row["task_name"] for row in self.workspace.list_todos()]
        recent_actions = [
            f"{row['action_type']}: {row['status']}"
            for row in reversed(self.workspace.list_recent_actions(limit=6))
        ]
        return WorkspaceObservation(
            current_time="2026-04-04T10:00:00Z",
            unread_emails=unread,
            active_todos=todos,
            last_action_status=self.last_action_status,
            current_email=self.current_email,
            search_results=self.search_results,
            action_history=recent_actions,
        )

    def step(self, action: AssistantAction) -> tuple[WorkspaceObservation, TaskReward, bool, dict[str, object]]:
        self.step_count += 1
        if action.action_type == "read_email" and action.target_id is not None:
            row = self.workspace.read_email(action.target_id)
            self.current_email = EmailDetail(**dict(row)) if row else None
            self.last_action_status = "email read" if row else "email not found"
        elif action.action_type == "reply" and action.target_id is not None and action.payload:
            self.last_action_status = self.workspace.send_reply(action.target_id, action.payload)
        elif (
            action.action_type == "forward"
            and action.target_id is not None
            and action.secondary_payload
        ):
            self.last_action_status = self.workspace.forward_email(
                action.target_id,
                action.secondary_payload,
                action.payload,
            )
        elif action.action_type == "add_todo" and action.payload:
            self.last_action_status = self.workspace.create_todo(
                task_name=action.payload,
                deadline_date=action.secondary_payload,
                context=(
                    f"Created from email {self.current_email.id}: {self.current_email.subject}"
                    if self.current_email
                    else f"Created from task {self.task_name}"
                ),
            )
        elif action.action_type == "archive" and action.target_id is not None:
            self.last_action_status = self.workspace.archive_email(action.target_id)
        elif action.action_type == "search_files" and action.payload:
            results = self.workspace.search_documents(action.payload)
            self.search_results = [
                FileSearchResult(
                    id=row["id"],
                    filename=row["filename"],
                    snippet=row["content_text"][:160],
                )
                for row in results
            ]
            self.last_action_status = f"search returned {len(results)} file(s)"
        else:
            self.last_action_status = "invalid action payload"

        observation = self.observe()
        reward = self.grade()
        if self.step_count >= self.max_steps and not reward.is_done:
            reward = TaskReward(
                step_reward=reward.step_reward,
                total_score=reward.total_score,
                is_done=True,
                reasoning=f"{reward.reasoning}; terminated at step budget",
            )
        done = reward.is_done
        info = {
            "task_name": self.task_name,
            "step_count": self.step_count,
            "max_steps": self.max_steps,
            "status": self.last_action_status,
            "reasoning": reward.reasoning,
            "total_score": reward.total_score,
            "state": self.state(),
        }
        return observation, reward, done, info

    def grade(self) -> TaskReward:
        if self.task_name == "easy_deadline_extraction":
            return grade_easy(self.workspace)
        if self.task_name == "medium_triage_and_negotiation":
            return grade_medium(self.workspace)
        if self.task_name == "hard_rag_reply":
            return grade_hard(self.workspace)
        return TaskReward(reasoning="No grader configured")