try: from dotenv import load_dotenv except ImportError: def load_dotenv(): return False import time load_dotenv() import logging import os try: from openai import OpenAI from groq import Groq except ImportError: OpenAI = None from envs.errors import EnvironmentDoneError from models.schemas import ExpertState, WorkSpaceAction, WorkspaceObservation, WorkspaceState from openenv.core import Environment from prompter.system_prompt import SystemPrompt logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", ) logger = logging.getLogger(__name__) import re DISCOVERY_PATTERNS = { "Finance": [ r"50\s*k", r"50,000", r"fifty thousand", r"budget cap", r"budget ceiling", r"hard cap", r"low[- ]five[- ]figure", r"mid[- ]five[- ]figure", r"five[- ]figure", r"under (?:the )?ceiling", r"under\s+\$?50k", r"below\s+\$?50k", r"sub-\$?50k", ], "Security": [ r"biometric", r"2\s*fa", r"m\s*fa", r"two-factor", r"second factor", r"physiological", ], "UX": [ r"single[ -]click", r"one[ -]click", r"one[ -]tap", r"single[ -]tap", r"single[\u2011-]tap", r"single[\u2011-]click", r"frictionless purchase", r"one decisive interaction", ], } def normalize_environment_mode(mode: str | None) -> str: canonical = (mode or "").strip().lower() aliases = { "": "mock", "easy": "easy", "deterministic": "mock", "medium": "medium", "hard": "hard", "scripted": "mock", "llm": "llm", "live": "llm", "online": "llm", "remote": "llm", "api": "llm", } if canonical not in aliases: raise ValueError(f"Unsupported environment mode: {mode}") return aliases[canonical] class WorkSpaceEnvironment(Environment): def __init__(self, mode: str | None = None): self._state: WorkspaceState | None = None self.system_prompt = SystemPrompt() requested_mode = mode or os.getenv("BASELINE_ENV_MODE") or "easy" self.mode = normalize_environment_mode(requested_mode) self.env_model = os.getenv("ENV_MODEL_NAME") or os.getenv("MODEL_NAME") or "llama-3.1-8b-instant" self._env_client: object | None = None if self.mode in ["medium", "hard", "llm"]: self.env_model = os.getenv("MODEL_NAME") or "llama-3.1-8b-instant" self._env_client = Groq(api_key=os.getenv("GROQ_API_KEY")) # self._env_client = OpenAI( # base_url=base_url, # api_key=api_key, # timeout=45.0, # max_retries=2, # ) def reset(self, topic="Draft the new Mobile App PRD") -> WorkspaceObservation: experts = { "Finance": ExpertState(name="Finance", hidden_constraint="Budget must not exceed $50k."), "Security": ExpertState(name="Security", hidden_constraint="Must include biometric 2FA."), "UX": ExpertState(name="UX", hidden_constraint="Checkout must be a single click."), } self._state = WorkspaceState(experts=experts, chat_history=[]) return WorkspaceObservation( feedback=f"SYSTEM: You are the PM. {topic}. Message the experts to gather requirements.", current_turn=0, reward=0.0, done=False, ) def state(self) -> WorkspaceState: if self._state is None: raise Exception("Call reset() first") return self._state def step(self, action: WorkSpaceAction) -> WorkspaceObservation: if self._state is None: raise Exception("Call reset() before step()") if self._state.is_done: raise EnvironmentDoneError("Episode already terminated.") self._state.turn_count += 1 feedback_text, _ = self._get_expert_feedback(action) component_rewards = self._calculate_multi_reward(action, feedback_text) self._state.chat_history.append({ "agent": action.content, "world": feedback_text, }) total_reward = 0.0 if self.mode == "easy": # Goal: Discover all 3. Reward is sum of NEW discoveries. total_reward = ( component_rewards["discovery_finance"] + component_rewards["discovery_security"] + component_rewards["discovery_ux"] ) # TERMINATION all_found = all(e.constraint_discovered_by_agent for e in self._state.experts.values()) if all_found or action.action_type == "submit_final": self._state.is_done = True if all_found: feedback_text += "\nSYSTEM: All constraints discovered. Task complete." elif self.mode in ["medium", "hard", "llm"]: # Goal: Synthesis if action.action_type == "submit_final": self._state.is_done = True scores = [ component_rewards["final_finance"], component_rewards["final_security"], component_rewards["final_ux"], ] # Harmonic Mean logic total_reward = 0.0 if any(s == 0 for s in scores) else 3 / sum(1/s for s in scores) else: # Dense discovery 'nudges' (0.033 instead of 0.33) total_reward = ( component_rewards["discovery_finance"] + component_rewards["discovery_security"] + component_rewards["discovery_ux"] ) * 0.1 total_reward += component_rewards["penalty"] # 6. Safety Turn Limit if self._state.turn_count >= self._state.max_turns: self._state.is_done = True feedback_text += "\nSYSTEM: Turn limit reached." return WorkspaceObservation( feedback=feedback_text, current_turn=self._state.turn_count, reward=round(max(0, total_reward), 3), done=self._state.is_done, ) def _get_expert_feedback(self, action: WorkSpaceAction) -> tuple[str, float]: """ Executes the expert logic based on action type. Returns: (feedback_text, internal_dense_reward) """ all_feedback = [] total_internal_reward = 0.0 if action.action_type == "message_expert": target = action.target if target == "All": for name in self._state.experts: self._update_frustration(name, action) resp, reward = self.expert_response(name, action.content) all_feedback.append(f"{name}: {resp}") total_internal_reward += reward feedback_text = "\n\n".join(all_feedback) elif target in self._state.experts: self._update_frustration(target, action) resp, reward = self.expert_response(target, action.content) feedback_text = f"{target}: {resp}" total_internal_reward += reward else: feedback_text = f"SYSTEM: Unknown expert '{target}'." elif action.action_type == "propose_draft": for name in self._state.experts: self._update_frustration(name, action) resp, reward = self.expert_response(name, action.content) all_feedback.append(f"{name}: {resp}") # Small reward for progress, but less than discovery total_internal_reward += (reward * 0.5) feedback_text = "\n".join(all_feedback) elif action.action_type == "submit_final": feedback_text = "SYSTEM: Final draft received for grading." total_internal_reward = 0.0 else: feedback_text = f"SYSTEM: Invalid action_type '{action.action_type}'." return feedback_text, total_internal_reward def expert_response(self, expert_name: str, agent_message: str) -> tuple[str, float]: expert = self._state.experts[expert_name] response = self._generate_expert_response(expert, expert_name, agent_message) # Discovery state is awarded and flipped in _calculate_multi_reward so the # environment has a single source of truth for easy-mode reward. return response, 0.0 def harmonic_mean_reward(self, draft: str) -> float: scores = [ self._grade_draft_against_constraint(draft, expert.hidden_constraint) for expert in self._state.experts.values() ] if any(score == 0 for score in scores): return 0.0 harmonic = len(scores) / sum(1 / score for score in scores) return round(harmonic, 3) def _calculate_multi_reward(self, action: WorkSpaceAction, feedback_text: str) -> dict: r = { "discovery_finance": 0.0, "discovery_security": 0.0, "discovery_ux": 0.0, "final_finance": 0.0, "final_security": 0.0, "final_ux": 0.0, "penalty": 0.0 } # 1. DISCOVERY (Only grant if NOT already discovered) text = feedback_text.lower() for name, patterns in DISCOVERY_PATTERNS.items(): expert = self._state.experts[name] if not expert.constraint_discovered_by_agent: if any(re.search(p, text) for p in patterns): r[f"discovery_{name.lower()}"] = 0.33 expert.constraint_discovered_by_agent = True # FLIP THE BIT # 2. FINAL SUBMISSION if action.action_type == "submit_final": for name, expert in self._state.experts.items(): r[f"final_{name.lower()}"] = self._grade_draft_against_constraint( action.content, expert.hidden_constraint, ) # 3. PENALTIES if action.action_type == "message_expert" and action.target == "All": r["penalty"] -= 1.0 if self.mode == "easy" else 0.5 elif action.action_type == "propose_draft" and action.target == "All": r["penalty"] -= 0.1 if self.mode in ["medium", "hard", "llm"] else 0.0 if self._is_repeated_question(action.content, action.target or ""): r["penalty"] -= 0.4 # Doubled the repeat penalty return r def _grade_draft_against_constraint(self, draft: str, constraint: str) -> float: # DETERMINISTIC VERIFIER (The "Smack It" Fix) text = draft.lower() # Finance Check if "$50k" in constraint or "budget" in constraint: mentions_amount = any( x in text for x in [ "50k", "$50k", "50,000", "$50,000", "fifty thousand", "sub-$50k", "sub 50k", ] ) mentions_limit = any( token in text for token in [ "under", "below", "at or below", "not exceed", "cap", "ceiling", "budget cap", ] ) if mentions_amount and mentions_limit: return 1.0 # Security Check if "biometric" in constraint: if "biometric" in text and any( token in text for token in ("2fa", "mfa", "two-factor", "multi-factor") ): return 1.0 # UX Check if "single click" in constraint: if any( token in text for token in ("single-click", "one-click", "single click", "one click", "single-tap", "one-tap") ) and "checkout" in text: return 1.0 # Fallback to LLM grading ONLY in live mode if self.mode == "live": # (Your existing LLM grader logic here) pass return 0.0 def _update_frustration(self, expert_name: str, action: WorkSpaceAction): expert = self._state.experts[expert_name] repeated_question = self._is_repeated_question(action.content, expert_name) if repeated_question: expert.frustration_level = min(10.0, expert.frustration_level + 1.0) if expert.frustration_level >= 5.0 and not expert.constraint_shifted: expert.hidden_constraint += " Also requires board approval." expert.constraint_shifted = True def _call_llm(self, prompt: str, max_tokens: int = 300) -> str: if self._env_client is None: raise RuntimeError("Environment client is not configured for llm mode.") time.sleep(4.0) try: response = self._env_client.chat.completions.create( model=self.env_model, messages=[{"role": "user", "content": prompt}], temperature=0.7, max_tokens=max_tokens, ) return response.choices[0].message.content.strip() except Exception as exc: logger.error(f"Environment LLM Error: {exc}") raise def _generate_expert_response(self, expert: ExpertState, expert_name: str, agent_message: str) -> str: # If in EASY mode, don't even call Groq. Use pure string templates. if self.mode == "easy": responses = { "Finance": "The budget cap is $50k. Don't go over it.", "Security": "We require biometric 2FA. No exceptions.", "UX": "The checkout must be a single-click flow." } return responses.get(expert_name, "I have no requirements.") # Medium and Live still use the LLM prompt = self.system_prompt.get_expert_prompt(expert, expert_name, agent_message) return self._call_llm(prompt, max_tokens=300) def _mock_expert_response(self, expert: ExpertState, expert_name: str, agent_message: str) -> str: draft_score = self._mock_grade_constraint(agent_message, expert.hidden_constraint) lower_message = agent_message.lower() is_question = "?" in agent_message or any( token in lower_message for token in ("please", "could you", "can you", "what", "which", "how") ) if expert_name == "Finance": if is_question: response = ( "We need the initial release budget capped at or below $50k. " "Please keep the scope lean and prioritize the highest-ROI features." ) elif draft_score >= 0.9: response = ( "This draft respects the sub-$50k budget and keeps scope disciplined. " "From a finance perspective, the release plan looks viable." ) else: response = ( "I still need the PRD to explicitly cap the first release budget at $50k or less. " "Right now the financial guardrails are too vague." ) elif expert_name == "Security": if is_question: response = ( "Passwords alone will not be enough for this app. " "We need biometric 2FA for sign-in and other sensitive actions." ) elif draft_score >= 0.9: response = ( "The draft now captures biometric 2FA clearly, which addresses our baseline security requirement. " "That is the level of control we need." ) else: response = ( "The PRD still needs to call out biometric 2FA explicitly. " "Without that requirement, the security posture is incomplete." ) else: if is_question: response = ( "Checkout has to feel immediate for the user. " "The flow should support a true single-click checkout with minimal friction." ) elif draft_score >= 0.9: response = ( "This draft captures the single-click checkout requirement well. " "The flow now feels appropriately low-friction." ) else: response = ( "I still need the PRD to commit to a single-click checkout experience. " "The current draft leaves too much friction in the funnel." ) if expert.constraint_shifted: response += " Any change of this size would also need board approval." return response def _mock_grade_constraint(self, draft: str, constraint: str) -> float: text = draft.lower() checks = [] if "$50k" in constraint: checks.append( any(token in text for token in ("$50k", "50k", "under 50k", "below 50k", "budget cap")) and "budget" in text ) if "biometric 2FA" in constraint: checks.append( "biometric" in text and any(token in text for token in ("2fa", "two-factor", "mfa", "multi-factor")) ) if "single click" in constraint: checks.append( any(token in text for token in ("single click", "single-click", "one click", "one-click")) and "checkout" in text ) if "board approval" in constraint.lower(): checks.append("board approval" in text) if not checks: return 0.0 satisfied = sum(1 for check in checks if check) return round(satisfied / len(checks), 3) def _constraint_mentioned(self, response: str, constraint: str) -> bool: constraint_keywords = constraint.lower().split() stopwords = {"must", "the", "a", "an", "is", "be", "and", "or", "not", "to", "in"} keywords = [word for word in constraint_keywords if word not in stopwords] response_lower = response.lower() matches = sum(1 for keyword in keywords if keyword in response_lower) return matches >= max(1, len(keywords) // 2) def _is_repeated_question(self, content: str, expert_name: str) -> bool: previous = [ history["agent"] for history in self._state.chat_history if expert_name in history.get("world", "") ] if not previous: return False content_words = set(content.lower().split()) for prev in previous: prev_words = set(prev.lower().split()) if not content_words: continue overlap = len(content_words & prev_words) / len(content_words) if overlap > 0.7: return True return False