runner-ai-intelligence / src /pipeline_steps /intelligence_step.py
avfranco's picture
HF Space deploy snapshot (minimal allow-list)
557ee65
from core.pipeline.step import PipelineStep
from observability import logger as obs_logger
from observability import components as obs_components
from datetime import datetime
import logging
logger = logging.getLogger(__name__)
class IntelligenceStep(PipelineStep):
name = "intelligence_generation"
def __init__(
self,
insights_agent,
plan_agent,
brief_service,
weekly_repo,
guardrail_service,
positioning_change_service,
goal_progress_service,
goal_service
):
self.insights_agent = insights_agent
self.plan_agent = plan_agent
self.brief_service = brief_service
self.weekly_repo = weekly_repo
self.guardrail_service = guardrail_service
self.positioning_change_service = positioning_change_service
self.goal_progress_service = goal_progress_service
self.goal_service = goal_service
async def run(self, context):
if context.weekly_snapshot is None:
obs_logger.log_event(
"info",
"Skipping insights generation: no run data available",
event="insights_skipped_no_data",
component=obs_components.PIPELINE,
)
return
state_changed = self.positioning_change_service.has_changed(context.last_positioning, context.positioning)
if state_changed:
obs_logger.log_event(
"info",
"Positioning state changed materially - regenerating intelligence",
event="intelligence_regenerated",
component=obs_components.ORCHESTRATOR
)
# If insights or plan are not already populated (by parallel execution in pipeline), run them sequentially
if context.insights is None:
await self._run_insights(context)
if context.plan is None:
await self._run_plan(context)
# Brief is always sequential as it depends on trends/snapshot being stable
brief, focus = await self._run_brief(context)
# Update cache in context
self._update_cache(context, brief, focus)
else:
obs_logger.log_event(
"info",
f"Reuse intelligence for state: {context.positioning.position_status}",
event="intelligence_reused",
component=obs_components.ORCHESTRATOR,
positioning_state=context.positioning.position_status
)
context.insights = context.last_insights
context.plan = context.last_plan
# Apply cached brief to current snapshot
if context.weekly_snapshot:
context.weekly_snapshot.performance_brief = context.last_brief
context.weekly_snapshot.performance_focus = context.last_focus
if self.weekly_repo:
self.weekly_repo.save(context.weekly_snapshot)
def run_parallel_agents(self, context):
"""
Extraction point for parallel execution.
Returns a list of awaitable tasks for independent agents.
"""
if context.weekly_snapshot is None:
return []
# Check if state changed before deciding to run parallel tasks
# NOTE: This repeats the logic but ensures we don't fire LLM calls if not needed
state_changed = self.positioning_change_service.has_changed(context.last_positioning, context.positioning)
if not state_changed:
return []
return [
self._run_insights(context),
self._run_plan(context)
]
async def _run_insights(self, context):
latest_run = context.runs[-1] if context.runs else None
if isinstance(latest_run, dict):
from domain.training.run import Run
latest_run = Run(**latest_run)
if context.weekly_snapshot is None or not context.runs:
obs_logger.log_event(
"info",
"Skipping insights generation: no run data available",
event="insights_skipped_no_data",
component=obs_components.PIPELINE,
)
return
context.insights = await self.insights_agent.run(
latest_run,
context.trends,
risk_level=context.risk_assessment.risk_level,
language=context.language,
profile=context.runner_profile,
goal=context.active_goal,
)
async def _run_plan(self, context):
plan_result = await self.plan_agent.run(
context.summary, language=context.language, profile=context.runner_profile, goal=context.active_goal
)
draft_plan = plan_result.get("plan", "")
context.plan = self.guardrail_service.apply(draft_plan, context.risk_assessment)
async def _run_brief(self, context):
brief, focus = "", ""
if self.brief_service and context.weekly_snapshot and context.weekly_trend:
goal_progress = self.goal_progress_service.compute(
self.goal_service, context.active_goal, context.weekly_snapshot, context.weekly_trend
)
brief, focus = await self.brief_service.generate_brief(
context.weekly_snapshot, context.weekly_trend, goal_progress=goal_progress, language=context.language
)
context.weekly_snapshot.performance_brief = brief
context.weekly_snapshot.performance_focus = focus
context.weekly_snapshot.brief_source_hash = self.brief_service.compute_brief_hash(
context.weekly_snapshot, context.weekly_trend, goal_progress=goal_progress, language=context.language
)
context.weekly_snapshot.brief_generated_at = datetime.now()
if self.weekly_repo:
self.weekly_repo.save(context.weekly_snapshot)
return brief, focus
def _update_cache(self, context, brief, focus):
context.last_positioning = context.positioning
context.last_insights = context.insights
context.last_plan = context.plan
context.last_brief = brief
context.last_focus = focus