Project-Polymath / envs /environment.py
Addyk24's picture
Initialized RL environment
92e2763
try:
from dotenv import load_dotenv
except ImportError:
def load_dotenv():
return False
import time
load_dotenv()
import logging
import os
try:
from openai import OpenAI
from groq import Groq
except ImportError:
OpenAI = None
from envs.errors import EnvironmentDoneError
from models.schemas import ExpertState, WorkSpaceAction, WorkspaceObservation, WorkspaceState
from openenv.core import Environment
from prompter.system_prompt import SystemPrompt
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
)
logger = logging.getLogger(__name__)
import re
DISCOVERY_PATTERNS = {
"Finance": [
r"50\s*k",
r"50,000",
r"fifty thousand",
r"budget cap",
r"budget ceiling",
r"hard cap",
r"low[- ]five[- ]figure",
r"mid[- ]five[- ]figure",
r"five[- ]figure",
r"under (?:the )?ceiling",
r"under\s+\$?50k",
r"below\s+\$?50k",
r"sub-\$?50k",
],
"Security": [
r"biometric",
r"2\s*fa",
r"m\s*fa",
r"two-factor",
r"second factor",
r"physiological",
],
"UX": [
r"single[ -]click",
r"one[ -]click",
r"one[ -]tap",
r"single[ -]tap",
r"single[\u2011-]tap",
r"single[\u2011-]click",
r"frictionless purchase",
r"one decisive interaction",
],
}
def normalize_environment_mode(mode: str | None) -> str:
canonical = (mode or "").strip().lower()
aliases = {
"": "mock",
"easy": "easy",
"deterministic": "mock",
"medium": "medium",
"hard": "hard",
"scripted": "mock",
"llm": "llm",
"live": "llm",
"online": "llm",
"remote": "llm",
"api": "llm",
}
if canonical not in aliases:
raise ValueError(f"Unsupported environment mode: {mode}")
return aliases[canonical]
class WorkSpaceEnvironment(Environment):
def __init__(self, mode: str | None = None):
self._state: WorkspaceState | None = None
self.system_prompt = SystemPrompt()
requested_mode = mode or os.getenv("BASELINE_ENV_MODE") or "easy"
self.mode = normalize_environment_mode(requested_mode)
self.env_model = os.getenv("ENV_MODEL_NAME") or os.getenv("MODEL_NAME") or "llama-3.1-8b-instant"
self._env_client: object | None = None
if self.mode in ["medium", "hard", "llm"]:
self.env_model = os.getenv("MODEL_NAME") or "llama-3.1-8b-instant"
self._env_client = Groq(api_key=os.getenv("GROQ_API_KEY"))
# self._env_client = OpenAI(
# base_url=base_url,
# api_key=api_key,
# timeout=45.0,
# max_retries=2,
# )
def reset(self, topic="Draft the new Mobile App PRD") -> WorkspaceObservation:
experts = {
"Finance": ExpertState(name="Finance", hidden_constraint="Budget must not exceed $50k."),
"Security": ExpertState(name="Security", hidden_constraint="Must include biometric 2FA."),
"UX": ExpertState(name="UX", hidden_constraint="Checkout must be a single click."),
}
self._state = WorkspaceState(experts=experts, chat_history=[])
return WorkspaceObservation(
feedback=f"SYSTEM: You are the PM. {topic}. Message the experts to gather requirements.",
current_turn=0,
reward=0.0,
done=False,
)
def state(self) -> WorkspaceState:
if self._state is None:
raise Exception("Call reset() first")
return self._state
def step(self, action: WorkSpaceAction) -> WorkspaceObservation:
if self._state is None:
raise Exception("Call reset() before step()")
if self._state.is_done:
raise EnvironmentDoneError("Episode already terminated.")
self._state.turn_count += 1
feedback_text, _ = self._get_expert_feedback(action)
component_rewards = self._calculate_multi_reward(action, feedback_text)
self._state.chat_history.append({
"agent": action.content,
"world": feedback_text,
})
total_reward = 0.0
if self.mode == "easy":
# Goal: Discover all 3. Reward is sum of NEW discoveries.
total_reward = (
component_rewards["discovery_finance"] +
component_rewards["discovery_security"] +
component_rewards["discovery_ux"]
)
# TERMINATION
all_found = all(e.constraint_discovered_by_agent for e in self._state.experts.values())
if all_found or action.action_type == "submit_final":
self._state.is_done = True
if all_found:
feedback_text += "\nSYSTEM: All constraints discovered. Task complete."
elif self.mode in ["medium", "hard", "llm"]:
# Goal: Synthesis
if action.action_type == "submit_final":
self._state.is_done = True
scores = [
component_rewards["final_finance"],
component_rewards["final_security"],
component_rewards["final_ux"],
]
# Harmonic Mean logic
total_reward = 0.0 if any(s == 0 for s in scores) else 3 / sum(1/s for s in scores)
else:
# Dense discovery 'nudges' (0.033 instead of 0.33)
total_reward = (
component_rewards["discovery_finance"] +
component_rewards["discovery_security"] +
component_rewards["discovery_ux"]
) * 0.1
total_reward += component_rewards["penalty"]
# 6. Safety Turn Limit
if self._state.turn_count >= self._state.max_turns:
self._state.is_done = True
feedback_text += "\nSYSTEM: Turn limit reached."
return WorkspaceObservation(
feedback=feedback_text,
current_turn=self._state.turn_count,
reward=round(max(0, total_reward), 3),
done=self._state.is_done,
)
def _get_expert_feedback(self, action: WorkSpaceAction) -> tuple[str, float]:
"""
Executes the expert logic based on action type.
Returns: (feedback_text, internal_dense_reward)
"""
all_feedback = []
total_internal_reward = 0.0
if action.action_type == "message_expert":
target = action.target
if target == "All":
for name in self._state.experts:
self._update_frustration(name, action)
resp, reward = self.expert_response(name, action.content)
all_feedback.append(f"{name}: {resp}")
total_internal_reward += reward
feedback_text = "\n\n".join(all_feedback)
elif target in self._state.experts:
self._update_frustration(target, action)
resp, reward = self.expert_response(target, action.content)
feedback_text = f"{target}: {resp}"
total_internal_reward += reward
else:
feedback_text = f"SYSTEM: Unknown expert '{target}'."
elif action.action_type == "propose_draft":
for name in self._state.experts:
self._update_frustration(name, action)
resp, reward = self.expert_response(name, action.content)
all_feedback.append(f"{name}: {resp}")
# Small reward for progress, but less than discovery
total_internal_reward += (reward * 0.5)
feedback_text = "\n".join(all_feedback)
elif action.action_type == "submit_final":
feedback_text = "SYSTEM: Final draft received for grading."
total_internal_reward = 0.0
else:
feedback_text = f"SYSTEM: Invalid action_type '{action.action_type}'."
return feedback_text, total_internal_reward
def expert_response(self, expert_name: str, agent_message: str) -> tuple[str, float]:
expert = self._state.experts[expert_name]
response = self._generate_expert_response(expert, expert_name, agent_message)
# Discovery state is awarded and flipped in _calculate_multi_reward so the
# environment has a single source of truth for easy-mode reward.
return response, 0.0
def harmonic_mean_reward(self, draft: str) -> float:
scores = [
self._grade_draft_against_constraint(draft, expert.hidden_constraint)
for expert in self._state.experts.values()
]
if any(score == 0 for score in scores):
return 0.0
harmonic = len(scores) / sum(1 / score for score in scores)
return round(harmonic, 3)
def _calculate_multi_reward(self, action: WorkSpaceAction, feedback_text: str) -> dict:
r = {
"discovery_finance": 0.0, "discovery_security": 0.0, "discovery_ux": 0.0,
"final_finance": 0.0, "final_security": 0.0, "final_ux": 0.0,
"penalty": 0.0
}
# 1. DISCOVERY (Only grant if NOT already discovered)
text = feedback_text.lower()
for name, patterns in DISCOVERY_PATTERNS.items():
expert = self._state.experts[name]
if not expert.constraint_discovered_by_agent:
if any(re.search(p, text) for p in patterns):
r[f"discovery_{name.lower()}"] = 0.33
expert.constraint_discovered_by_agent = True # FLIP THE BIT
# 2. FINAL SUBMISSION
if action.action_type == "submit_final":
for name, expert in self._state.experts.items():
r[f"final_{name.lower()}"] = self._grade_draft_against_constraint(
action.content,
expert.hidden_constraint,
)
# 3. PENALTIES
if action.action_type == "message_expert" and action.target == "All":
r["penalty"] -= 1.0 if self.mode == "easy" else 0.5
elif action.action_type == "propose_draft" and action.target == "All":
r["penalty"] -= 0.1 if self.mode in ["medium", "hard", "llm"] else 0.0
if self._is_repeated_question(action.content, action.target or ""):
r["penalty"] -= 0.4 # Doubled the repeat penalty
return r
def _grade_draft_against_constraint(self, draft: str, constraint: str) -> float:
# DETERMINISTIC VERIFIER (The "Smack It" Fix)
text = draft.lower()
# Finance Check
if "$50k" in constraint or "budget" in constraint:
mentions_amount = any(
x in text
for x in [
"50k",
"$50k",
"50,000",
"$50,000",
"fifty thousand",
"sub-$50k",
"sub 50k",
]
)
mentions_limit = any(
token in text
for token in [
"under",
"below",
"at or below",
"not exceed",
"cap",
"ceiling",
"budget cap",
]
)
if mentions_amount and mentions_limit:
return 1.0
# Security Check
if "biometric" in constraint:
if "biometric" in text and any(
token in text for token in ("2fa", "mfa", "two-factor", "multi-factor")
):
return 1.0
# UX Check
if "single click" in constraint:
if any(
token in text
for token in ("single-click", "one-click", "single click", "one click", "single-tap", "one-tap")
) and "checkout" in text:
return 1.0
# Fallback to LLM grading ONLY in live mode
if self.mode == "live":
# (Your existing LLM grader logic here)
pass
return 0.0
def _update_frustration(self, expert_name: str, action: WorkSpaceAction):
expert = self._state.experts[expert_name]
repeated_question = self._is_repeated_question(action.content, expert_name)
if repeated_question:
expert.frustration_level = min(10.0, expert.frustration_level + 1.0)
if expert.frustration_level >= 5.0 and not expert.constraint_shifted:
expert.hidden_constraint += " Also requires board approval."
expert.constraint_shifted = True
def _call_llm(self, prompt: str, max_tokens: int = 300) -> str:
if self._env_client is None:
raise RuntimeError("Environment client is not configured for llm mode.")
time.sleep(4.0)
try:
response = self._env_client.chat.completions.create(
model=self.env_model,
messages=[{"role": "user", "content": prompt}],
temperature=0.7,
max_tokens=max_tokens,
)
return response.choices[0].message.content.strip()
except Exception as exc:
logger.error(f"Environment LLM Error: {exc}")
raise
def _generate_expert_response(self, expert: ExpertState, expert_name: str, agent_message: str) -> str:
# If in EASY mode, don't even call Groq. Use pure string templates.
if self.mode == "easy":
responses = {
"Finance": "The budget cap is $50k. Don't go over it.",
"Security": "We require biometric 2FA. No exceptions.",
"UX": "The checkout must be a single-click flow."
}
return responses.get(expert_name, "I have no requirements.")
# Medium and Live still use the LLM
prompt = self.system_prompt.get_expert_prompt(expert, expert_name, agent_message)
return self._call_llm(prompt, max_tokens=300)
def _mock_expert_response(self, expert: ExpertState, expert_name: str, agent_message: str) -> str:
draft_score = self._mock_grade_constraint(agent_message, expert.hidden_constraint)
lower_message = agent_message.lower()
is_question = "?" in agent_message or any(
token in lower_message for token in ("please", "could you", "can you", "what", "which", "how")
)
if expert_name == "Finance":
if is_question:
response = (
"We need the initial release budget capped at or below $50k. "
"Please keep the scope lean and prioritize the highest-ROI features."
)
elif draft_score >= 0.9:
response = (
"This draft respects the sub-$50k budget and keeps scope disciplined. "
"From a finance perspective, the release plan looks viable."
)
else:
response = (
"I still need the PRD to explicitly cap the first release budget at $50k or less. "
"Right now the financial guardrails are too vague."
)
elif expert_name == "Security":
if is_question:
response = (
"Passwords alone will not be enough for this app. "
"We need biometric 2FA for sign-in and other sensitive actions."
)
elif draft_score >= 0.9:
response = (
"The draft now captures biometric 2FA clearly, which addresses our baseline security requirement. "
"That is the level of control we need."
)
else:
response = (
"The PRD still needs to call out biometric 2FA explicitly. "
"Without that requirement, the security posture is incomplete."
)
else:
if is_question:
response = (
"Checkout has to feel immediate for the user. "
"The flow should support a true single-click checkout with minimal friction."
)
elif draft_score >= 0.9:
response = (
"This draft captures the single-click checkout requirement well. "
"The flow now feels appropriately low-friction."
)
else:
response = (
"I still need the PRD to commit to a single-click checkout experience. "
"The current draft leaves too much friction in the funnel."
)
if expert.constraint_shifted:
response += " Any change of this size would also need board approval."
return response
def _mock_grade_constraint(self, draft: str, constraint: str) -> float:
text = draft.lower()
checks = []
if "$50k" in constraint:
checks.append(
any(token in text for token in ("$50k", "50k", "under 50k", "below 50k", "budget cap"))
and "budget" in text
)
if "biometric 2FA" in constraint:
checks.append(
"biometric" in text and any(token in text for token in ("2fa", "two-factor", "mfa", "multi-factor"))
)
if "single click" in constraint:
checks.append(
any(token in text for token in ("single click", "single-click", "one click", "one-click"))
and "checkout" in text
)
if "board approval" in constraint.lower():
checks.append("board approval" in text)
if not checks:
return 0.0
satisfied = sum(1 for check in checks if check)
return round(satisfied / len(checks), 3)
def _constraint_mentioned(self, response: str, constraint: str) -> bool:
constraint_keywords = constraint.lower().split()
stopwords = {"must", "the", "a", "an", "is", "be", "and", "or", "not", "to", "in"}
keywords = [word for word in constraint_keywords if word not in stopwords]
response_lower = response.lower()
matches = sum(1 for keyword in keywords if keyword in response_lower)
return matches >= max(1, len(keywords) // 2)
def _is_repeated_question(self, content: str, expert_name: str) -> bool:
previous = [
history["agent"] for history in self._state.chat_history if expert_name in history.get("world", "")
]
if not previous:
return False
content_words = set(content.lower().split())
for prev in previous:
prev_words = set(prev.lower().split())
if not content_words:
continue
overlap = len(content_words & prev_words) / len(content_words)
if overlap > 0.7:
return True
return False