import sys import os from typing import Any, Dict, Tuple, Optional from uuid import uuid4 # Make sure models module can be imported sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from openenv.core.env_server.interfaces import Environment from openenv.core.env_server.types import State from models import RevOpsObservation, RevOpsAction, EnrichmentData from server.data_generator import get_task_data, get_reps, get_icp_criteria from server.graders import grader_easy, grader_medium, grader_hard global_last_grader_score = None class RevOpsEnvironment(Environment): SUPPORTS_CONCURRENT_SESSIONS: bool = True def __init__(self, **kwargs): self._state = State(episode_id=str(uuid4()), step_count=0) self.task_id = "task_easy" # DEFAULT self.state_data = {} def reset(self, seed: Optional[int] = None, episode_id: Optional[str] = None, **kwargs) -> RevOpsObservation: self._state = State(episode_id=episode_id or str(uuid4()), step_count=0) task_id = kwargs.get("task_id") if not task_id and episode_id in ["task_easy", "task_medium", "task_hard"]: task_id = episode_id if task_id: self.task_id = task_id task_data = get_task_data(self.task_id) self.state_data = { "leads": task_data["leads"], "current_lead_index": 0, "enrichment_map": task_data["enrichment_map"], "crm": task_data["crm"], "reps": get_reps(), "icp": get_icp_criteria(), "action_history": [], "accumulated_reward": 0.0, "last_feedback": f"New episode started for {self.task_id}.", "lead_states": {lead.id: {"enriched": False, "scored": False, "crm_checked": False, "merged": False, "flagged": False} for lead in task_data["leads"]}, "grader_score": None } obs = self._get_observation() obs.done = False obs.reward = 0.0 return obs def _get_observation(self) -> RevOpsObservation: idx = self.state_data.get("current_lead_index", 0) leads = self.state_data.get("leads", []) if not leads: return RevOpsObservation() if idx < len(leads): lead = leads[idx] lead_state = self.state_data["lead_states"][lead.id] enrichment = self.state_data["enrichment_map"][lead.id] if lead_state["enriched"] else EnrichmentData(enriched=False) crm = self.state_data["crm"] if lead_state["crm_checked"] else self.state_data["crm"].model_copy(update={"existing_accounts": [], "opportunities": []}) else: lead = leads[-1] enrichment = EnrichmentData(enriched=False) crm = self.state_data["crm"] return RevOpsObservation( task_id=self.task_id, lead=lead, enrichment=enrichment, crm=crm, reps=self.state_data.get("reps", []), icp_criteria=self.state_data.get("icp", ""), sla_time_remaining_minutes=60, last_action_feedback=self.state_data.get("last_feedback", "") ) def step(self, action: RevOpsAction, **kwargs) -> RevOpsObservation: # type: ignore[override] if "action_history" not in self.state_data: self.reset(task_id=self.task_id) self._state.step_count += 1 self.state_data["action_history"].append(action) idx = self.state_data["current_lead_index"] is_done = False reward = 0.0 feedback = "" if idx >= len(self.state_data["leads"]): obs = self._get_observation() obs.done = True obs.reward = 0.0 obs.metadata = {"message": "Episode is already finished."} return obs current_lead = self.state_data["leads"][idx] lead_state = self.state_data["lead_states"][current_lead.id] if action.action_type == "enrich_lead": if not lead_state["enriched"]: lead_state["enriched"] = True reward += 0.1 feedback = "Lead enriched successfully." else: feedback = "Lead already enriched." elif action.action_type == "check_crm": if not lead_state["crm_checked"]: lead_state["crm_checked"] = True reward += 0.1 feedback = "CRM checked successfully." else: feedback = "CRM already checked." elif action.action_type == "update_lead_score": if self.task_id == "task_medium" and not lead_state["enriched"]: reward -= 0.2 feedback = "Violation: Scored before enrichment." else: current_lead.score = action.score lead_state["scored"] = True reward += 0.1 feedback = f"Lead score updated to {action.score}." elif action.action_type == "merge_with_account": if not lead_state["crm_checked"] and self.task_id == "task_hard" and current_lead.id == "lead_3_cfo": reward -= 0.5 feedback = "Violation: Merged without checking CRM first." else: lead_state["merged"] = True reward += 0.1 feedback = f"Merged with account {action.account_id}." elif action.action_type == "flag_reengagement": lead_state["flagged"] = True reward += 0.1 feedback = f"Flagged as re-engagement for opportunity {action.opportunity_id}." elif action.action_type == "route_to_rep": if self.task_id == "task_hard" and current_lead.id == "lead_3_cfo" and not lead_state["crm_checked"]: reward -= 0.4 feedback = "Fatal violation: Routed without checking CRM." else: reward += 0.1 feedback = f"Lead routed to rep {action.rep_id}. Moving to next lead." self.state_data["current_lead_index"] += 1 elif action.action_type == "disqualify": reward += 0.1 feedback = f"Lead disqualified: {action.disqualification_reason}. Moving to next lead." self.state_data["current_lead_index"] += 1 else: feedback = f"Action {action.action_type.value} performed." self.state_data["accumulated_reward"] += reward self.state_data["last_feedback"] = feedback if self.state_data["current_lead_index"] >= len(self.state_data["leads"]): is_done = True obs = self._get_observation() info = {"message": feedback} if is_done: if self.task_id == "task_easy": score = grader_easy(self.state_data["action_history"], self.state_data["leads"]) elif self.task_id == "task_medium": score = grader_medium(self.state_data["action_history"], self.state_data["leads"]) elif self.task_id == "task_hard": score = grader_hard(self.state_data["action_history"], self.state_data["leads"]) else: score = 0.0 info["grader_score"] = score self.state_data["grader_score"] = score global global_last_grader_score global_last_grader_score = score if score >= 0.7: reward += 0.5 else: reward -= 0.5 obs.done = is_done obs.reward = reward obs.metadata = info return obs @property def state(self) -> State: return self._state def get_full_state(self): return self.state_data