Spaces:
Running
Running
| import uuid | |
| from datetime import date | |
| from typing import Optional, List, Dict, Any, Tuple | |
| from domain.runner.profile import RunnerProfile | |
| from domain.training.weekly_snapshot import WeeklySnapshot | |
| from domain.training.trend_snapshot import TrendSnapshot | |
| from domain.training.weekly_snapshot_builder import WeeklySnapshotBuilder | |
| from engines.trend.trend_engine import TrendEngine | |
| from persistence.repositories.runner_repo import RunnerRepository | |
| from persistence.repositories.run_repo import RunRepository | |
| from persistence.repositories.weekly_repo import WeeklySnapshotRepository | |
| from persistence.repositories.trend_repo import TrendSnapshotRepository | |
| from observability import logger as obs_logger | |
| from observability import components as obs_components | |
| from services.brief_service import BriefService | |
| from datetime import datetime | |
| from services.structure_service import StructureService | |
| class SnapshotService: | |
| """ | |
| Orchestrates the recompute flow for weekly and trend snapshots. | |
| Strictly coordinates between engines and repositories. | |
| """ | |
| def __init__( | |
| self, | |
| runner_repo: RunnerRepository, | |
| run_repo: RunRepository, | |
| weekly_repo: WeeklySnapshotRepository, | |
| trend_repo: TrendSnapshotRepository, | |
| weekly_builder: WeeklySnapshotBuilder, | |
| trend_engine: TrendEngine, | |
| structure_service: Optional["StructureService"] = None, | |
| brief_service: Optional[BriefService] = None, | |
| ): | |
| self.runner_repo = runner_repo | |
| self.run_repo = run_repo | |
| self.weekly_repo = weekly_repo | |
| self.trend_repo = trend_repo | |
| self.weekly_builder = weekly_builder | |
| self.trend_engine = trend_engine | |
| self.structure_service = structure_service | |
| self.brief_service = brief_service | |
| self.intelligence_snapshots: Dict[Tuple[date, str], Any] = {} | |
| async def recompute_week( | |
| self, | |
| runner_id: uuid.UUID, | |
| week_start: date, | |
| runs: Optional[List] = None, | |
| language: str = "en", | |
| skip_brief: bool = False, | |
| goal_progress: Optional[Dict[str, Any]] = None, | |
| enable_intelligence: bool = True, | |
| ) -> tuple[WeeklySnapshot, TrendSnapshot]: | |
| """ | |
| Orchestrates the recompute flow for a specific week. | |
| Returns the computed snapshots directly. | |
| """ | |
| obs_logger.ensure_trace() | |
| with obs_logger.start_span("snapshot_service.recompute_week", obs_components.SERVICE): | |
| try: | |
| obs_logger.log_event( | |
| "info", | |
| f"Recomputing snapshots for week {week_start.isoformat()}", | |
| component=obs_components.SERVICE, | |
| fields={"runner_id": str(runner_id), "week_start": week_start.isoformat(), "enable_intelligence": enable_intelligence}, | |
| ) | |
| # 1. Load runs for that week. | |
| repo_runs = [] | |
| if self.run_repo: | |
| obs_logger.log_event( | |
| "info", | |
| "Fetching all runs for week from repository", | |
| component=obs_components.SERVICE, | |
| ) | |
| repo_runs = self.run_repo.get_runs_for_week(str(runner_id), week_start) | |
| # Support direct input for tests or if repo is missing | |
| input_runs = [] | |
| if runs is not None: | |
| from domain.training.run import Run | |
| for r in runs: | |
| if isinstance(r, dict): | |
| input_runs.append(Run(**r)) | |
| else: | |
| input_runs.append(r) | |
| # Combine and deduplicate by ID if possible | |
| # In production, runs passed via SnapshotStep are ALREADY in repo_runs | |
| # because PersistRunsStep happened before. | |
| # In tests, they might only be in input_runs. | |
| all_runs_map = {r.id: r for r in repo_runs if r.id} | |
| for r in input_runs: | |
| if r.id: | |
| all_runs_map[r.id] = r | |
| else: | |
| # Fallback for runs without ID | |
| repo_runs.append(r) | |
| if all_runs_map: | |
| runs = list(all_runs_map.values()) | |
| else: | |
| runs = repo_runs | |
| # 2. Compute weekly snapshot | |
| weekly_snapshot = self.weekly_builder.build( | |
| runner_id=runner_id, week_start_date=week_start, runs=runs | |
| ) | |
| # 2.1 Attach Structure Status | |
| if self.structure_service: | |
| # Logic moved: structure_service will handle goal context internally or defaults to 0.0 | |
| weekly_snapshot.structure_status = self.structure_service.compute_structure_status( | |
| runner_id=runner_id, | |
| week_start=week_start, | |
| weekly_volume=weekly_snapshot.total_distance_km, | |
| goal_volume=0.0, | |
| ) | |
| # 2.2 Attach re-computed snapshot ID to existent one (this fix the issue of existent charts going empty) | |
| affected_snapshot = self.get_snapshot(week_start, language) | |
| if affected_snapshot: | |
| weekly_snapshot.id = affected_snapshot.id | |
| if self.weekly_repo: | |
| self.weekly_repo.save(weekly_snapshot) | |
| # Emit snapshot_built event | |
| obs_logger.log_event( | |
| "info", | |
| f"Snapshot built for week {week_start.isoformat()}", | |
| event="snapshot_built", | |
| component=obs_components.SERVICE, | |
| **{ | |
| "week_start": week_start.isoformat(), | |
| "total_distance": weekly_snapshot.total_distance_km, | |
| "run_count": weekly_snapshot.run_count, | |
| }, | |
| ) | |
| # 3. Compute trend snapshot | |
| previous_week_snapshot = None | |
| if self.weekly_repo: | |
| previous_week_snapshot = self.weekly_repo.get_previous_week(week_start) | |
| # If no previous snapshot in DB, or DB disabled, we return empty trend | |
| # unless the caller provides more context (not supported yet for simplicity) | |
| if previous_week_snapshot is None: | |
| from domain.training.trend_snapshot import TrendSnapshot | |
| trend_snapshot = TrendSnapshot.empty(runner_id, week_start) | |
| else: | |
| trend_snapshot = self.trend_engine.compute_trend_snapshot( | |
| current=weekly_snapshot, previous=previous_week_snapshot | |
| ) | |
| if self.trend_repo: | |
| self.trend_repo.save(trend_snapshot) | |
| # Emit trend_built event | |
| obs_logger.log_event( | |
| "info", | |
| f"Trend built for week {week_start.isoformat()}", | |
| event="trend_built", | |
| component=obs_components.SERVICE, | |
| **{ | |
| "comparison_available": trend_snapshot.comparison_available, | |
| "comparison_type": getattr(trend_snapshot, "comparison_type", "unknown"), | |
| }, | |
| ) | |
| # 4. Generate Performance Brief if BriefService is available | |
| if self.brief_service and not skip_brief: | |
| # Check if we need to regenerate | |
| new_hash = self.brief_service.compute_brief_hash( | |
| weekly_snapshot, trend_snapshot, goal_progress=goal_progress, language=language | |
| ) | |
| if weekly_snapshot.brief_source_hash != new_hash: | |
| # LLM Guard Logic: Execute only when trend comparison is available AND intelligence enabled | |
| if enable_intelligence and trend_snapshot.comparison_available: | |
| obs_logger.log_event( | |
| "info", "Regenerating performance brief", component=obs_components.SERVICE | |
| ) | |
| brief, focus = await self.brief_service.generate_brief( | |
| weekly_snapshot, trend_snapshot, goal_progress=goal_progress, language=language | |
| ) | |
| llm_used = True | |
| reason = "active_week_and_comparison_available" | |
| elif not enable_intelligence: | |
| obs_logger.log_event( | |
| "info", "Skipping intelligence generation for historical week", | |
| component=obs_components.SERVICE, | |
| fields={"week_start": week_start.isoformat()} | |
| ) | |
| brief = self._generate_deterministic_brief(weekly_snapshot) | |
| focus = "" | |
| llm_used = False | |
| reason = "not_active_week" | |
| else: | |
| obs_logger.log_event( | |
| "info", "Using deterministic fallback for brief", component=obs_components.SERVICE | |
| ) | |
| brief = self._generate_deterministic_brief(weekly_snapshot) | |
| focus = "" # Deterministic fallback focus reset to empty | |
| llm_used = False | |
| reason = "insufficient_data" | |
| weekly_snapshot.performance_brief = brief | |
| weekly_snapshot.performance_focus = focus | |
| weekly_snapshot.brief_source_hash = new_hash | |
| weekly_snapshot.brief_generated_at = datetime.now() | |
| obs_logger.log_event( | |
| "info", | |
| "Weekly brief generated", | |
| event="weekly_brief_generated", | |
| component=obs_components.SERVICE, | |
| **{ | |
| "llm_used": llm_used, | |
| "reason": reason, | |
| "week_start": week_start.isoformat(), | |
| }, | |
| ) | |
| if self.weekly_repo: | |
| self.weekly_repo.save(weekly_snapshot) | |
| obs_logger.log_event( | |
| "info", | |
| "Completed snapshot recompute for week", | |
| component=obs_components.SERVICE, | |
| **{"week_start": week_start.isoformat()}, | |
| ) | |
| return weekly_snapshot, trend_snapshot | |
| except Exception: | |
| raise | |
| def _generate_deterministic_brief(self, snapshot: WeeklySnapshot) -> str: | |
| """Determines fallback brief message when insufficient history exists.""" | |
| if snapshot.run_count == 0: | |
| return "No runs recorded this week yet. Let’s get started." | |
| # Based on requirements: | |
| # Case B (First week): "Great start. Keep building consistency — insights will deepen as your history grows." | |
| # Case C (Some data but no comparison): "Solid week logged. As your history builds, we’ll unlock deeper performance insights." | |
| # We'll return Case B as the general onboarding message for single week or first-week scenarios. | |
| return "Great start. Keep building consistency — insights will deepen as your history grows." | |
| async def recompute_all(self, runner_id: uuid.UUID) -> None: | |
| """ | |
| Full rebuild of all weekly and trend snapshots for a runner. | |
| 1. Load all runs | |
| 2. Group by week | |
| 3. Compute and replace all weekly snapshots | |
| 4. Compute and replace all trend snapshots | |
| """ | |
| import time | |
| from ingestion.weekly_features import week_start as get_week_start | |
| start_ms = time.time() * 1000 | |
| obs_logger.log_event( | |
| "info", | |
| "Starting full historical recompute", | |
| component=obs_components.SERVICE, | |
| **{"runner_id": str(runner_id)}, | |
| ) | |
| # 1. Load all runs | |
| runs = self.run_repo.get_all_for_runner(str(runner_id)) | |
| if not runs: | |
| obs_logger.log_event( | |
| "warning", "No runs found for runner during recompute_all", component=obs_components.SERVICE | |
| ) | |
| return | |
| # 2. Group by week | |
| runs_by_week = {} | |
| for r in runs: | |
| if not r.start_time: | |
| continue | |
| # Normalize to date for grouping | |
| ws = get_week_start(r.start_time).date() | |
| if ws not in runs_by_week: | |
| runs_by_week[ws] = [] | |
| runs_by_week[ws].append(r) | |
| # 3. Compute all weekly snapshots in chronological order | |
| sorted_weeks = sorted(runs_by_week.keys()) | |
| weekly_snapshots = [] | |
| for ws in sorted_weeks: | |
| snapshot = self.weekly_builder.build( | |
| runner_id=runner_id, week_start_date=ws, runs=runs_by_week[ws] | |
| ) | |
| # Attach structure status for historical weeks too | |
| if self.structure_service: | |
| snapshot.structure_status = self.structure_service.compute_structure_status( | |
| runner_id=runner_id, | |
| week_start=ws, | |
| weekly_volume=snapshot.total_distance_km, | |
| goal_volume=0.0, # Historical recompute usually has no goal context unless loaded | |
| ) | |
| weekly_snapshots.append(snapshot) | |
| # 4. Replace weekly snapshots | |
| self.weekly_repo.delete_all_for_runner(runner_id) | |
| for s in weekly_snapshots: | |
| self.weekly_repo.save(s) | |
| # 5. Compute all trend snapshots | |
| trend_snapshots = [] | |
| previous_snapshot = None | |
| for current_snapshot in weekly_snapshots: | |
| trend = self.trend_engine.compute_trend_snapshot( | |
| current=current_snapshot, previous=previous_snapshot | |
| ) | |
| trend_snapshots.append(trend) | |
| previous_snapshot = current_snapshot | |
| # 6. Replace trend snapshots | |
| self.trend_repo.delete_all_for_runner(runner_id) | |
| self.trend_repo.save_many(trend_snapshots) | |
| duration_ms = (time.time() * 1000) - start_ms | |
| obs_logger.log_event( | |
| "info", | |
| "Full historical recompute complete", | |
| component=obs_components.SERVICE, | |
| **{ | |
| "total_runs": len(runs), | |
| "total_weeks": len(sorted_weeks), | |
| "duration_ms": duration_ms, | |
| }, | |
| ) | |
| def build_agent_context(self) -> tuple: | |
| """Helper to build consistent agent context from persisted snapshots.""" | |
| latest_weekly = self.weekly_repo.get_last_n(1) if self.weekly_repo else [] | |
| if not latest_weekly: | |
| from domain.training.agent_models import WeeklySummary, WeeklyTrends | |
| return WeeklySummary(), WeeklyTrends() | |
| return self.build_agent_context_for_week(latest_weekly[0].week_start_date) | |
| def build_agent_context_for_week(self, target_date: date) -> tuple: | |
| """ | |
| Helper to build consistent agent context from persisted snapshots | |
| centered on a specific week. | |
| """ | |
| from domain.training.agent_models import WeeklySummary, WeeklyTrends | |
| # Get snapshots ending at target_date | |
| history = self.weekly_repo.get_last_n_ending_at(target_date, 12) if self.weekly_repo else [] | |
| summary = WeeklySummary() | |
| trends = WeeklyTrends() | |
| if history: | |
| top_week = history[0] | |
| summary = WeeklySummary( | |
| week_start=top_week.week_start_date, | |
| consistency_score=int(top_week.consistency_score), | |
| num_runs=top_week.run_count, | |
| total_distance_m=top_week.total_distance_km * 1000, | |
| avg_hr_bpm=top_week.avg_hr, | |
| avg_pace_s_per_km=top_week.avg_pace_sec_per_km, | |
| total_duration_s=int(top_week.total_time_sec), | |
| structure_status=top_week.structure_status, | |
| weekly_km={}, | |
| ) | |
| # Build weekly_km with YYYY-WW keys as expected by agents | |
| for w in history: | |
| iso_year, iso_week, _ = w.week_start_date.isocalendar() | |
| key = f"{iso_year}-{iso_week:02d}" | |
| summary.weekly_km[key] = w.total_distance_km | |
| # Load trend snapshot for this specific week | |
| trend_snap = ( | |
| self.trend_repo.get_by_week(top_week.week_start_date) if self.trend_repo else None | |
| ) | |
| if trend_snap: | |
| # Map trend snapshot to the trends model agents expect | |
| trends = WeeklyTrends( | |
| pace_trend_s_per_km=trend_snap.avg_pace_delta_s_per_km, | |
| distance_trend_m=( | |
| trend_snap.distance_delta_km * 1000 | |
| if trend_snap.distance_delta_km is not None | |
| else 0.0 | |
| ), | |
| avg_runs_per_week=float(top_week.run_count), | |
| run_monotony=( | |
| trend_snap.consistency_delta | |
| if trend_snap.consistency_delta is not None | |
| else 0.0 | |
| ), | |
| ) | |
| return summary, trends | |
| def persist_snapshot(self, snapshot): | |
| """Persist weekly snapshot using repository owned by the service.""" | |
| if not self.weekly_repo: | |
| return | |
| self.weekly_repo.save(snapshot) | |
| def persist_intelligence_snapshot(self, snapshot): | |
| """Persist intelligence snapshot using repository owned by the service.""" | |
| if not self.weekly_repo: | |
| return | |
| self.weekly_repo.save_intelligence_snapshot(snapshot) | |
| def persist_trend(self, trend_snapshot): | |
| """Persist trend snapshot using repository owned by the service.""" | |
| if not self.trend_repo: | |
| return | |
| self.trend_repo.save(trend_snapshot) | |
| def persist_profile(self, profile: RunnerProfile): | |
| """Persist runner profile using repository owned by the service.""" | |
| if not self.runner_repo: | |
| return | |
| self.runner_repo.save(profile) | |
| def get_history(self, n: int = 12): | |
| """Retrieves historical weekly snapshots.""" | |
| if not self.weekly_repo: | |
| return [] | |
| return self.weekly_repo.get_last_n(n) | |
| def ensure_empty_week(self, runner_id, week_start): | |
| snapshot = WeeklySnapshot( | |
| runner_id=runner_id, | |
| week_start_date=week_start, | |
| run_count=0, | |
| total_distance_km=0.0, | |
| total_time_sec=0, | |
| avg_hr=None, | |
| avg_pace_sec_per_km=None, | |
| consistency_score=0, | |
| ) | |
| self.persist_snapshot(snapshot) | |
| return snapshot | |
| def initialize_baseline_if_needed(self, runner_profile): | |
| from datetime import datetime | |
| if not runner_profile or runner_profile.baseline_weekly_km is not None: | |
| return runner_profile | |
| summary, _ = self.build_agent_context() | |
| weekly_km = summary.weekly_km | |
| if weekly_km: | |
| km_vals = list(weekly_km.values()) | |
| if km_vals: | |
| runner_profile.baseline_weekly_km = float(sum(km_vals) / len(km_vals)) | |
| runner_profile.updated_at = datetime.now() | |
| self.persist_profile(runner_profile) | |
| return runner_profile | |
| def get_snapshot(self, week_start_date: date, language: str = "en") -> Optional[Any]: | |
| """Retrieves a cached intelligence snapshot if it exists.""" | |
| return self.intelligence_snapshots.get((week_start_date, language)) | |
| def has_snapshot(self, week_start_date: date, language: str = "en") -> bool: | |
| """Checks if an intelligence snapshot exists for the given week.""" | |
| return (week_start_date, language) in self.intelligence_snapshots | |
| def store_snapshot(self, week_start_date: date, snapshot: Any, language: str = "en") -> None: | |
| """Stores a recomputed intelligence snapshot in memory.""" | |
| self.intelligence_snapshots[(week_start_date, language)] = snapshot | |
| def invalidate_weeks(self, runner_id: str, weeks: List[date]) -> None: | |
| """ | |
| Deletes intelligence snapshots for given weeks. | |
| Safe no-op if snapshots do not exist. | |
| """ | |
| import logging | |
| logger = logging.getLogger(__name__) | |
| for week in weeks: | |
| logger.debug(f"[Snapshot] Invalidated week: {week}") | |
| # Ensure DB level is cleared if repo exists | |
| if self.weekly_repo: | |
| # Assuming weekly_repo has delete method, we just delete or let it naturally overwrite. | |
| # According to the instructions, we do `self.weekly_repo.delete(runner_id, week)` | |
| if hasattr(self.weekly_repo, "delete"): | |
| self.weekly_repo.delete(runner_id, week) | |
| # Clear memory cache regardless of language | |
| keys_to_delete = [ | |
| k for k in self.intelligence_snapshots.keys() | |
| if isinstance(k, tuple) and len(k) == 2 and k[0] == week | |
| ] | |
| for key in keys_to_delete: | |
| del self.intelligence_snapshots[key] | |
| def is_week_dirty(self, week_start_date: date, weekly_summary: Any) -> bool: | |
| """ | |
| Determines if a week needs intelligence recomputation. | |
| Only compares stable training metrics as per optimization guardrails. | |
| """ | |
| snapshot = self.get_snapshot(week_start_date) | |
| if not snapshot: | |
| return True | |
| # Guardrail 1: Compare ONLY stable training fields | |
| # Snapshot fields (RunnerIntelligenceSnapshot) | |
| s_dist = getattr(snapshot, "weekly_distance_km", 0.0) | |
| s_runs = getattr(snapshot, "run_count", 0) | |
| s_pace = getattr(snapshot, "avg_pace", 0.0) | |
| s_cons = getattr(snapshot, "consistency_score", 0) | |
| # WeeklySummary fields (from build_agent_context_for_week) | |
| w_dist = weekly_summary.total_distance_m / 1000.0 if hasattr(weekly_summary, "total_distance_m") else 0.0 | |
| w_runs = weekly_summary.num_runs if hasattr(weekly_summary, "num_runs") else 0 | |
| w_pace = weekly_summary.avg_pace_s_per_km if hasattr(weekly_summary, "avg_pace_s_per_km") else 0.0 | |
| w_cons = weekly_summary.consistency_score if hasattr(weekly_summary, "consistency_score") else 0 | |
| # Deterministic comparison with tolerance for floats | |
| if abs(s_dist - w_dist) > 0.01: | |
| return True | |
| if s_runs != w_runs: | |
| return True | |
| if abs((s_pace or 0) - (w_pace or 0)) > 1.0: # 1 second/km tolerance | |
| return True | |
| if s_cons != w_cons: | |
| return True | |
| return False | |
| def has_intelligence(snapshot: Any) -> bool: | |
| """ | |
| Check if an intelligence snapshot has coaching results. | |
| Lightweight check for UI/Orchestrator gating. | |
| """ | |
| if not snapshot: | |
| return False | |
| # Check RunnerIntelligenceSnapshot (UI view model) fields | |
| if hasattr(snapshot, "insights") and snapshot.insights: | |
| return True | |
| if hasattr(snapshot, "recommendation") and snapshot.recommendation: | |
| return True | |
| if hasattr(snapshot, "performance_brief") and snapshot.performance_brief: | |
| return True | |
| return False |