analytics-engine / app /core /event_hub.py
shaliz-kong
refactored for self hosted redis
397c16a
"""Central Event Hub wrapper around Redis streams & pub/sub.
Provides a small compatibility layer so callers can emit events
and read recent stream entries without importing `redis` directly.
"""
import json
from datetime import datetime
from typing import Any, Dict
import logging
from app.deps import get_redis
logger = logging.getLogger(__name__)
class EventHub:
def __init__(self):
self.redis = get_redis()
self.is_rest_api = not hasattr(self.redis, 'pubsub')
# Generic key helpers
def get_key(self, key: str):
return self.redis.get(key)
def setex(self, key: str, ttl: int, value: str):
try:
return self.redis.setex(key, ttl, value)
except Exception as e:
logger.error(f"[hub] ❌ setex failed for {key}: {e}", exc_info=True)
raise
def exists(self, key: str) -> bool:
return self.redis.exists(key)
def delete(self, key: str):
return self.redis.delete(key)
# βœ… ADD: Raw command execution compatibility
def execute_command(self, *args):
"""
Execute raw Redis command (works for both TCP and Upstash)
Usage: execute_command("XADD", "stream", "*", "field", "value")
"""
try:
if self.is_rest_api:
# Upstash: pass as list to execute()
return self.redis.execute(list(args))
else:
# TCP Redis: native execute_command
return self.redis.execute_command(*args)
except Exception as e:
logger.error(f"[hub] ❌ Command failed {args}: {e}")
raise
# Stream & pub/sub helpers
def stream_key(self, org_id: str, source_id: str) -> str:
return f"stream:analytics:{org_id}:{source_id}"
def trigger_channel(self, org_id: str, source_id: str) -> str:
return f"analytics_trigger:{org_id}:{source_id}"
def emit_kpi_update(self, org_id: str, source_id: str, kpi_data: Dict[str, Any]):
message = {
"type": "kpi_update",
"timestamp": datetime.utcnow().isoformat(),
"data": kpi_data,
}
return self.redis.xadd(self.stream_key(org_id, source_id), {"message": json.dumps(message)})
def emit_insight(self, org_id: str, source_id: str, insight: Dict[str, Any]):
message = {
"type": "insight",
"timestamp": datetime.utcnow().isoformat(),
"data": insight,
}
return self.redis.xadd(self.stream_key(org_id, source_id), {"message": json.dumps(message)})
def emit_status(self, org_id: str, source_id: str, status: str, message: str = "", details: Dict | None = None):
payload = {
"type": "status",
"status": status,
"message": message,
"details": details or {},
"timestamp": datetime.utcnow().isoformat()
}
channel = f"analytics:{org_id}:{source_id}:status"
return self.redis.publish(channel, json.dumps(payload))
def emit_error(self, org_id: str, source_id: str, error_message: str, error_details: Dict | None = None):
payload = {
"type": "error",
"message": error_message,
"details": error_details or {},
"timestamp": datetime.utcnow().isoformat()
}
channel = f"analytics:{org_id}:{source_id}:error"
return self.redis.publish(channel, json.dumps(payload))
# app/core/event_hub.py
# app/core/event_hub.py - Line 89
def emit_analytics_trigger(self, org_id: str, source_id: str, extra: dict | None = None):
"""Write trigger to centralized stream"""
stream_key = "stream:analytics_triggers"
payload = {
"org_id": org_id,
"source_id": source_id,
"timestamp": datetime.utcnow().isoformat(),
}
if extra:
payload.update(extra)
try:
# βœ… Use compatibility wrapper
msg_id = self.execute_command(
"XADD",
stream_key,
"*", # Auto-generate ID
"message",
json.dumps(payload)
)
logger.info(f"[hub] πŸ“€ trigger emitted: {org_id}:{source_id} (msg: {msg_id})")
return msg_id
except Exception as e:
logger.error(f"[hub] ❌ emit failed: {e}", exc_info=True)
return None
def ensure_consumer_group(self, stream_key: str, group: str):
try:
return self.redis.xgroup_create(stream_key, group, id="0", mkstream=True)
except Exception as e:
# ignore BUSYGROUP
if "BUSYGROUP" in str(e):
return None
raise
def read_recent_stream(self, stream_key: str, count: int = 10):
try:
messages = self.redis.xrevrange(stream_key, count=count)
out = []
for msg in messages:
# msg -> (id, {b'message': b'...'} )
data = msg[1].get(b"message") if isinstance(msg[1], dict) else None
if data:
try:
out.append(json.loads(data.decode()))
except Exception:
try:
out.append(json.loads(data))
except Exception:
out.append({"raw": data})
return out
except Exception:
return []
def get_recent_events(self, org_id: str, source_id: str, count: int = 10):
return self.read_recent_stream(self.stream_key(org_id, source_id), count)
# Simple queue helpers
def lpush(self, key: str, value: str):
return self.redis.lpush(key, value)
def brpop(self, key: str, timeout: int = 0):
return self.redis.brpop(key, timeout=timeout)
def publish(self, channel: str, message: str):
return self.redis.publish(channel, message)
def keys(self, pattern: str):
return self.redis.keys(pattern)
def pipeline(self):
"""Return a redis pipeline-like object if supported by client.
Note: Upstash client may not support classic pipelines; callers should
handle attribute errors and fall back to sequential commands.
"""
try:
return self.redis.pipeline()
except Exception:
return None
# Singleton
event_hub = EventHub()