| import sys |
| import os |
| from typing import Any, Dict, Tuple, Optional |
| from uuid import uuid4 |
|
|
| |
| sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) |
|
|
| from openenv.core.env_server.interfaces import Environment |
| from openenv.core.env_server.types import State |
| from models import RevOpsObservation, RevOpsAction, EnrichmentData |
| from server.data_generator import get_task_data, get_reps, get_icp_criteria |
| from server.graders import grader_easy, grader_medium, grader_hard |
|
|
| global_last_grader_score = None |
|
|
| class RevOpsEnvironment(Environment): |
| SUPPORTS_CONCURRENT_SESSIONS: bool = True |
|
|
| def __init__(self, **kwargs): |
| self._state = State(episode_id=str(uuid4()), step_count=0) |
| self.task_id = "task_easy" |
| self.state_data = {} |
|
|
| def reset(self, seed: Optional[int] = None, episode_id: Optional[str] = None, **kwargs) -> RevOpsObservation: |
| self._state = State(episode_id=episode_id or str(uuid4()), step_count=0) |
| |
| task_id = kwargs.get("task_id") |
| if not task_id and episode_id in ["task_easy", "task_medium", "task_hard"]: |
| task_id = episode_id |
| |
| if task_id: |
| self.task_id = task_id |
| |
| task_data = get_task_data(self.task_id) |
| |
| self.state_data = { |
| "leads": task_data["leads"], |
| "current_lead_index": 0, |
| "enrichment_map": task_data["enrichment_map"], |
| "crm": task_data["crm"], |
| "reps": get_reps(), |
| "icp": get_icp_criteria(), |
| "action_history": [], |
| "accumulated_reward": 0.0, |
| "last_feedback": f"New episode started for {self.task_id}.", |
| "lead_states": {lead.id: {"enriched": False, "scored": False, "crm_checked": False, "merged": False, "flagged": False} for lead in task_data["leads"]}, |
| "grader_score": None |
| } |
| |
| obs = self._get_observation() |
| obs.done = False |
| obs.reward = 0.0 |
| return obs |
|
|
| def _get_observation(self) -> RevOpsObservation: |
| idx = self.state_data.get("current_lead_index", 0) |
| leads = self.state_data.get("leads", []) |
| if not leads: |
| return RevOpsObservation() |
| |
| if idx < len(leads): |
| lead = leads[idx] |
| lead_state = self.state_data["lead_states"][lead.id] |
| enrichment = self.state_data["enrichment_map"][lead.id] if lead_state["enriched"] else EnrichmentData(enriched=False) |
| crm = self.state_data["crm"] if lead_state["crm_checked"] else self.state_data["crm"].model_copy(update={"existing_accounts": [], "opportunities": []}) |
| else: |
| lead = leads[-1] |
| enrichment = EnrichmentData(enriched=False) |
| crm = self.state_data["crm"] |
| |
| return RevOpsObservation( |
| task_id=self.task_id, |
| lead=lead, |
| enrichment=enrichment, |
| crm=crm, |
| reps=self.state_data.get("reps", []), |
| icp_criteria=self.state_data.get("icp", ""), |
| sla_time_remaining_minutes=60, |
| last_action_feedback=self.state_data.get("last_feedback", "") |
| ) |
|
|
| def step(self, action: RevOpsAction, **kwargs) -> RevOpsObservation: |
| if "action_history" not in self.state_data: |
| self.reset(task_id=self.task_id) |
|
|
| self._state.step_count += 1 |
| self.state_data["action_history"].append(action) |
| |
| idx = self.state_data["current_lead_index"] |
| is_done = False |
| reward = 0.0 |
| feedback = "" |
|
|
| if idx >= len(self.state_data["leads"]): |
| obs = self._get_observation() |
| obs.done = True |
| obs.reward = 0.0 |
| obs.metadata = {"message": "Episode is already finished."} |
| return obs |
|
|
| current_lead = self.state_data["leads"][idx] |
| lead_state = self.state_data["lead_states"][current_lead.id] |
|
|
| if action.action_type == "enrich_lead": |
| if not lead_state["enriched"]: |
| lead_state["enriched"] = True |
| reward += 0.1 |
| feedback = "Lead enriched successfully." |
| else: |
| feedback = "Lead already enriched." |
| |
| elif action.action_type == "check_crm": |
| if not lead_state["crm_checked"]: |
| lead_state["crm_checked"] = True |
| reward += 0.1 |
| feedback = "CRM checked successfully." |
| else: |
| feedback = "CRM already checked." |
| |
| elif action.action_type == "update_lead_score": |
| if self.task_id == "task_medium" and not lead_state["enriched"]: |
| reward -= 0.2 |
| feedback = "Violation: Scored before enrichment." |
| else: |
| current_lead.score = action.score |
| lead_state["scored"] = True |
| reward += 0.1 |
| feedback = f"Lead score updated to {action.score}." |
|
|
| elif action.action_type == "merge_with_account": |
| if not lead_state["crm_checked"] and self.task_id == "task_hard" and current_lead.id == "lead_3_cfo": |
| reward -= 0.5 |
| feedback = "Violation: Merged without checking CRM first." |
| else: |
| lead_state["merged"] = True |
| reward += 0.1 |
| feedback = f"Merged with account {action.account_id}." |
| |
| elif action.action_type == "flag_reengagement": |
| lead_state["flagged"] = True |
| reward += 0.1 |
| feedback = f"Flagged as re-engagement for opportunity {action.opportunity_id}." |
|
|
| elif action.action_type == "route_to_rep": |
| if self.task_id == "task_hard" and current_lead.id == "lead_3_cfo" and not lead_state["crm_checked"]: |
| reward -= 0.4 |
| feedback = "Fatal violation: Routed without checking CRM." |
| else: |
| reward += 0.1 |
| feedback = f"Lead routed to rep {action.rep_id}. Moving to next lead." |
| self.state_data["current_lead_index"] += 1 |
|
|
| elif action.action_type == "disqualify": |
| reward += 0.1 |
| feedback = f"Lead disqualified: {action.disqualification_reason}. Moving to next lead." |
| self.state_data["current_lead_index"] += 1 |
|
|
| else: |
| feedback = f"Action {action.action_type.value} performed." |
|
|
| self.state_data["accumulated_reward"] += reward |
| self.state_data["last_feedback"] = feedback |
|
|
| if self.state_data["current_lead_index"] >= len(self.state_data["leads"]): |
| is_done = True |
| |
| obs = self._get_observation() |
| |
| info = {"message": feedback} |
| if is_done: |
| if self.task_id == "task_easy": |
| score = grader_easy(self.state_data["action_history"], self.state_data["leads"]) |
| elif self.task_id == "task_medium": |
| score = grader_medium(self.state_data["action_history"], self.state_data["leads"]) |
| elif self.task_id == "task_hard": |
| score = grader_hard(self.state_data["action_history"], self.state_data["leads"]) |
| else: |
| score = 0.0 |
| info["grader_score"] = score |
| self.state_data["grader_score"] = score |
| global global_last_grader_score |
| global_last_grader_score = score |
| if score >= 0.7: |
| reward += 0.5 |
| else: |
| reward -= 0.5 |
| |
| obs.done = is_done |
| obs.reward = reward |
| obs.metadata = info |
| return obs |
|
|
| @property |
| def state(self) -> State: |
| return self._state |
| |
| def get_full_state(self): |
| return self.state_data |
|
|