from core.pipeline.step import PipelineStep from observability import logger as obs_logger from observability import components as obs_components from datetime import datetime import logging logger = logging.getLogger(__name__) class IntelligenceStep(PipelineStep): name = "intelligence_generation" def __init__( self, insights_agent, plan_agent, brief_service, weekly_repo, guardrail_service, positioning_change_service, goal_progress_service, goal_service ): self.insights_agent = insights_agent self.plan_agent = plan_agent self.brief_service = brief_service self.weekly_repo = weekly_repo self.guardrail_service = guardrail_service self.positioning_change_service = positioning_change_service self.goal_progress_service = goal_progress_service self.goal_service = goal_service async def run(self, context): if context.weekly_snapshot is None: obs_logger.log_event( "info", "Skipping insights generation: no run data available", event="insights_skipped_no_data", component=obs_components.PIPELINE, ) return state_changed = self.positioning_change_service.has_changed(context.last_positioning, context.positioning) if state_changed: obs_logger.log_event( "info", "Positioning state changed materially - regenerating intelligence", event="intelligence_regenerated", component=obs_components.ORCHESTRATOR ) # If insights or plan are not already populated (by parallel execution in pipeline), run them sequentially if context.insights is None: await self._run_insights(context) if context.plan is None: await self._run_plan(context) # Brief is always sequential as it depends on trends/snapshot being stable brief, focus = await self._run_brief(context) # Update cache in context self._update_cache(context, brief, focus) else: obs_logger.log_event( "info", f"Reuse intelligence for state: {context.positioning.position_status}", event="intelligence_reused", component=obs_components.ORCHESTRATOR, positioning_state=context.positioning.position_status ) context.insights = context.last_insights context.plan = context.last_plan # Apply cached brief to current snapshot if context.weekly_snapshot: context.weekly_snapshot.performance_brief = context.last_brief context.weekly_snapshot.performance_focus = context.last_focus if self.weekly_repo: self.weekly_repo.save(context.weekly_snapshot) def run_parallel_agents(self, context): """ Extraction point for parallel execution. Returns a list of awaitable tasks for independent agents. """ if context.weekly_snapshot is None: return [] # Check if state changed before deciding to run parallel tasks # NOTE: This repeats the logic but ensures we don't fire LLM calls if not needed state_changed = self.positioning_change_service.has_changed(context.last_positioning, context.positioning) if not state_changed: return [] return [ self._run_insights(context), self._run_plan(context) ] async def _run_insights(self, context): latest_run = context.runs[-1] if context.runs else None if isinstance(latest_run, dict): from domain.training.run import Run latest_run = Run(**latest_run) if context.weekly_snapshot is None or not context.runs: obs_logger.log_event( "info", "Skipping insights generation: no run data available", event="insights_skipped_no_data", component=obs_components.PIPELINE, ) return context.insights = await self.insights_agent.run( latest_run, context.trends, risk_level=context.risk_assessment.risk_level, language=context.language, profile=context.runner_profile, goal=context.active_goal, ) async def _run_plan(self, context): plan_result = await self.plan_agent.run( context.summary, language=context.language, profile=context.runner_profile, goal=context.active_goal ) draft_plan = plan_result.get("plan", "") context.plan = self.guardrail_service.apply(draft_plan, context.risk_assessment) async def _run_brief(self, context): brief, focus = "", "" if self.brief_service and context.weekly_snapshot and context.weekly_trend: goal_progress = self.goal_progress_service.compute( self.goal_service, context.active_goal, context.weekly_snapshot, context.weekly_trend ) brief, focus = await self.brief_service.generate_brief( context.weekly_snapshot, context.weekly_trend, goal_progress=goal_progress, language=context.language ) context.weekly_snapshot.performance_brief = brief context.weekly_snapshot.performance_focus = focus context.weekly_snapshot.brief_source_hash = self.brief_service.compute_brief_hash( context.weekly_snapshot, context.weekly_trend, goal_progress=goal_progress, language=context.language ) context.weekly_snapshot.brief_generated_at = datetime.now() if self.weekly_repo: self.weekly_repo.save(context.weekly_snapshot) return brief, focus def _update_cache(self, context, brief, focus): context.last_positioning = context.positioning context.last_insights = context.insights context.last_plan = context.plan context.last_brief = brief context.last_focus = focus