import logging import uuid import time from datetime import date from typing import List, Dict, Any, Optional from observability import logger as obs_logger from observability import components as obs_components # LLM imports from llm import get_llm_client from llm.model_registry import select_model_for_agent # Local Agent Imports from .feature_engineering.agent import FeatureEngineeringAgent from .insights.agent import InsightsAgent from .plan.agent import PlanAgent from .visualization.agent import VisualizationAgent from .chat_agent import ChatAgent from .guardrail_agent import InjuryFatigueGuardrailAgent from tools.stores import InMemoryFeatureStore, InMemoryPlanStore from tools.viz_executor import TemplateVisualizationExecutor from router.router import async_route from router.hooks import NoOpHook # New domain imports from domain.runner.profile import RunnerProfile from domain.training.run import Run from domain.training.weekly_snapshot import WeeklySnapshot from domain.training.trend_snapshot import TrendSnapshot from domain.training.agent_models import AnalysisRecord, WeeklyTrends, WeeklySummary, RiskAssessment # New persistence imports from persistence.db import Database from persistence.repositories.runner_repo import RunnerRepository from persistence.repositories.run_repo import RunRepository from persistence.repositories.weekly_repo import WeeklySnapshotRepository from persistence.repositories.trend_repo import TrendSnapshotRepository from persistence.repositories.analysis_repo import AnalysisRepository from persistence.repositories.goal_repo import SqlGoalRepository from persistence.repositories.null_goal_repo import NullGoalRepository from persistence.repositories.planned_session_repository import PlannedSessionRepository # New engine imports from engines.trend.trend_engine import TrendEngine from engines.period_comparison_engine import compute_period_comparison # New stateless builders from domain.training.weekly_snapshot_builder import WeeklySnapshotBuilder from domain.training.weekly_trend_builder import WeeklyTrendBuilder from domain.training.weekly_trend import WeeklyTrend # New service imports from services.snapshot_service import SnapshotService from services.brief_service import BriefService from services.goal_service import GoalService from services.structure_service import StructureService from services.run_persistence_service import RunPersistenceService from application.runner_positioning_service import RunnerPositioningService from domain.runner_positioning import RunnerPositioning from application.positioning_service import WeeklyPositioning, PositioningEngine from application.recommendation_service import RecommendationService from services.guardrail_arbitration_service import GuardrailArbitrationService from services.positioning_change_service import PositioningChangeService from services.goal_progress_service import GoalProgressService from application.goal_trajectory_service import GoalTrajectoryService import config # Pipeline imports from core.pipeline.pipeline import RunnerPipeline from core.pipeline.context import PipelineContext from pipeline_steps.feature_engineering_step import FeatureEngineeringStep from pipeline_steps.persist_runs_step import PersistRunsStep from pipeline_steps.structure_step import StructureStep from pipeline_steps.snapshot_step import SnapshotStep from pipeline_steps.guardrail_step import GuardrailStep from pipeline_steps.positioning_step import PositioningStep from pipeline_steps.goal_trajectory_step import GoalTrajectoryStep from pipeline_steps.intelligence_step import IntelligenceStep from pipeline_steps.visualization_step import VisualizationStep from pipeline_steps.comparison_step import ComparisonStep from pipeline_steps.persist_analysis_step import PersistAnalysisStep from core.intelligence.snapshot_builder import build_intelligence_snapshot # Configure logging logger = logging.getLogger(__name__) class RunnerOrchestrator: """ Central orchestration component for Runner Intelligence. Coordinates pipeline execution, agents, and services using the RunnerPipeline architecture. """ def __init__(self, feature_store=None, plan_store=None, viz_executor=None, db=None): logger.info("Initializing RunnerOrchestrator") self.session_id = config.generate_session_id() # Tool boundaries self.feature_store = feature_store or InMemoryFeatureStore() self.plan_store = plan_store or InMemoryPlanStore() self.viz_executor = viz_executor or TemplateVisualizationExecutor() # New persistence layer self.db = db self.runner_repo = None self.run_repo = None self.weekly_repo = None self.trend_repo = None self.analysis_repo = None self.goal_repo = None self.planned_repo = None self.snapshot_service = None # Mode Discipline: Resolve storage implementation at Orchestrator boundary if not self.db: if config.is_persistence_enabled(): self.db = Database(config.STORAGE_DB_PATH) else: # Fallback to volatile in-memory storage for HF Spaces/transient sessions self.db = Database(":memory:") self.db.initialize_schema() # Inject dependencies self.runner_repo = RunnerRepository(self.db) self.run_repo = RunRepository(self.db) self.weekly_repo = WeeklySnapshotRepository(self.db) self.trend_repo = TrendSnapshotRepository(self.db) self.analysis_repo = AnalysisRepository(self.db) if config.is_persistence_enabled(): self.goal_repo = SqlGoalRepository(self.db) else: self.goal_repo = NullGoalRepository() self.planned_repo = PlannedSessionRepository(self.db) # Services receive resolved dependencies self.structure_service = StructureService( planned_repo=self.planned_repo, goal_repo=self.goal_repo, runner_repo=self.runner_repo, ) self.trend_engine = TrendEngine() self.run_persistence_service = RunPersistenceService(self.run_repo) # Stateless Builders self.weekly_snapshot_builder = WeeklySnapshotBuilder() self.weekly_trend_builder = WeeklyTrendBuilder() self.snapshot_service = SnapshotService( runner_repo=self.runner_repo, run_repo=self.run_repo, weekly_repo=self.weekly_repo, trend_repo=self.trend_repo, weekly_builder=self.weekly_snapshot_builder, trend_engine=self.trend_engine, structure_service=self.structure_service, brief_service=None, # Will be set after brief_service is initialized ) self.goal_service = GoalService(goal_repo=self.goal_repo) self.positioning_service = RunnerPositioningService() self.positioning_engine = PositioningEngine() self.recommendation_service = RecommendationService() # New isolation services self.guardrail_arbitration_service = GuardrailArbitrationService() self.positioning_change_service = PositioningChangeService() self.goal_progress_service = GoalProgressService() self.goal_trajectory_service = GoalTrajectoryService() # LLM Clients per Agent def _get_agent_client(agent_name: str): profile = select_model_for_agent(agent_name) return get_llm_client( provider=profile.provider, model_name=profile.model_name, temperature=config.LLM_TEMPERATURE, max_tokens=config.LLM_MAX_TOKENS, drop_params=getattr(config, "LLM_DROP_PARAMS", False), ) # Instantiate agents logger.info("Instantiating agents with specific model profiles") self.feature_agent = FeatureEngineeringAgent() self.insights_agent = InsightsAgent(llm_client=_get_agent_client("InsightsAgent")) self.plan_agent = PlanAgent(llm_client=_get_agent_client("PlanAgent")) self.visualization_agent = VisualizationAgent( llm_client=_get_agent_client("VisualizationAgent") ) self.chat_agent = ChatAgent(llm_client=_get_agent_client("ChatAgent")) # We also need a client for the router self.router_client = _get_agent_client("Router") self.guardrail_agent = InjuryFatigueGuardrailAgent() # Instantiate dedicated BriefService self.brief_service = BriefService(llm_client=_get_agent_client("BriefService")) self.snapshot_service.brief_service = self.brief_service # Latest results state self.latest_insights = {} self.latest_plan = None self.latest_summary = {} self.latest_trends = {} self.latest_risk_assessment = None # Session snapshot cache for stateless mode history self.session_snapshots: Dict[date, WeeklySnapshot] = {} self.session_runner_positioning: Dict[date, RunnerPositioning] = {} self.session_view_positioning: Dict[date, WeeklyPositioning] = {} self.session_goal_trajectory: Dict[date, GoalTrajectory] = {} # State-driven intelligence cache (v1 optimization) self._last_positioning: Optional[RunnerPositioning] = None self._last_insights: Dict[str, Any] = {} self._last_plan: Optional[str] = None self._last_brief: str = "" self._last_focus: str = "" self.latest_goal_trajectory: Optional[GoalTrajectory] = None logger.info("RunnerOrchestrator initialized") async def _initialize(self): """Asynchronous initialization (RunnerOrchestrator session initialization removed).""" logger.info("Orchestrator skipping RunnerOrchestrator session initialization") pass def reset(self): """Resets the orchestrator state by clearing stores and results.""" logger.info("Resetting orchestrator state") self.feature_store.clear() self.plan_store.clear() self.latest_insights = {} self.latest_plan = None self.latest_summary = {} self.latest_risk_assessment = None self.session_snapshots = {} self.session_runner_positioning = {} self.session_view_positioning = {} self.session_goal_trajectory = {} self.latest_goal_trajectory = None self._last_positioning = None self._last_insights = {} self._last_plan = None self._last_brief = "" self._last_focus = "" self.latest_goal_trajectory = None self.latest_positioning = None self.latest_plan = None self.latest_brief = "" self.latest_focus = "" self.latest_risk_assessment = None self.latest_summary = {} self.latest_positioning = None self.latest_goal_trajectory = None self.analysis_repo = None async def run(self, runs: List[Dict[str, Any]], language: str = "en") -> Dict[str, Any]: obs_logger.bind_new_trace_id() with obs_logger.start_span("orchestrator.run", obs_components.ORCHESTRATOR): start_time = time.time() logger.info(f"Starting orchestration run with trace_id: {obs_logger.get_trace_id()}") # 1. Initialize Pipeline Context context = PipelineContext( runs=runs, language=language, ) context.sessions = self.session_snapshots # Link to session cache # Restore state for intelligence reuse context.last_positioning = self._last_positioning context.last_insights = self._last_insights context.last_plan = self._last_plan context.last_brief = self._last_brief context.last_focus = self._last_focus # Load or create profile from datetime import datetime if self.runner_repo: profile = self.runner_repo.get_runner_profile(uuid.UUID(config.DEFAULT_RUNNER_ID)) if not profile: logger.info("No profile found, creating default") profile = RunnerProfile( runner_id=uuid.UUID(config.DEFAULT_RUNNER_ID), created_at=datetime.now(), updated_at=datetime.now(), ) self.runner_repo.save(profile) context.runner_profile = profile if self.goal_repo: context.active_goal = self.goal_repo.get_active_goal(uuid.UUID(config.DEFAULT_RUNNER_ID)) # 2. Build Pipeline pipeline = RunnerPipeline([ FeatureEngineeringStep(self.feature_agent), PersistRunsStep(self.run_persistence_service), StructureStep(self.structure_service), SnapshotStep(self.snapshot_service, self.weekly_trend_builder), GuardrailStep(self.guardrail_agent), PositioningStep( self.positioning_service, self.recommendation_service, self.goal_progress_service, self.goal_service ), GoalTrajectoryStep(self.goal_service, self.goal_trajectory_service), IntelligenceStep( self.insights_agent, self.plan_agent, self.brief_service, self.weekly_repo, self.guardrail_arbitration_service, self.positioning_change_service, self.goal_progress_service, self.goal_service ), VisualizationStep(self.visualization_agent, self.viz_executor), ComparisonStep(self.weekly_repo), PersistAnalysisStep(self.analysis_repo) ]) # 3. Execute Pipeline try: await pipeline.execute(context) # Produce Intelligence Snapshot context.intelligence_snapshot = build_intelligence_snapshot(context) if context.intelligence_snapshot and context.intelligence_snapshot.week_start: obs_logger.log_event( "info", "Runner intelligence snapshot generated", component=obs_components.APPLICATION, week=str(context.intelligence_snapshot.week_start), ) # Update legacy stores for backward compatibility detailed_features = [ f.model_dump(mode="json") if hasattr(f, "model_dump") else f for f in context.runs ] for feat in detailed_features: run_id = feat.get("id") or str(uuid.uuid4()) self.feature_store.put_features(run_id, feat) if context.summary: self.feature_store.put_weekly_summary("latest", context.summary) if context.plan: self.plan_store.save_plan("latest", context.plan) # Store latest in orchestrator for chat consistency self.latest_insights = context.insights self.latest_plan = context.plan self.latest_summary = context.summary self.latest_risk_assessment = context.risk_assessment # Update orchestrator cache from context self._last_positioning = context.last_positioning self._last_insights = context.last_insights self._last_plan = context.last_plan self._last_brief = context.last_brief self._last_focus = context.last_focus self.latest_goal_trajectory = getattr(context, "goal_trajectory", None) if self.latest_goal_trajectory and context.weekly_snapshot: self.session_goal_trajectory[context.weekly_snapshot.week_start_date] = self.latest_goal_trajectory logger.info("Orchestration run completed via Pipeline architecture") return { "features": detailed_features, "insights": context.insights, "plan": context.plan, "risk_assessment": context.risk_assessment.model_dump() if context.risk_assessment else None, "trends": context.trends, "period_comparison": context.period_comparison, "charts": context.charts, "mode": "RunnerOrchestrator", "profile": context.runner_profile.model_dump() if context.runner_profile else None, "goal_progress": self.goal_progress_service.compute(self.goal_service, context.active_goal, context.weekly_snapshot, context.period_comparison), "goal_trajectory": context.goal_trajectory if hasattr(context, "goal_trajectory") else None, "positioning": context.positioning.model_dump() if context.positioning else None, "recommendation": context.recommendation.model_dump() if context.recommendation else None, } except Exception as e: obs_logger.log_event( "error", f"Orchestration pipeline failed: {e}", event="error", component=obs_components.ORCHESTRATOR, error_type=type(e).__name__, error_message=str(e), duration_ms=(time.time() - start_time) * 1000, ) raise def _persist_analysis( self, features: List[Dict], insights: Any, plan: str, risk_assessment: RiskAssessment, trends: WeeklyTrends, ): """Helper to persist analysis artefacts safely.""" if not self.analysis_repo: return try: # Combine trends and risk assessment into run_summary run_summary = trends.model_dump() if risk_assessment: run_summary["risk_assessment"] = risk_assessment.model_dump() record = AnalysisRecord( source_files=[], formats=[], run_summary=run_summary, run_timeseries=features, insights_json=( insights if isinstance(insights, dict) else ( insights.model_dump() if hasattr(insights, "model_dump") else {"content": str(insights)} ) ), plan_json={"content": plan}, route_json={}, ) self.analysis_repo.save(record) logger.info(f"Analysis persisted with ID: {record.id}") except Exception as e: logger.warning(f"Failed to persist analysis: {e}") async def chat(self, message: str, language: str = "en") -> Dict[str, Any]: """ Handles a chat message from the user. Returns a dictionary with 'response' (text) and optionally 'chart' (figure). """ obs_logger.bind_new_trace_id() with obs_logger.start_span("orchestrator.chat", obs_components.ORCHESTRATOR): start_time = time.time() logger.info(f"Starting chat handling with trace_id: {obs_logger.get_trace_id()}") try: session_features = self.feature_store.get_all_features() # Load all historic features to provide a complete picture historic_features = [] if self.run_repo: try: # Note: RunRepository needs mapping from analyses for now # but get_all_features usually returns all from all analyses historic_features = ( self.analysis_repo.get_all_features() if hasattr(self.analysis_repo, "get_all_features") else [] ) except Exception as e: logger.warning(f"Failed to load historic features: {e}") # Merge and deduplicate all_features_map = {f.get("id") or f.get("start_time"): f for f in historic_features} for f in session_features: key = f.get("id") or f.get("start_time") all_features_map[key] = f all_features = list(all_features_map.values()) # Issue #30: Auto-inject last 4 insights if no new upload in this session auto_injected_insights = [] is_no_upload = not session_features if is_no_upload and self.analysis_repo: try: # Fetch exactly 4 per requirement auto_injected_insights = self.analysis_repo.get_recent_insights(limit=4) logger.info( f"Auto-injected {len(auto_injected_insights)} recent insights (no upload detected)" ) except Exception as e: logger.warning(f"Failed to load auto-injected insights: {e}") # Global "No Data" check if not all_features and not auto_injected_insights: return { "response": "I don't have any running data from you yet. Please upload your .tcx / .fit files so I can help you!" } # 1. Route the request using the new structured router decision = await async_route( message, raw_context={"features": all_features}, llm_client=self.router_client ) intent = decision.route logger.info(f"Chat intent detected: {intent}") if intent == "CHART": # Delegate to Visualization Agent to get specs chart_specs = await self.visualization_agent.run(all_features, query=message) if chart_specs: # Render the first one for the chat interface spec = chart_specs[0] fig = self.viz_executor.render_chart(spec, all_features) return { "response": f"Here is the {spec.title or spec.chart_type} you asked for.", "chart": fig, } else: return { "response": "I couldn't find a way to visualize those metrics with your current data. Could you try asking for a summary instead?" } else: # Delegate to Chat Agent summary = self.latest_summary trends = self.latest_trends # If session state is raw dict (e.g. from previous run before refactor), # convert to model or fallback to building from snapshots if isinstance(summary, dict) or isinstance(trends, dict) or not summary: summary, trends = self.snapshot_service.build_agent_context() latest_plan = self.latest_plan or self.plan_store.get_latest_plan() current_insights = self.latest_insights # Maintain the 10-limit historic context for general memory if needed, # but we'll use auto_injected_insights specifically for the prompt injection. historic_insights = [] if self.analysis_repo: try: historic_insights = self.analysis_repo.get_recent_insights(limit=10) except Exception as e: logger.warning(f"Failed to load historic insights: {e}") response_text = await self.chat_agent.run( message, context={ "features": all_features, "insights": current_insights, "plan": latest_plan, "summary": summary, "historic_insights": historic_insights, "auto_injected_insights": auto_injected_insights, "is_no_upload": is_no_upload, }, language=language, ) return {"response": response_text} except Exception as e: return {"response": f"I encountered an error: {str(e)}"} def load_last_analysis(self) -> Optional[AnalysisRecord]: """Debug helper to load the last persisted analysis.""" if not self.analysis_repo: return None return self.analysis_repo.get_last_analysis() async def generate_performance_card( self, snapshot: WeeklySnapshot, trend: WeeklyTrend, language: str = "en" ) -> Any: """ Centralized entry point for generating performance cards. Enforces architecture boundary by keeping BriefService internal. """ obs_logger.ensure_trace() with obs_logger.start_span("orchestrator.performance_card", component=obs_components.ORCHESTRATOR): from services.performance_card_service import PerformanceCardService # Boundary Enforcement: Orchestrator owns the nested span and event emission with obs_logger.start_span("performance_card.generate", component=obs_components.APPLICATION): service = PerformanceCardService() card = await service.generate( snapshot=snapshot, trend=trend, brief_service=self.brief_service, language=language, ) obs_logger.log_event( "info", "Performance card generated successfully", event="performance_card_generated", component=obs_components.ORCHESTRATOR, **{ "week_start": str(snapshot.week_start_date), "comparison_available": trend.comparison_available, "comparison_type": trend.comparison_type, "llm_used": card.llm_used, # Use actual execution status }, ) return card async def generate_runner_positioning( self, snapshot: WeeklySnapshot, trend: WeeklyTrend, language: str = "en" ) -> WeeklyPositioning: """ Generates runner positioning assessment (v1). Uses session memory to avoid recomputation if possible. """ obs_logger.ensure_trace() with obs_logger.start_span("positioning.generate", component=obs_components.APPLICATION): # 1. Check View Cache if snapshot.week_start_date in self.session_view_positioning: return self.session_view_positioning[snapshot.week_start_date] # 2. Recompute if missing (e.g. initial load without new analysis) positioning = self.positioning_engine.compute(snapshot, trend) self.session_view_positioning[snapshot.week_start_date] = positioning # 3. Emit Event (STEP 10) obs_logger.log_event( "info", "Runner positioning computed", event="positioning_computed", component=obs_components.APPLICATION, week=str(snapshot.week_start_date), status=positioning.status, # WeeklyPositioning has 'status' signal_strength=str(positioning.signal_strength) ) return positioning async def get_orchestrator(use_RunnerOrchestrator: bool = True): orchestrator = RunnerOrchestrator() await orchestrator._initialize() return orchestrator