File size: 5,017 Bytes
f7c3585
 
1dcaa96
 
 
f7c3585
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
from typing import Tuple, Dict, Any, Optional
import copy
from models import Observation, Action, Email
from tasks import get_task, Task


class EmailOpsEnv:
    def __init__(self):
        self.task: Optional[Task] = None
        self.emails = []
        self.current_folder = "inbox"
        self.opened_email: Optional[Email] = None
        self.last_action_status = "Environment initialized."
        self.metrics = {}

    def reset(self, task_name: str = "easy") -> Observation:
        self.task = get_task(task_name)
        self.emails = copy.deepcopy(self.task.get_initial_emails())
        self.current_folder = "inbox"
        self.opened_email = None
        self.last_action_status = f"Started task: {self.task.name} - {self.task.description}"
        self.metrics = self.task.initialize_metrics()
        return self.state()

    def state(self) -> Observation:
        inbox_summary = []
        for e in self.emails:
            if e.folder == self.current_folder:
                inbox_summary.append({
                    "id": e.id,
                    "sender": e.sender,
                    "subject": e.subject,
                    "is_read": e.is_read,
                    "is_flagged": e.is_flagged
                })
        
        return Observation(
            inbox_summary=inbox_summary,
            current_folder=self.current_folder,
            opened_email=self.opened_email,
            last_action_status=self.last_action_status
        )

    def step(self, action: Action) -> Tuple[Observation, float, bool, Dict[str, Any]]:
        self.last_action_status = "Action completed successfully."
        
        # Execute action
        act_type = action.action_type
        
        if act_type == "open_email":
            email = self._get_email(action.email_id)
            if email:
                if email.folder != self.current_folder:
                    self.last_action_status = f"Error: Email {action.email_id} not in current folder."
                else:
                    self.opened_email = copy.deepcopy(email)
                    email.is_read = True # mark as read automatically when opened
                    self.last_action_status = f"Opened email {action.email_id}."
            else:
                self.last_action_status = f"Error: Email {action.email_id} not found."
                
        elif act_type == "close_email":
            self.opened_email = None
            self.last_action_status = "Closed email."
            
        elif act_type == "move_email":
            email = self._get_email(action.email_id)
            if email and action.folder_name:
                email.folder = action.folder_name
                # Close it if we moved the currently opened email
                if self.opened_email and self.opened_email.id == email.id:
                    self.opened_email = None
                self.last_action_status = f"Moved email {action.email_id} to {action.folder_name}."
            else:
                self.last_action_status = f"Error: Invalid email or folder for move."
                
        elif act_type == "reply":
            email = self._get_email(action.email_id)
            if email and action.reply_body:
                self.last_action_status = f"Replied to {email.sender} with '{action.reply_body}'."
            else:
                self.last_action_status = "Error: Invalid email or missing reply body."
                
        elif act_type == "delete_email":
            email = self._get_email(action.email_id)
            if email:
                self.emails.remove(email)
                if self.opened_email and self.opened_email.id == email.id:
                    self.opened_email = None
                self.last_action_status = f"Deleted email {action.email_id}."
            else:
                self.last_action_status = f"Error: Email {action.email_id} not found."
                
        elif act_type == "flag_email":
            email = self._get_email(action.email_id)
            if email:
                email.is_flagged = True
                self.last_action_status = f"Flagged email {action.email_id}."
            else:
                self.last_action_status = f"Error: Email {action.email_id} not found."
                
        elif act_type == "submit":
            self.last_action_status = "Task submitted by agent."
            
        else:
            self.last_action_status = f"Error: Unknown action type {act_type}."

        # Grade
        action_dict = action.model_dump()
        reward_info = self.task.grade(self.emails, self.metrics, action_dict)
        
        # In case grader modified metrics (e.g., reply tracking)
        self.metrics = reward_info.metrics
        
        return self.state(), reward_info.score, reward_info.is_done, reward_info.metrics

    def _get_email(self, email_id: str) -> Optional[Email]:
        if not email_id:
            return None
        return next((e for e in self.emails if e.id == email_id), None)