Spaces:
Running
Running
File size: 9,658 Bytes
6252f54 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 | """Generator agent — derives structured roadmap artifacts from graph-property context."""
import asyncio
import json
import logging
from typing import Any
from backend.agents.retriever import EnrichedCapability
from backend.llm.client import LLMClient, extract_json
from backend.llm.prompts import build_epic_prompt, build_roadmap_structure_prompt
from backend.schemas.request import AnalyzeRequest
from backend.schemas.response import (
RoadmapPhase,
EpicArtifact,
Feature,
UserStory,
Task,
)
log = logging.getLogger(__name__)
COMPLEXITY_TO_PHASE = {
"low": 1,
"medium": 1,
"high": 2,
"very_high": 3,
}
MATURITY_TO_PHASE = {
"<1yr": 1,
"1-2yr": 1,
"2-5yr": 2,
"5+yr": 3,
}
def _assign_phase(cap: EnrichedCapability) -> int:
cx = (cap.capability.get("implementation_complexity") or "medium").lower()
phase = COMPLEXITY_TO_PHASE.get(cx, 2)
if cap.trend:
horizon = cap.trend.get("time_horizon") or "2-5yr"
trend_phase = MATURITY_TO_PHASE.get(horizon, 2)
# Use trend phase only if it pushes later, not earlier
phase = max(phase, trend_phase)
# Duration-based override: short tasks → earlier phases
duration = cap.capability.get("typical_duration_weeks") or 12
if duration <= 8 and phase > 1:
phase = 1
elif duration >= 24 and phase < 3:
phase = min(phase + 1, 3)
return min(phase, 3)
def _build_epic_from_cap(cap: EnrichedCapability, request: AnalyzeRequest) -> dict:
"""Build the context dict passed to the LLM prompt."""
c = cap.capability
std = cap.standard or {}
trend = cap.trend or {}
sd = cap.subdomain or {}
return {
"org_type": request.org_type,
"goals": request.goals,
"budget_tier": request.budget_tier,
"timeline_months": request.timeline_months,
"risk_tolerance": request.risk_tolerance,
"domain_name": cap.domain.get("name", ""),
"subdomain_name": sd.get("name", ""),
"subdomain_functional_scope": sd.get("functional_scope", ""),
"subdomain_business_driver": sd.get("business_driver", ""),
"cap_name": c.get("name", ""),
"cap_description": c.get("description", ""),
"cap_business_outcomes": c.get("business_outcomes") or [],
"cap_risk_factors": c.get("risk_factors") or [],
"cap_kpis": c.get("kpis") or [],
"cap_duration_weeks": c.get("typical_duration_weeks") or 12,
"cap_complexity": c.get("implementation_complexity") or "medium",
"cap_frameworks": c.get("common_frameworks") or [],
"cap_solution_patterns": c.get("solution_patterns") or [],
"std_name": std.get("name", ""),
"std_publisher": std.get("publisher", ""),
"std_version": std.get("version", ""),
"std_key_principles": std.get("key_principles") or [],
"std_compliance_requirements": std.get("compliance_requirements") or [],
"trend_name": trend.get("name", ""),
"trend_source": trend.get("source", ""),
"trend_impact": trend.get("impact_level", ""),
"trend_maturity": trend.get("maturity", ""),
"trend_horizon": trend.get("time_horizon", ""),
"trend_business_impact": trend.get("business_impact", ""),
"trend_enablers": trend.get("technology_enablers") or [],
"subcapabilities": [sc.get("name", "") for sc in cap.subcapabilities],
}
def _parse_epic_response(raw: dict, cap: EnrichedCapability, phase_num: int) -> EpicArtifact:
"""Parse LLM JSON response into EpicArtifact, injecting compliance ACs if missing."""
std_reqs = (cap.standard or {}).get("compliance_requirements") or []
cap_kpis = cap.capability.get("kpis") or []
features: list[Feature] = []
for f in raw.get("features") or []:
stories: list[UserStory] = []
for s in f.get("user_stories") or []:
raw_tasks = s.get("tasks") or []
tasks = []
for t in raw_tasks:
if isinstance(t, dict):
tasks.append(Task(
title=t.get("title") or t.get("name") or "",
description=t.get("description") or "",
estimated_days=int(t.get("estimated_days") or 3),
assignee_role=t.get("assignee_role") or "",
))
elif isinstance(t, str):
tasks.append(Task(title=t))
stories.append(
UserStory(
role=s.get("role", "architect"),
want=s.get("want", ""),
so_that=s.get("so_that", ""),
acceptance_criteria=s.get("acceptance_criteria") or [],
tasks=tasks,
)
)
features.append(
Feature(
title=f.get("title", ""),
description=f.get("description", ""),
technical_notes=f.get("technical_notes", ""),
user_stories=stories,
estimated_story_points=f.get("estimated_story_points"),
)
)
# Ensure compliance ACs appear at epic level
epic_acs: list[str] = list(raw.get("acceptance_criteria") or [])
for req in std_reqs:
if req and not any(req[:30] in ac for ac in epic_acs):
epic_acs.append(f"[Compliance] {req}")
for kpi in cap_kpis:
if kpi and not any(kpi[:30] in ac for ac in epic_acs):
epic_acs.append(f"[KPI] {kpi}")
return EpicArtifact(
epic_id=f"EPIC-{cap.capability.get('id', 'unknown')[:8].upper()}",
title=raw.get("title") or cap.capability.get("name", ""),
description=raw.get("description") or cap.capability.get("description") or "",
business_value=raw.get("business_value") or "",
strategic_rationale=raw.get("strategic_rationale") or (cap.trend or {}).get("business_impact") or "",
governance_reference=(
f"{(cap.standard or {}).get('name','')} — {(cap.standard or {}).get('publisher','')}"
).strip(" —"),
trend_alignment=(cap.trend or {}).get("name") or "",
acceptance_criteria=epic_acs,
features=features,
risk_register=raw.get("risks") or cap.capability.get("risk_factors") or [],
estimated_sprints=max(1, (cap.capability.get("typical_duration_weeks") or 12) // 2),
phase=phase_num,
subdomain_group=cap.subdomain.get("name") or "",
)
class GeneratorAgent:
def __init__(self, llm: LLMClient):
self.llm = llm
async def _generate_epic(self, ctx: dict, cap: EnrichedCapability, phase_num: int) -> EpicArtifact:
prompt = build_epic_prompt(ctx)
try:
raw_text = await self.llm.chat(
messages=[{"role": "user", "content": prompt}],
max_tokens=4096,
temperature=0.4,
)
raw = extract_json(raw_text)
if isinstance(raw, dict):
return _parse_epic_response(raw, cap, phase_num)
except Exception as exc:
log.warning(f"Epic generation failed for {cap.capability.get('name')}: {exc}")
# Deterministic fallback from graph properties
return _parse_epic_response({}, cap, phase_num)
async def generate(
self,
caps: list[EnrichedCapability],
request: AnalyzeRequest,
compliance_issues: list[str] | None = None,
) -> list[RoadmapPhase]:
phase_map: dict[int, list[EpicArtifact]] = {1: [], 2: [], 3: []}
capped = caps[:20]
async def _process(idx_cap: tuple[int, EnrichedCapability]) -> tuple[int, EpicArtifact]:
idx, cap = idx_cap
# Rank-based bucketing ensures phase variety: top third→1, mid→2, bottom→3
n = len(capped)
rank_phase = 1 if idx < n // 3 else (2 if idx < 2 * n // 3 else 3)
# Property-based phase can only push LATER (never earlier than rank suggests)
prop_phase = _assign_phase(cap)
phase_num = max(rank_phase, prop_phase) if prop_phase == 3 else rank_phase
ctx = _build_epic_from_cap(cap, request)
if compliance_issues:
ctx["compliance_issues"] = compliance_issues
epic = await self._generate_epic(ctx, cap, phase_num)
return phase_num, epic
results = await asyncio.gather(*[_process((i, c)) for i, c in enumerate(capped)])
for phase_num, epic in results:
phase_map[phase_num].append(epic)
phase_names = {
1: "Foundation & Quick Wins",
2: "Core Transformation",
3: "Advanced Capabilities",
}
phase_durations = {
1: min(request.timeline_months // 3, 6),
2: min(request.timeline_months // 3, 12),
3: request.timeline_months - (min(request.timeline_months // 3, 6) + min(request.timeline_months // 3, 12)),
}
phases: list[RoadmapPhase] = []
for num in [1, 2, 3]:
epics = phase_map[num]
if not epics:
continue
phases.append(
RoadmapPhase(
phase_number=num,
phase_name=phase_names[num],
duration_months=max(phase_durations[num], 1),
epics=epics,
objectives=[
e.business_value for e in epics[:3] if e.business_value
],
key_milestones=[e.title for e in epics[:2]],
)
)
return phases
|