File size: 6,482 Bytes
b9f0345 2334370 4e13540 b9f0345 4e13540 b9f0345 397c16a b9f0345 bcb1320 b9f0345 397c16a b9f0345 f962ee5 b9f0345 36a0cad ce0938c 36a0cad ce0938c 397c16a ce0938c b604b62 ce0938c b604b62 ce0938c 397c16a 36a0cad 397c16a 36a0cad 397c16a 36a0cad ce0938c 36a0cad ce0938c b9f0345 bcb1320 b9f0345 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 | """Central Event Hub wrapper around Redis streams & pub/sub.
Provides a small compatibility layer so callers can emit events
and read recent stream entries without importing `redis` directly.
"""
import json
from datetime import datetime
from typing import Any, Dict
import logging
from app.deps import get_redis
logger = logging.getLogger(__name__)
class EventHub:
def __init__(self):
self.redis = get_redis()
self.is_rest_api = not hasattr(self.redis, 'pubsub')
# Generic key helpers
def get_key(self, key: str):
return self.redis.get(key)
def setex(self, key: str, ttl: int, value: str):
try:
return self.redis.setex(key, ttl, value)
except Exception as e:
logger.error(f"[hub] β setex failed for {key}: {e}", exc_info=True)
raise
def exists(self, key: str) -> bool:
return self.redis.exists(key)
def delete(self, key: str):
return self.redis.delete(key)
# β
ADD: Raw command execution compatibility
def execute_command(self, *args):
"""
Execute raw Redis command (works for both TCP and Upstash)
Usage: execute_command("XADD", "stream", "*", "field", "value")
"""
try:
if self.is_rest_api:
# Upstash: pass as list to execute()
return self.redis.execute(list(args))
else:
# TCP Redis: native execute_command
return self.redis.execute_command(*args)
except Exception as e:
logger.error(f"[hub] β Command failed {args}: {e}")
raise
# Stream & pub/sub helpers
def stream_key(self, org_id: str, source_id: str) -> str:
return f"stream:analytics:{org_id}:{source_id}"
def trigger_channel(self, org_id: str, source_id: str) -> str:
return f"analytics_trigger:{org_id}:{source_id}"
def emit_kpi_update(self, org_id: str, source_id: str, kpi_data: Dict[str, Any]):
message = {
"type": "kpi_update",
"timestamp": datetime.utcnow().isoformat(),
"data": kpi_data,
}
return self.redis.xadd(self.stream_key(org_id, source_id), {"message": json.dumps(message)})
def emit_insight(self, org_id: str, source_id: str, insight: Dict[str, Any]):
message = {
"type": "insight",
"timestamp": datetime.utcnow().isoformat(),
"data": insight,
}
return self.redis.xadd(self.stream_key(org_id, source_id), {"message": json.dumps(message)})
def emit_status(self, org_id: str, source_id: str, status: str, message: str = "", details: Dict | None = None):
payload = {
"type": "status",
"status": status,
"message": message,
"details": details or {},
"timestamp": datetime.utcnow().isoformat()
}
channel = f"analytics:{org_id}:{source_id}:status"
return self.redis.publish(channel, json.dumps(payload))
def emit_error(self, org_id: str, source_id: str, error_message: str, error_details: Dict | None = None):
payload = {
"type": "error",
"message": error_message,
"details": error_details or {},
"timestamp": datetime.utcnow().isoformat()
}
channel = f"analytics:{org_id}:{source_id}:error"
return self.redis.publish(channel, json.dumps(payload))
# app/core/event_hub.py
# app/core/event_hub.py - Line 89
def emit_analytics_trigger(self, org_id: str, source_id: str, extra: dict | None = None):
"""Write trigger to centralized stream"""
stream_key = "stream:analytics_triggers"
payload = {
"org_id": org_id,
"source_id": source_id,
"timestamp": datetime.utcnow().isoformat(),
}
if extra:
payload.update(extra)
try:
# β
Use compatibility wrapper
msg_id = self.execute_command(
"XADD",
stream_key,
"*", # Auto-generate ID
"message",
json.dumps(payload)
)
logger.info(f"[hub] π€ trigger emitted: {org_id}:{source_id} (msg: {msg_id})")
return msg_id
except Exception as e:
logger.error(f"[hub] β emit failed: {e}", exc_info=True)
return None
def ensure_consumer_group(self, stream_key: str, group: str):
try:
return self.redis.xgroup_create(stream_key, group, id="0", mkstream=True)
except Exception as e:
# ignore BUSYGROUP
if "BUSYGROUP" in str(e):
return None
raise
def read_recent_stream(self, stream_key: str, count: int = 10):
try:
messages = self.redis.xrevrange(stream_key, count=count)
out = []
for msg in messages:
# msg -> (id, {b'message': b'...'} )
data = msg[1].get(b"message") if isinstance(msg[1], dict) else None
if data:
try:
out.append(json.loads(data.decode()))
except Exception:
try:
out.append(json.loads(data))
except Exception:
out.append({"raw": data})
return out
except Exception:
return []
def get_recent_events(self, org_id: str, source_id: str, count: int = 10):
return self.read_recent_stream(self.stream_key(org_id, source_id), count)
# Simple queue helpers
def lpush(self, key: str, value: str):
return self.redis.lpush(key, value)
def brpop(self, key: str, timeout: int = 0):
return self.redis.brpop(key, timeout=timeout)
def publish(self, channel: str, message: str):
return self.redis.publish(channel, message)
def keys(self, pattern: str):
return self.redis.keys(pattern)
def pipeline(self):
"""Return a redis pipeline-like object if supported by client.
Note: Upstash client may not support classic pipelines; callers should
handle attribute errors and fall back to sequential commands.
"""
try:
return self.redis.pipeline()
except Exception:
return None
# Singleton
event_hub = EventHub()
|