Spaces:
Sleeping
Sleeping
| """Utility class that coordinates blueprint execution across data tables.""" | |
| from __future__ import annotations | |
| from datetime import datetime | |
| from typing import Any, Callable, Dict, Iterable, List, Optional | |
| from ..data.catalog import DataCatalog | |
| from .base import AnalysisContext, Blueprint, BlueprintResult, normalize_datetime | |
| class AnalysisEngine: | |
| """ | |
| Lightweight executor that keeps Satyrn-inspired components wired together. | |
| - Maintains a registry of blueprints. | |
| - Provides normalized event data to blueprints that need it. | |
| - Offers a fetch hook to refresh PlanIt Purple feeds. | |
| """ | |
| def __init__(self, catalog: DataCatalog, blueprints: Optional[Iterable[Blueprint]] = None) -> None: | |
| self.catalog = catalog | |
| self._blueprints: Dict[str, Blueprint] = {} | |
| self._events: List[Dict[str, Any]] = [] | |
| if blueprints: | |
| for blueprint in blueprints: | |
| self.register_blueprint(blueprint) | |
| def register_blueprint(self, blueprint: Blueprint) -> None: | |
| self._blueprints[blueprint.name] = blueprint | |
| def run(self, blueprint_name: str, **params: Any) -> BlueprintResult: | |
| if blueprint_name not in self._blueprints: | |
| raise KeyError(f"Blueprint '{blueprint_name}' not registered.") | |
| context = AnalysisContext(catalog=self.catalog, events=self._events) | |
| return self._blueprints[blueprint_name].run(context, **params) | |
| # ------------------------------------------------------------------ # | |
| # Event management | |
| # ------------------------------------------------------------------ # | |
| def refresh_events(self, fetch_fn: Optional[Callable[[List[str]], List[Dict[str, Any]]]] = None) -> int: | |
| """ | |
| Update the in-memory event cache by downloading PlanIt Purple feeds. | |
| A custom fetch function can be provided for testing or offline use. | |
| Returns the number of events in the cache. | |
| """ | |
| feed_config = self.catalog.metadata.get("event_feeds") | |
| urls = feed_config.get("urls") if isinstance(feed_config, dict) else None | |
| if not urls: | |
| self._events = [] | |
| return 0 | |
| fetcher = fetch_fn or self._default_fetch | |
| events = fetcher(urls) | |
| events.sort(key=lambda evt: evt.get("start") or datetime.max) | |
| self._events = events | |
| return len(self._events) | |
| # Internal helpers -------------------------------------------------- # | |
| def _default_fetch(self, urls: List[str]) -> List[Dict[str, Any]]: | |
| try: | |
| import requests # type: ignore | |
| except ImportError: | |
| return [] | |
| events: List[Dict[str, Any]] = [] | |
| for url in urls: | |
| try: | |
| response = requests.get(url, timeout=10) | |
| response.raise_for_status() | |
| payload = response.json() | |
| except Exception: | |
| continue | |
| events.extend(self._normalize_events(payload, source=url)) | |
| return events | |
| def _normalize_events(self, payload: Any, *, source: str) -> List[Dict[str, Any]]: | |
| raw_events: List[Dict[str, Any]] = [] | |
| if isinstance(payload, dict): | |
| if isinstance(payload.get("events"), list): | |
| raw_events = payload["events"] | |
| elif isinstance(payload.get("items"), list): | |
| raw_events = payload["items"] | |
| else: | |
| raw_events = [payload] | |
| elif isinstance(payload, list): | |
| raw_events = payload | |
| else: | |
| return [] | |
| normalized: List[Dict[str, Any]] = [] | |
| for item in raw_events: | |
| if not isinstance(item, dict): | |
| continue | |
| start_value = ( | |
| item.get("start") | |
| or item.get("startDate") | |
| or item.get("start_date") | |
| or item.get("start_datetime") | |
| ) | |
| end_value = item.get("end") or item.get("endDate") or item.get("end_date") or item.get("end_datetime") | |
| normalized.append( | |
| { | |
| "title": item.get("title") or item.get("summary") or item.get("name") or "Untitled Event", | |
| "start": normalize_datetime(start_value), | |
| "end": normalize_datetime(end_value), | |
| "location": item.get("location") or item.get("location_name"), | |
| "description": item.get("description") or item.get("summary"), | |
| "url": item.get("url") or item.get("permalink") or source, | |
| "tags": item.get("tags") or item.get("keywords"), | |
| "source": source, | |
| "raw": item, | |
| } | |
| ) | |
| return normalized | |