NU-KIOSK-API / backend /tools /analysis_engine.py
Monish BV
Add kiosk-api: stripped backend for speech integration
c2b7a7b
"""Utility class that coordinates blueprint execution across data tables."""
from __future__ import annotations
from datetime import datetime
from typing import Any, Callable, Dict, Iterable, List, Optional
from ..data.catalog import DataCatalog
from .base import AnalysisContext, Blueprint, BlueprintResult, normalize_datetime
class AnalysisEngine:
"""
Lightweight executor that keeps Satyrn-inspired components wired together.
- Maintains a registry of blueprints.
- Provides normalized event data to blueprints that need it.
- Offers a fetch hook to refresh PlanIt Purple feeds.
"""
def __init__(self, catalog: DataCatalog, blueprints: Optional[Iterable[Blueprint]] = None) -> None:
self.catalog = catalog
self._blueprints: Dict[str, Blueprint] = {}
self._events: List[Dict[str, Any]] = []
if blueprints:
for blueprint in blueprints:
self.register_blueprint(blueprint)
def register_blueprint(self, blueprint: Blueprint) -> None:
self._blueprints[blueprint.name] = blueprint
def run(self, blueprint_name: str, **params: Any) -> BlueprintResult:
if blueprint_name not in self._blueprints:
raise KeyError(f"Blueprint '{blueprint_name}' not registered.")
context = AnalysisContext(catalog=self.catalog, events=self._events)
return self._blueprints[blueprint_name].run(context, **params)
# ------------------------------------------------------------------ #
# Event management
# ------------------------------------------------------------------ #
def refresh_events(self, fetch_fn: Optional[Callable[[List[str]], List[Dict[str, Any]]]] = None) -> int:
"""
Update the in-memory event cache by downloading PlanIt Purple feeds.
A custom fetch function can be provided for testing or offline use.
Returns the number of events in the cache.
"""
feed_config = self.catalog.metadata.get("event_feeds")
urls = feed_config.get("urls") if isinstance(feed_config, dict) else None
if not urls:
self._events = []
return 0
fetcher = fetch_fn or self._default_fetch
events = fetcher(urls)
events.sort(key=lambda evt: evt.get("start") or datetime.max)
self._events = events
return len(self._events)
# Internal helpers -------------------------------------------------- #
def _default_fetch(self, urls: List[str]) -> List[Dict[str, Any]]:
try:
import requests # type: ignore
except ImportError:
return []
events: List[Dict[str, Any]] = []
for url in urls:
try:
response = requests.get(url, timeout=10)
response.raise_for_status()
payload = response.json()
except Exception:
continue
events.extend(self._normalize_events(payload, source=url))
return events
def _normalize_events(self, payload: Any, *, source: str) -> List[Dict[str, Any]]:
raw_events: List[Dict[str, Any]] = []
if isinstance(payload, dict):
if isinstance(payload.get("events"), list):
raw_events = payload["events"]
elif isinstance(payload.get("items"), list):
raw_events = payload["items"]
else:
raw_events = [payload]
elif isinstance(payload, list):
raw_events = payload
else:
return []
normalized: List[Dict[str, Any]] = []
for item in raw_events:
if not isinstance(item, dict):
continue
start_value = (
item.get("start")
or item.get("startDate")
or item.get("start_date")
or item.get("start_datetime")
)
end_value = item.get("end") or item.get("endDate") or item.get("end_date") or item.get("end_datetime")
normalized.append(
{
"title": item.get("title") or item.get("summary") or item.get("name") or "Untitled Event",
"start": normalize_datetime(start_value),
"end": normalize_datetime(end_value),
"location": item.get("location") or item.get("location_name"),
"description": item.get("description") or item.get("summary"),
"url": item.get("url") or item.get("permalink") or source,
"tags": item.get("tags") or item.get("keywords"),
"source": source,
"raw": item,
}
)
return normalized