Spaces:
Sleeping
Sleeping
File size: 11,321 Bytes
81aa69d 6c591d0 81aa69d 6c591d0 81aa69d 6c591d0 81aa69d 6c591d0 81aa69d 562c0a4 81aa69d 7acbefe 81aa69d 6c591d0 81aa69d 3e4c834 6c591d0 81aa69d 3e4c834 6c591d0 81aa69d 6c591d0 81aa69d 7acbefe 81aa69d 562c0a4 81aa69d 7acbefe 81aa69d 562c0a4 81aa69d | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 | """
Customer Support Ticket Resolution Environment.
A production-ready OpenEnv environment that simulates real-world
customer support workflows. Agents learn to handle tickets ranging
from simple FAQs to complex, multi-step escalations with angry customers.
Implements the standard OpenEnv interface:
- reset(task_id) β initial SupportObservation
- step(action) β (observation, reward, done, info)
- state() β SupportState
"""
import logging
import sys
import os
from typing import Any, Dict, List, Optional, Tuple
from uuid import uuid4
# Ensure project root is on the path so sibling modules resolve
_project_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
if _project_root not in sys.path:
sys.path.insert(0, _project_root)
from models import (
CustomerMessage,
CustomerSentiment,
Difficulty,
RewardBreakdown,
StepResult,
SupportAction,
SupportObservation,
SupportState,
TicketCategory,
TicketInfo,
TicketPriority,
TicketStatus,
safe_score,
)
from grader import grade_response
from tasks import TASKS, TASK_IDS, get_task
logger = logging.getLogger(__name__)
class CustomerSupportEnvironment:
"""
OpenEnv-compatible environment for customer support ticket resolution.
Each episode = one customer support ticket.
The agent interacts by sending SupportAction responses, and receives
SupportObservation with updated ticket state and conversation history.
"""
def __init__(self):
self._state: Optional[SupportState] = None
self._task: Optional[Dict[str, Any]] = None
self._ticket: Optional[TicketInfo] = None
self._conversation: List[CustomerMessage] = []
self._current_message: str = ""
self._follow_up_index: int = 0
self._cumulative_reward: float = 0.0
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
# reset()
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
def reset(
self,
task_id: Optional[str] = None,
seed: Optional[int] = None,
**kwargs: Any,
) -> SupportObservation:
"""
Reset the environment to a new episode.
Args:
task_id: Which task to load. Defaults to "easy_faq".
seed: Optional random seed (unused, tasks are deterministic).
Returns:
Initial SupportObservation with the first customer message.
"""
task_id = task_id or "easy_faq"
task = get_task(task_id)
# Build ticket info from task definition
ticket_dict = task["ticket"]
self._ticket = TicketInfo(**ticket_dict)
# Initialize state
self._state = SupportState(
episode_id=str(uuid4()),
task_id=task_id,
step_count=0,
max_steps=task["max_steps"],
done=False,
cumulative_reward=0.0,
reward_history=[],
ticket_status=TicketStatus.OPEN,
resolution_achieved=False,
)
# Initialize conversation with the customer's first message
self._task = task
self._current_message = task["initial_message"]
self._follow_up_index = 0
self._cumulative_reward = 0.0
self._conversation = [
CustomerMessage(
role="customer",
content=task["initial_message"],
timestamp=0,
)
]
return self._build_observation()
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
# step()
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
def step(
self,
action: SupportAction,
**kwargs: Any,
) -> Tuple[SupportObservation, float, bool, Dict[str, Any]]:
"""
Execute one step in the environment.
Args:
action: The agent's response (SupportAction).
Returns:
Tuple of (observation, reward, done, info).
reward is ALWAYS in strict (0, 1).
"""
if self._state is None or self._state.done:
raise RuntimeError(
"Environment not initialized or episode already done. Call reset() first."
)
assert self._task is not None, "Task not set. Call reset() first."
assert self._ticket is not None, "Ticket not set. Call reset() first."
# Increment step
self._state.step_count += 1
# Record agent message in history
self._conversation.append(
CustomerMessage(
role="agent",
content=action.response_text,
timestamp=self._state.step_count,
)
)
# Grade the response
reward_breakdown = grade_response(
response=action.response_text,
grading_rubric=self._task["grading_rubric"],
ticket_info=self._task["ticket"],
conversation_history=[m.model_dump() for m in self._conversation],
action_type=action.action_type,
step_count=self._state.step_count,
max_steps=self._state.max_steps,
)
# Clamp step reward to strict (0, 1) β safe_score guarantees this
step_reward = safe_score(reward_breakdown.total)
logger.info(
f"[ENV] step: raw_total={reward_breakdown.total:.6f} "
f"step_reward={step_reward:.6f}"
)
self._cumulative_reward += step_reward
self._state.cumulative_reward = self._cumulative_reward
self._state.reward_history.append(reward_breakdown)
# Handle action type
if action.action_type == "resolve":
self._state.ticket_status = TicketStatus.RESOLVED
self._state.resolution_achieved = True
self._state.done = True
elif action.action_type == "escalate":
self._state.ticket_status = TicketStatus.ESCALATED
else:
self._state.ticket_status = TicketStatus.IN_PROGRESS
# Check if max steps reached
if self._state.step_count >= self._state.max_steps:
self._state.done = True
# If not done, queue next customer message (follow-up or acknowledgement)
if not self._state.done:
follow_ups = self._task.get("follow_up_messages", [])
if self._follow_up_index < len(follow_ups):
next_msg = follow_ups[self._follow_up_index]
self._follow_up_index += 1
else:
next_msg = self._generate_contextual_reply(action)
self._current_message = next_msg
self._conversation.append(
CustomerMessage(
role="customer",
content=next_msg,
timestamp=self._state.step_count,
)
)
# Compute average reward β clamped to strict (0, 1)
avg_reward = safe_score(self._cumulative_reward / self._state.step_count)
# Build info dict β all scores strictly in (0, 1)
# Clamp every numeric score in reward_breakdown before exposing
rb_dict = reward_breakdown.model_dump()
for key in ["correctness", "tone", "completeness", "efficiency", "total"]:
if key in rb_dict:
rb_dict[key] = safe_score(rb_dict[key])
info = {
"reward_breakdown": rb_dict,
"step_reward": step_reward,
"cumulative_reward": safe_score(self._cumulative_reward / self._state.step_count),
"average_reward": avg_reward,
"steps_taken": self._state.step_count,
"task_id": self._state.task_id,
"resolution_achieved": self._state.resolution_achieved,
}
obs = self._build_observation()
return obs, step_reward, self._state.done, info
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
# state()
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
def state(self) -> SupportState:
"""Return the current internal state."""
if self._state is None:
return SupportState(
episode_id="not_initialized",
task_id="none",
step_count=0,
max_steps=0,
done=True,
cumulative_reward=0.0,
)
return self._state
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
# Private helpers
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
def _build_observation(self) -> SupportObservation:
"""Construct the current observation."""
assert self._state is not None
assert self._task is not None
assert self._ticket is not None
return SupportObservation(
ticket=self._ticket,
conversation_history=list(self._conversation),
current_message=self._current_message,
policy_context=self._task.get("policy_context", ""),
task_id=self._state.task_id,
difficulty=self._task["difficulty"],
max_steps=self._state.max_steps,
steps_remaining=self._state.max_steps - self._state.step_count,
done=self._state.done,
reward=safe_score(self._cumulative_reward / max(self._state.step_count, 1)),
)
def _generate_contextual_reply(self, action: SupportAction) -> str:
"""Generate a contextual customer follow-up based on agent's response quality."""
assert self._state is not None
last_reward = self._state.reward_history[-1] if self._state.reward_history else None
if last_reward and last_reward.total >= 0.7:
return (
"Thank you for that information. That's helpful. "
"Is there anything else I should know?"
)
elif last_reward and last_reward.total >= 0.4:
return (
"Hmm, I appreciate the response but I'm not sure that fully "
"addresses my concern. Can you clarify?"
)
else:
return (
"I don't think you've answered my question. "
"Can you please look into this more carefully?"
)
|