Spaces:
Sleeping
Sleeping
| try: | |
| from dotenv import load_dotenv | |
| except ImportError: | |
| def load_dotenv(): | |
| return False | |
| import time | |
| load_dotenv() | |
| import logging | |
| import os | |
| try: | |
| from openai import OpenAI | |
| from groq import Groq | |
| except ImportError: | |
| OpenAI = None | |
| from envs.errors import EnvironmentDoneError | |
| from models.schemas import ExpertState, WorkSpaceAction, WorkspaceObservation, WorkspaceState | |
| from openenv.core import Environment | |
| from prompter.system_prompt import SystemPrompt | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", | |
| ) | |
| logger = logging.getLogger(__name__) | |
| import re | |
| DISCOVERY_PATTERNS = { | |
| "Finance": [ | |
| r"50\s*k", | |
| r"50,000", | |
| r"fifty thousand", | |
| r"budget cap", | |
| r"budget ceiling", | |
| r"hard cap", | |
| r"low[- ]five[- ]figure", | |
| r"mid[- ]five[- ]figure", | |
| r"five[- ]figure", | |
| r"under (?:the )?ceiling", | |
| r"under\s+\$?50k", | |
| r"below\s+\$?50k", | |
| r"sub-\$?50k", | |
| ], | |
| "Security": [ | |
| r"biometric", | |
| r"2\s*fa", | |
| r"m\s*fa", | |
| r"two-factor", | |
| r"second factor", | |
| r"physiological", | |
| ], | |
| "UX": [ | |
| r"single[ -]click", | |
| r"one[ -]click", | |
| r"one[ -]tap", | |
| r"single[ -]tap", | |
| r"single[\u2011-]tap", | |
| r"single[\u2011-]click", | |
| r"frictionless purchase", | |
| r"one decisive interaction", | |
| ], | |
| } | |
| def normalize_environment_mode(mode: str | None) -> str: | |
| canonical = (mode or "").strip().lower() | |
| aliases = { | |
| "": "mock", | |
| "easy": "easy", | |
| "deterministic": "mock", | |
| "medium": "medium", | |
| "hard": "hard", | |
| "scripted": "mock", | |
| "llm": "llm", | |
| "live": "llm", | |
| "online": "llm", | |
| "remote": "llm", | |
| "api": "llm", | |
| } | |
| if canonical not in aliases: | |
| raise ValueError(f"Unsupported environment mode: {mode}") | |
| return aliases[canonical] | |
| class WorkSpaceEnvironment(Environment): | |
| def __init__(self, mode: str | None = None): | |
| self._state: WorkspaceState | None = None | |
| self.system_prompt = SystemPrompt() | |
| requested_mode = mode or os.getenv("BASELINE_ENV_MODE") or "easy" | |
| self.mode = normalize_environment_mode(requested_mode) | |
| self.env_model = os.getenv("ENV_MODEL_NAME") or os.getenv("MODEL_NAME") or "llama-3.1-8b-instant" | |
| self._env_client: object | None = None | |
| if self.mode in ["medium", "hard", "llm"]: | |
| self.env_model = os.getenv("MODEL_NAME") or "llama-3.1-8b-instant" | |
| self._env_client = Groq(api_key=os.getenv("GROQ_API_KEY")) | |
| # self._env_client = OpenAI( | |
| # base_url=base_url, | |
| # api_key=api_key, | |
| # timeout=45.0, | |
| # max_retries=2, | |
| # ) | |
| def reset(self, topic="Draft the new Mobile App PRD") -> WorkspaceObservation: | |
| experts = { | |
| "Finance": ExpertState(name="Finance", hidden_constraint="Budget must not exceed $50k."), | |
| "Security": ExpertState(name="Security", hidden_constraint="Must include biometric 2FA."), | |
| "UX": ExpertState(name="UX", hidden_constraint="Checkout must be a single click."), | |
| } | |
| self._state = WorkspaceState(experts=experts, chat_history=[]) | |
| return WorkspaceObservation( | |
| feedback=f"SYSTEM: You are the PM. {topic}. Message the experts to gather requirements.", | |
| current_turn=0, | |
| reward=0.0, | |
| done=False, | |
| ) | |
| def state(self) -> WorkspaceState: | |
| if self._state is None: | |
| raise Exception("Call reset() first") | |
| return self._state | |
| def step(self, action: WorkSpaceAction) -> WorkspaceObservation: | |
| if self._state is None: | |
| raise Exception("Call reset() before step()") | |
| if self._state.is_done: | |
| raise EnvironmentDoneError("Episode already terminated.") | |
| self._state.turn_count += 1 | |
| feedback_text, _ = self._get_expert_feedback(action) | |
| component_rewards = self._calculate_multi_reward(action, feedback_text) | |
| self._state.chat_history.append({ | |
| "agent": action.content, | |
| "world": feedback_text, | |
| }) | |
| total_reward = 0.0 | |
| if self.mode == "easy": | |
| # Goal: Discover all 3. Reward is sum of NEW discoveries. | |
| total_reward = ( | |
| component_rewards["discovery_finance"] + | |
| component_rewards["discovery_security"] + | |
| component_rewards["discovery_ux"] | |
| ) | |
| # TERMINATION | |
| all_found = all(e.constraint_discovered_by_agent for e in self._state.experts.values()) | |
| if all_found or action.action_type == "submit_final": | |
| self._state.is_done = True | |
| if all_found: | |
| feedback_text += "\nSYSTEM: All constraints discovered. Task complete." | |
| elif self.mode in ["medium", "hard", "llm"]: | |
| # Goal: Synthesis | |
| if action.action_type == "submit_final": | |
| self._state.is_done = True | |
| scores = [ | |
| component_rewards["final_finance"], | |
| component_rewards["final_security"], | |
| component_rewards["final_ux"], | |
| ] | |
| # Harmonic Mean logic | |
| total_reward = 0.0 if any(s == 0 for s in scores) else 3 / sum(1/s for s in scores) | |
| else: | |
| # Dense discovery 'nudges' (0.033 instead of 0.33) | |
| total_reward = ( | |
| component_rewards["discovery_finance"] + | |
| component_rewards["discovery_security"] + | |
| component_rewards["discovery_ux"] | |
| ) * 0.1 | |
| total_reward += component_rewards["penalty"] | |
| # 6. Safety Turn Limit | |
| if self._state.turn_count >= self._state.max_turns: | |
| self._state.is_done = True | |
| feedback_text += "\nSYSTEM: Turn limit reached." | |
| return WorkspaceObservation( | |
| feedback=feedback_text, | |
| current_turn=self._state.turn_count, | |
| reward=round(max(0, total_reward), 3), | |
| done=self._state.is_done, | |
| ) | |
| def _get_expert_feedback(self, action: WorkSpaceAction) -> tuple[str, float]: | |
| """ | |
| Executes the expert logic based on action type. | |
| Returns: (feedback_text, internal_dense_reward) | |
| """ | |
| all_feedback = [] | |
| total_internal_reward = 0.0 | |
| if action.action_type == "message_expert": | |
| target = action.target | |
| if target == "All": | |
| for name in self._state.experts: | |
| self._update_frustration(name, action) | |
| resp, reward = self.expert_response(name, action.content) | |
| all_feedback.append(f"{name}: {resp}") | |
| total_internal_reward += reward | |
| feedback_text = "\n\n".join(all_feedback) | |
| elif target in self._state.experts: | |
| self._update_frustration(target, action) | |
| resp, reward = self.expert_response(target, action.content) | |
| feedback_text = f"{target}: {resp}" | |
| total_internal_reward += reward | |
| else: | |
| feedback_text = f"SYSTEM: Unknown expert '{target}'." | |
| elif action.action_type == "propose_draft": | |
| for name in self._state.experts: | |
| self._update_frustration(name, action) | |
| resp, reward = self.expert_response(name, action.content) | |
| all_feedback.append(f"{name}: {resp}") | |
| # Small reward for progress, but less than discovery | |
| total_internal_reward += (reward * 0.5) | |
| feedback_text = "\n".join(all_feedback) | |
| elif action.action_type == "submit_final": | |
| feedback_text = "SYSTEM: Final draft received for grading." | |
| total_internal_reward = 0.0 | |
| else: | |
| feedback_text = f"SYSTEM: Invalid action_type '{action.action_type}'." | |
| return feedback_text, total_internal_reward | |
| def expert_response(self, expert_name: str, agent_message: str) -> tuple[str, float]: | |
| expert = self._state.experts[expert_name] | |
| response = self._generate_expert_response(expert, expert_name, agent_message) | |
| # Discovery state is awarded and flipped in _calculate_multi_reward so the | |
| # environment has a single source of truth for easy-mode reward. | |
| return response, 0.0 | |
| def harmonic_mean_reward(self, draft: str) -> float: | |
| scores = [ | |
| self._grade_draft_against_constraint(draft, expert.hidden_constraint) | |
| for expert in self._state.experts.values() | |
| ] | |
| if any(score == 0 for score in scores): | |
| return 0.0 | |
| harmonic = len(scores) / sum(1 / score for score in scores) | |
| return round(harmonic, 3) | |
| def _calculate_multi_reward(self, action: WorkSpaceAction, feedback_text: str) -> dict: | |
| r = { | |
| "discovery_finance": 0.0, "discovery_security": 0.0, "discovery_ux": 0.0, | |
| "final_finance": 0.0, "final_security": 0.0, "final_ux": 0.0, | |
| "penalty": 0.0 | |
| } | |
| # 1. DISCOVERY (Only grant if NOT already discovered) | |
| text = feedback_text.lower() | |
| for name, patterns in DISCOVERY_PATTERNS.items(): | |
| expert = self._state.experts[name] | |
| if not expert.constraint_discovered_by_agent: | |
| if any(re.search(p, text) for p in patterns): | |
| r[f"discovery_{name.lower()}"] = 0.33 | |
| expert.constraint_discovered_by_agent = True # FLIP THE BIT | |
| # 2. FINAL SUBMISSION | |
| if action.action_type == "submit_final": | |
| for name, expert in self._state.experts.items(): | |
| r[f"final_{name.lower()}"] = self._grade_draft_against_constraint( | |
| action.content, | |
| expert.hidden_constraint, | |
| ) | |
| # 3. PENALTIES | |
| if action.action_type == "message_expert" and action.target == "All": | |
| r["penalty"] -= 1.0 if self.mode == "easy" else 0.5 | |
| elif action.action_type == "propose_draft" and action.target == "All": | |
| r["penalty"] -= 0.1 if self.mode in ["medium", "hard", "llm"] else 0.0 | |
| if self._is_repeated_question(action.content, action.target or ""): | |
| r["penalty"] -= 0.4 # Doubled the repeat penalty | |
| return r | |
| def _grade_draft_against_constraint(self, draft: str, constraint: str) -> float: | |
| # DETERMINISTIC VERIFIER (The "Smack It" Fix) | |
| text = draft.lower() | |
| # Finance Check | |
| if "$50k" in constraint or "budget" in constraint: | |
| mentions_amount = any( | |
| x in text | |
| for x in [ | |
| "50k", | |
| "$50k", | |
| "50,000", | |
| "$50,000", | |
| "fifty thousand", | |
| "sub-$50k", | |
| "sub 50k", | |
| ] | |
| ) | |
| mentions_limit = any( | |
| token in text | |
| for token in [ | |
| "under", | |
| "below", | |
| "at or below", | |
| "not exceed", | |
| "cap", | |
| "ceiling", | |
| "budget cap", | |
| ] | |
| ) | |
| if mentions_amount and mentions_limit: | |
| return 1.0 | |
| # Security Check | |
| if "biometric" in constraint: | |
| if "biometric" in text and any( | |
| token in text for token in ("2fa", "mfa", "two-factor", "multi-factor") | |
| ): | |
| return 1.0 | |
| # UX Check | |
| if "single click" in constraint: | |
| if any( | |
| token in text | |
| for token in ("single-click", "one-click", "single click", "one click", "single-tap", "one-tap") | |
| ) and "checkout" in text: | |
| return 1.0 | |
| # Fallback to LLM grading ONLY in live mode | |
| if self.mode == "live": | |
| # (Your existing LLM grader logic here) | |
| pass | |
| return 0.0 | |
| def _update_frustration(self, expert_name: str, action: WorkSpaceAction): | |
| expert = self._state.experts[expert_name] | |
| repeated_question = self._is_repeated_question(action.content, expert_name) | |
| if repeated_question: | |
| expert.frustration_level = min(10.0, expert.frustration_level + 1.0) | |
| if expert.frustration_level >= 5.0 and not expert.constraint_shifted: | |
| expert.hidden_constraint += " Also requires board approval." | |
| expert.constraint_shifted = True | |
| def _call_llm(self, prompt: str, max_tokens: int = 300) -> str: | |
| if self._env_client is None: | |
| raise RuntimeError("Environment client is not configured for llm mode.") | |
| time.sleep(4.0) | |
| try: | |
| response = self._env_client.chat.completions.create( | |
| model=self.env_model, | |
| messages=[{"role": "user", "content": prompt}], | |
| temperature=0.7, | |
| max_tokens=max_tokens, | |
| ) | |
| return response.choices[0].message.content.strip() | |
| except Exception as exc: | |
| logger.error(f"Environment LLM Error: {exc}") | |
| raise | |
| def _generate_expert_response(self, expert: ExpertState, expert_name: str, agent_message: str) -> str: | |
| # If in EASY mode, don't even call Groq. Use pure string templates. | |
| if self.mode == "easy": | |
| responses = { | |
| "Finance": "The budget cap is $50k. Don't go over it.", | |
| "Security": "We require biometric 2FA. No exceptions.", | |
| "UX": "The checkout must be a single-click flow." | |
| } | |
| return responses.get(expert_name, "I have no requirements.") | |
| # Medium and Live still use the LLM | |
| prompt = self.system_prompt.get_expert_prompt(expert, expert_name, agent_message) | |
| return self._call_llm(prompt, max_tokens=300) | |
| def _mock_expert_response(self, expert: ExpertState, expert_name: str, agent_message: str) -> str: | |
| draft_score = self._mock_grade_constraint(agent_message, expert.hidden_constraint) | |
| lower_message = agent_message.lower() | |
| is_question = "?" in agent_message or any( | |
| token in lower_message for token in ("please", "could you", "can you", "what", "which", "how") | |
| ) | |
| if expert_name == "Finance": | |
| if is_question: | |
| response = ( | |
| "We need the initial release budget capped at or below $50k. " | |
| "Please keep the scope lean and prioritize the highest-ROI features." | |
| ) | |
| elif draft_score >= 0.9: | |
| response = ( | |
| "This draft respects the sub-$50k budget and keeps scope disciplined. " | |
| "From a finance perspective, the release plan looks viable." | |
| ) | |
| else: | |
| response = ( | |
| "I still need the PRD to explicitly cap the first release budget at $50k or less. " | |
| "Right now the financial guardrails are too vague." | |
| ) | |
| elif expert_name == "Security": | |
| if is_question: | |
| response = ( | |
| "Passwords alone will not be enough for this app. " | |
| "We need biometric 2FA for sign-in and other sensitive actions." | |
| ) | |
| elif draft_score >= 0.9: | |
| response = ( | |
| "The draft now captures biometric 2FA clearly, which addresses our baseline security requirement. " | |
| "That is the level of control we need." | |
| ) | |
| else: | |
| response = ( | |
| "The PRD still needs to call out biometric 2FA explicitly. " | |
| "Without that requirement, the security posture is incomplete." | |
| ) | |
| else: | |
| if is_question: | |
| response = ( | |
| "Checkout has to feel immediate for the user. " | |
| "The flow should support a true single-click checkout with minimal friction." | |
| ) | |
| elif draft_score >= 0.9: | |
| response = ( | |
| "This draft captures the single-click checkout requirement well. " | |
| "The flow now feels appropriately low-friction." | |
| ) | |
| else: | |
| response = ( | |
| "I still need the PRD to commit to a single-click checkout experience. " | |
| "The current draft leaves too much friction in the funnel." | |
| ) | |
| if expert.constraint_shifted: | |
| response += " Any change of this size would also need board approval." | |
| return response | |
| def _mock_grade_constraint(self, draft: str, constraint: str) -> float: | |
| text = draft.lower() | |
| checks = [] | |
| if "$50k" in constraint: | |
| checks.append( | |
| any(token in text for token in ("$50k", "50k", "under 50k", "below 50k", "budget cap")) | |
| and "budget" in text | |
| ) | |
| if "biometric 2FA" in constraint: | |
| checks.append( | |
| "biometric" in text and any(token in text for token in ("2fa", "two-factor", "mfa", "multi-factor")) | |
| ) | |
| if "single click" in constraint: | |
| checks.append( | |
| any(token in text for token in ("single click", "single-click", "one click", "one-click")) | |
| and "checkout" in text | |
| ) | |
| if "board approval" in constraint.lower(): | |
| checks.append("board approval" in text) | |
| if not checks: | |
| return 0.0 | |
| satisfied = sum(1 for check in checks if check) | |
| return round(satisfied / len(checks), 3) | |
| def _constraint_mentioned(self, response: str, constraint: str) -> bool: | |
| constraint_keywords = constraint.lower().split() | |
| stopwords = {"must", "the", "a", "an", "is", "be", "and", "or", "not", "to", "in"} | |
| keywords = [word for word in constraint_keywords if word not in stopwords] | |
| response_lower = response.lower() | |
| matches = sum(1 for keyword in keywords if keyword in response_lower) | |
| return matches >= max(1, len(keywords) // 2) | |
| def _is_repeated_question(self, content: str, expert_name: str) -> bool: | |
| previous = [ | |
| history["agent"] for history in self._state.chat_history if expert_name in history.get("world", "") | |
| ] | |
| if not previous: | |
| return False | |
| content_words = set(content.lower().split()) | |
| for prev in previous: | |
| prev_words = set(prev.lower().split()) | |
| if not content_words: | |
| continue | |
| overlap = len(content_words & prev_words) / len(content_words) | |
| if overlap > 0.7: | |
| return True | |
| return False | |