from __future__ import annotations import asyncio import logging import random import time from dataclasses import dataclass from datetime import datetime from typing import Any import httpx from dateutil import parser as date_parser from defusedxml import ElementTree as ET from .config import Settings from .models import Measurement, resolve_trend logger = logging.getLogger(__name__) MAX_ATTEMPTS = 4 BASE_BACKOFF = 0.5 TIMEOUT_SECONDS = 8.0 LEVEL_KEYS = ("level", "levelcm", "pegel", "wasserstand", "value", "stand") TIME_KEYS = ( "timestamp", "zeit", "time", "datetime", "datum", "messzeit", "uhrzeit", ) TREND_KEYS = ("trend", "tendency", "richtung") @dataclass(slots=True) class FetchResult: measurement: Measurement latency_ms: float | None is_demo: bool error: str | None = None class FetchError(Exception): """Raised when fetching from the remote source fails.""" async def fetch_latest( settings: Settings, previous: Measurement | None, *, force_demo: bool = False, ) -> FetchResult: if force_demo: measurement = _generate_demo(previous, settings) return FetchResult(measurement=measurement, latency_ms=None, is_demo=True) try: measurement, latency = await _fetch_from_source(settings, previous) return FetchResult(measurement=measurement, latency_ms=latency, is_demo=False) except Exception as exc: # noqa: BLE001 logger.warning("Falling back to demo mode due to: %s", exc) measurement = _generate_demo(previous, settings) return FetchResult( measurement=measurement, latency_ms=None, is_demo=True, error=str(exc), ) async def _fetch_from_source( settings: Settings, previous: Measurement | None, ) -> tuple[Measurement, float]: url = str(settings.source_url) timeout = httpx.Timeout(TIMEOUT_SECONDS) async with httpx.AsyncClient(timeout=timeout) as client: last_error: Exception | None = None for attempt in range(1, MAX_ATTEMPTS + 1): start = time.perf_counter() try: response = await client.get(url) elapsed_ms = (time.perf_counter() - start) * 1000 response.raise_for_status() measurement = _parse_response(response, settings, previous) return measurement, elapsed_ms except Exception as exc: # noqa: BLE001 last_error = exc delay = BASE_BACKOFF * 2 ** (attempt - 1) logger.debug( "Fetch attempt %s failed (%s); retrying in %.2fs", attempt, exc, delay, ) if attempt == MAX_ATTEMPTS: break await asyncio.sleep(delay) raise FetchError(f"Failed to fetch data from {url}") from last_error def _parse_response( response: httpx.Response, settings: Settings, previous: Measurement | None, ) -> Measurement: content_type = response.headers.get("content-type", "").lower() if "json" in content_type: payload = response.json() return _build_measurement_from_payload(payload, settings, previous) text = response.text # Try JSON first regardless of header to be robust try: payload = response.json() return _build_measurement_from_payload(payload, settings, previous) except Exception: # noqa: BLE001 pass return _build_measurement_from_xml(text, settings, previous) def _build_measurement_from_payload( payload: Any, settings: Settings, previous: Measurement | None, ) -> Measurement: record = _select_record(payload) if record is None: raise ValueError("No valid record found in payload") normalized = {str(k).lower(): v for k, v in record.items()} level_cm = _extract_level(normalized) timestamp = _extract_timestamp(normalized, settings) provided_trend = _extract_trend(normalized) trend = resolve_trend(level_cm, provided_trend, previous) return Measurement(level_cm=level_cm, timestamp=timestamp, trend=trend) def _build_measurement_from_xml( text: str, settings: Settings, previous: Measurement | None, ) -> Measurement: root = ET.fromstring(text) flat: dict[str, Any] = {} for element in root.iter(): if element.text and element.text.strip(): flat[element.tag.lower()] = element.text.strip() for key, value in element.attrib.items(): flat[f"{element.tag.lower()}_{key.lower()}"] = value if not flat: raise ValueError("XML payload did not contain any usable data") level_cm = _extract_level(flat) timestamp = _extract_timestamp(flat, settings) provided_trend = _extract_trend(flat) trend = resolve_trend(level_cm, provided_trend, previous) return Measurement(level_cm=level_cm, timestamp=timestamp, trend=trend) def _select_record(payload: Any) -> dict[str, Any] | None: if isinstance(payload, dict): if any(isinstance(v, (dict, list)) for v in payload.values()): for key in ("latest", "current", "data", "value", "measurement", "item", "werte"): if key in payload: nested = _select_record(payload[key]) if nested: return nested return payload if isinstance(payload, list): for item in reversed(payload): nested = _select_record(item) if nested: return nested return None def _extract_level(data: dict[str, Any]) -> int: for key in LEVEL_KEYS: if key in data: value = data[key] numeric = _coerce_float(value) if numeric is not None: # Values below 20 are likely metres; convert to centimetres. if numeric < 20: numeric *= 100 return max(0, int(round(numeric))) raise ValueError("Could not determine level value") def _extract_timestamp(data: dict[str, Any], settings: Settings) -> datetime: tz = settings.timezone # Handle separate date/time fields (e.g. Datum + Uhrzeit) date_value = data.get("datum") or data.get("date") time_value = ( data.get("uhrzeit") or data.get("zeit") or data.get("time") or data.get("messzeit") ) if date_value and time_value: combined = f"{date_value} {time_value}" parsed = _coerce_datetime(combined, tz) if parsed is not None: return parsed for key in TIME_KEYS: if key in data and data[key] is not None: value = data[key] parsed = _coerce_datetime(value, tz) if parsed is not None: return parsed return datetime.now(tz) def _extract_trend(data: dict[str, Any]) -> int | None: for key in TREND_KEYS: if key in data: raw = data[key] if raw is None: continue if isinstance(raw, (int, float)) and raw in (-1, 0, 1): return int(raw) text = str(raw).strip().lower() if text in {"-1", "falling", "down", "sinkend"}: return -1 if text in {"1", "rising", "up", "steigend"}: return 1 if text in {"0", "stable", "gleich", "steady"}: return 0 return None def _coerce_float(value: Any) -> float | None: if value is None: return None if isinstance(value, (int, float)): return float(value) text = str(value).strip().replace(",", ".") try: return float(text) except ValueError: return None def _coerce_datetime(value: Any, tz) -> datetime | None: if value is None: return None if isinstance(value, (int, float)): # Assume Unix timestamp (seconds) return datetime.fromtimestamp(float(value), tz=tz) text = str(value).strip() if not text: return None try: parsed = date_parser.parse(text) except (ValueError, TypeError) as exc: logger.debug("Failed to parse datetime %s: %s", value, exc) return None if parsed.tzinfo is None: parsed = parsed.replace(tzinfo=tz) return parsed.astimezone(tz) def _generate_demo(previous: Measurement | None, settings: Settings) -> Measurement: base = previous.level_cm if previous else 380 jitter = random.randint(-10, 10) drift = random.choice([-3, -2, -1, 0, 1, 2, 3]) level = max(250, min(900, base + drift + jitter)) timestamp = datetime.now(settings.timezone) trend = resolve_trend(level, None, previous) return Measurement(level_cm=level, timestamp=timestamp, trend=trend, is_demo=True)