File size: 4,344 Bytes
0f123dc
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
"""Redis-backed JSON cache with graceful degradation."""

from __future__ import annotations

import json
import logging
import os
import threading
from typing import Any, Optional

try:
    import redis
except Exception:  # pragma: no cover - dependency may be absent in local test envs.
    redis = None  # type: ignore[assignment]

logger = logging.getLogger(__name__)


def _to_bool(value: str | None) -> bool:
    return str(value or "").strip().lower() in {"1", "true", "yes", "on"}


def _build_redis_url() -> str:
    explicit = (os.getenv("REDIS_URL") or "").strip()
    if explicit:
        return explicit
    host = (os.getenv("REDIS_HOST") or "localhost").strip()
    port = int((os.getenv("REDIS_PORT") or "6379").strip())
    password = os.getenv("REDIS_PASS")
    tls = _to_bool(os.getenv("REDIS_TLS"))
    scheme = "rediss" if tls else "redis"
    if password:
        return f"{scheme}://:{password}@{host}:{port}/0"
    return f"{scheme}://{host}:{port}/0"


class RedisJSONCache:
    """Thread-safe Redis cache helper used across agent live-data flows."""

    _instance: Optional["RedisJSONCache"] = None
    _instance_lock = threading.Lock()

    def __init__(self, namespace: str = "zico:strategy:live_data:") -> None:
        self.namespace = namespace
        self._client: Optional[Any] = None
        self._client_lock = threading.Lock()

    @classmethod
    def instance(cls) -> "RedisJSONCache":
        with cls._instance_lock:
            if cls._instance is None:
                cls._instance = cls()
            return cls._instance

    def _key(self, key: str) -> str:
        return f"{self.namespace}{key}"

    def _get_client(self) -> Optional[Any]:
        if redis is None:
            return None
        if self._client is not None:
            return self._client
        with self._client_lock:
            if self._client is not None:
                return self._client
            try:
                client = redis.Redis.from_url(
                    _build_redis_url(),
                    decode_responses=True,
                    socket_connect_timeout=1.5,
                    socket_timeout=1.5,
                    health_check_interval=30,
                    retry_on_timeout=True,
                )
                client.ping()
                self._client = client
                return client
            except Exception as exc:
                logger.warning("Redis unavailable; cache will degrade to source fetches: %s", exc)
                self._client = None
                return None

    def get_json(self, key: str) -> Any:
        client = self._get_client()
        if client is None:
            return None
        try:
            raw = client.get(self._key(key))
            if not raw:
                return None
            return json.loads(raw)
        except Exception as exc:
            logger.debug("Redis get_json failed for key=%s: %s", key, exc)
            return None

    def set_json(self, key: str, value: Any, ttl: int) -> None:
        client = self._get_client()
        if client is None:
            return
        try:
            payload = json.dumps(value, separators=(",", ":"), ensure_ascii=True)
            client.set(self._key(key), payload, ex=max(int(ttl), 1))
        except Exception as exc:
            logger.debug("Redis set_json failed for key=%s: %s", key, exc)

    def set_error(self, key: str, error_payload: Any, ttl: int) -> None:
        wrapped = {"__error__": True, "payload": error_payload}
        self.set_json(key, wrapped, ttl)

    def incr(self, key: str, ttl: int) -> int:
        client = self._get_client()
        if client is None:
            return 0
        try:
            nkey = self._key(key)
            value = int(client.incr(nkey))
            if value == 1:
                client.expire(nkey, max(int(ttl), 1))
            return value
        except Exception as exc:
            logger.debug("Redis incr failed for key=%s: %s", key, exc)
            return 0

    def exists(self, key: str) -> bool:
        client = self._get_client()
        if client is None:
            return False
        try:
            return bool(client.exists(self._key(key)))
        except Exception as exc:
            logger.debug("Redis exists failed for key=%s: %s", key, exc)
            return False