harshraj22/croprl-workspace / code /market_engine.py
harshraj22's picture
download
raw
16.2 kB
"""
MarketEngine — Supply-demand price clearing and hype crop cycle for multi-agent CropRL.
Two sub-systems:
1. MarketEngine: queues sell orders within a month, then resolves them all at
month-end using a demand-response model so that collective sell volume
drives down the clearing price.
2. HypeEngine: manages the four-phase (DORMANT → BUILDING → PEAK → COLLAPSING)
hype cycle for specialty crops (Matcha, Quinoa, Turmeric). Integrates with
MarketEngine.resolve_month() to trigger early collapse on over-supply.
"""
from __future__ import annotations
import math
from typing import Dict, List, Optional, Tuple
import numpy as np
from cropRL.config import EnvConfig, MultiAgentConfig
from cropRL.enums import CropType, HypePhase
from cropRL.models import HypeCropStatus
# Hype crop indices
HYPE_CROP_TYPES = (
CropType.MATCHA,
CropType.QUINOA,
CropType.TURMERIC,
)
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
# Hype Engine
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
class _HypeCrop:
"""Internal state for a single hype crop's demand cycle."""
# Phase transition constants (from the plan)
HYPE_BUILD_RATE = 0.08 # hype_level increase per month in BUILDING
HYPE_COLLAPSE_RATE = 0.20 # hype_level decrease per month in COLLAPSING
HYPE_BUILD_THRESHOLD = 0.9 # hype_level at which BUILDING → PEAK
PEAK_MIN_MONTHS = 2
PEAK_MAX_MONTHS = 4
def __init__(
self,
crop_type: int,
crop_name: str,
max_hype_premium: float,
rng: np.random.Generator,
) -> None:
self.crop_type = crop_type
self.crop_name = crop_name
self.max_hype_premium = max_hype_premium
self._rng = rng
self.hype_level: float = 0.0
self.phase: HypePhase = HypePhase.DORMANT
self.months_in_phase: int = 0
self._peak_duration: int = self.PEAK_MIN_MONTHS # set on PEAK entry
def tick(self, trigger_prob: float, collapse_threshold: float, sold_volume: float, market_capacity: float) -> None:
"""Advance the hype cycle by one month."""
self.months_in_phase += 1
if self.phase == HypePhase.DORMANT:
# Random trigger to BUILDING phase
if self._rng.random() < trigger_prob:
self._enter_building()
elif self.phase == HypePhase.BUILDING:
self.hype_level = min(1.0, self.hype_level + self.HYPE_BUILD_RATE)
if self.hype_level >= self.HYPE_BUILD_THRESHOLD:
self._enter_peak()
elif self.phase == HypePhase.PEAK:
self.hype_level = 1.0
# Early collapse trigger: over-supply
over_supplied = (
market_capacity > 0
and sold_volume > market_capacity * collapse_threshold
)
# Timeout trigger: peak duration exceeded
timed_out = self.months_in_phase >= self._peak_duration
if over_supplied or timed_out:
self._enter_collapsing()
elif self.phase == HypePhase.COLLAPSING:
self.hype_level = max(0.0, self.hype_level - self.HYPE_COLLAPSE_RATE)
if self.hype_level <= 0.0:
self._enter_dormant()
def get_hype_mult(self) -> float:
"""
Return the price multiplier for this crop given current hype_level.
hype_mult = 1.0 + hype_level × (max_hype_premium − 1.0)
"""
return 1.0 + self.hype_level * (self.max_hype_premium - 1.0)
def status(self) -> HypeCropStatus:
return HypeCropStatus(
crop_type=self.crop_type,
crop_name=self.crop_name,
hype_level=round(self.hype_level, 4),
phase=self.phase,
months_in_phase=self.months_in_phase,
)
# ── Phase transitions ──────────────────────────────────────
def _enter_building(self) -> None:
self.phase = HypePhase.BUILDING
self.hype_level = 0.05 # small nudge to signal start
self.months_in_phase = 0
def _enter_peak(self) -> None:
self.phase = HypePhase.PEAK
self.hype_level = 1.0
self.months_in_phase = 0
self._peak_duration = int(self._rng.integers(
self.PEAK_MIN_MONTHS, self.PEAK_MAX_MONTHS + 1
))
def _enter_collapsing(self) -> None:
self.phase = HypePhase.COLLAPSING
self.months_in_phase = 0
def _enter_dormant(self) -> None:
self.phase = HypePhase.DORMANT
self.hype_level = 0.0
self.months_in_phase = 0
class HypeEngine:
"""
Manages hype cycles for all hype crops.
"""
def __init__(
self,
ma_config: MultiAgentConfig,
env_config: EnvConfig,
rng: np.random.Generator,
) -> None:
self._cfg = ma_config
self._env_cfg = env_config
self._crops: Dict[int, _HypeCrop] = {}
for ct in HYPE_CROP_TYPES:
premium = ma_config.max_hype_premium.get(int(ct), 2.0)
self._crops[int(ct)] = _HypeCrop(
crop_type=int(ct),
crop_name=env_config.crop_names[int(ct)],
max_hype_premium=premium,
rng=rng,
)
def tick_month(
self, sold_volumes: Dict[int, float]
) -> List[HypeCropStatus]:
"""
Advance all hype cycles by one month.
Parameters
----------
sold_volumes : dict
Total volume sold per crop this month (crop_type_int → tons).
Returns
-------
list of HypeCropStatus
"""
statuses = []
for ct_int, crop in self._crops.items():
vol = sold_volumes.get(ct_int, 0.0)
cap = self._cfg.market_capacity.get(ct_int, 1.0)
crop.tick(
trigger_prob=self._cfg.hype_trigger_prob,
collapse_threshold=self._cfg.hype_collapse_supply_threshold,
sold_volume=vol,
market_capacity=cap,
)
statuses.append(crop.status())
return statuses
def get_hype_mult(self, crop_type: int) -> float:
"""Return the current hype price multiplier for a crop (1.0 if non-hype)."""
if crop_type not in self._crops:
return 1.0
return self._crops[crop_type].get_hype_mult()
def statuses(self) -> List[HypeCropStatus]:
return [c.status() for c in self._crops.values()]
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
# Market Engine
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
class _SellOrder:
"""A pending sell order queued during the month."""
__slots__ = ("agent_id", "crop_type", "volume", "is_inventory")
def __init__(
self,
agent_id: int,
crop_type: int,
volume: float,
is_inventory: bool = False,
) -> None:
self.agent_id = agent_id
self.crop_type = crop_type
self.volume = volume
self.is_inventory = is_inventory
class MarketEngine:
"""
Shared market clearing engine for the multi-agent environment.
Responsibilities:
- Holds the authoritative ``realised_prices`` dict (post-supply-shock).
- Queues sell orders during the month via ``queue_sell()``.
- Resolves all queued orders at month-end via ``resolve_month()``,
applying demand-response to produce clearing prices.
- Manages base price generation (random walk from existing dynamics).
"""
def __init__(
self,
ma_config: MultiAgentConfig,
env_config: EnvConfig,
rng: np.random.Generator,
) -> None:
self._ma_cfg = ma_config
self._env_cfg = env_config
self._rng = rng
# Base prices (the mean-reverting random walk target)
# Initialised to config base prices; updated each month
self._base_prices: List[float] = list(env_config.base_market_prices)
# Realised prices after supply-demand adjustment (updated at resolve_month)
# Indexed 0..num_crop_types-1 (0 = Fallow, always 0)
self.realised_prices: List[float] = list(env_config.base_market_prices)
# Last month's realised prices (exposed in observation)
self.last_month_realised_prices: Tuple[float, ...] = tuple(
env_config.base_market_prices[1:]
)
# Pending sell orders this month
self._pending_orders: List[_SellOrder] = []
# Hype engine
self._hype_engine: Optional[HypeEngine] = None
if ma_config.enable_hype_crops:
self._hype_engine = HypeEngine(ma_config, env_config, rng)
# Previous tick's prices for autocorrelation random walk
self._prev_prices: Optional[Tuple[float, ...]] = None
# ──────────────────────────────────────────────────────────────
# Sell Queue
# ──────────────────────────────────────────────────────────────
def queue_sell(
self,
agent_id: int,
crop_type: int,
volume: float,
is_inventory: bool = False,
) -> None:
"""
Queue a sell order for the current month.
All queued orders are cleared at ``resolve_month()``.
"""
if volume > 0:
self._pending_orders.append(
_SellOrder(agent_id, crop_type, volume, is_inventory)
)
# ──────────────────────────────────────────────────────────────
# Month Resolution
# ──────────────────────────────────────────────────────────────
def resolve_month(self, month: int) -> Dict[int, float]:
"""
Resolve all pending sell orders for this month.
Steps:
1. Aggregate total sold volume per crop.
2. Compute demand_response(V, crop) for each crop.
3. Compute realised_price = base × seasonal × hype × demand_response.
4. Credit each sell order with realised_price × volume.
5. Tick hype cycles with actual sold volumes.
Returns
-------
dict
agent_id → total revenue earned this month from sells.
"""
# Aggregate volumes
total_volume: Dict[int, float] = {}
for order in self._pending_orders:
total_volume[order.crop_type] = (
total_volume.get(order.crop_type, 0.0) + order.volume
)
# Compute demand response & realised prices per crop
clearing_prices: Dict[int, float] = {}
for crop_idx in range(1, self._env_cfg.num_crop_types):
base = self._base_prices[crop_idx]
vol = total_volume.get(crop_idx, 0.0)
demand_resp = self._demand_response(vol, crop_idx)
hype_mult = (
self._hype_engine.get_hype_mult(crop_idx)
if self._hype_engine
else 1.0
)
clearing_prices[crop_idx] = base * demand_resp * hype_mult
self.realised_prices[crop_idx] = clearing_prices[crop_idx]
# Credit revenues per agent
revenues: Dict[int, float] = {}
for order in self._pending_orders:
price = clearing_prices.get(order.crop_type, 0.0)
rev = order.volume * price
revenues[order.agent_id] = revenues.get(order.agent_id, 0.0) + rev
# Snapshot last_month_realised_prices (All 6 crops)
self.last_month_realised_prices = tuple(
self.realised_prices[i] for i in range(1, self._env_cfg.num_crop_types)
)
# Tick hype engine (uses actual sold volumes)
if self._hype_engine:
self._hype_engine.tick_month(total_volume)
# Clear pending orders for next month
self._pending_orders = []
return revenues
def generate_base_prices(
self,
month: int,
inflated_base_prices: Optional[List[float]] = None,
) -> List[float]:
"""
Generate new base prices using the existing mean-reverting random walk.
This replaces the per-agent call to dynamics.generate_market_prices()
for the *shared* commodity baseline.
Returns the new base prices list (indexed 0..num_crops-1).
"""
from cropRL.dynamics import generate_market_prices
base = inflated_base_prices or self._base_prices
# The existing dynamics function generates prices for all crops except Fallow.
generated_prices = generate_market_prices(
month=month,
config=self._env_cfg,
rng=self._rng,
prev_prices=self._prev_prices,
effective_base_prices=tuple(base),
)
new_prices = list(base) # copy
for i in range(1, self._env_cfg.num_crop_types):
new_prices[i] = generated_prices[i - 1]
# Hype crops 4-6 get small random walk independently
for hc in range(4, self._env_cfg.num_crop_types):
hc_base = base[hc]
noise = float(self._rng.normal(0.0, self._env_cfg.market_price_sigma))
noise = float(np.clip(noise, -3 * self._env_cfg.market_price_sigma,
3 * self._env_cfg.market_price_sigma))
floor = hc_base * self._env_cfg.price_min_multiplier
ceiling = hc_base * self._env_cfg.price_max_multiplier
if self._prev_prices is not None and len(self._prev_prices) > hc - 1:
prev = self._prev_prices[hc - 1]
target = hc_base
drift = self._env_cfg.price_reversion_speed * (target - prev) / max(target, 1.0)
new_prices[hc] = float(np.clip(prev * (1.0 + drift + noise), floor, ceiling))
else:
new_prices[hc] = float(np.clip(hc_base * (1.0 + noise), floor, ceiling))
self._base_prices = new_prices
self._prev_prices = tuple(new_prices[1:]) # for next month's autocorrelation
return new_prices
def hype_statuses(self) -> List[HypeCropStatus]:
"""Return current hype status for all hype crops."""
if self._hype_engine:
return self._hype_engine.statuses()
return []
# ──────────────────────────────────────────────────────────────
# Internal helpers
# ──────────────────────────────────────────────────────────────
def _demand_response(self, volume: float, crop_type: int) -> float:
"""
Compute the demand-response multiplier for a given sold volume.
demand_response(V) = max(price_floor_mult,
1 - price_impact_coeff × (V / market_capacity))
"""
capacity = self._ma_cfg.market_capacity.get(crop_type, 50.0)
if capacity <= 0:
return 1.0
raw = 1.0 - self._ma_cfg.price_impact_coeff * (volume / capacity)
return max(self._ma_cfg.price_floor_mult, raw)

Xet Storage Details

Size:
16.2 kB
·
Xet hash:
5e490de696f142c40584b0f352edabe4f3bd908e20e2cdb3a0880c3edfe88b04

Xet efficiently stores files, intelligently splitting them into unique chunks and accelerating uploads and downloads. More info.