File size: 6,482 Bytes
b9f0345
 
 
 
 
 
 
2334370
4e13540
b9f0345
 
4e13540
b9f0345
 
 
397c16a
b9f0345
 
 
 
 
bcb1320
 
 
 
 
b9f0345
 
 
 
 
 
397c16a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
b9f0345
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
f962ee5
b9f0345
36a0cad
 
ce0938c
36a0cad
ce0938c
397c16a
ce0938c
 
b604b62
 
ce0938c
 
b604b62
ce0938c
 
397c16a
 
36a0cad
397c16a
 
 
36a0cad
397c16a
 
36a0cad
ce0938c
 
36a0cad
ce0938c
b9f0345
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
bcb1320
 
 
 
 
 
 
 
 
 
 
b9f0345
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
"""Central Event Hub wrapper around Redis streams & pub/sub.

Provides a small compatibility layer so callers can emit events
and read recent stream entries without importing `redis` directly.
"""
import json
from datetime import datetime
from typing import Any, Dict
import logging
from app.deps import get_redis

logger = logging.getLogger(__name__)
class EventHub:
    def __init__(self):
        self.redis = get_redis()
        self.is_rest_api = not hasattr(self.redis, 'pubsub')
    # Generic key helpers
    def get_key(self, key: str):
        return self.redis.get(key)

    def setex(self, key: str, ttl: int, value: str):
        try:
            return self.redis.setex(key, ttl, value)
        except Exception as e:
            logger.error(f"[hub] ❌ setex failed for {key}: {e}", exc_info=True)
            raise

    def exists(self, key: str) -> bool:
        return self.redis.exists(key)

    def delete(self, key: str):
        return self.redis.delete(key)
    
    # βœ… ADD: Raw command execution compatibility
    def execute_command(self, *args):
        """
        Execute raw Redis command (works for both TCP and Upstash)
        Usage: execute_command("XADD", "stream", "*", "field", "value")
        """
        try:
            if self.is_rest_api:
                # Upstash: pass as list to execute()
                return self.redis.execute(list(args))
            else:
                # TCP Redis: native execute_command
                return self.redis.execute_command(*args)
        except Exception as e:
            logger.error(f"[hub] ❌ Command failed {args}: {e}")
            raise

    # Stream & pub/sub helpers
    def stream_key(self, org_id: str, source_id: str) -> str:
        return f"stream:analytics:{org_id}:{source_id}"

    def trigger_channel(self, org_id: str, source_id: str) -> str:
        return f"analytics_trigger:{org_id}:{source_id}"

    def emit_kpi_update(self, org_id: str, source_id: str, kpi_data: Dict[str, Any]):
        message = {
            "type": "kpi_update",
            "timestamp": datetime.utcnow().isoformat(),
            "data": kpi_data,
        }
        return self.redis.xadd(self.stream_key(org_id, source_id), {"message": json.dumps(message)})

    def emit_insight(self, org_id: str, source_id: str, insight: Dict[str, Any]):
        message = {
            "type": "insight",
            "timestamp": datetime.utcnow().isoformat(),
            "data": insight,
        }
        return self.redis.xadd(self.stream_key(org_id, source_id), {"message": json.dumps(message)})

    def emit_status(self, org_id: str, source_id: str, status: str, message: str = "", details: Dict | None = None):
        payload = {
            "type": "status",
            "status": status,
            "message": message,
            "details": details or {},
            "timestamp": datetime.utcnow().isoformat()
        }
        channel = f"analytics:{org_id}:{source_id}:status"
        return self.redis.publish(channel, json.dumps(payload))

    def emit_error(self, org_id: str, source_id: str, error_message: str, error_details: Dict | None = None):
        payload = {
            "type": "error",
            "message": error_message,
            "details": error_details or {},
            "timestamp": datetime.utcnow().isoformat()
        }
        channel = f"analytics:{org_id}:{source_id}:error"
        return self.redis.publish(channel, json.dumps(payload))

    # app/core/event_hub.py

    # app/core/event_hub.py - Line 89

    def emit_analytics_trigger(self, org_id: str, source_id: str, extra: dict | None = None):
        """Write trigger to centralized stream"""
        stream_key = "stream:analytics_triggers"
        
        payload = {
            "org_id": org_id,
            "source_id": source_id,
            "timestamp": datetime.utcnow().isoformat(),
        }
        if extra:
            payload.update(extra)

        try:
            # βœ… Use compatibility wrapper
            msg_id = self.execute_command(
                "XADD", 
                stream_key,
                "*",  # Auto-generate ID
                "message",
                json.dumps(payload)
            )
            
            logger.info(f"[hub] πŸ“€ trigger emitted: {org_id}:{source_id} (msg: {msg_id})")
            return msg_id
        except Exception as e:
            logger.error(f"[hub] ❌ emit failed: {e}", exc_info=True)
            return None

    def ensure_consumer_group(self, stream_key: str, group: str):
        try:
            return self.redis.xgroup_create(stream_key, group, id="0", mkstream=True)
        except Exception as e:
            # ignore BUSYGROUP
            if "BUSYGROUP" in str(e):
                return None
            raise

    def read_recent_stream(self, stream_key: str, count: int = 10):
        try:
            messages = self.redis.xrevrange(stream_key, count=count)
            out = []
            for msg in messages:
                # msg -> (id, {b'message': b'...'} )
                data = msg[1].get(b"message") if isinstance(msg[1], dict) else None
                if data:
                    try:
                        out.append(json.loads(data.decode()))
                    except Exception:
                        try:
                            out.append(json.loads(data))
                        except Exception:
                            out.append({"raw": data})
            return out
        except Exception:
            return []

    def get_recent_events(self, org_id: str, source_id: str, count: int = 10):
        return self.read_recent_stream(self.stream_key(org_id, source_id), count)

    # Simple queue helpers
    def lpush(self, key: str, value: str):
        return self.redis.lpush(key, value)

    def brpop(self, key: str, timeout: int = 0):
        return self.redis.brpop(key, timeout=timeout)

    def publish(self, channel: str, message: str):
        return self.redis.publish(channel, message)

    def keys(self, pattern: str):
        return self.redis.keys(pattern)

    def pipeline(self):
        """Return a redis pipeline-like object if supported by client.

        Note: Upstash client may not support classic pipelines; callers should
        handle attribute errors and fall back to sequential commands.
        """
        try:
            return self.redis.pipeline()
        except Exception:
            return None


# Singleton
event_hub = EventHub()