runner-ai-intelligence / src /agents /orchestrator.py
avfranco's picture
HF Space deploy snapshot (minimal allow-list)
557ee65
import logging
import uuid
import time
from datetime import date
from typing import List, Dict, Any, Optional
from observability import logger as obs_logger
from observability import components as obs_components
# LLM imports
from llm import get_llm_client
from llm.model_registry import select_model_for_agent
# Local Agent Imports
from .feature_engineering.agent import FeatureEngineeringAgent
from .insights.agent import InsightsAgent
from .plan.agent import PlanAgent
from .visualization.agent import VisualizationAgent
from .chat_agent import ChatAgent
from .guardrail_agent import InjuryFatigueGuardrailAgent
from tools.stores import InMemoryFeatureStore, InMemoryPlanStore
from tools.viz_executor import TemplateVisualizationExecutor
from router.router import async_route
from router.hooks import NoOpHook
# New domain imports
from domain.runner.profile import RunnerProfile
from domain.training.run import Run
from domain.training.weekly_snapshot import WeeklySnapshot
from domain.training.trend_snapshot import TrendSnapshot
from domain.training.agent_models import AnalysisRecord, WeeklyTrends, WeeklySummary, RiskAssessment
# New persistence imports
from persistence.db import Database
from persistence.repositories.runner_repo import RunnerRepository
from persistence.repositories.run_repo import RunRepository
from persistence.repositories.weekly_repo import WeeklySnapshotRepository
from persistence.repositories.trend_repo import TrendSnapshotRepository
from persistence.repositories.analysis_repo import AnalysisRepository
from persistence.repositories.goal_repo import SqlGoalRepository
from persistence.repositories.null_goal_repo import NullGoalRepository
from persistence.repositories.planned_session_repository import PlannedSessionRepository
# New engine imports
from engines.trend.trend_engine import TrendEngine
from engines.period_comparison_engine import compute_period_comparison
# New stateless builders
from domain.training.weekly_snapshot_builder import WeeklySnapshotBuilder
from domain.training.weekly_trend_builder import WeeklyTrendBuilder
from domain.training.weekly_trend import WeeklyTrend
# New service imports
from services.snapshot_service import SnapshotService
from services.brief_service import BriefService
from services.goal_service import GoalService
from services.structure_service import StructureService
from services.run_persistence_service import RunPersistenceService
from application.runner_positioning_service import RunnerPositioningService
from domain.runner_positioning import RunnerPositioning
from application.positioning_service import WeeklyPositioning, PositioningEngine
from application.recommendation_service import RecommendationService
from services.guardrail_arbitration_service import GuardrailArbitrationService
from services.positioning_change_service import PositioningChangeService
from services.goal_progress_service import GoalProgressService
from application.goal_trajectory_service import GoalTrajectoryService
import config
# Pipeline imports
from core.pipeline.pipeline import RunnerPipeline
from core.pipeline.context import PipelineContext
from pipeline_steps.feature_engineering_step import FeatureEngineeringStep
from pipeline_steps.persist_runs_step import PersistRunsStep
from pipeline_steps.structure_step import StructureStep
from pipeline_steps.snapshot_step import SnapshotStep
from pipeline_steps.guardrail_step import GuardrailStep
from pipeline_steps.positioning_step import PositioningStep
from pipeline_steps.goal_trajectory_step import GoalTrajectoryStep
from pipeline_steps.intelligence_step import IntelligenceStep
from pipeline_steps.visualization_step import VisualizationStep
from pipeline_steps.comparison_step import ComparisonStep
from pipeline_steps.persist_analysis_step import PersistAnalysisStep
from core.intelligence.snapshot_builder import build_intelligence_snapshot
# Configure logging
logger = logging.getLogger(__name__)
class RunnerOrchestrator:
"""
Central orchestration component for Runner Intelligence.
Coordinates pipeline execution, agents, and services using the
RunnerPipeline architecture.
"""
def __init__(self, feature_store=None, plan_store=None, viz_executor=None, db=None):
logger.info("Initializing RunnerOrchestrator")
self.session_id = config.generate_session_id()
# Tool boundaries
self.feature_store = feature_store or InMemoryFeatureStore()
self.plan_store = plan_store or InMemoryPlanStore()
self.viz_executor = viz_executor or TemplateVisualizationExecutor()
# New persistence layer
self.db = db
self.runner_repo = None
self.run_repo = None
self.weekly_repo = None
self.trend_repo = None
self.analysis_repo = None
self.goal_repo = None
self.planned_repo = None
self.snapshot_service = None
# Mode Discipline: Resolve storage implementation at Orchestrator boundary
if not self.db:
if config.is_persistence_enabled():
self.db = Database(config.STORAGE_DB_PATH)
else:
# Fallback to volatile in-memory storage for HF Spaces/transient sessions
self.db = Database(":memory:")
self.db.initialize_schema()
# Inject dependencies
self.runner_repo = RunnerRepository(self.db)
self.run_repo = RunRepository(self.db)
self.weekly_repo = WeeklySnapshotRepository(self.db)
self.trend_repo = TrendSnapshotRepository(self.db)
self.analysis_repo = AnalysisRepository(self.db)
if config.is_persistence_enabled():
self.goal_repo = SqlGoalRepository(self.db)
else:
self.goal_repo = NullGoalRepository()
self.planned_repo = PlannedSessionRepository(self.db)
# Services receive resolved dependencies
self.structure_service = StructureService(
planned_repo=self.planned_repo,
goal_repo=self.goal_repo,
runner_repo=self.runner_repo,
)
self.trend_engine = TrendEngine()
self.run_persistence_service = RunPersistenceService(self.run_repo)
# Stateless Builders
self.weekly_snapshot_builder = WeeklySnapshotBuilder()
self.weekly_trend_builder = WeeklyTrendBuilder()
self.snapshot_service = SnapshotService(
runner_repo=self.runner_repo,
run_repo=self.run_repo,
weekly_repo=self.weekly_repo,
trend_repo=self.trend_repo,
weekly_builder=self.weekly_snapshot_builder,
trend_engine=self.trend_engine,
structure_service=self.structure_service,
brief_service=None, # Will be set after brief_service is initialized
)
self.goal_service = GoalService(goal_repo=self.goal_repo)
self.positioning_service = RunnerPositioningService()
self.positioning_engine = PositioningEngine()
self.recommendation_service = RecommendationService()
# New isolation services
self.guardrail_arbitration_service = GuardrailArbitrationService()
self.positioning_change_service = PositioningChangeService()
self.goal_progress_service = GoalProgressService()
self.goal_trajectory_service = GoalTrajectoryService()
# LLM Clients per Agent
def _get_agent_client(agent_name: str):
profile = select_model_for_agent(agent_name)
return get_llm_client(
provider=profile.provider,
model_name=profile.model_name,
temperature=config.LLM_TEMPERATURE,
max_tokens=config.LLM_MAX_TOKENS,
drop_params=getattr(config, "LLM_DROP_PARAMS", False),
)
# Instantiate agents
logger.info("Instantiating agents with specific model profiles")
self.feature_agent = FeatureEngineeringAgent()
self.insights_agent = InsightsAgent(llm_client=_get_agent_client("InsightsAgent"))
self.plan_agent = PlanAgent(llm_client=_get_agent_client("PlanAgent"))
self.visualization_agent = VisualizationAgent(
llm_client=_get_agent_client("VisualizationAgent")
)
self.chat_agent = ChatAgent(llm_client=_get_agent_client("ChatAgent"))
# We also need a client for the router
self.router_client = _get_agent_client("Router")
self.guardrail_agent = InjuryFatigueGuardrailAgent()
# Instantiate dedicated BriefService
self.brief_service = BriefService(llm_client=_get_agent_client("BriefService"))
self.snapshot_service.brief_service = self.brief_service
# Latest results state
self.latest_insights = {}
self.latest_plan = None
self.latest_summary = {}
self.latest_trends = {}
self.latest_risk_assessment = None
# Session snapshot cache for stateless mode history
self.session_snapshots: Dict[date, WeeklySnapshot] = {}
self.session_runner_positioning: Dict[date, RunnerPositioning] = {}
self.session_view_positioning: Dict[date, WeeklyPositioning] = {}
self.session_goal_trajectory: Dict[date, GoalTrajectory] = {}
# State-driven intelligence cache (v1 optimization)
self._last_positioning: Optional[RunnerPositioning] = None
self._last_insights: Dict[str, Any] = {}
self._last_plan: Optional[str] = None
self._last_brief: str = ""
self._last_focus: str = ""
self.latest_goal_trajectory: Optional[GoalTrajectory] = None
logger.info("RunnerOrchestrator initialized")
async def _initialize(self):
"""Asynchronous initialization (RunnerOrchestrator session initialization removed)."""
logger.info("Orchestrator skipping RunnerOrchestrator session initialization")
pass
def reset(self):
"""Resets the orchestrator state by clearing stores and results."""
logger.info("Resetting orchestrator state")
self.feature_store.clear()
self.plan_store.clear()
self.latest_insights = {}
self.latest_plan = None
self.latest_summary = {}
self.latest_risk_assessment = None
self.session_snapshots = {}
self.session_runner_positioning = {}
self.session_view_positioning = {}
self.session_goal_trajectory = {}
self.latest_goal_trajectory = None
self._last_positioning = None
self._last_insights = {}
self._last_plan = None
self._last_brief = ""
self._last_focus = ""
self.latest_goal_trajectory = None
self.latest_positioning = None
self.latest_plan = None
self.latest_brief = ""
self.latest_focus = ""
self.latest_risk_assessment = None
self.latest_summary = {}
self.latest_positioning = None
self.latest_goal_trajectory = None
self.analysis_repo = None
async def run(self, runs: List[Dict[str, Any]], language: str = "en") -> Dict[str, Any]:
obs_logger.bind_new_trace_id()
with obs_logger.start_span("orchestrator.run", obs_components.ORCHESTRATOR):
start_time = time.time()
logger.info(f"Starting orchestration run with trace_id: {obs_logger.get_trace_id()}")
# 1. Initialize Pipeline Context
context = PipelineContext(
runs=runs,
language=language,
)
context.sessions = self.session_snapshots # Link to session cache
# Restore state for intelligence reuse
context.last_positioning = self._last_positioning
context.last_insights = self._last_insights
context.last_plan = self._last_plan
context.last_brief = self._last_brief
context.last_focus = self._last_focus
# Load or create profile
from datetime import datetime
if self.runner_repo:
profile = self.runner_repo.get_runner_profile(uuid.UUID(config.DEFAULT_RUNNER_ID))
if not profile:
logger.info("No profile found, creating default")
profile = RunnerProfile(
runner_id=uuid.UUID(config.DEFAULT_RUNNER_ID),
created_at=datetime.now(),
updated_at=datetime.now(),
)
self.runner_repo.save(profile)
context.runner_profile = profile
if self.goal_repo:
context.active_goal = self.goal_repo.get_active_goal(uuid.UUID(config.DEFAULT_RUNNER_ID))
# 2. Build Pipeline
pipeline = RunnerPipeline([
FeatureEngineeringStep(self.feature_agent),
PersistRunsStep(self.run_persistence_service),
StructureStep(self.structure_service),
SnapshotStep(self.snapshot_service, self.weekly_trend_builder),
GuardrailStep(self.guardrail_agent),
PositioningStep(
self.positioning_service,
self.recommendation_service,
self.goal_progress_service,
self.goal_service
),
GoalTrajectoryStep(self.goal_service, self.goal_trajectory_service),
IntelligenceStep(
self.insights_agent,
self.plan_agent,
self.brief_service,
self.weekly_repo,
self.guardrail_arbitration_service,
self.positioning_change_service,
self.goal_progress_service,
self.goal_service
),
VisualizationStep(self.visualization_agent, self.viz_executor),
ComparisonStep(self.weekly_repo),
PersistAnalysisStep(self.analysis_repo)
])
# 3. Execute Pipeline
try:
await pipeline.execute(context)
# Produce Intelligence Snapshot
context.intelligence_snapshot = build_intelligence_snapshot(context)
if context.intelligence_snapshot and context.intelligence_snapshot.week_start:
obs_logger.log_event(
"info",
"Runner intelligence snapshot generated",
component=obs_components.APPLICATION,
week=str(context.intelligence_snapshot.week_start),
)
# Update legacy stores for backward compatibility
detailed_features = [
f.model_dump(mode="json") if hasattr(f, "model_dump") else f for f in context.runs
]
for feat in detailed_features:
run_id = feat.get("id") or str(uuid.uuid4())
self.feature_store.put_features(run_id, feat)
if context.summary:
self.feature_store.put_weekly_summary("latest", context.summary)
if context.plan:
self.plan_store.save_plan("latest", context.plan)
# Store latest in orchestrator for chat consistency
self.latest_insights = context.insights
self.latest_plan = context.plan
self.latest_summary = context.summary
self.latest_risk_assessment = context.risk_assessment
# Update orchestrator cache from context
self._last_positioning = context.last_positioning
self._last_insights = context.last_insights
self._last_plan = context.last_plan
self._last_brief = context.last_brief
self._last_focus = context.last_focus
self.latest_goal_trajectory = getattr(context, "goal_trajectory", None)
if self.latest_goal_trajectory and context.weekly_snapshot:
self.session_goal_trajectory[context.weekly_snapshot.week_start_date] = self.latest_goal_trajectory
logger.info("Orchestration run completed via Pipeline architecture")
return {
"features": detailed_features,
"insights": context.insights,
"plan": context.plan,
"risk_assessment": context.risk_assessment.model_dump() if context.risk_assessment else None,
"trends": context.trends,
"period_comparison": context.period_comparison,
"charts": context.charts,
"mode": "RunnerOrchestrator",
"profile": context.runner_profile.model_dump() if context.runner_profile else None,
"goal_progress": self.goal_progress_service.compute(self.goal_service, context.active_goal, context.weekly_snapshot, context.period_comparison),
"goal_trajectory": context.goal_trajectory if hasattr(context, "goal_trajectory") else None,
"positioning": context.positioning.model_dump() if context.positioning else None,
"recommendation": context.recommendation.model_dump() if context.recommendation else None,
}
except Exception as e:
obs_logger.log_event(
"error",
f"Orchestration pipeline failed: {e}",
event="error",
component=obs_components.ORCHESTRATOR,
error_type=type(e).__name__,
error_message=str(e),
duration_ms=(time.time() - start_time) * 1000,
)
raise
def _persist_analysis(
self,
features: List[Dict],
insights: Any,
plan: str,
risk_assessment: RiskAssessment,
trends: WeeklyTrends,
):
"""Helper to persist analysis artefacts safely."""
if not self.analysis_repo:
return
try:
# Combine trends and risk assessment into run_summary
run_summary = trends.model_dump()
if risk_assessment:
run_summary["risk_assessment"] = risk_assessment.model_dump()
record = AnalysisRecord(
source_files=[],
formats=[],
run_summary=run_summary,
run_timeseries=features,
insights_json=(
insights
if isinstance(insights, dict)
else (
insights.model_dump()
if hasattr(insights, "model_dump")
else {"content": str(insights)}
)
),
plan_json={"content": plan},
route_json={},
)
self.analysis_repo.save(record)
logger.info(f"Analysis persisted with ID: {record.id}")
except Exception as e:
logger.warning(f"Failed to persist analysis: {e}")
async def chat(self, message: str, language: str = "en") -> Dict[str, Any]:
"""
Handles a chat message from the user.
Returns a dictionary with 'response' (text) and optionally 'chart' (figure).
"""
obs_logger.bind_new_trace_id()
with obs_logger.start_span("orchestrator.chat", obs_components.ORCHESTRATOR):
start_time = time.time()
logger.info(f"Starting chat handling with trace_id: {obs_logger.get_trace_id()}")
try:
session_features = self.feature_store.get_all_features()
# Load all historic features to provide a complete picture
historic_features = []
if self.run_repo:
try:
# Note: RunRepository needs mapping from analyses for now
# but get_all_features usually returns all from all analyses
historic_features = (
self.analysis_repo.get_all_features()
if hasattr(self.analysis_repo, "get_all_features")
else []
)
except Exception as e:
logger.warning(f"Failed to load historic features: {e}")
# Merge and deduplicate
all_features_map = {f.get("id") or f.get("start_time"): f for f in historic_features}
for f in session_features:
key = f.get("id") or f.get("start_time")
all_features_map[key] = f
all_features = list(all_features_map.values())
# Issue #30: Auto-inject last 4 insights if no new upload in this session
auto_injected_insights = []
is_no_upload = not session_features
if is_no_upload and self.analysis_repo:
try:
# Fetch exactly 4 per requirement
auto_injected_insights = self.analysis_repo.get_recent_insights(limit=4)
logger.info(
f"Auto-injected {len(auto_injected_insights)} recent insights (no upload detected)"
)
except Exception as e:
logger.warning(f"Failed to load auto-injected insights: {e}")
# Global "No Data" check
if not all_features and not auto_injected_insights:
return {
"response": "I don't have any running data from you yet. Please upload your .tcx / .fit files so I can help you!"
}
# 1. Route the request using the new structured router
decision = await async_route(
message, raw_context={"features": all_features}, llm_client=self.router_client
)
intent = decision.route
logger.info(f"Chat intent detected: {intent}")
if intent == "CHART":
# Delegate to Visualization Agent to get specs
chart_specs = await self.visualization_agent.run(all_features, query=message)
if chart_specs:
# Render the first one for the chat interface
spec = chart_specs[0]
fig = self.viz_executor.render_chart(spec, all_features)
return {
"response": f"Here is the {spec.title or spec.chart_type} you asked for.",
"chart": fig,
}
else:
return {
"response": "I couldn't find a way to visualize those metrics with your current data. Could you try asking for a summary instead?"
}
else:
# Delegate to Chat Agent
summary = self.latest_summary
trends = self.latest_trends
# If session state is raw dict (e.g. from previous run before refactor),
# convert to model or fallback to building from snapshots
if isinstance(summary, dict) or isinstance(trends, dict) or not summary:
summary, trends = self.snapshot_service.build_agent_context()
latest_plan = self.latest_plan or self.plan_store.get_latest_plan()
current_insights = self.latest_insights
# Maintain the 10-limit historic context for general memory if needed,
# but we'll use auto_injected_insights specifically for the prompt injection.
historic_insights = []
if self.analysis_repo:
try:
historic_insights = self.analysis_repo.get_recent_insights(limit=10)
except Exception as e:
logger.warning(f"Failed to load historic insights: {e}")
response_text = await self.chat_agent.run(
message,
context={
"features": all_features,
"insights": current_insights,
"plan": latest_plan,
"summary": summary,
"historic_insights": historic_insights,
"auto_injected_insights": auto_injected_insights,
"is_no_upload": is_no_upload,
},
language=language,
)
return {"response": response_text}
except Exception as e:
return {"response": f"I encountered an error: {str(e)}"}
def load_last_analysis(self) -> Optional[AnalysisRecord]:
"""Debug helper to load the last persisted analysis."""
if not self.analysis_repo:
return None
return self.analysis_repo.get_last_analysis()
async def generate_performance_card(
self, snapshot: WeeklySnapshot, trend: WeeklyTrend, language: str = "en"
) -> Any:
"""
Centralized entry point for generating performance cards.
Enforces architecture boundary by keeping BriefService internal.
"""
obs_logger.ensure_trace()
with obs_logger.start_span("orchestrator.performance_card", component=obs_components.ORCHESTRATOR):
from services.performance_card_service import PerformanceCardService
# Boundary Enforcement: Orchestrator owns the nested span and event emission
with obs_logger.start_span("performance_card.generate", component=obs_components.APPLICATION):
service = PerformanceCardService()
card = await service.generate(
snapshot=snapshot,
trend=trend,
brief_service=self.brief_service,
language=language,
)
obs_logger.log_event(
"info",
"Performance card generated successfully",
event="performance_card_generated",
component=obs_components.ORCHESTRATOR,
**{
"week_start": str(snapshot.week_start_date),
"comparison_available": trend.comparison_available,
"comparison_type": trend.comparison_type,
"llm_used": card.llm_used, # Use actual execution status
},
)
return card
async def generate_runner_positioning(
self, snapshot: WeeklySnapshot, trend: WeeklyTrend, language: str = "en"
) -> WeeklyPositioning:
"""
Generates runner positioning assessment (v1).
Uses session memory to avoid recomputation if possible.
"""
obs_logger.ensure_trace()
with obs_logger.start_span("positioning.generate", component=obs_components.APPLICATION):
# 1. Check View Cache
if snapshot.week_start_date in self.session_view_positioning:
return self.session_view_positioning[snapshot.week_start_date]
# 2. Recompute if missing (e.g. initial load without new analysis)
positioning = self.positioning_engine.compute(snapshot, trend)
self.session_view_positioning[snapshot.week_start_date] = positioning
# 3. Emit Event (STEP 10)
obs_logger.log_event(
"info",
"Runner positioning computed",
event="positioning_computed",
component=obs_components.APPLICATION,
week=str(snapshot.week_start_date),
status=positioning.status, # WeeklyPositioning has 'status'
signal_strength=str(positioning.signal_strength)
)
return positioning
async def get_orchestrator(use_RunnerOrchestrator: bool = True):
orchestrator = RunnerOrchestrator()
await orchestrator._initialize()
return orchestrator