Spaces:
Runtime error
Runtime error
File size: 5,017 Bytes
f7c3585 1dcaa96 f7c3585 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 | from typing import Tuple, Dict, Any, Optional
import copy
from models import Observation, Action, Email
from tasks import get_task, Task
class EmailOpsEnv:
def __init__(self):
self.task: Optional[Task] = None
self.emails = []
self.current_folder = "inbox"
self.opened_email: Optional[Email] = None
self.last_action_status = "Environment initialized."
self.metrics = {}
def reset(self, task_name: str = "easy") -> Observation:
self.task = get_task(task_name)
self.emails = copy.deepcopy(self.task.get_initial_emails())
self.current_folder = "inbox"
self.opened_email = None
self.last_action_status = f"Started task: {self.task.name} - {self.task.description}"
self.metrics = self.task.initialize_metrics()
return self.state()
def state(self) -> Observation:
inbox_summary = []
for e in self.emails:
if e.folder == self.current_folder:
inbox_summary.append({
"id": e.id,
"sender": e.sender,
"subject": e.subject,
"is_read": e.is_read,
"is_flagged": e.is_flagged
})
return Observation(
inbox_summary=inbox_summary,
current_folder=self.current_folder,
opened_email=self.opened_email,
last_action_status=self.last_action_status
)
def step(self, action: Action) -> Tuple[Observation, float, bool, Dict[str, Any]]:
self.last_action_status = "Action completed successfully."
# Execute action
act_type = action.action_type
if act_type == "open_email":
email = self._get_email(action.email_id)
if email:
if email.folder != self.current_folder:
self.last_action_status = f"Error: Email {action.email_id} not in current folder."
else:
self.opened_email = copy.deepcopy(email)
email.is_read = True # mark as read automatically when opened
self.last_action_status = f"Opened email {action.email_id}."
else:
self.last_action_status = f"Error: Email {action.email_id} not found."
elif act_type == "close_email":
self.opened_email = None
self.last_action_status = "Closed email."
elif act_type == "move_email":
email = self._get_email(action.email_id)
if email and action.folder_name:
email.folder = action.folder_name
# Close it if we moved the currently opened email
if self.opened_email and self.opened_email.id == email.id:
self.opened_email = None
self.last_action_status = f"Moved email {action.email_id} to {action.folder_name}."
else:
self.last_action_status = f"Error: Invalid email or folder for move."
elif act_type == "reply":
email = self._get_email(action.email_id)
if email and action.reply_body:
self.last_action_status = f"Replied to {email.sender} with '{action.reply_body}'."
else:
self.last_action_status = "Error: Invalid email or missing reply body."
elif act_type == "delete_email":
email = self._get_email(action.email_id)
if email:
self.emails.remove(email)
if self.opened_email and self.opened_email.id == email.id:
self.opened_email = None
self.last_action_status = f"Deleted email {action.email_id}."
else:
self.last_action_status = f"Error: Email {action.email_id} not found."
elif act_type == "flag_email":
email = self._get_email(action.email_id)
if email:
email.is_flagged = True
self.last_action_status = f"Flagged email {action.email_id}."
else:
self.last_action_status = f"Error: Email {action.email_id} not found."
elif act_type == "submit":
self.last_action_status = "Task submitted by agent."
else:
self.last_action_status = f"Error: Unknown action type {act_type}."
# Grade
action_dict = action.model_dump()
reward_info = self.task.grade(self.emails, self.metrics, action_dict)
# In case grader modified metrics (e.g., reply tracking)
self.metrics = reward_info.metrics
return self.state(), reward_info.score, reward_info.is_done, reward_info.metrics
def _get_email(self, email_id: str) -> Optional[Email]:
if not email_id:
return None
return next((e for e in self.emails if e.id == email_id), None)
|