methanol_apc / server /tasks.py
glitchfilter's picture
Upload folder using huggingface_hub
f15113b verified
"""
Task definitions and graders for the Methanol APC Environment.
Four tasks with increasing difficulty, each with a deterministic grader
that returns a score in [0.0, 1.0].
Tasks
-----
1. startup (Easy) — Ramp reactor from idle to operating temperature
2. optimization (Medium) — Maximize profit at steady state
3. disturbance_rejection (Hard) — Handle cooling system failure
4. long_horizon_production (Expert) — Catalyst-aware marathon production
"""
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Dict, List
from .reactor_sim import ReactorState, EMERGENCY_SHUTDOWN_TEMP
# ---------------------------------------------------------------------------
# Base task
# ---------------------------------------------------------------------------
@dataclass
class TaskConfig:
"""Configuration for a single task."""
name: str
max_steps: int
initial_temperature: float = 250.0
initial_pressure: float = 50.0
initial_feed_h2: float = 4.0
initial_feed_co: float = 2.0
initial_cooling_flow: float = 50.0
initial_cooling_temp: float = 25.0
initial_compressor: float = 40.0
initial_catalyst: float = 1.0
# Disturbance schedule: {step: {field: value}}
disturbances: Dict[int, Dict[str, float]] = field(default_factory=lambda: {})
# Operation mode: "steady_state" | "periodic" | "batch"
operation_mode: str = "steady_state"
# For periodic mode: demand cycle period (steps)
demand_period: int = 50
# For batch mode: target production (kg)
batch_target_kg: float = 0.0
# ---------------------------------------------------------------------------
# Task definitions
# ---------------------------------------------------------------------------
STARTUP_TASK = TaskConfig(
name="startup",
max_steps=50,
initial_temperature=150.0,
initial_pressure=30.0,
initial_feed_h2=0.0,
initial_feed_co=0.0,
initial_cooling_flow=20.0,
initial_compressor=20.0,
)
OPTIMIZATION_TASK = TaskConfig(
name="optimization",
max_steps=100,
initial_temperature=250.0,
initial_pressure=60.0,
initial_feed_h2=4.0,
initial_feed_co=2.0,
initial_cooling_flow=50.0,
initial_compressor=50.0,
)
DISTURBANCE_TASK = TaskConfig(
name="disturbance_rejection",
max_steps=100,
initial_temperature=250.0,
initial_pressure=60.0,
initial_feed_h2=4.0,
initial_feed_co=2.0,
initial_cooling_flow=50.0,
initial_compressor=50.0,
# At step 25: cooling water temp rises from 25 -> 45 degC
# (cooling tower failure — reduces cooling capacity, pushes toward runaway)
disturbances={25: {"cooling_water_temp": 45.0}},
)
LONG_HORIZON_TASK = TaskConfig(
name="long_horizon_production",
max_steps=500,
initial_temperature=250.0,
initial_pressure=60.0,
initial_feed_h2=4.0,
initial_feed_co=2.0,
initial_cooling_flow=50.0,
initial_compressor=50.0,
operation_mode="batch",
batch_target_kg=5000.0, # produce 5000 kg methanol
)
TASKS: Dict[str, TaskConfig] = {
"startup": STARTUP_TASK,
"optimization": OPTIMIZATION_TASK,
"disturbance_rejection": DISTURBANCE_TASK,
"long_horizon_production": LONG_HORIZON_TASK,
}
# ---------------------------------------------------------------------------
# NEW TASKS — 8 additional scenarios for increased difficulty range
# ---------------------------------------------------------------------------
# Easy-Medium: Emergency recovery — start near shutdown, cool down safely
EMERGENCY_RECOVERY_TASK = TaskConfig(
name="emergency_recovery",
max_steps=80,
initial_temperature=290.0, # dangerously close to 300C shutdown
initial_pressure=70.0,
initial_feed_h2=6.0,
initial_feed_co=3.0,
initial_cooling_flow=40.0,
initial_compressor=60.0,
)
# Medium: Feed composition upset — H2/CO ratio shifts at step 30
FEED_UPSET_TASK = TaskConfig(
name="feed_composition_upset",
max_steps=100,
initial_temperature=250.0,
initial_pressure=60.0,
initial_feed_h2=4.0,
initial_feed_co=2.0,
initial_cooling_flow=50.0,
initial_compressor=50.0,
# At step 30: simulate upstream reformer fluctuation
# Agent must compensate by adjusting feed rates
disturbances={30: {"cooling_water_temp": 25.0}}, # placeholder — actual feed upset handled in env
)
# Medium: Cost minimization — fixed production target, minimize opex
COST_MINIMIZATION_TASK = TaskConfig(
name="cost_minimization",
max_steps=100,
initial_temperature=250.0,
initial_pressure=60.0,
initial_feed_h2=4.0,
initial_feed_co=2.0,
initial_cooling_flow=50.0,
initial_compressor=50.0,
)
# Hard: Pressure loss — compressor drops 40% at step 20
PRESSURE_LOSS_TASK = TaskConfig(
name="pressure_loss",
max_steps=100,
initial_temperature=250.0,
initial_pressure=60.0,
initial_feed_h2=4.0,
initial_feed_co=2.0,
initial_cooling_flow=50.0,
initial_compressor=50.0,
# At step 20: compressor output drops
disturbances={20: {"cooling_water_temp": 25.0}}, # placeholder
)
# Hard: Day-night cycle — cooling water temp oscillates
DAY_NIGHT_TASK = TaskConfig(
name="day_night_cycle",
max_steps=150,
initial_temperature=250.0,
initial_pressure=60.0,
initial_feed_h2=4.0,
initial_feed_co=2.0,
initial_cooling_flow=50.0,
initial_compressor=50.0,
# Cooling water temp changes every 25 steps: 25->35->25->35->25->35
disturbances={
25: {"cooling_water_temp": 35.0},
50: {"cooling_water_temp": 25.0},
75: {"cooling_water_temp": 35.0},
100: {"cooling_water_temp": 25.0},
125: {"cooling_water_temp": 35.0},
},
operation_mode="periodic",
demand_period=50,
)
# Hard: Catalyst degradation — start with aged catalyst
AGED_CATALYST_TASK = TaskConfig(
name="aged_catalyst",
max_steps=100,
initial_temperature=250.0,
initial_pressure=60.0,
initial_feed_h2=4.0,
initial_feed_co=2.0,
initial_cooling_flow=50.0,
initial_compressor=50.0,
initial_catalyst=0.4, # severely aged catalyst
)
# Expert: Multi-disturbance — cascading failures
MULTI_DISTURBANCE_TASK = TaskConfig(
name="multi_disturbance",
max_steps=150,
initial_temperature=250.0,
initial_pressure=60.0,
initial_feed_h2=4.0,
initial_feed_co=2.0,
initial_cooling_flow=50.0,
initial_compressor=50.0,
# Cascading failures: cooling at 25, then worse at 50
disturbances={
25: {"cooling_water_temp": 35.0},
50: {"cooling_water_temp": 45.0},
},
)
# Expert: Maximum yield challenge — produce as much as possible in 200 steps
MAX_YIELD_TASK = TaskConfig(
name="maximum_yield",
max_steps=200,
initial_temperature=250.0,
initial_pressure=60.0,
initial_feed_h2=4.0,
initial_feed_co=2.0,
initial_cooling_flow=50.0,
initial_compressor=50.0,
)
# Register all new tasks
TASKS.update({
"emergency_recovery": EMERGENCY_RECOVERY_TASK,
"feed_composition_upset": FEED_UPSET_TASK,
"cost_minimization": COST_MINIMIZATION_TASK,
"pressure_loss": PRESSURE_LOSS_TASK,
"day_night_cycle": DAY_NIGHT_TASK,
"aged_catalyst": AGED_CATALYST_TASK,
"multi_disturbance": MULTI_DISTURBANCE_TASK,
"maximum_yield": MAX_YIELD_TASK,
})
# ---------------------------------------------------------------------------
# Graders — each returns float in (0.0, 1.0) strictly, deterministic
# ---------------------------------------------------------------------------
def _clamp_score(score: float) -> float:
"""Map score in [0, 1] to strictly (0, 1) using centered sigmoid.
sigmoid(k*(x - 0.5)) centers the S-curve so that:
0.0 -> ~0.02 (bad stays clearly bad)
0.5 -> 0.50 (midpoint preserved)
1.0 -> ~0.98 (good stays clearly good)
k=10 gives wide spread; final affine scales to [0.01, 0.99].
"""
import math
mapped = 1.0 / (1.0 + math.exp(-10.0 * (score - 0.5)))
return 0.01 + 0.98 * mapped # scale to (0.01, 0.99)
def grade_startup(trajectory: List[ReactorState]) -> float:
"""Grade the startup task.
Score based on:
- Did the reactor reach 250 degC?
- How much overshoot above 250 degC?
- Was there an emergency shutdown?
"""
if not trajectory:
return 0.0
target = 250.0
max_temp = max(s.temperature for s in trajectory)
final_temp = trajectory[-1].temperature
shutdown = any(s.emergency_shutdown for s in trajectory)
if shutdown:
return 0.0
# Did we reach target?
reached = any(s.temperature >= target - 5.0 for s in trajectory)
if not reached:
# Partial credit for getting close
return 0.1 * min(max_temp / target, 1.0)
# Overshoot penalty
overshoot = max(0.0, max_temp - target)
if overshoot > 20.0:
return 0.1
score = 1.0 - (overshoot / 20.0)
# Stability bonus: final temp should be near target
final_error = abs(final_temp - target)
if final_error < 5.0:
score = min(1.0, score + 0.1)
return max(0.0, min(1.0, score))
def grade_optimization(trajectory: List[ReactorState]) -> float:
"""Grade the optimization task.
Score = normalized cumulative profit relative to baseline/theoretical range.
"""
if not trajectory:
return 0.0
shutdown = any(s.emergency_shutdown for s in trajectory)
total_profit = trajectory[-1].cumulative_profit
# Baseline: conservative operation yields ~$5 over 100 steps
# Theoretical max: aggressive-but-safe yields ~$25 over 100 steps
baseline_profit = 5.0
max_profit = 25.0
if shutdown:
# Still give partial credit for profit earned before shutdown
score = 0.2 * max(0.0, total_profit / max_profit)
return max(0.0, min(1.0, score))
score = (total_profit - baseline_profit) / max(max_profit - baseline_profit, 1e-6)
return max(0.0, min(1.0, score))
def grade_disturbance(trajectory: List[ReactorState]) -> float:
"""Grade the disturbance rejection task.
50% for survival (no shutdown), 50% for maintained production.
"""
if not trajectory:
return 0.0
shutdown = any(s.emergency_shutdown for s in trajectory)
survival_score = 0.0 if shutdown else 0.5
# Production after disturbance (step 25+)
post_disturbance = [s for s in trajectory if s.time_step >= 25]
if not post_disturbance:
return survival_score
production_after = sum(
max(0.0, post_disturbance[i].methanol_produced -
(post_disturbance[i - 1].methanol_produced if i > 0 else
post_disturbance[0].methanol_produced))
for i in range(1, len(post_disturbance))
)
# Expected production at steady state over 75 steps: ~12 kg
expected = 12.0
yield_score = min(0.5, 0.5 * production_after / max(expected, 1e-6))
return max(0.0, min(1.0, survival_score + yield_score))
def grade_long_horizon(trajectory: List[ReactorState]) -> float:
"""Grade the long-horizon production task.
Target: produce 50,000 kg of methanol.
Score based on production achieved and catalyst health.
"""
if not trajectory:
return 0.0
target = 50_000.0
final = trajectory[-1]
production = final.methanol_produced
catalyst = final.catalyst_health
shutdown = any(s.emergency_shutdown for s in trajectory)
steps = final.time_step
if shutdown:
return 0.1 * min(production / target, 1.0)
if catalyst <= 0.01:
# Catalyst destroyed — heavy penalty
return 0.1 * min(production / target, 1.0)
if production >= target:
# Reached target — score by speed
score = 1.0 - (steps / 500.0)
return max(0.3, min(1.0, score))
# Didn't reach target — partial credit
return 0.3 * min(production / target, 1.0)
def _clamped_grader(fn):
"""Wrap a grader to ensure score is strictly in (0, 1)."""
def wrapper(trajectory):
return _clamp_score(fn(trajectory))
return wrapper
GRADERS = {
"startup": _clamped_grader(grade_startup),
"optimization": _clamped_grader(grade_optimization),
"disturbance_rejection": _clamped_grader(grade_disturbance),
"long_horizon_production": _clamped_grader(grade_long_horizon),
}
# ---------------------------------------------------------------------------
# Graders for new tasks — reuse patterns from existing graders
# ---------------------------------------------------------------------------
def grade_emergency_recovery(trajectory: List[ReactorState]) -> float:
"""Grade emergency recovery: cool down from 290C without shutdown."""
if not trajectory:
return 0.0
shutdown = any(s.emergency_shutdown for s in trajectory)
if shutdown:
return 0.0
final_temp = trajectory[-1].temperature
# Score based on how close to target 250C and how quickly
if final_temp > 270:
return 0.2 # still too hot
temp_score = 0.5 * max(0.0, 1.0 - abs(final_temp - 250.0) / 40.0)
# Production bonus
production = trajectory[-1].methanol_produced
prod_score = 0.5 * min(1.0, production / 200.0)
return temp_score + prod_score
def grade_feed_upset(trajectory: List[ReactorState]) -> float:
"""Grade feed composition upset: maintain production through ratio change."""
if not trajectory:
return 0.0
shutdown = any(s.emergency_shutdown for s in trajectory)
if shutdown:
return 0.1
profit = trajectory[-1].cumulative_profit
return min(1.0, max(0.0, profit / 20.0))
def grade_cost_minimization(trajectory: List[ReactorState]) -> float:
"""Grade cost minimization: maximize profit efficiency (profit per unit feed)."""
if not trajectory:
return 0.0
shutdown = any(s.emergency_shutdown for s in trajectory)
profit = trajectory[-1].cumulative_profit
production = trajectory[-1].methanol_produced
if shutdown or production < 10.0:
return 0.1
# Profit per kg of methanol produced
efficiency = profit / max(production, 1.0)
return min(1.0, max(0.0, efficiency / 0.5)) # ~$0.50/kg is excellent
def grade_pressure_loss(trajectory: List[ReactorState]) -> float:
"""Grade pressure loss: maintain production after compressor drops."""
return grade_disturbance(trajectory) # same scoring as disturbance rejection
def grade_day_night(trajectory: List[ReactorState]) -> float:
"""Grade day-night cycle: maintain stable production through oscillating cooling."""
if not trajectory:
return 0.0
shutdown = any(s.emergency_shutdown for s in trajectory)
if shutdown:
return 0.1
# Stability: low temperature variance
temps = [s.temperature for s in trajectory]
mean_temp = sum(temps) / len(temps)
variance = sum((t - mean_temp) ** 2 for t in temps) / len(temps)
stability_score = 0.5 * max(0.0, 1.0 - variance / 100.0)
# Production
production = trajectory[-1].methanol_produced
prod_score = 0.5 * min(1.0, production / 500.0)
return stability_score + prod_score
def grade_aged_catalyst(trajectory: List[ReactorState]) -> float:
"""Grade aged catalyst: maximize production with degraded catalyst."""
if not trajectory:
return 0.0
shutdown = any(s.emergency_shutdown for s in trajectory)
if shutdown:
return 0.1
production = trajectory[-1].methanol_produced
catalyst_preserved = trajectory[-1].catalyst_health
# With aged catalyst (start at 0.4), getting any production is good
prod_score = 0.6 * min(1.0, production / 200.0)
cat_score = 0.4 * (catalyst_preserved / 0.4) # relative preservation
return min(1.0, prod_score + cat_score)
def grade_multi_disturbance(trajectory: List[ReactorState]) -> float:
"""Grade multi-disturbance: survive cascading failures."""
if not trajectory:
return 0.0
shutdown = any(s.emergency_shutdown for s in trajectory)
survival = 0.0 if shutdown else 0.4
# Production after second disturbance (step 50+)
post = [s for s in trajectory if s.time_step >= 50]
if not post:
return survival
production_after = post[-1].methanol_produced - (post[0].methanol_produced if post else 0)
yield_score = 0.6 * min(1.0, production_after / 300.0)
return survival + yield_score
def grade_max_yield(trajectory: List[ReactorState]) -> float:
"""Grade maximum yield: total methanol produced in 200 steps."""
if not trajectory:
return 0.0
shutdown = any(s.emergency_shutdown for s in trajectory)
production = trajectory[-1].methanol_produced
if shutdown:
return 0.1 * min(1.0, production / 1000.0)
# 1000 kg in 200 steps is excellent
return min(1.0, production / 1000.0)
# Register new graders
GRADERS.update({
"emergency_recovery": _clamped_grader(grade_emergency_recovery),
"feed_composition_upset": _clamped_grader(grade_feed_upset),
"cost_minimization": _clamped_grader(grade_cost_minimization),
"pressure_loss": _clamped_grader(grade_pressure_loss),
"day_night_cycle": _clamped_grader(grade_day_night),
"aged_catalyst": _clamped_grader(grade_aged_catalyst),
"multi_disturbance": _clamped_grader(grade_multi_disturbance),
"maximum_yield": _clamped_grader(grade_max_yield),
})
# ---------------------------------------------------------------------------
# Step reward computation (dense, per-step)
# ---------------------------------------------------------------------------
def compute_step_reward(
prev: ReactorState,
curr: ReactorState,
task: TaskConfig,
) -> float:
"""Compute dense per-step reward.
Six components normalized to roughly [-1, +1]:
1. profit_reward: normalized step profit
2. safety_reward: distance from safety limits
3. stability_reward: low temperature variance
4. catalyst_reward: catalyst health preservation
5. task_progress_reward: task-specific progress signal
6. shutdown_penalty: -1.0 if emergency shutdown
"""
if curr.emergency_shutdown:
import math
mapped = 1.0 / (1.0 + math.exp(-3.0 * (-1.0))) # raw = -1.0
return 0.01 + 0.98 * mapped # ≈ 0.06
# 1. Profit reward (0 to +0.4)
profit_reward = max(-0.2, min(0.4, curr.profit_this_step / 0.5))
# 2. Safety reward: distance from 300 degC limit (-0.3 to +0.2)
temp_margin = (EMERGENCY_SHUTDOWN_TEMP - curr.temperature) / EMERGENCY_SHUTDOWN_TEMP
if curr.temperature > 280:
safety_reward = -0.3 * (curr.temperature - 280) / 20.0
elif curr.temperature > 270:
safety_reward = -0.1
else:
safety_reward = 0.1 * temp_margin
# 3. Stability reward: low temperature change (+0.0 to +0.1)
temp_change = abs(curr.temperature - prev.temperature)
stability_reward = 0.1 * max(0.0, 1.0 - temp_change / 5.0)
# 4. Catalyst reward (+0.0 to +0.1)
catalyst_reward = 0.1 * curr.catalyst_health
# 5. Task-specific progress
progress_reward = 0.0
if task.name == "startup":
target = 250.0
dist_now = abs(curr.temperature - target)
dist_prev = abs(prev.temperature - target)
if dist_now < dist_prev:
progress_reward = 0.2 * (dist_prev - dist_now) / target
elif curr.temperature > target + 5:
progress_reward = -0.1
elif task.name == "optimization":
progress_reward = 0.2 * max(0.0, min(1.0, curr.profit_this_step / 0.3))
elif task.name == "disturbance_rejection":
# Reward stability after disturbance
if curr.time_step > 25:
progress_reward = 0.2 * max(0.0, 1.0 - temp_change / 3.0)
else:
progress_reward = 0.1 * max(0.0, curr.profit_this_step / 0.3)
elif task.name == "long_horizon_production":
# Reward production rate while preserving catalyst
production_rate = curr.methanol_produced - prev.methanol_produced
progress_reward = 0.15 * min(1.0, production_rate / 0.2)
progress_reward += 0.05 * curr.catalyst_health
total = profit_reward + safety_reward + stability_reward + catalyst_reward + progress_reward
# Sigmoid mapping: preserves relative signal in (0.01, 0.99)
import math
mapped = 1.0 / (1.0 + math.exp(-3.0 * total))
return 0.01 + 0.98 * mapped