Flickinshots's picture
Deploy Project Epsilon Space bundle
200a73b verified
from __future__ import annotations
from src.executive_assistant.graders import grade_easy, grade_hard, grade_medium
from src.executive_assistant.models import (
AssistantAction,
EmailDetail,
EmailSummary,
FileSearchResult,
TaskReward,
WorkspaceObservation,
)
from src.executive_assistant.seeds import TASK_SEEDS
from src.executive_assistant.workspace import MockWorkspace
class ExecutiveAssistantEnv:
def __init__(self, task_name: str = "easy_deadline_extraction") -> None:
self.task_name = task_name
self.workspace = MockWorkspace()
self.last_action_status = "environment initialized"
self.current_email: EmailDetail | None = None
self.search_results: list[FileSearchResult] = []
self.step_count = 0
self.max_steps = 12
def reset(self) -> WorkspaceObservation:
self.workspace = MockWorkspace()
seed = TASK_SEEDS[self.task_name]
self.workspace.seed(seed.get("emails", []), seed.get("files", []))
self.last_action_status = f"scenario reset: {self.task_name}"
self.current_email = None
self.search_results = []
self.step_count = 0
return self.observe()
def state(self) -> dict[str, object]:
return {
"task_name": self.task_name,
"step_count": self.step_count,
"max_steps": self.max_steps,
"last_action_status": self.last_action_status,
"current_email": self.current_email.model_dump() if self.current_email else None,
"search_results": [result.model_dump() for result in self.search_results],
"observation": self.observe().model_dump(),
"workspace": self.workspace.snapshot(),
}
def observe(self) -> WorkspaceObservation:
unread = [
EmailSummary(
id=row["id"],
sender=row["sender"],
subject=row["subject"],
snippet=row["snippet"],
)
for row in self.workspace.get_unread_emails()
]
todos = [row["task_name"] for row in self.workspace.list_todos()]
recent_actions = [
f"{row['action_type']}: {row['status']}"
for row in reversed(self.workspace.list_recent_actions(limit=6))
]
return WorkspaceObservation(
current_time="2026-04-04T10:00:00Z",
unread_emails=unread,
active_todos=todos,
last_action_status=self.last_action_status,
current_email=self.current_email,
search_results=self.search_results,
action_history=recent_actions,
)
def step(self, action: AssistantAction) -> tuple[WorkspaceObservation, TaskReward, bool, dict[str, object]]:
self.step_count += 1
if action.action_type == "read_email" and action.target_id is not None:
row = self.workspace.read_email(action.target_id)
self.current_email = EmailDetail(**dict(row)) if row else None
self.last_action_status = "email read" if row else "email not found"
elif action.action_type == "reply" and action.target_id is not None and action.payload:
self.last_action_status = self.workspace.send_reply(action.target_id, action.payload)
elif (
action.action_type == "forward"
and action.target_id is not None
and action.secondary_payload
):
self.last_action_status = self.workspace.forward_email(
action.target_id,
action.secondary_payload,
action.payload,
)
elif action.action_type == "add_todo" and action.payload:
self.last_action_status = self.workspace.create_todo(
task_name=action.payload,
deadline_date=action.secondary_payload,
context=(
f"Created from email {self.current_email.id}: {self.current_email.subject}"
if self.current_email
else f"Created from task {self.task_name}"
),
)
elif action.action_type == "archive" and action.target_id is not None:
self.last_action_status = self.workspace.archive_email(action.target_id)
elif action.action_type == "search_files" and action.payload:
results = self.workspace.search_documents(action.payload)
self.search_results = [
FileSearchResult(
id=row["id"],
filename=row["filename"],
snippet=row["content_text"][:160],
)
for row in results
]
self.last_action_status = f"search returned {len(results)} file(s)"
else:
self.last_action_status = "invalid action payload"
observation = self.observe()
reward = self.grade()
if self.step_count >= self.max_steps and not reward.is_done:
reward = TaskReward(
step_reward=reward.step_reward,
total_score=reward.total_score,
is_done=True,
reasoning=f"{reward.reasoning}; terminated at step budget",
)
done = reward.is_done
info = {
"task_name": self.task_name,
"step_count": self.step_count,
"max_steps": self.max_steps,
"status": self.last_action_status,
"reasoning": reward.reasoning,
"total_score": reward.total_score,
"state": self.state(),
}
return observation, reward, done, info
def grade(self) -> TaskReward:
if self.task_name == "easy_deadline_extraction":
return grade_easy(self.workspace)
if self.task_name == "medium_triage_and_negotiation":
return grade_medium(self.workspace)
if self.task_name == "hard_rag_reply":
return grade_hard(self.workspace)
return TaskReward(reasoning="No grader configured")