File size: 4,745 Bytes
c2b7a7b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
"""Utility class that coordinates blueprint execution across data tables."""

from __future__ import annotations

from datetime import datetime
from typing import Any, Callable, Dict, Iterable, List, Optional

from ..data.catalog import DataCatalog
from .base import AnalysisContext, Blueprint, BlueprintResult, normalize_datetime


class AnalysisEngine:
    """
    Lightweight executor that keeps Satyrn-inspired components wired together.

    - Maintains a registry of blueprints.
    - Provides normalized event data to blueprints that need it.
    - Offers a fetch hook to refresh PlanIt Purple feeds.
    """

    def __init__(self, catalog: DataCatalog, blueprints: Optional[Iterable[Blueprint]] = None) -> None:
        self.catalog = catalog
        self._blueprints: Dict[str, Blueprint] = {}
        self._events: List[Dict[str, Any]] = []
        if blueprints:
            for blueprint in blueprints:
                self.register_blueprint(blueprint)

    def register_blueprint(self, blueprint: Blueprint) -> None:
        self._blueprints[blueprint.name] = blueprint

    def run(self, blueprint_name: str, **params: Any) -> BlueprintResult:
        if blueprint_name not in self._blueprints:
            raise KeyError(f"Blueprint '{blueprint_name}' not registered.")
        context = AnalysisContext(catalog=self.catalog, events=self._events)
        return self._blueprints[blueprint_name].run(context, **params)

    # ------------------------------------------------------------------ #
    # Event management
    # ------------------------------------------------------------------ #
    def refresh_events(self, fetch_fn: Optional[Callable[[List[str]], List[Dict[str, Any]]]] = None) -> int:
        """
        Update the in-memory event cache by downloading PlanIt Purple feeds.

        A custom fetch function can be provided for testing or offline use.
        Returns the number of events in the cache.
        """
        feed_config = self.catalog.metadata.get("event_feeds")
        urls = feed_config.get("urls") if isinstance(feed_config, dict) else None
        if not urls:
            self._events = []
            return 0

        fetcher = fetch_fn or self._default_fetch
        events = fetcher(urls)
        events.sort(key=lambda evt: evt.get("start") or datetime.max)
        self._events = events
        return len(self._events)

    # Internal helpers -------------------------------------------------- #
    def _default_fetch(self, urls: List[str]) -> List[Dict[str, Any]]:
        try:
            import requests  # type: ignore
        except ImportError:
            return []

        events: List[Dict[str, Any]] = []
        for url in urls:
            try:
                response = requests.get(url, timeout=10)
                response.raise_for_status()
                payload = response.json()
            except Exception:
                continue
            events.extend(self._normalize_events(payload, source=url))
        return events

    def _normalize_events(self, payload: Any, *, source: str) -> List[Dict[str, Any]]:
        raw_events: List[Dict[str, Any]] = []
        if isinstance(payload, dict):
            if isinstance(payload.get("events"), list):
                raw_events = payload["events"]
            elif isinstance(payload.get("items"), list):
                raw_events = payload["items"]
            else:
                raw_events = [payload]
        elif isinstance(payload, list):
            raw_events = payload
        else:
            return []

        normalized: List[Dict[str, Any]] = []
        for item in raw_events:
            if not isinstance(item, dict):
                continue
            start_value = (
                item.get("start")
                or item.get("startDate")
                or item.get("start_date")
                or item.get("start_datetime")
            )
            end_value = item.get("end") or item.get("endDate") or item.get("end_date") or item.get("end_datetime")
            normalized.append(
                {
                    "title": item.get("title") or item.get("summary") or item.get("name") or "Untitled Event",
                    "start": normalize_datetime(start_value),
                    "end": normalize_datetime(end_value),
                    "location": item.get("location") or item.get("location_name"),
                    "description": item.get("description") or item.get("summary"),
                    "url": item.get("url") or item.get("permalink") or source,
                    "tags": item.get("tags") or item.get("keywords"),
                    "source": source,
                    "raw": item,
                }
            )
        return normalized