Spaces:
Running
Running
| import asyncio | |
| import math | |
| import time | |
| from dataclasses import dataclass, field | |
| from typing import Any, Awaitable, Callable, Dict, Optional, Tuple | |
| EnvFetcher = Callable[[str], Awaitable[Dict[str, Any]]] | |
| EnvWriter = Callable[[str, Dict[str, Any]], Awaitable[Dict[str, Any]]] | |
| GeoFetcher = Callable[[float, float], Awaitable[Optional[Dict[str, Any]]]] | |
| class EnvironmentSnapshot: | |
| data: Dict[str, Any] | |
| updated_at: float = field(default_factory=lambda: time.time()) | |
| class EnvironmentContextService: | |
| """ | |
| 管理即時環境資訊: | |
| - 記憶體快取 + TTL | |
| - 節流距離/方位差 | |
| - Firestore 寫入排程(current + snapshots) | |
| - 反地理查詢背景處理 | |
| """ | |
| def __init__( | |
| self, | |
| *, | |
| min_distance_m: float, | |
| min_heading_deg: float, | |
| ttl_seconds: float, | |
| env_fetcher: EnvFetcher, | |
| env_writer: EnvWriter, | |
| snapshot_writer: Optional[EnvWriter] = None, | |
| ) -> None: | |
| self._min_distance = max(min_distance_m, 0.0) | |
| self._min_heading = max(min_heading_deg, 0.0) | |
| self._ttl = max(ttl_seconds, 1.0) | |
| self._env_fetcher = env_fetcher | |
| self._env_writer = env_writer | |
| self._snapshot_writer = snapshot_writer | |
| self._cache: Dict[str, EnvironmentSnapshot] = {} | |
| self._write_queue: "asyncio.Queue[Tuple[str, Dict[str, Any]]]" = asyncio.Queue() | |
| self._snapshot_queue: "asyncio.Queue[Tuple[str, Dict[str, Any]]]" = asyncio.Queue() | |
| self._writer_task: Optional[asyncio.Task] = None | |
| self._snapshot_task: Optional[asyncio.Task] = None | |
| self._geo_tasks: Dict[str, asyncio.Task] = {} | |
| self._lock = asyncio.Lock() | |
| # --------------------------------------------------------------------- # | |
| # 公開介面 | |
| # --------------------------------------------------------------------- # | |
| async def start(self) -> None: | |
| if self._writer_task is None: | |
| self._writer_task = asyncio.create_task(self._write_loop(), name="env-current-writer") | |
| if self._snapshot_writer and self._snapshot_task is None: | |
| self._snapshot_task = asyncio.create_task(self._snapshot_loop(), name="env-snapshot-writer") | |
| async def shutdown(self) -> None: | |
| for pending in self._geo_tasks.values(): | |
| pending.cancel() | |
| self._geo_tasks.clear() | |
| if self._writer_task: | |
| self._writer_task.cancel() | |
| with contextlib.suppress(asyncio.CancelledError): | |
| await self._writer_task | |
| self._writer_task = None | |
| if self._snapshot_task: | |
| self._snapshot_task.cancel() | |
| with contextlib.suppress(asyncio.CancelledError): | |
| await self._snapshot_task | |
| self._snapshot_task = None | |
| async def ingest_snapshot( | |
| self, | |
| user_id: str, | |
| raw_payload: Dict[str, Any], | |
| *, | |
| geocode_provider: Optional[GeoFetcher] = None, | |
| ) -> Dict[str, Any]: | |
| """ | |
| 接收前端發送的環境快照,立即回傳 ACK 與基本資料, | |
| 寫入 Firestore 與反地理查詢則交由背景處理。 | |
| """ | |
| if not user_id: | |
| raise ValueError("user_id is required for environment snapshot ingestion") | |
| normalized, write_snapshot = await self._normalize_snapshot(user_id, raw_payload) | |
| async with self._lock: | |
| self._cache[user_id] = EnvironmentSnapshot(data=normalized) | |
| await self._write_queue.put((user_id, normalized)) | |
| if write_snapshot and self._snapshot_writer: | |
| await self._snapshot_queue.put((user_id, normalized)) | |
| if geocode_provider and self._needs_geocode(normalized): | |
| await self._schedule_geocode(user_id, normalized, geocode_provider) | |
| ack = { | |
| "success": True, | |
| "geohash_7": normalized.get("geohash_7"), | |
| "heading_cardinal": normalized.get("heading_cardinal"), | |
| } | |
| return ack | |
| async def get_context(self, user_id: str, *, allow_stale: bool = False) -> Dict[str, Any]: | |
| async with self._lock: | |
| cached = self._cache.get(user_id) | |
| if cached and (allow_stale or not self._is_stale(cached)): | |
| return dict(cached.data) | |
| data = await self._env_fetcher(user_id) | |
| if data.get("success"): | |
| ctx = data.get("context") or {} | |
| async with self._lock: | |
| self._cache[user_id] = EnvironmentSnapshot(data=ctx) | |
| return dict(ctx) | |
| return {} | |
| # --------------------------------------------------------------------- # | |
| # 內部流程 | |
| # --------------------------------------------------------------------- # | |
| async def _normalize_snapshot( | |
| self, | |
| user_id: str, | |
| raw_payload: Dict[str, Any], | |
| ) -> Tuple[Dict[str, Any], bool]: | |
| lat = _safe_float(raw_payload.get("lat")) | |
| lon = _safe_float(raw_payload.get("lon")) | |
| accuracy = _safe_float(raw_payload.get("accuracy_m")) | |
| heading_deg = _safe_float(raw_payload.get("heading_deg")) | |
| ctx = { | |
| "lat": lat, | |
| "lon": lon, | |
| "accuracy_m": accuracy, | |
| "heading_deg": heading_deg, | |
| "heading_cardinal": _heading_to_cardinal(heading_deg) if heading_deg is not None else None, | |
| "tz": raw_payload.get("tz"), | |
| "locale": raw_payload.get("locale"), | |
| "device": raw_payload.get("device"), | |
| "city": raw_payload.get("city"), | |
| "admin": raw_payload.get("admin"), | |
| "country_code": raw_payload.get("country_code"), | |
| "address_display": raw_payload.get("address_display"), | |
| "geohash_7": _encode_geohash(lat, lon), | |
| "updated_at": time.time(), | |
| } | |
| previous = await self._get_cached(user_id) | |
| should_snapshot = self._should_snapshot(previous, ctx) | |
| if previous and not self._has_position_change(previous.data, ctx): | |
| # 沒有座標變化時保留先前的精細地理資訊 | |
| for key in ( | |
| "detailed_address", | |
| "label", | |
| "road", | |
| "house_number", | |
| "suburb", | |
| "city_district", | |
| "postcode", | |
| "amenity", | |
| "shop", | |
| "building", | |
| "office", | |
| "leisure", | |
| "tourism", | |
| "name", | |
| ): | |
| ctx[key] = previous.data.get(key) | |
| return ctx, should_snapshot | |
| async def _schedule_geocode( | |
| self, | |
| user_id: str, | |
| ctx: Dict[str, Any], | |
| geocode_provider: GeoFetcher, | |
| ) -> None: | |
| if user_id in self._geo_tasks: | |
| # 已有任務在跑,避免重複 | |
| return | |
| async def _task() -> None: | |
| try: | |
| if ctx.get("lat") is None or ctx.get("lon") is None: | |
| return | |
| enriched = await geocode_provider(ctx["lat"], ctx["lon"]) | |
| if not enriched: | |
| return | |
| async with self._lock: | |
| cached = self._cache.get(user_id) | |
| if not cached: | |
| cached = EnvironmentSnapshot(data=dict(ctx)) | |
| self._cache[user_id] = cached | |
| cached.data.update(enriched) | |
| cached.updated_at = time.time() | |
| await self._write_queue.put((user_id, dict(cached.data))) | |
| if self._snapshot_writer: | |
| await self._snapshot_queue.put((user_id, dict(cached.data))) | |
| finally: | |
| self._geo_tasks.pop(user_id, None) | |
| self._geo_tasks[user_id] = asyncio.create_task(_task(), name=f"env-geocode-{user_id}") | |
| async def _write_loop(self) -> None: | |
| while True: | |
| user_id, payload = await self._write_queue.get() | |
| try: | |
| await self._env_writer(user_id, payload) | |
| except Exception: | |
| # 寫入失敗時稍後重試 | |
| await asyncio.sleep(1.0) | |
| await self._write_queue.put((user_id, payload)) | |
| finally: | |
| self._write_queue.task_done() | |
| async def _snapshot_loop(self) -> None: | |
| while True: | |
| user_id, payload = await self._snapshot_queue.get() | |
| try: | |
| await self._snapshot_writer(user_id, payload) | |
| except Exception: | |
| await asyncio.sleep(2.0) | |
| await self._snapshot_queue.put((user_id, payload)) | |
| finally: | |
| self._snapshot_queue.task_done() | |
| async def _get_cached(self, user_id: str) -> Optional[EnvironmentSnapshot]: | |
| async with self._lock: | |
| return self._cache.get(user_id) | |
| def _should_snapshot(self, previous: Optional[EnvironmentSnapshot], current: Dict[str, Any]) -> bool: | |
| if previous is None: | |
| return True | |
| return self._has_position_change(previous.data, current) | |
| def _has_position_change(self, previous: Dict[str, Any], current: Dict[str, Any]) -> bool: | |
| if previous.get("lat") is None or previous.get("lon") is None: | |
| return True | |
| if current.get("lat") is None or current.get("lon") is None: | |
| return False | |
| distance = _haversine_m(previous["lat"], previous["lon"], current["lat"], current["lon"]) | |
| if distance >= self._min_distance: | |
| return True | |
| prev_heading = previous.get("heading_deg") | |
| curr_heading = current.get("heading_deg") | |
| if prev_heading is None or curr_heading is None: | |
| return False | |
| heading_diff = abs(curr_heading - prev_heading) | |
| heading_diff = min(heading_diff, 360 - heading_diff) | |
| return heading_diff >= self._min_heading | |
| def _is_stale(self, snapshot: EnvironmentSnapshot) -> bool: | |
| return (time.time() - snapshot.updated_at) > self._ttl | |
| def _needs_geocode(self, ctx: Dict[str, Any]) -> bool: | |
| if ctx.get("lat") is None or ctx.get("lon") is None: | |
| return False | |
| return not any(ctx.get(field) for field in ("city", "address_display", "label", "detailed_address")) | |
| def _safe_float(value: Any) -> Optional[float]: | |
| try: | |
| if value is None: | |
| return None | |
| return float(value) | |
| except (TypeError, ValueError): | |
| return None | |
| def _heading_to_cardinal(deg: Optional[float]) -> Optional[str]: | |
| if deg is None: | |
| return None | |
| try: | |
| val = float(deg) | |
| except (TypeError, ValueError): | |
| return None | |
| dirs = [ | |
| "N", | |
| "NNE", | |
| "NE", | |
| "ENE", | |
| "E", | |
| "ESE", | |
| "SE", | |
| "SSE", | |
| "S", | |
| "SSW", | |
| "SW", | |
| "WSW", | |
| "W", | |
| "WNW", | |
| "NW", | |
| "NNW", | |
| ] | |
| idx = int((val % 360) / 22.5 + 0.5) % len(dirs) | |
| return dirs[idx] | |
| def _encode_geohash(lat: Optional[float], lon: Optional[float]) -> Optional[str]: | |
| if lat is None or lon is None: | |
| return None | |
| try: | |
| from geohash2 import encode as gh_encode # type: ignore | |
| except Exception: | |
| return None | |
| try: | |
| return gh_encode(lat, lon, precision=7) | |
| except Exception: | |
| return None | |
| def _haversine_m(lat1: float, lon1: float, lat2: float, lon2: float) -> float: | |
| """ | |
| 使用哈弗辛公式計算兩點距離(公尺) | |
| """ | |
| rad_lat1 = math.radians(lat1) | |
| rad_lat2 = math.radians(lat2) | |
| dlat = rad_lat2 - rad_lat1 | |
| dlon = math.radians(lon2 - lon1) | |
| a = math.sin(dlat / 2) ** 2 + math.cos(rad_lat1) * math.cos(rad_lat2) * math.sin(dlon / 2) ** 2 | |
| c = 2 * math.asin(math.sqrt(a)) | |
| return 6371000.0 * c | |
| import contextlib # noqa: E402 # placed at end to avoid circular import at module load | |