Spaces:
Runtime error
Runtime error
File size: 2,665 Bytes
8d21059 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 | # utils/redis_cache.py
import logging
import json
from typing import Any
import config
logger = logging.getLogger(__name__)
_redis = None
_redis_available = None # None=لم يجرَّب، True=يعمل، False=غير متاح
def get_redis():
global _redis, _redis_available
if _redis_available is False:
return None
if _redis is not None:
return _redis
try:
import redis
r = redis.Redis(
host=config.REDIS_HOST,
port=config.REDIS_PORT,
password=config.REDIS_PASSWORD or None,
db=config.REDIS_DB,
decode_responses=True,
socket_connect_timeout=3,
socket_timeout=3,
)
r.ping()
_redis = r
_redis_available = True
logger.info("✅ Redis متصل — الكاش نشط")
return _redis
except Exception as e:
_redis_available = False
logger.warning(f"⚠️ Redis غير متاح: {e} — سيعمل البوت بكاش ذاكرة فقط")
return None
# ── كاش الذاكرة الاحتياطي (fallback) ──────────────────────
_mem_cache: dict[str, tuple[Any, float]] = {}
def cache_get(key: str) -> Any | None:
import time
r = get_redis()
if r:
try:
val = r.get(key)
if val:
return json.loads(val)
return None
except Exception:
pass
# fallback للذاكرة
entry = _mem_cache.get(key)
if entry:
value, expires_at = entry
if time.time() < expires_at:
return value
del _mem_cache[key]
return None
def cache_set(key: str, value: Any, ttl: int = None) -> bool:
import time
if ttl is None:
ttl = config.CACHE_TTL # 20 ساعة افتراضياً
r = get_redis()
if r:
try:
r.setex(key, ttl, json.dumps(value, ensure_ascii=False))
return True
except Exception:
pass
# fallback للذاكرة
_mem_cache[key] = (value, time.time() + ttl)
# تنظيف الكاش القديم إذا كبر
if len(_mem_cache) > 500:
now = time.time()
expired = [k for k, (_, exp) in _mem_cache.items() if now > exp]
for k in expired:
_mem_cache.pop(k, None)
return True
def cache_delete(key: str) -> bool:
r = get_redis()
if r:
try:
r.delete(key)
except Exception:
pass
_mem_cache.pop(key, None)
return True
def cache_exists(key: str) -> bool:
return cache_get(key) is not None
|