File size: 6,320 Bytes
557ee65
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
from core.pipeline.step import PipelineStep
from observability import logger as obs_logger
from observability import components as obs_components
from datetime import datetime
import logging

logger = logging.getLogger(__name__)

class IntelligenceStep(PipelineStep):
    name = "intelligence_generation"

    def __init__(
        self,
        insights_agent,
        plan_agent,
        brief_service,
        weekly_repo,
        guardrail_service,
        positioning_change_service,
        goal_progress_service,
        goal_service
    ):
        self.insights_agent = insights_agent
        self.plan_agent = plan_agent
        self.brief_service = brief_service
        self.weekly_repo = weekly_repo
        self.guardrail_service = guardrail_service
        self.positioning_change_service = positioning_change_service
        self.goal_progress_service = goal_progress_service
        self.goal_service = goal_service

    async def run(self, context):
        if context.weekly_snapshot is None:
            obs_logger.log_event(
                "info",
                "Skipping insights generation: no run data available",
                event="insights_skipped_no_data",
                component=obs_components.PIPELINE,
            )
            return

        state_changed = self.positioning_change_service.has_changed(context.last_positioning, context.positioning)
        
        if state_changed:
            obs_logger.log_event(
                "info",
                "Positioning state changed materially - regenerating intelligence",
                event="intelligence_regenerated",
                component=obs_components.ORCHESTRATOR
            )
            
            # If insights or plan are not already populated (by parallel execution in pipeline), run them sequentially
            if context.insights is None:
                await self._run_insights(context)
            
            if context.plan is None:
                await self._run_plan(context)

            # Brief is always sequential as it depends on trends/snapshot being stable
            brief, focus = await self._run_brief(context)
            
            # Update cache in context
            self._update_cache(context, brief, focus)
        else:
            obs_logger.log_event(
                "info",
                f"Reuse intelligence for state: {context.positioning.position_status}",
                event="intelligence_reused",
                component=obs_components.ORCHESTRATOR,
                positioning_state=context.positioning.position_status
            )
            context.insights = context.last_insights
            context.plan = context.last_plan
            
            # Apply cached brief to current snapshot
            if context.weekly_snapshot:
                context.weekly_snapshot.performance_brief = context.last_brief
                context.weekly_snapshot.performance_focus = context.last_focus
                if self.weekly_repo:
                    self.weekly_repo.save(context.weekly_snapshot)

    def run_parallel_agents(self, context):
        """
        Extraction point for parallel execution.
        Returns a list of awaitable tasks for independent agents.
        """
        if context.weekly_snapshot is None:
            return []

        # Check if state changed before deciding to run parallel tasks
        # NOTE: This repeats the logic but ensures we don't fire LLM calls if not needed
        state_changed = self.positioning_change_service.has_changed(context.last_positioning, context.positioning)
        if not state_changed:
            return []

        return [
            self._run_insights(context),
            self._run_plan(context)
        ]

    async def _run_insights(self, context):
        latest_run = context.runs[-1] if context.runs else None
        if isinstance(latest_run, dict):
            from domain.training.run import Run
            latest_run = Run(**latest_run)

        if context.weekly_snapshot is None or not context.runs:
            obs_logger.log_event(
                "info",
                "Skipping insights generation: no run data available",
                event="insights_skipped_no_data",
                component=obs_components.PIPELINE,
            )
            return

        context.insights = await self.insights_agent.run(
            latest_run,
            context.trends,
            risk_level=context.risk_assessment.risk_level,
            language=context.language,
            profile=context.runner_profile,
            goal=context.active_goal,
        )

    async def _run_plan(self, context):
        plan_result = await self.plan_agent.run(
            context.summary, language=context.language, profile=context.runner_profile, goal=context.active_goal
        )
        draft_plan = plan_result.get("plan", "")
        context.plan = self.guardrail_service.apply(draft_plan, context.risk_assessment)

    async def _run_brief(self, context):
        brief, focus = "", ""
        if self.brief_service and context.weekly_snapshot and context.weekly_trend:
            goal_progress = self.goal_progress_service.compute(
                self.goal_service, context.active_goal, context.weekly_snapshot, context.weekly_trend
            )
            brief, focus = await self.brief_service.generate_brief(
                context.weekly_snapshot, context.weekly_trend, goal_progress=goal_progress, language=context.language
            )
            context.weekly_snapshot.performance_brief = brief
            context.weekly_snapshot.performance_focus = focus
            context.weekly_snapshot.brief_source_hash = self.brief_service.compute_brief_hash(
                context.weekly_snapshot, context.weekly_trend, goal_progress=goal_progress, language=context.language
            )
            context.weekly_snapshot.brief_generated_at = datetime.now()
            if self.weekly_repo:
                self.weekly_repo.save(context.weekly_snapshot)
        return brief, focus

    def _update_cache(self, context, brief, focus):
        context.last_positioning = context.positioning
        context.last_insights = context.insights
        context.last_plan = context.plan
        context.last_brief = brief
        context.last_focus = focus