"""Utility class that coordinates blueprint execution across data tables.""" from __future__ import annotations from datetime import datetime from typing import Any, Callable, Dict, Iterable, List, Optional from ..data.catalog import DataCatalog from .base import AnalysisContext, Blueprint, BlueprintResult, normalize_datetime class AnalysisEngine: """ Lightweight executor that keeps Satyrn-inspired components wired together. - Maintains a registry of blueprints. - Provides normalized event data to blueprints that need it. - Offers a fetch hook to refresh PlanIt Purple feeds. """ def __init__(self, catalog: DataCatalog, blueprints: Optional[Iterable[Blueprint]] = None) -> None: self.catalog = catalog self._blueprints: Dict[str, Blueprint] = {} self._events: List[Dict[str, Any]] = [] if blueprints: for blueprint in blueprints: self.register_blueprint(blueprint) def register_blueprint(self, blueprint: Blueprint) -> None: self._blueprints[blueprint.name] = blueprint def run(self, blueprint_name: str, **params: Any) -> BlueprintResult: if blueprint_name not in self._blueprints: raise KeyError(f"Blueprint '{blueprint_name}' not registered.") context = AnalysisContext(catalog=self.catalog, events=self._events) return self._blueprints[blueprint_name].run(context, **params) # ------------------------------------------------------------------ # # Event management # ------------------------------------------------------------------ # def refresh_events(self, fetch_fn: Optional[Callable[[List[str]], List[Dict[str, Any]]]] = None) -> int: """ Update the in-memory event cache by downloading PlanIt Purple feeds. A custom fetch function can be provided for testing or offline use. Returns the number of events in the cache. """ feed_config = self.catalog.metadata.get("event_feeds") urls = feed_config.get("urls") if isinstance(feed_config, dict) else None if not urls: self._events = [] return 0 fetcher = fetch_fn or self._default_fetch events = fetcher(urls) events.sort(key=lambda evt: evt.get("start") or datetime.max) self._events = events return len(self._events) # Internal helpers -------------------------------------------------- # def _default_fetch(self, urls: List[str]) -> List[Dict[str, Any]]: try: import requests # type: ignore except ImportError: return [] events: List[Dict[str, Any]] = [] for url in urls: try: response = requests.get(url, timeout=10) response.raise_for_status() payload = response.json() except Exception: continue events.extend(self._normalize_events(payload, source=url)) return events def _normalize_events(self, payload: Any, *, source: str) -> List[Dict[str, Any]]: raw_events: List[Dict[str, Any]] = [] if isinstance(payload, dict): if isinstance(payload.get("events"), list): raw_events = payload["events"] elif isinstance(payload.get("items"), list): raw_events = payload["items"] else: raw_events = [payload] elif isinstance(payload, list): raw_events = payload else: return [] normalized: List[Dict[str, Any]] = [] for item in raw_events: if not isinstance(item, dict): continue start_value = ( item.get("start") or item.get("startDate") or item.get("start_date") or item.get("start_datetime") ) end_value = item.get("end") or item.get("endDate") or item.get("end_date") or item.get("end_datetime") normalized.append( { "title": item.get("title") or item.get("summary") or item.get("name") or "Untitled Event", "start": normalize_datetime(start_value), "end": normalize_datetime(end_value), "location": item.get("location") or item.get("location_name"), "description": item.get("description") or item.get("summary"), "url": item.get("url") or item.get("permalink") or source, "tags": item.get("tags") or item.get("keywords"), "source": source, "raw": item, } ) return normalized