Daksh C Jain
Complete rebuild β€” professional DQN green logistics platform
852f739
"""
GreenCityEnv β€” Urban logistics grid environment.
State: [agent_y, agent_x] β€” 2-D position on grid
Action: Discrete(4) β€” Up / Down / Left / Right
Reward: negative carbon cost per step; +20 on delivery
"""
from __future__ import annotations
import numpy as np
import gymnasium as gym
from gymnasium import spaces
VEHICLE_BASE_COST = {"Diesel": 1.0, "EV": 0.2}
CONGESTION_MULT = 4.0
DELIVERY_BONUS = 20.0
PRESETS = {
"πŸ™οΈ Downtown Rush": {
"size": 7,
"congestion": [[2,2],[2,3],[3,2],[4,4],[4,5]],
"start": [0, 0],
"goal": [6, 6],
"description": "Dense urban core β€” heavy congestion cluster in the middle.",
},
"🏭 Industrial Port": {
"size": 8,
"congestion": [[1,6],[2,5],[2,6],[3,6],[6,1],[6,2]],
"start": [0, 0],
"goal": [7, 7],
"description": "Port logistics β€” congestion near loading docks.",
},
"πŸ›£οΈ Ring Road": {
"size": 7,
"congestion": [[1,1],[1,5],[5,1],[5,5],[3,3]],
"start": [0, 3],
"goal": [6, 3],
"description": "Congestion at ring-road junctions β€” clear central corridor.",
},
"πŸ•οΈ Open Field": {
"size": 6,
"congestion": [],
"start": [0, 0],
"goal": [5, 5],
"description": "Minimal congestion β€” baseline comparison scenario.",
},
}
class GreenCityEnv(gym.Env):
"""Fixed and complete urban logistics environment."""
metadata = {"render_modes": []}
def __init__(self, size: int = 7):
super().__init__()
self.size = size
self.observation_space = spaces.Box(
low=0, high=size - 1, shape=(2,), dtype=np.int32
)
self.action_space = spaces.Discrete(4)
self._moves = np.array([[-1,0],[1,0],[0,-1],[0,1]])
# Will be set on reset
self.agent_pos = np.zeros(2, dtype=np.int32)
self.goal = np.array([size-1, size-1], dtype=np.int32)
self.congestion = []
self.vehicle = "Diesel"
self.steps = 0
def reset(
self,
seed: int | None = None,
start_pos: list[int] = None,
goal_pos: list[int] = None,
congestion_map: list = None,
vehicle: str = "Diesel",
options: dict | None = None,
):
super().reset(seed=seed)
self.agent_pos = np.array(start_pos or [0, 0], dtype=np.int32)
self.goal = np.array(goal_pos or [self.size-1, self.size-1], dtype=np.int32)
self.congestion = [np.array(c, dtype=np.int32) for c in (congestion_map or [])]
self.vehicle = vehicle
self.steps = 0
return self.agent_pos.copy(), {}
def step(self, action: int):
new_pos = np.clip(self.agent_pos + self._moves[action], 0, self.size - 1)
self.agent_pos = new_pos
self.steps += 1
in_congestion = any(np.array_equal(new_pos, c) for c in self.congestion)
base_cost = VEHICLE_BASE_COST.get(self.vehicle, 1.0)
multiplier = CONGESTION_MULT if in_congestion else 1.0
reward = -(base_cost * multiplier)
done = bool(np.array_equal(self.agent_pos, self.goal))
if done:
reward += DELIVERY_BONUS
truncated = self.steps >= self.size * 5
info = {"in_congestion": in_congestion, "carbon_step": base_cost * multiplier}
return self.agent_pos.copy(), float(reward), done, truncated, info
def parse_congestion(text: str, size: int) -> list[list[int]]:
"""Parse 'y,x; y,x; ...' string into list of [y,x] coords."""
zones = []
for part in text.replace(";", ",").split(","):
part = part.strip()
if not part:
continue
try:
vals = [int(v) for v in part.split() if v.isdigit() or (v.lstrip('-').isdigit())]
if len(vals) >= 2:
y, x = vals[0], vals[1]
if 0 <= y < size and 0 <= x < size:
zones.append([y, x])
except ValueError:
pass
return zones