runner-ai-intelligence / src /services /snapshot_service.py
avfranco's picture
HF Space deploy snapshot (minimal allow-list)
d64fd55
import uuid
from datetime import date
from typing import Optional, List, Dict, Any, Tuple
from domain.runner.profile import RunnerProfile
from domain.training.weekly_snapshot import WeeklySnapshot
from domain.training.trend_snapshot import TrendSnapshot
from domain.training.weekly_snapshot_builder import WeeklySnapshotBuilder
from engines.trend.trend_engine import TrendEngine
from persistence.repositories.runner_repo import RunnerRepository
from persistence.repositories.run_repo import RunRepository
from persistence.repositories.weekly_repo import WeeklySnapshotRepository
from persistence.repositories.trend_repo import TrendSnapshotRepository
from observability import logger as obs_logger
from observability import components as obs_components
from services.brief_service import BriefService
from datetime import datetime
from services.structure_service import StructureService
class SnapshotService:
"""
Orchestrates the recompute flow for weekly and trend snapshots.
Strictly coordinates between engines and repositories.
"""
def __init__(
self,
runner_repo: RunnerRepository,
run_repo: RunRepository,
weekly_repo: WeeklySnapshotRepository,
trend_repo: TrendSnapshotRepository,
weekly_builder: WeeklySnapshotBuilder,
trend_engine: TrendEngine,
structure_service: Optional["StructureService"] = None,
brief_service: Optional[BriefService] = None,
):
self.runner_repo = runner_repo
self.run_repo = run_repo
self.weekly_repo = weekly_repo
self.trend_repo = trend_repo
self.weekly_builder = weekly_builder
self.trend_engine = trend_engine
self.structure_service = structure_service
self.brief_service = brief_service
self.intelligence_snapshots: Dict[Tuple[date, str], Any] = {}
async def recompute_week(
self,
runner_id: uuid.UUID,
week_start: date,
runs: Optional[List] = None,
language: str = "en",
skip_brief: bool = False,
goal_progress: Optional[Dict[str, Any]] = None,
enable_intelligence: bool = True,
) -> tuple[WeeklySnapshot, TrendSnapshot]:
"""
Orchestrates the recompute flow for a specific week.
Returns the computed snapshots directly.
"""
obs_logger.ensure_trace()
with obs_logger.start_span("snapshot_service.recompute_week", obs_components.SERVICE):
try:
obs_logger.log_event(
"info",
f"Recomputing snapshots for week {week_start.isoformat()}",
component=obs_components.SERVICE,
fields={"runner_id": str(runner_id), "week_start": week_start.isoformat(), "enable_intelligence": enable_intelligence},
)
# 1. Load runs for that week.
repo_runs = []
if self.run_repo:
obs_logger.log_event(
"info",
"Fetching all runs for week from repository",
component=obs_components.SERVICE,
)
repo_runs = self.run_repo.get_runs_for_week(str(runner_id), week_start)
# Support direct input for tests or if repo is missing
input_runs = []
if runs is not None:
from domain.training.run import Run
for r in runs:
if isinstance(r, dict):
input_runs.append(Run(**r))
else:
input_runs.append(r)
# Combine and deduplicate by ID if possible
# In production, runs passed via SnapshotStep are ALREADY in repo_runs
# because PersistRunsStep happened before.
# In tests, they might only be in input_runs.
all_runs_map = {r.id: r for r in repo_runs if r.id}
for r in input_runs:
if r.id:
all_runs_map[r.id] = r
else:
# Fallback for runs without ID
repo_runs.append(r)
if all_runs_map:
runs = list(all_runs_map.values())
else:
runs = repo_runs
# 2. Compute weekly snapshot
weekly_snapshot = self.weekly_builder.build(
runner_id=runner_id, week_start_date=week_start, runs=runs
)
# 2.1 Attach Structure Status
if self.structure_service:
# Logic moved: structure_service will handle goal context internally or defaults to 0.0
weekly_snapshot.structure_status = self.structure_service.compute_structure_status(
runner_id=runner_id,
week_start=week_start,
weekly_volume=weekly_snapshot.total_distance_km,
goal_volume=0.0,
)
# 2.2 Attach re-computed snapshot ID to existent one (this fix the issue of existent charts going empty)
affected_snapshot = self.get_snapshot(week_start, language)
if affected_snapshot:
weekly_snapshot.id = affected_snapshot.id
if self.weekly_repo:
self.weekly_repo.save(weekly_snapshot)
# Emit snapshot_built event
obs_logger.log_event(
"info",
f"Snapshot built for week {week_start.isoformat()}",
event="snapshot_built",
component=obs_components.SERVICE,
**{
"week_start": week_start.isoformat(),
"total_distance": weekly_snapshot.total_distance_km,
"run_count": weekly_snapshot.run_count,
},
)
# 3. Compute trend snapshot
previous_week_snapshot = None
if self.weekly_repo:
previous_week_snapshot = self.weekly_repo.get_previous_week(week_start)
# If no previous snapshot in DB, or DB disabled, we return empty trend
# unless the caller provides more context (not supported yet for simplicity)
if previous_week_snapshot is None:
from domain.training.trend_snapshot import TrendSnapshot
trend_snapshot = TrendSnapshot.empty(runner_id, week_start)
else:
trend_snapshot = self.trend_engine.compute_trend_snapshot(
current=weekly_snapshot, previous=previous_week_snapshot
)
if self.trend_repo:
self.trend_repo.save(trend_snapshot)
# Emit trend_built event
obs_logger.log_event(
"info",
f"Trend built for week {week_start.isoformat()}",
event="trend_built",
component=obs_components.SERVICE,
**{
"comparison_available": trend_snapshot.comparison_available,
"comparison_type": getattr(trend_snapshot, "comparison_type", "unknown"),
},
)
# 4. Generate Performance Brief if BriefService is available
if self.brief_service and not skip_brief:
# Check if we need to regenerate
new_hash = self.brief_service.compute_brief_hash(
weekly_snapshot, trend_snapshot, goal_progress=goal_progress, language=language
)
if weekly_snapshot.brief_source_hash != new_hash:
# LLM Guard Logic: Execute only when trend comparison is available AND intelligence enabled
if enable_intelligence and trend_snapshot.comparison_available:
obs_logger.log_event(
"info", "Regenerating performance brief", component=obs_components.SERVICE
)
brief, focus = await self.brief_service.generate_brief(
weekly_snapshot, trend_snapshot, goal_progress=goal_progress, language=language
)
llm_used = True
reason = "active_week_and_comparison_available"
elif not enable_intelligence:
obs_logger.log_event(
"info", "Skipping intelligence generation for historical week",
component=obs_components.SERVICE,
fields={"week_start": week_start.isoformat()}
)
brief = self._generate_deterministic_brief(weekly_snapshot)
focus = ""
llm_used = False
reason = "not_active_week"
else:
obs_logger.log_event(
"info", "Using deterministic fallback for brief", component=obs_components.SERVICE
)
brief = self._generate_deterministic_brief(weekly_snapshot)
focus = "" # Deterministic fallback focus reset to empty
llm_used = False
reason = "insufficient_data"
weekly_snapshot.performance_brief = brief
weekly_snapshot.performance_focus = focus
weekly_snapshot.brief_source_hash = new_hash
weekly_snapshot.brief_generated_at = datetime.now()
obs_logger.log_event(
"info",
"Weekly brief generated",
event="weekly_brief_generated",
component=obs_components.SERVICE,
**{
"llm_used": llm_used,
"reason": reason,
"week_start": week_start.isoformat(),
},
)
if self.weekly_repo:
self.weekly_repo.save(weekly_snapshot)
obs_logger.log_event(
"info",
"Completed snapshot recompute for week",
component=obs_components.SERVICE,
**{"week_start": week_start.isoformat()},
)
return weekly_snapshot, trend_snapshot
except Exception:
raise
def _generate_deterministic_brief(self, snapshot: WeeklySnapshot) -> str:
"""Determines fallback brief message when insufficient history exists."""
if snapshot.run_count == 0:
return "No runs recorded this week yet. Let’s get started."
# Based on requirements:
# Case B (First week): "Great start. Keep building consistency — insights will deepen as your history grows."
# Case C (Some data but no comparison): "Solid week logged. As your history builds, we’ll unlock deeper performance insights."
# We'll return Case B as the general onboarding message for single week or first-week scenarios.
return "Great start. Keep building consistency — insights will deepen as your history grows."
async def recompute_all(self, runner_id: uuid.UUID) -> None:
"""
Full rebuild of all weekly and trend snapshots for a runner.
1. Load all runs
2. Group by week
3. Compute and replace all weekly snapshots
4. Compute and replace all trend snapshots
"""
import time
from ingestion.weekly_features import week_start as get_week_start
start_ms = time.time() * 1000
obs_logger.log_event(
"info",
"Starting full historical recompute",
component=obs_components.SERVICE,
**{"runner_id": str(runner_id)},
)
# 1. Load all runs
runs = self.run_repo.get_all_for_runner(str(runner_id))
if not runs:
obs_logger.log_event(
"warning", "No runs found for runner during recompute_all", component=obs_components.SERVICE
)
return
# 2. Group by week
runs_by_week = {}
for r in runs:
if not r.start_time:
continue
# Normalize to date for grouping
ws = get_week_start(r.start_time).date()
if ws not in runs_by_week:
runs_by_week[ws] = []
runs_by_week[ws].append(r)
# 3. Compute all weekly snapshots in chronological order
sorted_weeks = sorted(runs_by_week.keys())
weekly_snapshots = []
for ws in sorted_weeks:
snapshot = self.weekly_builder.build(
runner_id=runner_id, week_start_date=ws, runs=runs_by_week[ws]
)
# Attach structure status for historical weeks too
if self.structure_service:
snapshot.structure_status = self.structure_service.compute_structure_status(
runner_id=runner_id,
week_start=ws,
weekly_volume=snapshot.total_distance_km,
goal_volume=0.0, # Historical recompute usually has no goal context unless loaded
)
weekly_snapshots.append(snapshot)
# 4. Replace weekly snapshots
self.weekly_repo.delete_all_for_runner(runner_id)
for s in weekly_snapshots:
self.weekly_repo.save(s)
# 5. Compute all trend snapshots
trend_snapshots = []
previous_snapshot = None
for current_snapshot in weekly_snapshots:
trend = self.trend_engine.compute_trend_snapshot(
current=current_snapshot, previous=previous_snapshot
)
trend_snapshots.append(trend)
previous_snapshot = current_snapshot
# 6. Replace trend snapshots
self.trend_repo.delete_all_for_runner(runner_id)
self.trend_repo.save_many(trend_snapshots)
duration_ms = (time.time() * 1000) - start_ms
obs_logger.log_event(
"info",
"Full historical recompute complete",
component=obs_components.SERVICE,
**{
"total_runs": len(runs),
"total_weeks": len(sorted_weeks),
"duration_ms": duration_ms,
},
)
def build_agent_context(self) -> tuple:
"""Helper to build consistent agent context from persisted snapshots."""
latest_weekly = self.weekly_repo.get_last_n(1) if self.weekly_repo else []
if not latest_weekly:
from domain.training.agent_models import WeeklySummary, WeeklyTrends
return WeeklySummary(), WeeklyTrends()
return self.build_agent_context_for_week(latest_weekly[0].week_start_date)
def build_agent_context_for_week(self, target_date: date) -> tuple:
"""
Helper to build consistent agent context from persisted snapshots
centered on a specific week.
"""
from domain.training.agent_models import WeeklySummary, WeeklyTrends
# Get snapshots ending at target_date
history = self.weekly_repo.get_last_n_ending_at(target_date, 12) if self.weekly_repo else []
summary = WeeklySummary()
trends = WeeklyTrends()
if history:
top_week = history[0]
summary = WeeklySummary(
week_start=top_week.week_start_date,
consistency_score=int(top_week.consistency_score),
num_runs=top_week.run_count,
total_distance_m=top_week.total_distance_km * 1000,
avg_hr_bpm=top_week.avg_hr,
avg_pace_s_per_km=top_week.avg_pace_sec_per_km,
total_duration_s=int(top_week.total_time_sec),
structure_status=top_week.structure_status,
weekly_km={},
)
# Build weekly_km with YYYY-WW keys as expected by agents
for w in history:
iso_year, iso_week, _ = w.week_start_date.isocalendar()
key = f"{iso_year}-{iso_week:02d}"
summary.weekly_km[key] = w.total_distance_km
# Load trend snapshot for this specific week
trend_snap = (
self.trend_repo.get_by_week(top_week.week_start_date) if self.trend_repo else None
)
if trend_snap:
# Map trend snapshot to the trends model agents expect
trends = WeeklyTrends(
pace_trend_s_per_km=trend_snap.avg_pace_delta_s_per_km,
distance_trend_m=(
trend_snap.distance_delta_km * 1000
if trend_snap.distance_delta_km is not None
else 0.0
),
avg_runs_per_week=float(top_week.run_count),
run_monotony=(
trend_snap.consistency_delta
if trend_snap.consistency_delta is not None
else 0.0
),
)
return summary, trends
def persist_snapshot(self, snapshot):
"""Persist weekly snapshot using repository owned by the service."""
if not self.weekly_repo:
return
self.weekly_repo.save(snapshot)
def persist_intelligence_snapshot(self, snapshot):
"""Persist intelligence snapshot using repository owned by the service."""
if not self.weekly_repo:
return
self.weekly_repo.save_intelligence_snapshot(snapshot)
def persist_trend(self, trend_snapshot):
"""Persist trend snapshot using repository owned by the service."""
if not self.trend_repo:
return
self.trend_repo.save(trend_snapshot)
def persist_profile(self, profile: RunnerProfile):
"""Persist runner profile using repository owned by the service."""
if not self.runner_repo:
return
self.runner_repo.save(profile)
def get_history(self, n: int = 12):
"""Retrieves historical weekly snapshots."""
if not self.weekly_repo:
return []
return self.weekly_repo.get_last_n(n)
def ensure_empty_week(self, runner_id, week_start):
snapshot = WeeklySnapshot(
runner_id=runner_id,
week_start_date=week_start,
run_count=0,
total_distance_km=0.0,
total_time_sec=0,
avg_hr=None,
avg_pace_sec_per_km=None,
consistency_score=0,
)
self.persist_snapshot(snapshot)
return snapshot
def initialize_baseline_if_needed(self, runner_profile):
from datetime import datetime
if not runner_profile or runner_profile.baseline_weekly_km is not None:
return runner_profile
summary, _ = self.build_agent_context()
weekly_km = summary.weekly_km
if weekly_km:
km_vals = list(weekly_km.values())
if km_vals:
runner_profile.baseline_weekly_km = float(sum(km_vals) / len(km_vals))
runner_profile.updated_at = datetime.now()
self.persist_profile(runner_profile)
return runner_profile
def get_snapshot(self, week_start_date: date, language: str = "en") -> Optional[Any]:
"""Retrieves a cached intelligence snapshot if it exists."""
return self.intelligence_snapshots.get((week_start_date, language))
def has_snapshot(self, week_start_date: date, language: str = "en") -> bool:
"""Checks if an intelligence snapshot exists for the given week."""
return (week_start_date, language) in self.intelligence_snapshots
def store_snapshot(self, week_start_date: date, snapshot: Any, language: str = "en") -> None:
"""Stores a recomputed intelligence snapshot in memory."""
self.intelligence_snapshots[(week_start_date, language)] = snapshot
def invalidate_weeks(self, runner_id: str, weeks: List[date]) -> None:
"""
Deletes intelligence snapshots for given weeks.
Safe no-op if snapshots do not exist.
"""
import logging
logger = logging.getLogger(__name__)
for week in weeks:
logger.debug(f"[Snapshot] Invalidated week: {week}")
# Ensure DB level is cleared if repo exists
if self.weekly_repo:
# Assuming weekly_repo has delete method, we just delete or let it naturally overwrite.
# According to the instructions, we do `self.weekly_repo.delete(runner_id, week)`
if hasattr(self.weekly_repo, "delete"):
self.weekly_repo.delete(runner_id, week)
# Clear memory cache regardless of language
keys_to_delete = [
k for k in self.intelligence_snapshots.keys()
if isinstance(k, tuple) and len(k) == 2 and k[0] == week
]
for key in keys_to_delete:
del self.intelligence_snapshots[key]
def is_week_dirty(self, week_start_date: date, weekly_summary: Any) -> bool:
"""
Determines if a week needs intelligence recomputation.
Only compares stable training metrics as per optimization guardrails.
"""
snapshot = self.get_snapshot(week_start_date)
if not snapshot:
return True
# Guardrail 1: Compare ONLY stable training fields
# Snapshot fields (RunnerIntelligenceSnapshot)
s_dist = getattr(snapshot, "weekly_distance_km", 0.0)
s_runs = getattr(snapshot, "run_count", 0)
s_pace = getattr(snapshot, "avg_pace", 0.0)
s_cons = getattr(snapshot, "consistency_score", 0)
# WeeklySummary fields (from build_agent_context_for_week)
w_dist = weekly_summary.total_distance_m / 1000.0 if hasattr(weekly_summary, "total_distance_m") else 0.0
w_runs = weekly_summary.num_runs if hasattr(weekly_summary, "num_runs") else 0
w_pace = weekly_summary.avg_pace_s_per_km if hasattr(weekly_summary, "avg_pace_s_per_km") else 0.0
w_cons = weekly_summary.consistency_score if hasattr(weekly_summary, "consistency_score") else 0
# Deterministic comparison with tolerance for floats
if abs(s_dist - w_dist) > 0.01:
return True
if s_runs != w_runs:
return True
if abs((s_pace or 0) - (w_pace or 0)) > 1.0: # 1 second/km tolerance
return True
if s_cons != w_cons:
return True
return False
@staticmethod
def has_intelligence(snapshot: Any) -> bool:
"""
Check if an intelligence snapshot has coaching results.
Lightweight check for UI/Orchestrator gating.
"""
if not snapshot:
return False
# Check RunnerIntelligenceSnapshot (UI view model) fields
if hasattr(snapshot, "insights") and snapshot.insights:
return True
if hasattr(snapshot, "recommendation") and snapshot.recommendation:
return True
if hasattr(snapshot, "performance_brief") and snapshot.performance_brief:
return True
return False