Spaces:
Running
Running
| """GTM Strategy Optimizer β OpenEnv Environment implementation.""" | |
| from __future__ import annotations | |
| import uuid | |
| from typing import Any, Optional | |
| from openenv.core.env_server import Environment | |
| from models import ( | |
| ChannelMetrics, | |
| ExperimentResult, | |
| FunnelMetrics, | |
| GTMAction, | |
| GTMObservation, | |
| GTMState, | |
| SegmentMetrics, | |
| ) | |
| from server.simulation import EXPERIMENT_TYPES, MESSAGING_DIMS, PRICING_ACTIONS | |
| from server.tasks import create_simulator, get_task, TASKS | |
| class GTMEnvironment(Environment): | |
| """OpenEnv environment simulating Go-To-Market strategy optimization. | |
| Each episode represents a product launch lifecycle. The agent makes weekly | |
| decisions about budget allocation, customer targeting, messaging, experiments, | |
| and pricing to maximize revenue under uncertainty. | |
| """ | |
| SUPPORTS_CONCURRENT_SESSIONS = True | |
| def __init__(self, **kwargs: Any): | |
| super().__init__(**kwargs) | |
| self._state = GTMState() | |
| self._sim = None | |
| self._task_def = None | |
| self._grader_scores: dict[str, float] = {} | |
| def reset( | |
| self, | |
| seed: Optional[int] = None, | |
| episode_id: Optional[str] = None, | |
| task_id: str = "channel_optimizer", | |
| **kwargs: Any, | |
| ) -> GTMObservation: | |
| """Start a new GTM episode for the given task.""" | |
| task_def = get_task(task_id) | |
| self._task_def = task_def | |
| self._sim = create_simulator(task_id, seed=seed) | |
| self._state = GTMState( | |
| episode_id=episode_id or str(uuid.uuid4()), | |
| step_count=0, | |
| task_id=task_id, | |
| difficulty=task_def.difficulty, | |
| true_brand_strength=50.0, | |
| true_market_demand=1.0, | |
| total_revenue=0.0, | |
| total_spend=0.0, | |
| total_conversions=0, | |
| compliance_violations=0, | |
| experiments_run=0, | |
| useful_experiments=0, | |
| ) | |
| s = self._sim.state | |
| channels = list(self._sim.channels.keys()) | |
| segments = list(self._sim.segments.keys()) | |
| return GTMObservation( | |
| done=False, | |
| reward=None, | |
| week=0, | |
| total_weeks=s.total_weeks, | |
| budget_remaining=s.budget_remaining, | |
| weekly_budget=s.weekly_budget, | |
| channel_metrics={ch: ChannelMetrics() for ch in channels}, | |
| funnel=FunnelMetrics(), | |
| segment_performance={seg: SegmentMetrics() for seg in segments}, | |
| experiment_result=None, | |
| brand_score=50.0, | |
| total_revenue=0.0, | |
| total_conversions=0, | |
| average_cac=0.0, | |
| available_channels=channels, | |
| available_segments=segments, | |
| available_experiments=self._task_def.available_experiments, | |
| available_pricing_actions=self._task_def.available_pricing_actions, | |
| messaging_dimensions=MESSAGING_DIMS, | |
| message=self._initial_message(task_def), | |
| ) | |
| def step( | |
| self, | |
| action: GTMAction, | |
| timeout_s: Optional[float] = None, | |
| **kwargs: Any, | |
| ) -> GTMObservation: | |
| """Execute one week of GTM activity.""" | |
| if self._sim is None: | |
| raise RuntimeError("Must call reset() before step()") | |
| self._state.step_count += 1 | |
| # Run simulation step | |
| result = self._sim.step( | |
| budget_allocation=action.budget_allocation, | |
| segment_targeting=action.segment_targeting, | |
| messaging=action.messaging, | |
| experiment=action.experiment if action.experiment in self._task_def.available_experiments else None, | |
| pricing_action=action.pricing_action if action.pricing_action in self._task_def.available_pricing_actions else None, | |
| ) | |
| s = self._sim.state | |
| done = self._sim.is_done | |
| # Update internal state | |
| self._state.true_brand_strength = s.brand_strength | |
| self._state.true_market_demand = s.market_demand | |
| self._state.total_revenue = s.total_revenue | |
| self._state.total_spend = s.total_spend | |
| self._state.total_conversions = s.total_conversions | |
| self._state.compliance_violations = s.compliance_violations | |
| self._state.experiments_run = s.experiments_run | |
| self._state.useful_experiments = s.useful_experiments | |
| # Compute step reward (partial progress signal) | |
| reward = self._compute_reward(result, s) | |
| # If episode done, also compute and store grader score | |
| if done: | |
| grader_score = self._task_def.grader(s) | |
| self._grader_scores[self._state.episode_id] = grader_score | |
| # Build observation | |
| channel_metrics = { | |
| ch: ChannelMetrics(**m) for ch, m in result["channel_metrics"].items() | |
| } | |
| funnel = FunnelMetrics(**result["funnel"]) | |
| segment_perf = { | |
| seg: SegmentMetrics(**m) for seg, m in result["segment_performance"].items() | |
| } | |
| exp_result = None | |
| if result["experiment_result"]: | |
| exp_result = ExperimentResult(**result["experiment_result"]) | |
| avg_cac = s.total_spend / max(s.total_conversions, 1) | |
| return GTMObservation( | |
| done=done, | |
| reward=round(reward, 4), | |
| week=s.week, | |
| total_weeks=s.total_weeks, | |
| budget_remaining=round(s.budget_remaining, 2), | |
| weekly_budget=round(s.weekly_budget, 2), | |
| channel_metrics=channel_metrics, | |
| funnel=funnel, | |
| segment_performance=segment_perf, | |
| experiment_result=exp_result, | |
| brand_score=result["brand_score_observed"], | |
| total_revenue=round(s.total_revenue, 2), | |
| total_conversions=s.total_conversions, | |
| average_cac=round(avg_cac, 2), | |
| available_channels=list(self._sim.channels.keys()), | |
| available_segments=list(self._sim.segments.keys()), | |
| available_experiments=self._task_def.available_experiments, | |
| available_pricing_actions=self._task_def.available_pricing_actions, | |
| messaging_dimensions=MESSAGING_DIMS, | |
| message=self._step_message(result, s, done), | |
| ) | |
| def state(self) -> GTMState: | |
| return self._state | |
| def get_grader_score(self, episode_id: str) -> Optional[float]: | |
| """Get the grader score for a completed episode.""" | |
| return self._grader_scores.get(episode_id) | |
| # ββ Private helpers ββββββββββββββββββββββββββββββββββββββββββββ | |
| def _compute_reward(self, result: dict, s) -> float: | |
| """Per-step reward with partial progress signal.""" | |
| weekly_rev = result["weekly_revenue"] | |
| target_weekly = self._task_def.revenue_target / self._task_def.total_weeks | |
| # revenue component (0-0.5) | |
| rev_reward = min(0.5, 0.5 * weekly_rev / max(target_weekly, 1.0)) | |
| # efficiency bonus (0-0.2) | |
| weekly_spend = sum( | |
| m.get("spend", 0.0) for m in result["channel_metrics"].values() | |
| ) | |
| if weekly_spend > 0: | |
| roi = weekly_rev / weekly_spend | |
| eff_reward = min(0.2, 0.2 * roi / 3.0) | |
| else: | |
| eff_reward = 0.0 | |
| # brand maintenance (0-0.15) | |
| brand_reward = 0.15 * (s.brand_strength / 100.0) | |
| # penalties | |
| waste_penalty = 0.0 | |
| for ch_name, m in result["channel_metrics"].items(): | |
| if m.get("spend", 0) > 100 and m.get("conversions", 0) == 0: | |
| waste_penalty += 0.05 | |
| compliance_penalty = s.compliance_violations * 0.1 | |
| reward = rev_reward + eff_reward + brand_reward - waste_penalty - compliance_penalty | |
| return max(-1.0, min(1.0, reward)) | |
| def _initial_message(self, task_def) -> str: | |
| channels = ", ".join(c.name for c in task_def.channels) | |
| segments = ", ".join(s.name for s in task_def.segments) | |
| return ( | |
| f"Welcome to the GTM Strategy Optimizer β Task: {task_def.name} ({task_def.difficulty})\n" | |
| f"\n" | |
| f"{task_def.description}\n" | |
| f"\n" | |
| f"Duration: {task_def.total_weeks} weeks | Budget: ${task_def.total_budget:,.0f} " | |
| f"(${task_def.total_budget / task_def.total_weeks:,.0f}/week)\n" | |
| f"Channels: {channels}\n" | |
| f"Segments: {segments}\n" | |
| f"Product price: ${task_def.product.base_price:.0f}\n" | |
| f"\n" | |
| f"Allocate your budget wisely across channels and segments. " | |
| f"Craft messaging that resonates with your target customers. " | |
| f"Maximize revenue while building brand strength." | |
| ) | |
| def _step_message(self, result: dict, s, done: bool) -> str: | |
| weekly_rev = result["weekly_revenue"] | |
| parts = [f"Week {s.week}/{s.total_weeks} | Revenue this week: ${weekly_rev:,.0f}"] | |
| parts.append( | |
| f"Cumulative: ${s.total_revenue:,.0f} revenue, " | |
| f"{s.total_conversions} conversions, " | |
| f"${s.budget_remaining:,.0f} budget remaining" | |
| ) | |
| parts.append(f"Brand health: {result['brand_score_observed']:.0f}/100") | |
| if result["experiment_result"]: | |
| er = result["experiment_result"] | |
| parts.append(f"Experiment result: {er['recommendation']}") | |
| if done: | |
| grader = self._task_def.grader(s) | |
| parts.append(f"\nEpisode complete! Final grader score: {grader:.4f}") | |
| return " | ".join(parts) if not done else "\n".join(parts) | |