Spaces:
Running
Running
File size: 6,320 Bytes
557ee65 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 | from core.pipeline.step import PipelineStep
from observability import logger as obs_logger
from observability import components as obs_components
from datetime import datetime
import logging
logger = logging.getLogger(__name__)
class IntelligenceStep(PipelineStep):
name = "intelligence_generation"
def __init__(
self,
insights_agent,
plan_agent,
brief_service,
weekly_repo,
guardrail_service,
positioning_change_service,
goal_progress_service,
goal_service
):
self.insights_agent = insights_agent
self.plan_agent = plan_agent
self.brief_service = brief_service
self.weekly_repo = weekly_repo
self.guardrail_service = guardrail_service
self.positioning_change_service = positioning_change_service
self.goal_progress_service = goal_progress_service
self.goal_service = goal_service
async def run(self, context):
if context.weekly_snapshot is None:
obs_logger.log_event(
"info",
"Skipping insights generation: no run data available",
event="insights_skipped_no_data",
component=obs_components.PIPELINE,
)
return
state_changed = self.positioning_change_service.has_changed(context.last_positioning, context.positioning)
if state_changed:
obs_logger.log_event(
"info",
"Positioning state changed materially - regenerating intelligence",
event="intelligence_regenerated",
component=obs_components.ORCHESTRATOR
)
# If insights or plan are not already populated (by parallel execution in pipeline), run them sequentially
if context.insights is None:
await self._run_insights(context)
if context.plan is None:
await self._run_plan(context)
# Brief is always sequential as it depends on trends/snapshot being stable
brief, focus = await self._run_brief(context)
# Update cache in context
self._update_cache(context, brief, focus)
else:
obs_logger.log_event(
"info",
f"Reuse intelligence for state: {context.positioning.position_status}",
event="intelligence_reused",
component=obs_components.ORCHESTRATOR,
positioning_state=context.positioning.position_status
)
context.insights = context.last_insights
context.plan = context.last_plan
# Apply cached brief to current snapshot
if context.weekly_snapshot:
context.weekly_snapshot.performance_brief = context.last_brief
context.weekly_snapshot.performance_focus = context.last_focus
if self.weekly_repo:
self.weekly_repo.save(context.weekly_snapshot)
def run_parallel_agents(self, context):
"""
Extraction point for parallel execution.
Returns a list of awaitable tasks for independent agents.
"""
if context.weekly_snapshot is None:
return []
# Check if state changed before deciding to run parallel tasks
# NOTE: This repeats the logic but ensures we don't fire LLM calls if not needed
state_changed = self.positioning_change_service.has_changed(context.last_positioning, context.positioning)
if not state_changed:
return []
return [
self._run_insights(context),
self._run_plan(context)
]
async def _run_insights(self, context):
latest_run = context.runs[-1] if context.runs else None
if isinstance(latest_run, dict):
from domain.training.run import Run
latest_run = Run(**latest_run)
if context.weekly_snapshot is None or not context.runs:
obs_logger.log_event(
"info",
"Skipping insights generation: no run data available",
event="insights_skipped_no_data",
component=obs_components.PIPELINE,
)
return
context.insights = await self.insights_agent.run(
latest_run,
context.trends,
risk_level=context.risk_assessment.risk_level,
language=context.language,
profile=context.runner_profile,
goal=context.active_goal,
)
async def _run_plan(self, context):
plan_result = await self.plan_agent.run(
context.summary, language=context.language, profile=context.runner_profile, goal=context.active_goal
)
draft_plan = plan_result.get("plan", "")
context.plan = self.guardrail_service.apply(draft_plan, context.risk_assessment)
async def _run_brief(self, context):
brief, focus = "", ""
if self.brief_service and context.weekly_snapshot and context.weekly_trend:
goal_progress = self.goal_progress_service.compute(
self.goal_service, context.active_goal, context.weekly_snapshot, context.weekly_trend
)
brief, focus = await self.brief_service.generate_brief(
context.weekly_snapshot, context.weekly_trend, goal_progress=goal_progress, language=context.language
)
context.weekly_snapshot.performance_brief = brief
context.weekly_snapshot.performance_focus = focus
context.weekly_snapshot.brief_source_hash = self.brief_service.compute_brief_hash(
context.weekly_snapshot, context.weekly_trend, goal_progress=goal_progress, language=context.language
)
context.weekly_snapshot.brief_generated_at = datetime.now()
if self.weekly_repo:
self.weekly_repo.save(context.weekly_snapshot)
return brief, focus
def _update_cache(self, context, brief, focus):
context.last_positioning = context.positioning
context.last_insights = context.insights
context.last_plan = context.plan
context.last_brief = brief
context.last_focus = focus
|