File size: 3,112 Bytes
cb054fe
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
"""
Adapter between OverflowObservation (2D road grid) and the OpenENV policy
observation format (ego state + ticket matrix).

Nearby cars are converted to collision_risk tickets so TicketAttentionPolicy
can reason about them using the same mechanism it was designed for.
"""

from __future__ import annotations

import math
import numpy as np

try:
    from ..policies.policy_spec import build_obs, build_ticket_vector, OBS_DIM
except ImportError:
    from policies.policy_spec import build_obs, build_ticket_vector, OBS_DIM


def overflow_obs_to_policy_obs(obs) -> np.ndarray:
    """OverflowObservation → 603-dim numpy vector for our policies."""
    cars = obs.cars
    if not cars:
        return np.zeros(OBS_DIM, dtype=np.float32)

    ego = next((c for c in cars if c.carId == 0), cars[0])
    ego_speed_ms = ego.speed / 4.5            # OverflowEnv speed units → m/s
    ego_x        = ego.position.x
    ego_y        = (ego.lane - 2) * 3.7      # lane → lateral metres

    ticket_vectors = []
    for car in cars:
        if car.carId == 0:
            continue
        rel_x    = car.position.x - ego.position.x
        rel_y    = (car.lane - ego.lane) * 3.7
        car_spd  = car.speed / 4.5
        distance = math.sqrt(rel_x ** 2 + rel_y ** 2)
        if distance > 80:
            continue
        closing  = max(ego_speed_ms - car_spd * math.copysign(1, max(rel_x, 0.01)), 0.1)
        ttc      = min(distance / closing, 30.0)
        severity = 1.0 if distance < 8 else (0.75 if distance < 15 else 0.5)
        ticket_vectors.append(build_ticket_vector(
            severity_weight=severity, ttl=5.0,
            pos_x=rel_x, pos_y=rel_y, pos_z=0.0,
            vel_x=car_spd, vel_y=0.0, vel_z=0.0,
            heading=0.0,
            size_length=4.0, size_width=2.0, size_height=1.5,
            distance=distance, time_to_collision=ttc,
            bearing=math.atan2(rel_y, max(rel_x, 0.01)),
            ticket_type="collision_risk", entity_type="vehicle", confidence=1.0,
        ))

    tv = np.array(ticket_vectors, dtype=np.float32) if ticket_vectors else None
    return build_obs(
        ego_x=ego_x, ego_y=ego_y, ego_z=0.0,
        ego_vx=ego_speed_ms, ego_vy=0.0,
        heading=0.0, speed=ego_speed_ms,
        steer=0.0, throttle=0.5, brake=0.0,
        ticket_vectors=tv,
    )


def policy_action_to_decision(action_vec: np.ndarray) -> tuple[str, str]:
    """Continuous [steer, throttle, brake] → (text decision, reasoning)."""
    steer, throttle, brake = float(action_vec[0]), float(action_vec[1]), float(action_vec[2])
    if abs(steer) > 0.35:
        decision  = "lane_change_left" if steer < 0 else "lane_change_right"
        reasoning = f"steer={steer:.2f}: lateral avoidance"
    elif brake > 0.25:
        decision  = "brake"
        reasoning = f"brake={brake:.2f}: closing gap"
    elif throttle > 0.20:
        decision  = "accelerate"
        reasoning = f"throttle={throttle:.2f}: clear ahead"
    else:
        decision  = "maintain"
        reasoning = f"s={steer:.2f} t={throttle:.2f} b={brake:.2f}: holding course"
    return decision, reasoning