| """Central Event Hub wrapper around Redis streams & pub/sub. |
| |
| Provides a small compatibility layer so callers can emit events |
| and read recent stream entries without importing `redis` directly. |
| """ |
| import json |
| from datetime import datetime |
| from typing import Any, Dict |
| import logging |
| from app.deps import get_redis |
|
|
| logger = logging.getLogger(__name__) |
| class EventHub: |
| def __init__(self): |
| self.redis = get_redis() |
| self.is_rest_api = not hasattr(self.redis, 'pubsub') |
| |
| def get_key(self, key: str): |
| return self.redis.get(key) |
|
|
| def setex(self, key: str, ttl: int, value: str): |
| try: |
| return self.redis.setex(key, ttl, value) |
| except Exception as e: |
| logger.error(f"[hub] β setex failed for {key}: {e}", exc_info=True) |
| raise |
|
|
| def exists(self, key: str) -> bool: |
| return self.redis.exists(key) |
|
|
| def delete(self, key: str): |
| return self.redis.delete(key) |
| |
| |
| def execute_command(self, *args): |
| """ |
| Execute raw Redis command (works for both TCP and Upstash) |
| Usage: execute_command("XADD", "stream", "*", "field", "value") |
| """ |
| try: |
| if self.is_rest_api: |
| |
| return self.redis.execute(list(args)) |
| else: |
| |
| return self.redis.execute_command(*args) |
| except Exception as e: |
| logger.error(f"[hub] β Command failed {args}: {e}") |
| raise |
|
|
| |
| def stream_key(self, org_id: str, source_id: str) -> str: |
| return f"stream:analytics:{org_id}:{source_id}" |
|
|
| def trigger_channel(self, org_id: str, source_id: str) -> str: |
| return f"analytics_trigger:{org_id}:{source_id}" |
|
|
| def emit_kpi_update(self, org_id: str, source_id: str, kpi_data: Dict[str, Any]): |
| message = { |
| "type": "kpi_update", |
| "timestamp": datetime.utcnow().isoformat(), |
| "data": kpi_data, |
| } |
| return self.redis.xadd(self.stream_key(org_id, source_id), {"message": json.dumps(message)}) |
|
|
| def emit_insight(self, org_id: str, source_id: str, insight: Dict[str, Any]): |
| message = { |
| "type": "insight", |
| "timestamp": datetime.utcnow().isoformat(), |
| "data": insight, |
| } |
| return self.redis.xadd(self.stream_key(org_id, source_id), {"message": json.dumps(message)}) |
|
|
| def emit_status(self, org_id: str, source_id: str, status: str, message: str = "", details: Dict | None = None): |
| payload = { |
| "type": "status", |
| "status": status, |
| "message": message, |
| "details": details or {}, |
| "timestamp": datetime.utcnow().isoformat() |
| } |
| channel = f"analytics:{org_id}:{source_id}:status" |
| return self.redis.publish(channel, json.dumps(payload)) |
|
|
| def emit_error(self, org_id: str, source_id: str, error_message: str, error_details: Dict | None = None): |
| payload = { |
| "type": "error", |
| "message": error_message, |
| "details": error_details or {}, |
| "timestamp": datetime.utcnow().isoformat() |
| } |
| channel = f"analytics:{org_id}:{source_id}:error" |
| return self.redis.publish(channel, json.dumps(payload)) |
|
|
| |
|
|
| |
|
|
| def emit_analytics_trigger(self, org_id: str, source_id: str, extra: dict | None = None): |
| """Write trigger to centralized stream""" |
| stream_key = "stream:analytics_triggers" |
| |
| payload = { |
| "org_id": org_id, |
| "source_id": source_id, |
| "timestamp": datetime.utcnow().isoformat(), |
| } |
| if extra: |
| payload.update(extra) |
|
|
| try: |
| |
| msg_id = self.execute_command( |
| "XADD", |
| stream_key, |
| "*", |
| "message", |
| json.dumps(payload) |
| ) |
| |
| logger.info(f"[hub] π€ trigger emitted: {org_id}:{source_id} (msg: {msg_id})") |
| return msg_id |
| except Exception as e: |
| logger.error(f"[hub] β emit failed: {e}", exc_info=True) |
| return None |
|
|
| def ensure_consumer_group(self, stream_key: str, group: str): |
| try: |
| return self.redis.xgroup_create(stream_key, group, id="0", mkstream=True) |
| except Exception as e: |
| |
| if "BUSYGROUP" in str(e): |
| return None |
| raise |
|
|
| def read_recent_stream(self, stream_key: str, count: int = 10): |
| try: |
| messages = self.redis.xrevrange(stream_key, count=count) |
| out = [] |
| for msg in messages: |
| |
| data = msg[1].get(b"message") if isinstance(msg[1], dict) else None |
| if data: |
| try: |
| out.append(json.loads(data.decode())) |
| except Exception: |
| try: |
| out.append(json.loads(data)) |
| except Exception: |
| out.append({"raw": data}) |
| return out |
| except Exception: |
| return [] |
|
|
| def get_recent_events(self, org_id: str, source_id: str, count: int = 10): |
| return self.read_recent_stream(self.stream_key(org_id, source_id), count) |
|
|
| |
| def lpush(self, key: str, value: str): |
| return self.redis.lpush(key, value) |
|
|
| def brpop(self, key: str, timeout: int = 0): |
| return self.redis.brpop(key, timeout=timeout) |
|
|
| def publish(self, channel: str, message: str): |
| return self.redis.publish(channel, message) |
|
|
| def keys(self, pattern: str): |
| return self.redis.keys(pattern) |
|
|
| def pipeline(self): |
| """Return a redis pipeline-like object if supported by client. |
| |
| Note: Upstash client may not support classic pipelines; callers should |
| handle attribute errors and fall back to sequential commands. |
| """ |
| try: |
| return self.redis.pipeline() |
| except Exception: |
| return None |
|
|
|
|
| |
| event_hub = EventHub() |
|
|