File size: 5,857 Bytes
6252f54
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
"""
Graph-grounded DRL environment.

Replaces the 10 hardcoded archetypes in EAEnvironment with capabilities
loaded live from Neo4j for a specific domain. Maintains the same
STATE_DIM=20 / ACTION_DIM=10 interface so REINFORCETrainer works unchanged.
"""

import numpy as np
from typing import NamedTuple

from backend.graph.cypher_queries import (
    GET_CAPABILITIES_FOR_TRAINING,
    GET_DOMAIN_RELATIONSHIP_FLAGS,
)


class EAScenario(NamedTuple):
    cap_business_values: np.ndarray
    cap_effort_scores: np.ndarray
    cap_risk_scores: np.ndarray
    dependency_matrix: np.ndarray
    budget_capacity: float
    timeline_score: float
    risk_tolerance: float


_COMPLEXITY_TO_BV = {"low": 0.95, "medium": 0.80, "high": 0.65, "very_high": 0.50}
_COMPLEXITY_TO_EFFORT = {"low": 0.30, "medium": 0.55, "high": 0.75, "very_high": 0.90}


def _build_bv_effort_risk(caps: list[dict]) -> tuple[np.ndarray, np.ndarray, np.ndarray]:
    bv, effort, risk = [], [], []
    for c in caps:
        cx = (c.get("complexity") or "medium").lower()
        bv.append(_COMPLEXITY_TO_BV.get(cx, 0.80))
        dur = c.get("duration_weeks") or 16
        effort.append(min(float(dur) / 36.0, 1.0))
        rf = c.get("risk_factors") or []
        risk.append(min(len(rf) / 5.0, 0.90))
    # pad to exactly 10
    while len(bv) < 10:
        bv.append(0.70); effort.append(0.50); risk.append(0.30)
    return (
        np.array(bv[:10], dtype=np.float32),
        np.array(effort[:10], dtype=np.float32),
        np.array(risk[:10], dtype=np.float32),
    )


class GraphEAEnvironment:
    """DRL environment grounded in real Neo4j domain capabilities."""

    STATE_DIM = 20
    ACTION_DIM = 10

    def __init__(self, neo4j_client, domain_name: str, noise_scale: float = 0.08, seed: int | None = None):
        self._client = neo4j_client
        self._domain_name = domain_name
        self._noise = noise_scale
        self._rng = np.random.default_rng(seed)

        caps = self._client.run_query(GET_CAPABILITIES_FOR_TRAINING, domain_name=domain_name)
        self._caps = caps or []
        self._cap_names = [c.get("name", f"Cap-{i}") for i, c in enumerate(self._caps)]

        self._base_bv, self._base_ef, self._base_ri = _build_bv_effort_risk(self._caps)

        # Domain relationship flags (7 dims)
        flags_rows = self._client.run_query(GET_DOMAIN_RELATIONSHIP_FLAGS, domain_name=domain_name)
        if flags_rows:
            f = flags_rows[0]
            self._domain_flags = np.array([
                float(f.get("is_sector_hub", False)),
                float(f.get("is_enabled", False)),
                float(f.get("is_orchestrator", False)),
                float(f.get("is_governed", False)),
                float(f.get("is_sector_child", False)),
                float(f.get("enables_others", False)),
                float(f.get("has_trend", False)),
            ], dtype=np.float32)
        else:
            self._domain_flags = np.zeros(7, dtype=np.float32)

        self.scenario: EAScenario | None = None
        self.current_step = 0

    # ------------------------------------------------------------------
    def reset(self) -> np.ndarray:
        noise = self._rng.uniform(-self._noise, self._noise, 10)
        bv = np.clip(self._base_bv + noise, 0.1, 1.0)
        ef = np.clip(self._base_ef + self._rng.uniform(-self._noise, self._noise, 10), 0.1, 1.0)
        ri = np.clip(self._base_ri + self._rng.uniform(-self._noise / 2, self._noise / 2, 10), 0.05, 0.9)

        dep_matrix = np.zeros((10, 10), dtype=np.float32)
        # Light dependency: later capabilities often depend on earlier ones
        for i in range(min(len(self._caps) - 1, 9)):
            if self._rng.random() > 0.7:
                dep_matrix[i, i + 1] = 1.0

        budget_capacity = float(self._rng.choice([0.4, 0.6, 0.8, 1.0]))
        timeline_score = float(self._rng.choice([6, 12, 18, 24, 36])) / 36.0
        risk_tolerance = float(self._rng.choice([0.33, 0.67, 1.0]))

        self.scenario = EAScenario(bv, ef, ri, dep_matrix, budget_capacity, timeline_score, risk_tolerance)
        self.current_step = 0
        return self._state_vector()

    def _state_vector(self) -> np.ndarray:
        s = self.scenario
        return np.concatenate([
            s.cap_business_values,
            [s.budget_capacity],
            [s.timeline_score],
            [s.risk_tolerance],
            self._domain_flags,
        ]).astype(np.float32)

    def step(self, action_indices: np.ndarray) -> tuple[np.ndarray, float, bool]:
        s = self.scenario
        base_reward = sum(
            s.cap_business_values[idx] * (1.0 - rank / len(action_indices))
            for rank, idx in enumerate(action_indices)
        ) / len(action_indices)

        dep_violations = sum(
            1 for i, di in enumerate(action_indices)
            for j, dj in enumerate(action_indices)
            if s.dependency_matrix[dj, di] == 1.0 and j < i
        )
        dep_penalty = dep_violations * 0.15

        cum_effort, budget_penalty = 0.0, 0.0
        for idx in action_indices:
            cum_effort += s.cap_effort_scores[idx] / 10.0
            if cum_effort > s.budget_capacity:
                budget_penalty += 0.05

        risk_penalty = sum(
            s.cap_risk_scores[idx] * 0.2
            for idx in action_indices[:3]
            if s.cap_risk_scores[idx] > s.risk_tolerance
        )

        reward = float(max(-1.0, min(2.0, base_reward - dep_penalty - budget_penalty - risk_penalty)))
        self.current_step += 1
        return self._state_vector(), reward, True

    def sample_action(self) -> np.ndarray:
        return self._rng.permutation(self.ACTION_DIM).astype(np.int64)

    def get_domain_name(self) -> str:
        return self._domain_name

    def get_capability_names(self) -> list[str]:
        return self._cap_names