TheQuantEd's picture
deploy: AMD EA Strategy Optimizer — Neo4j + FastAPI + Streamlit
6252f54
"""Generator agent — derives structured roadmap artifacts from graph-property context."""
import asyncio
import json
import logging
from typing import Any
from backend.agents.retriever import EnrichedCapability
from backend.llm.client import LLMClient, extract_json
from backend.llm.prompts import build_epic_prompt, build_roadmap_structure_prompt
from backend.schemas.request import AnalyzeRequest
from backend.schemas.response import (
RoadmapPhase,
EpicArtifact,
Feature,
UserStory,
Task,
)
log = logging.getLogger(__name__)
COMPLEXITY_TO_PHASE = {
"low": 1,
"medium": 1,
"high": 2,
"very_high": 3,
}
MATURITY_TO_PHASE = {
"<1yr": 1,
"1-2yr": 1,
"2-5yr": 2,
"5+yr": 3,
}
def _assign_phase(cap: EnrichedCapability) -> int:
cx = (cap.capability.get("implementation_complexity") or "medium").lower()
phase = COMPLEXITY_TO_PHASE.get(cx, 2)
if cap.trend:
horizon = cap.trend.get("time_horizon") or "2-5yr"
trend_phase = MATURITY_TO_PHASE.get(horizon, 2)
# Use trend phase only if it pushes later, not earlier
phase = max(phase, trend_phase)
# Duration-based override: short tasks → earlier phases
duration = cap.capability.get("typical_duration_weeks") or 12
if duration <= 8 and phase > 1:
phase = 1
elif duration >= 24 and phase < 3:
phase = min(phase + 1, 3)
return min(phase, 3)
def _build_epic_from_cap(cap: EnrichedCapability, request: AnalyzeRequest) -> dict:
"""Build the context dict passed to the LLM prompt."""
c = cap.capability
std = cap.standard or {}
trend = cap.trend or {}
sd = cap.subdomain or {}
return {
"org_type": request.org_type,
"goals": request.goals,
"budget_tier": request.budget_tier,
"timeline_months": request.timeline_months,
"risk_tolerance": request.risk_tolerance,
"domain_name": cap.domain.get("name", ""),
"subdomain_name": sd.get("name", ""),
"subdomain_functional_scope": sd.get("functional_scope", ""),
"subdomain_business_driver": sd.get("business_driver", ""),
"cap_name": c.get("name", ""),
"cap_description": c.get("description", ""),
"cap_business_outcomes": c.get("business_outcomes") or [],
"cap_risk_factors": c.get("risk_factors") or [],
"cap_kpis": c.get("kpis") or [],
"cap_duration_weeks": c.get("typical_duration_weeks") or 12,
"cap_complexity": c.get("implementation_complexity") or "medium",
"cap_frameworks": c.get("common_frameworks") or [],
"cap_solution_patterns": c.get("solution_patterns") or [],
"std_name": std.get("name", ""),
"std_publisher": std.get("publisher", ""),
"std_version": std.get("version", ""),
"std_key_principles": std.get("key_principles") or [],
"std_compliance_requirements": std.get("compliance_requirements") or [],
"trend_name": trend.get("name", ""),
"trend_source": trend.get("source", ""),
"trend_impact": trend.get("impact_level", ""),
"trend_maturity": trend.get("maturity", ""),
"trend_horizon": trend.get("time_horizon", ""),
"trend_business_impact": trend.get("business_impact", ""),
"trend_enablers": trend.get("technology_enablers") or [],
"subcapabilities": [sc.get("name", "") for sc in cap.subcapabilities],
}
def _parse_epic_response(raw: dict, cap: EnrichedCapability, phase_num: int) -> EpicArtifact:
"""Parse LLM JSON response into EpicArtifact, injecting compliance ACs if missing."""
std_reqs = (cap.standard or {}).get("compliance_requirements") or []
cap_kpis = cap.capability.get("kpis") or []
features: list[Feature] = []
for f in raw.get("features") or []:
stories: list[UserStory] = []
for s in f.get("user_stories") or []:
raw_tasks = s.get("tasks") or []
tasks = []
for t in raw_tasks:
if isinstance(t, dict):
tasks.append(Task(
title=t.get("title") or t.get("name") or "",
description=t.get("description") or "",
estimated_days=int(t.get("estimated_days") or 3),
assignee_role=t.get("assignee_role") or "",
))
elif isinstance(t, str):
tasks.append(Task(title=t))
stories.append(
UserStory(
role=s.get("role", "architect"),
want=s.get("want", ""),
so_that=s.get("so_that", ""),
acceptance_criteria=s.get("acceptance_criteria") or [],
tasks=tasks,
)
)
features.append(
Feature(
title=f.get("title", ""),
description=f.get("description", ""),
technical_notes=f.get("technical_notes", ""),
user_stories=stories,
estimated_story_points=f.get("estimated_story_points"),
)
)
# Ensure compliance ACs appear at epic level
epic_acs: list[str] = list(raw.get("acceptance_criteria") or [])
for req in std_reqs:
if req and not any(req[:30] in ac for ac in epic_acs):
epic_acs.append(f"[Compliance] {req}")
for kpi in cap_kpis:
if kpi and not any(kpi[:30] in ac for ac in epic_acs):
epic_acs.append(f"[KPI] {kpi}")
return EpicArtifact(
epic_id=f"EPIC-{cap.capability.get('id', 'unknown')[:8].upper()}",
title=raw.get("title") or cap.capability.get("name", ""),
description=raw.get("description") or cap.capability.get("description") or "",
business_value=raw.get("business_value") or "",
strategic_rationale=raw.get("strategic_rationale") or (cap.trend or {}).get("business_impact") or "",
governance_reference=(
f"{(cap.standard or {}).get('name','')} — {(cap.standard or {}).get('publisher','')}"
).strip(" —"),
trend_alignment=(cap.trend or {}).get("name") or "",
acceptance_criteria=epic_acs,
features=features,
risk_register=raw.get("risks") or cap.capability.get("risk_factors") or [],
estimated_sprints=max(1, (cap.capability.get("typical_duration_weeks") or 12) // 2),
phase=phase_num,
subdomain_group=cap.subdomain.get("name") or "",
)
class GeneratorAgent:
def __init__(self, llm: LLMClient):
self.llm = llm
async def _generate_epic(self, ctx: dict, cap: EnrichedCapability, phase_num: int) -> EpicArtifact:
prompt = build_epic_prompt(ctx)
try:
raw_text = await self.llm.chat(
messages=[{"role": "user", "content": prompt}],
max_tokens=4096,
temperature=0.4,
)
raw = extract_json(raw_text)
if isinstance(raw, dict):
return _parse_epic_response(raw, cap, phase_num)
except Exception as exc:
log.warning(f"Epic generation failed for {cap.capability.get('name')}: {exc}")
# Deterministic fallback from graph properties
return _parse_epic_response({}, cap, phase_num)
async def generate(
self,
caps: list[EnrichedCapability],
request: AnalyzeRequest,
compliance_issues: list[str] | None = None,
) -> list[RoadmapPhase]:
phase_map: dict[int, list[EpicArtifact]] = {1: [], 2: [], 3: []}
capped = caps[:20]
async def _process(idx_cap: tuple[int, EnrichedCapability]) -> tuple[int, EpicArtifact]:
idx, cap = idx_cap
# Rank-based bucketing ensures phase variety: top third→1, mid→2, bottom→3
n = len(capped)
rank_phase = 1 if idx < n // 3 else (2 if idx < 2 * n // 3 else 3)
# Property-based phase can only push LATER (never earlier than rank suggests)
prop_phase = _assign_phase(cap)
phase_num = max(rank_phase, prop_phase) if prop_phase == 3 else rank_phase
ctx = _build_epic_from_cap(cap, request)
if compliance_issues:
ctx["compliance_issues"] = compliance_issues
epic = await self._generate_epic(ctx, cap, phase_num)
return phase_num, epic
results = await asyncio.gather(*[_process((i, c)) for i, c in enumerate(capped)])
for phase_num, epic in results:
phase_map[phase_num].append(epic)
phase_names = {
1: "Foundation & Quick Wins",
2: "Core Transformation",
3: "Advanced Capabilities",
}
phase_durations = {
1: min(request.timeline_months // 3, 6),
2: min(request.timeline_months // 3, 12),
3: request.timeline_months - (min(request.timeline_months // 3, 6) + min(request.timeline_months // 3, 12)),
}
phases: list[RoadmapPhase] = []
for num in [1, 2, 3]:
epics = phase_map[num]
if not epics:
continue
phases.append(
RoadmapPhase(
phase_number=num,
phase_name=phase_names[num],
duration_months=max(phase_durations[num], 1),
epics=epics,
objectives=[
e.business_value for e in epics[:3] if e.business_value
],
key_milestones=[e.title for e in epics[:2]],
)
)
return phases