shaliz-kong commited on
Commit
397c16a
Β·
1 Parent(s): 935eaff

refactored for self hosted redis

Browse files
Dockerfile CHANGED
@@ -1,4 +1,3 @@
1
-
2
  # ---- 1. base image ---------------------------------------------------------
3
  FROM python:3.11-slim
4
 
@@ -13,30 +12,33 @@ RUN apt-get update && apt-get install -y --no-install-recommends \
13
  ca-certificates \
14
  wget \
15
  unzip \
 
 
16
  && rm -rf /var/lib/apt/lists/*
17
 
18
  # ---- 3. upgrade pip & enable pre-built wheels ------------------------------
19
  RUN pip install --no-cache-dir --upgrade pip setuptools wheel
20
 
21
-
22
- # ---- 4. install Python deps (+ DuckDB driver) ------------------------------
23
  # ---- 4. install Python deps (+ DuckDB driver) ------------------------------
24
  COPY requirements.txt /tmp/requirements.txt
25
  RUN pip install --no-cache-dir --prefer-binary -r /tmp/requirements.txt && \
26
  pip install --no-cache-dir "duckdb>=1.0.0"
27
 
28
- # ---- 5. Pre-download VSS extension (matches DuckDB v1.0.0) ---------------
29
- # This prevents runtime download failures and startup crashes
30
- RUN mkdir -p /root/.duckdb/extensions/v1.0.0/linux_amd64 && \
31
- wget -q https://extensions.duckdb.org/v1.0.0/linux_amd64/vss.duckdb_extension.gz \
32
- -O /root/.duckdb/extensions/v1.0.0/linux_amd64/vss.duckdb_extension.gz && \
33
- gunzip /root/.duckdb/extensions/v1.0.0/linux_amd64/vss.duckdb_extension.gz
34
  # ---- 6. copy source --------------------------------------------------------
35
  COPY . /app
36
  WORKDIR /app
37
 
38
  # ---- 7. scheduler loop ----------------------------------------------------
39
  COPY scheduler_loop.py /app/scheduler_loop.py
40
- # corrected duck db
41
- # ---- 8. start both services -----------------------------------------------
42
- CMD sh -c "python -m uvicorn app.main:app --host 0.0.0.0 --port 7860 & python /app/scheduler_loop.py"
 
 
 
 
 
1
  # ---- 1. base image ---------------------------------------------------------
2
  FROM python:3.11-slim
3
 
 
12
  ca-certificates \
13
  wget \
14
  unzip \
15
+ redis-server \ # βœ… ADD THIS
16
+ supervisor \ # βœ… ADD THIS
17
  && rm -rf /var/lib/apt/lists/*
18
 
19
  # ---- 3. upgrade pip & enable pre-built wheels ------------------------------
20
  RUN pip install --no-cache-dir --upgrade pip setuptools wheel
21
 
 
 
22
  # ---- 4. install Python deps (+ DuckDB driver) ------------------------------
23
  COPY requirements.txt /tmp/requirements.txt
24
  RUN pip install --no-cache-dir --prefer-binary -r /tmp/requirements.txt && \
25
  pip install --no-cache-dir "duckdb>=1.0.0"
26
 
27
+ # ---- 5. Pre-download VSS extension (matches DuckDB v1.1.3) ---------------
28
+ # βœ… CHANGED: v1.1.3 instead of v1.0.0
29
+ RUN mkdir -p /root/.duckdb/extensions/v1.1.3/linux_amd64 && \
30
+ wget -q https://extensions.duckdb.org/v1.1.3/linux_amd64/vss.duckdb_extension.gz \
31
+ -O /root/.duckdb/extensions/v1.1.3/linux_amd64/vss.duckdb_extension.gz && \
32
+ gunzip /root/.duckdb/extensions/v1.1.3/linux_amd64/vss.duckdb_extension.gz
33
  # ---- 6. copy source --------------------------------------------------------
34
  COPY . /app
35
  WORKDIR /app
36
 
37
  # ---- 7. scheduler loop ----------------------------------------------------
38
  COPY scheduler_loop.py /app/scheduler_loop.py
39
+
40
+ # ---- 8. copy supervisord config -------------------------------------------
41
+ COPY supervisord.conf /etc/supervisor/conf.d/supervisord.conf # βœ… ADD THIS
42
+
43
+ # ---- 9. start all services via supervisord --------------------------------
44
+ CMD ["/usr/bin/supervisord", "-c", "/etc/supervisor/conf.d/supervisord.conf"]
app/core/event_hub.py CHANGED
@@ -13,7 +13,7 @@ logger = logging.getLogger(__name__)
13
  class EventHub:
14
  def __init__(self):
15
  self.redis = get_redis()
16
-
17
  # Generic key helpers
18
  def get_key(self, key: str):
19
  return self.redis.get(key)
@@ -30,6 +30,23 @@ class EventHub:
30
 
31
  def delete(self, key: str):
32
  return self.redis.delete(key)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
33
 
34
  # Stream & pub/sub helpers
35
  def stream_key(self, org_id: str, source_id: str) -> str:
@@ -82,7 +99,7 @@ class EventHub:
82
  def emit_analytics_trigger(self, org_id: str, source_id: str, extra: dict | None = None):
83
  """Write trigger to centralized stream"""
84
  stream_key = "stream:analytics_triggers"
85
-
86
  payload = {
87
  "org_id": org_id,
88
  "source_id": source_id,
@@ -92,15 +109,15 @@ class EventHub:
92
  payload.update(extra)
93
 
94
  try:
95
- # βœ… FIX: Pass command as a LIST, not multiple arguments
96
- msg_id = self.redis.execute([
97
  "XADD",
98
- stream_key,
99
- "*", # Auto-generate ID
100
- "message",
101
  json.dumps(payload)
102
- ])
103
-
104
  logger.info(f"[hub] πŸ“€ trigger emitted: {org_id}:{source_id} (msg: {msg_id})")
105
  return msg_id
106
  except Exception as e:
 
13
  class EventHub:
14
  def __init__(self):
15
  self.redis = get_redis()
16
+ self.is_rest_api = not hasattr(self.redis, 'pubsub')
17
  # Generic key helpers
18
  def get_key(self, key: str):
19
  return self.redis.get(key)
 
30
 
31
  def delete(self, key: str):
32
  return self.redis.delete(key)
33
+
34
+ # βœ… ADD: Raw command execution compatibility
35
+ def execute_command(self, *args):
36
+ """
37
+ Execute raw Redis command (works for both TCP and Upstash)
38
+ Usage: execute_command("XADD", "stream", "*", "field", "value")
39
+ """
40
+ try:
41
+ if self.is_rest_api:
42
+ # Upstash: pass as list to execute()
43
+ return self.redis.execute(list(args))
44
+ else:
45
+ # TCP Redis: native execute_command
46
+ return self.redis.execute_command(*args)
47
+ except Exception as e:
48
+ logger.error(f"[hub] ❌ Command failed {args}: {e}")
49
+ raise
50
 
51
  # Stream & pub/sub helpers
52
  def stream_key(self, org_id: str, source_id: str) -> str:
 
99
  def emit_analytics_trigger(self, org_id: str, source_id: str, extra: dict | None = None):
100
  """Write trigger to centralized stream"""
101
  stream_key = "stream:analytics_triggers"
102
+
103
  payload = {
104
  "org_id": org_id,
105
  "source_id": source_id,
 
109
  payload.update(extra)
110
 
111
  try:
112
+ # βœ… Use compatibility wrapper
113
+ msg_id = self.execute_command(
114
  "XADD",
115
+ stream_key,
116
+ "*", # Auto-generate ID
117
+ "message",
118
  json.dumps(payload)
119
+ )
120
+
121
  logger.info(f"[hub] πŸ“€ trigger emitted: {org_id}:{source_id} (msg: {msg_id})")
122
  return msg_id
123
  except Exception as e:
app/deps.py CHANGED
@@ -130,7 +130,7 @@ def get_duckdb(org_id: str) -> duckdb.DuckDBPyConnection:
130
 
131
  try:
132
  conn = duckdb.connect(str(db_file), read_only=False)
133
- conn.execute("SET hnsw_enable_experimental_persistence = true")
134
  # Enable VSS
135
  conn.execute("INSTALL vss;")
136
  conn.execute("LOAD vss;")
@@ -222,91 +222,69 @@ def get_vector_db(org_id: Optional[str] = None) -> duckdb.DuckDBPyConnection:
222
  return _vector_db_connections[org_id]
223
 
224
 
225
- # ── Redis Client (TCP + Upstash Compatible) ─────────────────────────────────────
226
  _redis_client = None
227
- _redis_config_cache: Dict[str, Any] = {}
228
-
229
  def get_redis():
230
  """
231
- πŸ”„ Returns Redis client (TCP or Upstash HTTP)
232
- Singleton pattern with config caching
 
 
233
  """
234
- global _redis_client, _redis_config_cache
235
-
236
- if _redis_client is not None:
237
- return _redis_client
238
 
239
- # Check for TCP Redis first
240
- redis_host = os.getenv("REDIS_HOST")
241
- if redis_host:
242
- logger.info("[REDIS] πŸ”Œ Initializing TCP Redis client")
243
-
244
- import redis as redis_py
245
 
246
- redis_url = os.getenv("REDIS_URL")
247
- if redis_url and redis_url.startswith("redis://"):
248
- from urllib.parse import urlparse
249
- parsed = urlparse(redis_url)
250
-
251
- _redis_client = redis_py.Redis(
252
- host=parsed.hostname or redis_host,
253
- port=parsed.port or int(os.getenv("REDIS_PORT", 6379)),
254
- password=parsed.password or os.getenv("REDIS_PASSWORD"),
255
- username=parsed.username or os.getenv("REDIS_USER"),
256
- decode_responses=True,
257
- ssl=bool(os.getenv("REDIS_SSL", False)),
258
- ssl_cert_reqs=None,
259
- socket_keepalive=True,
260
- socket_connect_timeout=5,
261
- socket_timeout=5,
262
- connection_pool=redis_py.ConnectionPool(
263
- max_connections=int(os.getenv("REDIS_MAX_CONNECTIONS", "10")),
264
- retry_on_timeout=True,
265
- socket_keepalive=True,
266
- )
267
- )
268
- else:
269
- _redis_client = redis_py.Redis(
270
- host=redis_host,
271
- port=int(os.getenv("REDIS_PORT", 6379)),
272
- password=os.getenv("REDIS_PASSWORD", None),
273
- decode_responses=True,
274
- socket_keepalive=True,
275
- connection_pool=redis_py.ConnectionPool(
276
- max_connections=int(os.getenv("REDIS_MAX_CONNECTIONS", "10")),
277
  )
278
- )
 
 
 
 
 
279
 
280
- _redis_config_cache["type"] = "tcp"
281
- return _redis_client
282
-
283
- # Fallback to Upstash HTTP
284
- if REDIS_URL and REDIS_TOKEN:
285
- logger.info("[REDIS] πŸ”Œ Initializing Upstash HTTP Redis client")
 
 
286
 
287
- _redis_client = Redis(url=REDIS_URL, token=REDIS_TOKEN)
288
- _redis_config_cache["type"] = "upstash"
 
 
289
  return _redis_client
290
-
291
- # Local dev fallback
292
- logger.warning("[REDIS] ⚠️ No config, using localhost:6379")
293
- import redis as redis_py
294
- _redis_client = redis_py.Redis(host="localhost", port=6379, decode_responses=True)
295
- _redis_config_cache["type"] = "local"
296
- return _redis_client
297
 
298
 
299
- def reset_redis_client():
300
- """SRE: Reset connection pool if needed"""
301
  global _redis_client
302
- if _redis_client:
303
- try:
304
- _redis_client.close()
305
- except Exception:
306
- pass
307
  _redis_client = None
308
 
309
 
 
 
 
 
 
 
310
  # ── QStash (Optional) ───────────────────────────────────────────────────────────
311
  _qstash_client = None
312
  _qstash_verifier = None
 
130
 
131
  try:
132
  conn = duckdb.connect(str(db_file), read_only=False)
133
+
134
  # Enable VSS
135
  conn.execute("INSTALL vss;")
136
  conn.execute("LOAD vss;")
 
222
  return _vector_db_connections[org_id]
223
 
224
 
225
+ # ── Redis Client (self hosted TCP + Upstash Compatible) ─────────────────────────────────────
226
  _redis_client = None
227
+ _redis_lock = threading.Lock()
 
228
  def get_redis():
229
  """
230
+ 🎯 Redis connection with clear priority:
231
+ 1. Self-hosted (TCP) - HF Spaces with supervisord
232
+ 2. Upstash (HTTP) - Fallback only
233
+ 3. Local dev mock - Last resort
234
  """
235
+ global _redis_client
 
 
 
236
 
237
+ with _redis_lock:
238
+ if _redis_client is not None:
239
+ return _redis_client
 
 
 
240
 
241
+ # 1. Self-hosted Redis (HF Spaces)
242
+ redis_url = os.getenv("REDIS_URL", "redis://localhost:6379")
243
+ if redis_url.startswith("redis://"):
244
+ try:
245
+ import redis as redis_py
246
+ _redis_client = redis_py.from_url(
247
+ redis_url,
248
+ decode_responses=True,
249
+ socket_connect_timeout=2,
250
+ socket_timeout=2,
251
+ retry_on_timeout=True
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
252
  )
253
+ # Test connection immediately
254
+ _redis_client.ping()
255
+ logger.info(f"βœ… Redis connected: {redis_url} (TCP)")
256
+ return _redis_client
257
+ except Exception as e:
258
+ logger.warning(f"⚠️ TCP Redis failed: {e}")
259
 
260
+ # 2. Upstash fallback (only if explicit)
261
+ upstash_url = os.getenv("UPSTASH_REDIS_REST_URL")
262
+ upstash_token = os.getenv("UPSTASH_REDIS_REST_TOKEN")
263
+
264
+ if upstash_url and upstash_token:
265
+ _redis_client = Redis(url=upstash_url, token=upstash_token)
266
+ logger.info("πŸ“‘ Redis connected: Upstash (HTTP)")
267
+ return _redis_client
268
 
269
+ # 3. Mock for local dev
270
+ logger.error("❌ No Redis available, using mock!")
271
+ from unittest.mock import Mock
272
+ _redis_client = Mock()
273
  return _redis_client
 
 
 
 
 
 
 
274
 
275
 
276
+ def reset_redis():
277
+ """SRE: Reset Redis connection (for testing)"""
278
  global _redis_client
 
 
 
 
 
279
  _redis_client = None
280
 
281
 
282
+ # ── Event Hub Connection Type Detection ─────────────────────────────────────────
283
+ def is_tcp_redis() -> bool:
284
+ """Check if using TCP Redis (pub/sub capable)"""
285
+ redis_url = os.getenv("REDIS_URL", "")
286
+ return redis_url.startswith("redis://")
287
+
288
  # ── QStash (Optional) ───────────────────────────────────────────────────────────
289
  _qstash_client = None
290
  _qstash_verifier = None
app/redis_async_client.py DELETED
@@ -1,39 +0,0 @@
1
- # app/redis_async_client.py
2
- """
3
- Async Redis client for Upstash using native Redis protocol.
4
- This is separate from the Upstash HTTP client used elsewhere.
5
- """
6
-
7
- import redis.asyncio as aioredis
8
- import os
9
- from urllib.parse import urlparse
10
-
11
- # Load from HF Secrets (exact names)
12
- UPSTASH_REDIS_REST_URL = os.getenv("UPSTASH_REDIS_REST_URL")
13
- UPSTASH_REDIS_REST_TOKEN = os.getenv("UPSTASH_REDIS_REST_TOKEN")
14
-
15
- if not UPSTASH_REDIS_REST_URL:
16
- raise RuntimeError("UPSTASH_REDIS_REST_URL not found in HF Secrets")
17
-
18
- # Parse the REST URL to get host and port
19
- parsed = urlparse(UPSTASH_REDIS_REST_URL)
20
-
21
- host = parsed.hostname
22
- port = parsed.port if parsed.port else 6379
23
-
24
- # Construct native Redis protocol URL
25
- REDIS_URL = f"redis://{host}:{port}"
26
-
27
- # Create async Redis client
28
- redis_async = aioredis.from_url(
29
- REDIS_URL,
30
- password=UPSTASH_REDIS_REST_TOKEN,
31
- decode_responses=True,
32
- retry_on_timeout=True,
33
- socket_keepalive=True,
34
- socket_connect_timeout=10,
35
- max_connections=10,
36
- )
37
-
38
- # Export for use
39
- __all__ = ["redis_async"]
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
app/redis_client.py CHANGED
@@ -1,13 +1,13 @@
1
- # app/redis_client.py
2
  from app.deps import get_redis
3
 
4
- # Export the singleton instance
5
  redis = get_redis()
6
 
7
- # Test on import
8
- try:
9
- redis.ping()
10
- print("βœ… Redis bridge connected")
11
- except Exception as e:
12
- print(f"❌ Redis connection failed: {e}")
13
- raise RuntimeError(f"Redis not available: {e}")
 
1
+ # app/redis_client.py – Lazy Singleton (No Startup Crash)
2
  from app.deps import get_redis
3
 
4
+ # Export the singleton instance (lazy, doesn't connect until first use)
5
  redis = get_redis()
6
 
7
+ # βœ… REMOVE: Don't ping on import - causes startup race condition
8
+ # try:
9
+ # redis.ping()
10
+ # print("βœ… Redis bridge connected")
11
+ # except Exception as e:
12
+ # print(f"❌ Redis connection failed: {e}")
13
+ # raise RuntimeError(f"Redis not available: {e}")
supervisord.conf ADDED
@@ -0,0 +1,32 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ [supervisord]
2
+ nodaemon=true
3
+ logfile=/var/log/supervisor/supervisord.log
4
+ pidfile=/var/run/supervisord.pid
5
+
6
+ [program:redis]
7
+ command=/usr/bin/redis-server /etc/redis/redis.conf
8
+ autostart=true
9
+ autorestart=true
10
+ priority=1
11
+ stdout_logfile=/var/log/supervisor/redis.log
12
+ stderr_logfile=/var/log/supervisor/redis_error.log
13
+ startsecs=2 # Wait 2 seconds to ensure Redis is ready
14
+
15
+ [program:uvicorn]
16
+ command=python -m uvicorn app.main:app --host 0.0.0.0 --port 7860 --workers 1
17
+ directory=/app
18
+ autostart=true
19
+ autorestart=true
20
+ priority=2
21
+ stdout_logfile=/var/log/supervisor/uvicorn.log
22
+ stderr_logfile=/var/log/supervisor/uvicorn_error.log
23
+ startsecs=3 # Wait for Redis to be ready
24
+
25
+ [program:scheduler]
26
+ command=python /app/scheduler_loop.py
27
+ directory=/app
28
+ autostart=true
29
+ autorestart=true
30
+ priority=3
31
+ stdout_logfile=/var/log/supervisor/scheduler.log
32
+ stderr_logfile=/var/log/supervisor/scheduler_error.log