Spaces:
Running
Running
| from core.pipeline.step import PipelineStep | |
| from observability import logger as obs_logger | |
| from observability import components as obs_components | |
| from datetime import datetime | |
| import logging | |
| logger = logging.getLogger(__name__) | |
| class IntelligenceStep(PipelineStep): | |
| name = "intelligence_generation" | |
| def __init__( | |
| self, | |
| insights_agent, | |
| plan_agent, | |
| brief_service, | |
| weekly_repo, | |
| guardrail_service, | |
| positioning_change_service, | |
| goal_progress_service, | |
| goal_service | |
| ): | |
| self.insights_agent = insights_agent | |
| self.plan_agent = plan_agent | |
| self.brief_service = brief_service | |
| self.weekly_repo = weekly_repo | |
| self.guardrail_service = guardrail_service | |
| self.positioning_change_service = positioning_change_service | |
| self.goal_progress_service = goal_progress_service | |
| self.goal_service = goal_service | |
| async def run(self, context): | |
| if context.weekly_snapshot is None: | |
| obs_logger.log_event( | |
| "info", | |
| "Skipping insights generation: no run data available", | |
| event="insights_skipped_no_data", | |
| component=obs_components.PIPELINE, | |
| ) | |
| return | |
| state_changed = self.positioning_change_service.has_changed(context.last_positioning, context.positioning) | |
| if state_changed: | |
| obs_logger.log_event( | |
| "info", | |
| "Positioning state changed materially - regenerating intelligence", | |
| event="intelligence_regenerated", | |
| component=obs_components.ORCHESTRATOR | |
| ) | |
| # If insights or plan are not already populated (by parallel execution in pipeline), run them sequentially | |
| if context.insights is None: | |
| await self._run_insights(context) | |
| if context.plan is None: | |
| await self._run_plan(context) | |
| # Brief is always sequential as it depends on trends/snapshot being stable | |
| brief, focus = await self._run_brief(context) | |
| # Update cache in context | |
| self._update_cache(context, brief, focus) | |
| else: | |
| obs_logger.log_event( | |
| "info", | |
| f"Reuse intelligence for state: {context.positioning.position_status}", | |
| event="intelligence_reused", | |
| component=obs_components.ORCHESTRATOR, | |
| positioning_state=context.positioning.position_status | |
| ) | |
| context.insights = context.last_insights | |
| context.plan = context.last_plan | |
| # Apply cached brief to current snapshot | |
| if context.weekly_snapshot: | |
| context.weekly_snapshot.performance_brief = context.last_brief | |
| context.weekly_snapshot.performance_focus = context.last_focus | |
| if self.weekly_repo: | |
| self.weekly_repo.save(context.weekly_snapshot) | |
| def run_parallel_agents(self, context): | |
| """ | |
| Extraction point for parallel execution. | |
| Returns a list of awaitable tasks for independent agents. | |
| """ | |
| if context.weekly_snapshot is None: | |
| return [] | |
| # Check if state changed before deciding to run parallel tasks | |
| # NOTE: This repeats the logic but ensures we don't fire LLM calls if not needed | |
| state_changed = self.positioning_change_service.has_changed(context.last_positioning, context.positioning) | |
| if not state_changed: | |
| return [] | |
| return [ | |
| self._run_insights(context), | |
| self._run_plan(context) | |
| ] | |
| async def _run_insights(self, context): | |
| latest_run = context.runs[-1] if context.runs else None | |
| if isinstance(latest_run, dict): | |
| from domain.training.run import Run | |
| latest_run = Run(**latest_run) | |
| if context.weekly_snapshot is None or not context.runs: | |
| obs_logger.log_event( | |
| "info", | |
| "Skipping insights generation: no run data available", | |
| event="insights_skipped_no_data", | |
| component=obs_components.PIPELINE, | |
| ) | |
| return | |
| context.insights = await self.insights_agent.run( | |
| latest_run, | |
| context.trends, | |
| risk_level=context.risk_assessment.risk_level, | |
| language=context.language, | |
| profile=context.runner_profile, | |
| goal=context.active_goal, | |
| ) | |
| async def _run_plan(self, context): | |
| plan_result = await self.plan_agent.run( | |
| context.summary, language=context.language, profile=context.runner_profile, goal=context.active_goal | |
| ) | |
| draft_plan = plan_result.get("plan", "") | |
| context.plan = self.guardrail_service.apply(draft_plan, context.risk_assessment) | |
| async def _run_brief(self, context): | |
| brief, focus = "", "" | |
| if self.brief_service and context.weekly_snapshot and context.weekly_trend: | |
| goal_progress = self.goal_progress_service.compute( | |
| self.goal_service, context.active_goal, context.weekly_snapshot, context.weekly_trend | |
| ) | |
| brief, focus = await self.brief_service.generate_brief( | |
| context.weekly_snapshot, context.weekly_trend, goal_progress=goal_progress, language=context.language | |
| ) | |
| context.weekly_snapshot.performance_brief = brief | |
| context.weekly_snapshot.performance_focus = focus | |
| context.weekly_snapshot.brief_source_hash = self.brief_service.compute_brief_hash( | |
| context.weekly_snapshot, context.weekly_trend, goal_progress=goal_progress, language=context.language | |
| ) | |
| context.weekly_snapshot.brief_generated_at = datetime.now() | |
| if self.weekly_repo: | |
| self.weekly_repo.save(context.weekly_snapshot) | |
| return brief, focus | |
| def _update_cache(self, context, brief, focus): | |
| context.last_positioning = context.positioning | |
| context.last_insights = context.insights | |
| context.last_plan = context.plan | |
| context.last_brief = brief | |
| context.last_focus = focus | |