LifeStack / core /reward.py
Soham Banerjee
deploy: pure lifestack with partitioned wisdom pool
77da5ce
import math
import copy
import json
import re
from core.life_state import LifeMetrics
from core.task import Task
def compute_reward(
state_before: LifeMetrics,
state_after: LifeMetrics,
resources_used: dict,
actions_taken: int,
metric_changes: dict = None,
completion: str = None,
disruption_baseline: int = None,
action_type: str = ""
) -> tuple[float, dict]:
"""
Computes the reward for a life step based on changes in LifeMetrics and resource usage.
Args:
state_before: The state at the start of the step.
state_after: The state after actions and cascades.
resources_used: Dict with keys 'time', 'money', 'energy'.
actions_taken: Integer count of intentional actions performed.
disruption_baseline: Expected number of metrics affected by an action.
Returns:
tuple[float, dict]: (final_reward, breakdown_dict)
"""
before_flat = state_before.flatten()
after_flat = state_after.flatten()
# 1. OUTCOME SCORE (Weighted average of positive deltas)
domain_weights = {
"career": 1/6,
"finances": 1/6,
"relationships": 1/6,
"physical_health": 1/6,
"mental_wellbeing": 1/6,
"time": 1/6
}
# Map sub-metrics to their domains
submetrics_per_domain = {}
for k in before_flat.keys():
domain = k.split('.')[0]
submetrics_per_domain[domain] = submetrics_per_domain.get(domain, 0) + 1
outcome_score = 0.0
for k in before_flat.keys():
domain = k.split('.')[0]
delta = after_flat[k] - before_flat[k]
if delta > 0:
# Each domain is 1/6. Each sub-metric within a domain gets its equal share of that 1/6.
# Normalize delta by 100 (max possible increase is 100).
weight = domain_weights[domain] / submetrics_per_domain[domain]
outcome_score += (delta / 100.0) * weight
# 2. CASCADE CONTAINMENT SCORE
worsened_count = sum(1 for k in before_flat.keys() if after_flat[k] < before_flat[k])
total_metrics = len(before_flat)
cascade_containment_score = 1.0 - (worsened_count / total_metrics)
# 3. RESOURCE EFFICIENCY SCORE
# Available: time 20, money 500, energy 100
m_time = resources_used.get('time', 0.0) / 20.0
m_money = resources_used.get('money', 0.0) / 500.0
m_energy = resources_used.get('energy', 0.0) / 100.0
# Normalize by total slots (3 resources)
resource_efficiency_score = 1.0 - ((m_time + m_money + m_energy) / 3.0)
resource_efficiency_score = max(0.0, min(1.0, resource_efficiency_score))
# 4. RELATIONSHIP PRESERVATION SCORE (Sigmoid applied to average delta)
rel_keys = [k for k in before_flat.keys() if k.startswith('relationships.')]
avg_rel_before = sum(before_flat[k] for k in rel_keys) / len(rel_keys)
avg_rel_after = sum(after_flat[k] for k in rel_keys) / len(rel_keys)
delta_rel = avg_rel_after - avg_rel_before
# score = 1 / (1 + exp(-delta/10))
relationship_preservation_score = 1.0 / (1.0 + math.exp(-delta_rel / 10.0))
# FINAL REWARD FORMULA
base_reward = (
(0.40 * outcome_score) +
(0.25 * cascade_containment_score) +
(0.20 * resource_efficiency_score) +
(0.15 * relationship_preservation_score)
)
# PENALTIES
penalties = 0.0
fired = []
# -0.50 if ANY metric is below 20 after the step
if any(v < 20 for v in after_flat.values()):
penalties -= 0.50
fired.append("CRITICAL_FLOOR_VIOLATION")
# -0.30 if cascade spread wider than the number of metrics the agent directly changed
# Scaled baseline from task metadata preferred over hardcoded default
if disruption_baseline is None:
disruption_baseline = len(metric_changes) if metric_changes else 2
if worsened_count > disruption_baseline:
penalties -= 0.30
fired.append("CASCADE_SPREAD_WIDER")
# -0.40 if actions_taken == 0
if actions_taken == 0:
penalties -= 0.40
fired.append("INACTION_PENALTY")
# -0.15 if relationships domain average dropped more than 20 points
if delta_rel < -20:
penalties -= 0.15
fired.append("RELATIONSHIP_COLLAPSE")
# [NEW] Plausibility Penalty
plaus = 0.0
if metric_changes:
plaus = reward_plausibility_check(metric_changes, resources_used)
if plaus < 0:
penalties += plaus
fired.append("PLAUSIBILITY_VIOLATION")
# [NEW] Format Compliance & Reasoning
comp_reward = 0.0
reasoning = ""
if completion:
comp_reward = reward_format_compliance(completion)
try:
# Simple extract reasoning from JSON if possible
import json
data = json.loads(completion)
reasoning = data.get("reasoning", "")
except:
pass
# [NEW] Reasoning Alignment (tied to action_type)
reasoning_score = reward_reasoning_coherence(reasoning, action_type=action_type)
final_reward = max(-1.0, min(1.0, base_reward + penalties))
breakdown = {
"components": {
"outcome": outcome_score,
"containment": cascade_containment_score,
"efficiency": resource_efficiency_score,
"preservation": relationship_preservation_score,
"format_compliance": comp_reward,
"plausibility": plaus,
"reasoning_alignment": reasoning_score
},
"base_reward": base_reward,
"penalties_total": penalties,
"penalties_fired": fired,
"metrics_worsened": worsened_count,
"rel_delta": delta_rel
}
return final_reward, breakdown
def compute_milestone_reward(milestones_achieved: list[str], task: Task) -> float:
if not task.milestones:
return 0.0
total_possible = sum(m.reward for m in task.milestones)
if total_possible == 0:
return 0.0
achieved = sum(m.reward for m in task.milestones if m.id in milestones_achieved)
return min(1.0, achieved / total_possible)
def compute_task_completion_reward(success_conditions_met: list[bool], task: Task) -> float:
# A task is completed if any of its target success conditions are satisfied.
# This handles tasks with multiple alternative goal-states (e.g. choice of routes).
if not success_conditions_met:
return 0.0
return 1.0 if any(success_conditions_met) else 0.0
def compute_replan_bonus(exo_events_seen: int, milestones_after_event: int) -> float:
# Scale bonus based on ability to bounce back after exogenous events
if exo_events_seen == 0:
return 0.0
return min(1.0, (milestones_after_event / exo_events_seen) * 0.5)
def compute_dead_end_penalty(routes_remaining: int) -> float:
return -0.5 if routes_remaining <= 0 else 0.0
def compute_task_reward(
state_before: LifeMetrics,
state_after: LifeMetrics,
resources_used: dict,
actions_taken: int,
milestones_achieved: list[str],
success_conditions_met: list[bool],
exo_events_seen: int,
milestones_after_event: int,
routes_remaining: int,
rollback_used: bool,
cascade_collapse: bool,
task: Task,
reasoning: str = "",
completion: str = "",
conflict_domain: str = "",
step_count: int = 0,
max_steps: int = 0,
metric_changes: dict = None,
cumulative_rel_delta: float = 0.0,
action_type: str = ""
) -> tuple[float, dict]:
# 1. Base local components (with scaled disruption baseline from task metadata)
d_baseline = len(task.mutable_world) if task and hasattr(task, 'mutable_world') else None
local_reward, local_breakdown = compute_reward(state_before, state_after, resources_used, actions_taken,
metric_changes=metric_changes, completion=completion,
disruption_baseline=d_baseline, action_type=action_type)
# 2. Orchestrator components
# Use only the raw outcome component from local_breakdown to avoid double-counting
# efficiency, containment, or preservation which are added separately below.
outcome_score_local = local_breakdown["components"].get("outcome", 0.0)
milestone_score = compute_milestone_reward(milestones_achieved, task)
completion_score = compute_task_completion_reward(success_conditions_met, task)
replan_score = compute_replan_bonus(exo_events_seen, milestones_after_event)
efficiency_score = local_breakdown["components"].get("efficiency", 0.0)
preservation_score = local_breakdown["components"].get("preservation", 0.0)
reasoning_score = reward_reasoning_coherence(reasoning, action_type=action_type)
# Check for specific failure cases
timeout_pen = reward_timeout_check(step_count, max_steps, any(success_met for success_met in success_conditions_met) if success_conditions_met else False)
dead_end_pen = compute_dead_end_penalty(routes_remaining)
# 3. Final weighting (all components are now unique/non-overlapping)
# Weights: Milestone 35%, Completion 25%, Outcome 10%, Preservation 5%, Replan 10%, Efficiency 10%, Reasoning 5%
base_reward = (
(0.35 * milestone_score) +
(0.25 * completion_score) +
(0.10 * outcome_score_local) +
(0.05 * preservation_score) +
(0.10 * replan_score) +
(0.10 * efficiency_score) +
(0.05 * reasoning_score)
)
# 4. Penalties
penalties = 0.0
fired = []
if timeout_pen < 0:
penalties += timeout_pen
fired.append("TIMEOUT")
if dead_end_pen < 0:
penalties += dead_end_pen
fired.append("DEAD_END")
if rollback_used:
penalties += -0.1
fired.append("ROLLBACK_USED")
if cascade_collapse:
penalties += -0.3
fired.append("CASCADE_COLLAPSE")
# Direct inaction penalty — not diluted by the 0.05 local weight
if actions_taken == 0:
penalties += -0.20
fired.append("TASK_INACTION_PENALTY")
# Cumulative relationship erosion across the episode
if cumulative_rel_delta < -20:
penalties += -0.15
fired.append("CUMULATIVE_RELATIONSHIP_EROSION")
final_reward = max(-1.0, min(1.0, base_reward + penalties))
breakdown = {
"components": {
"local_metric_delta": outcome_score_local,
"milestone": milestone_score,
"completion": completion_score,
"replan": replan_score,
"efficiency": efficiency_score,
"reasoning": reasoning_score,
"format_compliance": local_breakdown["components"].get("format_compliance", 0.0),
"plausibility": local_breakdown["components"].get("plausibility", 0.0),
"timeout_penalty": timeout_pen
},
"base_reward": base_reward,
"penalties_total": penalties,
"penalties_fired": fired,
"local_breakdown": local_breakdown
}
return final_reward, breakdown
def reward_format_compliance(completion: str) -> float:
"""
Scores the completion based on its format (JSON validity and required fields).
Returns:
+1.0: Valid JSON with all required fields:
action_type, target_domain, metric_changes, resource_cost, reasoning
+0.5: Any parseable JSON (including partial/incomplete dicts)
-0.5: Invalid JSON / unparseable
-1.0: Empty strings or refusal content
"""
if not completion or len(completion.strip()) < 10:
return -1.0
# Potential refusal indicators
if any(x in completion.lower() for x in ["i cannot", "i'm sorry", "as an ai"]):
return -1.0
# Extract JSON content from markdown code blocks if present
json_str = completion.strip()
if "```json" in json_str:
json_str = json_str.split("```json")[-1].split("```")[0].strip()
elif "```" in json_str:
json_str = json_str.split("```")[-1].split("```")[0].strip()
try:
data = json.loads(json_str)
required = ["action_type", "target_domain", "metric_changes", "resource_cost", "reasoning"]
if isinstance(data, dict) and all(k in data and data.get(k) is not None for k in required):
return 1.0
return 0.5
except json.JSONDecodeError:
# Final attempt: try to find anything between { and }
match = re.search(r'\{.*\}', json_str, re.DOTALL)
if match:
try:
data = json.loads(match.group(0))
required = ["action_type", "target_domain", "metric_changes", "resource_cost", "reasoning"]
if isinstance(data, dict) and all(k in data and data.get(k) is not None for k in required):
return 1.0
return 0.5
except:
pass
return -0.5
def reward_plausibility_check(metric_changes: dict, resource_cost: dict) -> float:
"""
Anti-gaming check. Prevents the model from claiming massive metric changes while spending 0 resources.
Resource cost is normalized to comparable units (time/20h, money/$500, energy/100pts).
"""
total_delta = sum(abs(v) for v in metric_changes.values())
# Zero-cost shortcut: any non-trivial claim with no cost at all is implausible
# Also handles empty resource_cost.
if not resource_cost or all(v == 0 for v in resource_cost.values()):
if total_delta > 3.0:
return -0.30
return 0.0
# Normalize each resource dimension to [0,1] before summing
norm_time = resource_cost.get('time', 0.0) / 20.0
norm_money = resource_cost.get('money', 0.0) / 500.0
norm_energy = resource_cost.get('energy', 0.0) / 100.0
total_cost = norm_time + norm_money + norm_energy
ratio = total_delta / max(0.01, total_cost)
if ratio > 150:
return -0.30 # Claiming massive change for virtually free
if ratio > 80:
return -0.10 # Highly suspicious efficiency
return 0.0 # Plausible ratio
def reward_timeout_check(step_count: int, max_steps: int, done: bool) -> float:
"""
Penalizes episodes that end by reaching the step limit without being resolved.
"""
if step_count >= max_steps and not done:
return -0.20
return 0.0
def reward_reasoning_coherence(reasoning: str, action_type: str = "") -> float:
"""
Harden verification of logical consistency. Requires both length and
alignment with the chosen action to prevent word-stuffing.
"""
if not reasoning or len(reasoning.strip()) < 20:
return -0.20 # Severe penalty for lack of effort
reasoning_lower = reasoning.lower()
score = 0.0
# 1. Structural Logic Check
# Reward use of logical connectors rather than just list of facts
connectors = ["because", "since", "therefore", "due to", "resulting in", "consequently"]
if any(c in reasoning_lower for c in connectors):
score += 0.05
# 2. Action Alignment (Non-Gammable Anti-Hacking)
# The reasoning MUST logically justify the chosen category.
action_keywords = {
"spend": ["cost", "price", "expensive", "money", "budget", "finance"],
"rest": ["energy", "sleep", "exhaustion", "recharge", "break"],
"communicate": ["talk", "discuss", "speak", "message", "call", "explain"],
"delegate": ["hand off", "assign", "help", "junior", "colleague"],
"negotiate": ["bargain", "trade", "deal", "terms"],
"deprioritize": ["later", "postpone", "unimportant", "drop"],
"reschedule": ["reschedule", "delay", "postpone", "move", "time", "calendar", "slot"],
"execute": ["route", "plan", "action", "implement", "complete", "resolve", "execute"],
}
if action_type and action_type in action_keywords:
match = any(kw in reasoning_lower for kw in action_keywords[action_type])
if match:
score += 0.10
else:
score -= 0.20
return max(-0.30, min(0.30, score))
def main():
# Scenario setup
print("--- TESTING REWARD SYSTEM ---")
# 1. PERFECT ACTION: All metrics improve by 10 points
state_start = LifeMetrics() # Defaults at 70
state_perfect = copy.deepcopy(state_start)
for k in state_perfect.flatten().keys():
domain, sub = k.split('.')
current = getattr(getattr(state_perfect, domain), sub)
setattr(getattr(state_perfect, domain), sub, current + 10)
res_perfect = {"time": 2, "money": 50, "energy": 10}
reward_p, break_p = compute_reward(state_start, state_perfect, res_perfect, actions_taken=5)
print("\n[SCENARIO 1: PERFECT ACTION]")
print(f"Reward: {reward_p:.4f}")
print(f"Breakdown: {break_p}")
# 2. BAD ACTION: Relationships tank by 30 points, everything else stays same
state_bad = copy.deepcopy(state_start)
for k in state_bad.flatten().keys():
if k.startswith('relationships.'):
domain, sub = k.split('.')
current = getattr(getattr(state_bad, domain), sub)
setattr(getattr(state_bad, domain), sub, current - 30)
res_bad = {"time": 10, "money": 300, "energy": 80}
reward_b, break_b = compute_reward(state_start, state_bad, res_bad, actions_taken=1)
print("\n[SCENARIO 2: BAD ACTION (Relationships Tank)]")
print(f"Reward: {reward_b:.4f}")
print(f"Breakdown: {break_b}")
# 3. INACTION: Nothing changes
state_nothing = copy.deepcopy(state_start)
res_none = {}
reward_n, break_n = compute_reward(state_start, state_nothing, res_none, actions_taken=0)
print("\n[SCENARIO 3: INACTION]")
print(f"Reward: {reward_n:.4f}")
print(f"Breakdown: {break_n}")
if __name__ == "__main__":
main()