Spaces:
Sleeping
Sleeping
File size: 4,344 Bytes
0f123dc | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 | """Redis-backed JSON cache with graceful degradation."""
from __future__ import annotations
import json
import logging
import os
import threading
from typing import Any, Optional
try:
import redis
except Exception: # pragma: no cover - dependency may be absent in local test envs.
redis = None # type: ignore[assignment]
logger = logging.getLogger(__name__)
def _to_bool(value: str | None) -> bool:
return str(value or "").strip().lower() in {"1", "true", "yes", "on"}
def _build_redis_url() -> str:
explicit = (os.getenv("REDIS_URL") or "").strip()
if explicit:
return explicit
host = (os.getenv("REDIS_HOST") or "localhost").strip()
port = int((os.getenv("REDIS_PORT") or "6379").strip())
password = os.getenv("REDIS_PASS")
tls = _to_bool(os.getenv("REDIS_TLS"))
scheme = "rediss" if tls else "redis"
if password:
return f"{scheme}://:{password}@{host}:{port}/0"
return f"{scheme}://{host}:{port}/0"
class RedisJSONCache:
"""Thread-safe Redis cache helper used across agent live-data flows."""
_instance: Optional["RedisJSONCache"] = None
_instance_lock = threading.Lock()
def __init__(self, namespace: str = "zico:strategy:live_data:") -> None:
self.namespace = namespace
self._client: Optional[Any] = None
self._client_lock = threading.Lock()
@classmethod
def instance(cls) -> "RedisJSONCache":
with cls._instance_lock:
if cls._instance is None:
cls._instance = cls()
return cls._instance
def _key(self, key: str) -> str:
return f"{self.namespace}{key}"
def _get_client(self) -> Optional[Any]:
if redis is None:
return None
if self._client is not None:
return self._client
with self._client_lock:
if self._client is not None:
return self._client
try:
client = redis.Redis.from_url(
_build_redis_url(),
decode_responses=True,
socket_connect_timeout=1.5,
socket_timeout=1.5,
health_check_interval=30,
retry_on_timeout=True,
)
client.ping()
self._client = client
return client
except Exception as exc:
logger.warning("Redis unavailable; cache will degrade to source fetches: %s", exc)
self._client = None
return None
def get_json(self, key: str) -> Any:
client = self._get_client()
if client is None:
return None
try:
raw = client.get(self._key(key))
if not raw:
return None
return json.loads(raw)
except Exception as exc:
logger.debug("Redis get_json failed for key=%s: %s", key, exc)
return None
def set_json(self, key: str, value: Any, ttl: int) -> None:
client = self._get_client()
if client is None:
return
try:
payload = json.dumps(value, separators=(",", ":"), ensure_ascii=True)
client.set(self._key(key), payload, ex=max(int(ttl), 1))
except Exception as exc:
logger.debug("Redis set_json failed for key=%s: %s", key, exc)
def set_error(self, key: str, error_payload: Any, ttl: int) -> None:
wrapped = {"__error__": True, "payload": error_payload}
self.set_json(key, wrapped, ttl)
def incr(self, key: str, ttl: int) -> int:
client = self._get_client()
if client is None:
return 0
try:
nkey = self._key(key)
value = int(client.incr(nkey))
if value == 1:
client.expire(nkey, max(int(ttl), 1))
return value
except Exception as exc:
logger.debug("Redis incr failed for key=%s: %s", key, exc)
return 0
def exists(self, key: str) -> bool:
client = self._get_client()
if client is None:
return False
try:
return bool(client.exists(self._key(key)))
except Exception as exc:
logger.debug("Redis exists failed for key=%s: %s", key, exc)
return False
|