Spaces:
Running
Running
| """Generator agent — derives structured roadmap artifacts from graph-property context.""" | |
| import asyncio | |
| import json | |
| import logging | |
| from typing import Any | |
| from backend.agents.retriever import EnrichedCapability | |
| from backend.llm.client import LLMClient, extract_json | |
| from backend.llm.prompts import build_epic_prompt, build_roadmap_structure_prompt | |
| from backend.schemas.request import AnalyzeRequest | |
| from backend.schemas.response import ( | |
| RoadmapPhase, | |
| EpicArtifact, | |
| Feature, | |
| UserStory, | |
| Task, | |
| ) | |
| log = logging.getLogger(__name__) | |
| COMPLEXITY_TO_PHASE = { | |
| "low": 1, | |
| "medium": 1, | |
| "high": 2, | |
| "very_high": 3, | |
| } | |
| MATURITY_TO_PHASE = { | |
| "<1yr": 1, | |
| "1-2yr": 1, | |
| "2-5yr": 2, | |
| "5+yr": 3, | |
| } | |
| def _assign_phase(cap: EnrichedCapability) -> int: | |
| cx = (cap.capability.get("implementation_complexity") or "medium").lower() | |
| phase = COMPLEXITY_TO_PHASE.get(cx, 2) | |
| if cap.trend: | |
| horizon = cap.trend.get("time_horizon") or "2-5yr" | |
| trend_phase = MATURITY_TO_PHASE.get(horizon, 2) | |
| # Use trend phase only if it pushes later, not earlier | |
| phase = max(phase, trend_phase) | |
| # Duration-based override: short tasks → earlier phases | |
| duration = cap.capability.get("typical_duration_weeks") or 12 | |
| if duration <= 8 and phase > 1: | |
| phase = 1 | |
| elif duration >= 24 and phase < 3: | |
| phase = min(phase + 1, 3) | |
| return min(phase, 3) | |
| def _build_epic_from_cap(cap: EnrichedCapability, request: AnalyzeRequest) -> dict: | |
| """Build the context dict passed to the LLM prompt.""" | |
| c = cap.capability | |
| std = cap.standard or {} | |
| trend = cap.trend or {} | |
| sd = cap.subdomain or {} | |
| return { | |
| "org_type": request.org_type, | |
| "goals": request.goals, | |
| "budget_tier": request.budget_tier, | |
| "timeline_months": request.timeline_months, | |
| "risk_tolerance": request.risk_tolerance, | |
| "domain_name": cap.domain.get("name", ""), | |
| "subdomain_name": sd.get("name", ""), | |
| "subdomain_functional_scope": sd.get("functional_scope", ""), | |
| "subdomain_business_driver": sd.get("business_driver", ""), | |
| "cap_name": c.get("name", ""), | |
| "cap_description": c.get("description", ""), | |
| "cap_business_outcomes": c.get("business_outcomes") or [], | |
| "cap_risk_factors": c.get("risk_factors") or [], | |
| "cap_kpis": c.get("kpis") or [], | |
| "cap_duration_weeks": c.get("typical_duration_weeks") or 12, | |
| "cap_complexity": c.get("implementation_complexity") or "medium", | |
| "cap_frameworks": c.get("common_frameworks") or [], | |
| "cap_solution_patterns": c.get("solution_patterns") or [], | |
| "std_name": std.get("name", ""), | |
| "std_publisher": std.get("publisher", ""), | |
| "std_version": std.get("version", ""), | |
| "std_key_principles": std.get("key_principles") or [], | |
| "std_compliance_requirements": std.get("compliance_requirements") or [], | |
| "trend_name": trend.get("name", ""), | |
| "trend_source": trend.get("source", ""), | |
| "trend_impact": trend.get("impact_level", ""), | |
| "trend_maturity": trend.get("maturity", ""), | |
| "trend_horizon": trend.get("time_horizon", ""), | |
| "trend_business_impact": trend.get("business_impact", ""), | |
| "trend_enablers": trend.get("technology_enablers") or [], | |
| "subcapabilities": [sc.get("name", "") for sc in cap.subcapabilities], | |
| } | |
| def _parse_epic_response(raw: dict, cap: EnrichedCapability, phase_num: int) -> EpicArtifact: | |
| """Parse LLM JSON response into EpicArtifact, injecting compliance ACs if missing.""" | |
| std_reqs = (cap.standard or {}).get("compliance_requirements") or [] | |
| cap_kpis = cap.capability.get("kpis") or [] | |
| features: list[Feature] = [] | |
| for f in raw.get("features") or []: | |
| stories: list[UserStory] = [] | |
| for s in f.get("user_stories") or []: | |
| raw_tasks = s.get("tasks") or [] | |
| tasks = [] | |
| for t in raw_tasks: | |
| if isinstance(t, dict): | |
| tasks.append(Task( | |
| title=t.get("title") or t.get("name") or "", | |
| description=t.get("description") or "", | |
| estimated_days=int(t.get("estimated_days") or 3), | |
| assignee_role=t.get("assignee_role") or "", | |
| )) | |
| elif isinstance(t, str): | |
| tasks.append(Task(title=t)) | |
| stories.append( | |
| UserStory( | |
| role=s.get("role", "architect"), | |
| want=s.get("want", ""), | |
| so_that=s.get("so_that", ""), | |
| acceptance_criteria=s.get("acceptance_criteria") or [], | |
| tasks=tasks, | |
| ) | |
| ) | |
| features.append( | |
| Feature( | |
| title=f.get("title", ""), | |
| description=f.get("description", ""), | |
| technical_notes=f.get("technical_notes", ""), | |
| user_stories=stories, | |
| estimated_story_points=f.get("estimated_story_points"), | |
| ) | |
| ) | |
| # Ensure compliance ACs appear at epic level | |
| epic_acs: list[str] = list(raw.get("acceptance_criteria") or []) | |
| for req in std_reqs: | |
| if req and not any(req[:30] in ac for ac in epic_acs): | |
| epic_acs.append(f"[Compliance] {req}") | |
| for kpi in cap_kpis: | |
| if kpi and not any(kpi[:30] in ac for ac in epic_acs): | |
| epic_acs.append(f"[KPI] {kpi}") | |
| return EpicArtifact( | |
| epic_id=f"EPIC-{cap.capability.get('id', 'unknown')[:8].upper()}", | |
| title=raw.get("title") or cap.capability.get("name", ""), | |
| description=raw.get("description") or cap.capability.get("description") or "", | |
| business_value=raw.get("business_value") or "", | |
| strategic_rationale=raw.get("strategic_rationale") or (cap.trend or {}).get("business_impact") or "", | |
| governance_reference=( | |
| f"{(cap.standard or {}).get('name','')} — {(cap.standard or {}).get('publisher','')}" | |
| ).strip(" —"), | |
| trend_alignment=(cap.trend or {}).get("name") or "", | |
| acceptance_criteria=epic_acs, | |
| features=features, | |
| risk_register=raw.get("risks") or cap.capability.get("risk_factors") or [], | |
| estimated_sprints=max(1, (cap.capability.get("typical_duration_weeks") or 12) // 2), | |
| phase=phase_num, | |
| subdomain_group=cap.subdomain.get("name") or "", | |
| ) | |
| class GeneratorAgent: | |
| def __init__(self, llm: LLMClient): | |
| self.llm = llm | |
| async def _generate_epic(self, ctx: dict, cap: EnrichedCapability, phase_num: int) -> EpicArtifact: | |
| prompt = build_epic_prompt(ctx) | |
| try: | |
| raw_text = await self.llm.chat( | |
| messages=[{"role": "user", "content": prompt}], | |
| max_tokens=4096, | |
| temperature=0.4, | |
| ) | |
| raw = extract_json(raw_text) | |
| if isinstance(raw, dict): | |
| return _parse_epic_response(raw, cap, phase_num) | |
| except Exception as exc: | |
| log.warning(f"Epic generation failed for {cap.capability.get('name')}: {exc}") | |
| # Deterministic fallback from graph properties | |
| return _parse_epic_response({}, cap, phase_num) | |
| async def generate( | |
| self, | |
| caps: list[EnrichedCapability], | |
| request: AnalyzeRequest, | |
| compliance_issues: list[str] | None = None, | |
| ) -> list[RoadmapPhase]: | |
| phase_map: dict[int, list[EpicArtifact]] = {1: [], 2: [], 3: []} | |
| capped = caps[:20] | |
| async def _process(idx_cap: tuple[int, EnrichedCapability]) -> tuple[int, EpicArtifact]: | |
| idx, cap = idx_cap | |
| # Rank-based bucketing ensures phase variety: top third→1, mid→2, bottom→3 | |
| n = len(capped) | |
| rank_phase = 1 if idx < n // 3 else (2 if idx < 2 * n // 3 else 3) | |
| # Property-based phase can only push LATER (never earlier than rank suggests) | |
| prop_phase = _assign_phase(cap) | |
| phase_num = max(rank_phase, prop_phase) if prop_phase == 3 else rank_phase | |
| ctx = _build_epic_from_cap(cap, request) | |
| if compliance_issues: | |
| ctx["compliance_issues"] = compliance_issues | |
| epic = await self._generate_epic(ctx, cap, phase_num) | |
| return phase_num, epic | |
| results = await asyncio.gather(*[_process((i, c)) for i, c in enumerate(capped)]) | |
| for phase_num, epic in results: | |
| phase_map[phase_num].append(epic) | |
| phase_names = { | |
| 1: "Foundation & Quick Wins", | |
| 2: "Core Transformation", | |
| 3: "Advanced Capabilities", | |
| } | |
| phase_durations = { | |
| 1: min(request.timeline_months // 3, 6), | |
| 2: min(request.timeline_months // 3, 12), | |
| 3: request.timeline_months - (min(request.timeline_months // 3, 6) + min(request.timeline_months // 3, 12)), | |
| } | |
| phases: list[RoadmapPhase] = [] | |
| for num in [1, 2, 3]: | |
| epics = phase_map[num] | |
| if not epics: | |
| continue | |
| phases.append( | |
| RoadmapPhase( | |
| phase_number=num, | |
| phase_name=phase_names[num], | |
| duration_months=max(phase_durations[num], 1), | |
| epics=epics, | |
| objectives=[ | |
| e.business_value for e in epics[:3] if e.business_value | |
| ], | |
| key_milestones=[e.title for e in epics[:2]], | |
| ) | |
| ) | |
| return phases | |