Games-In-Arabic / utils /redis_cache.py
qqqsfasdf's picture
Upload 60 files
8d21059 verified
# utils/redis_cache.py
import logging
import json
from typing import Any
import config
logger = logging.getLogger(__name__)
_redis = None
_redis_available = None # None=لم يجرَّب، True=يعمل، False=غير متاح
def get_redis():
global _redis, _redis_available
if _redis_available is False:
return None
if _redis is not None:
return _redis
try:
import redis
r = redis.Redis(
host=config.REDIS_HOST,
port=config.REDIS_PORT,
password=config.REDIS_PASSWORD or None,
db=config.REDIS_DB,
decode_responses=True,
socket_connect_timeout=3,
socket_timeout=3,
)
r.ping()
_redis = r
_redis_available = True
logger.info("✅ Redis متصل — الكاش نشط")
return _redis
except Exception as e:
_redis_available = False
logger.warning(f"⚠️ Redis غير متاح: {e} — سيعمل البوت بكاش ذاكرة فقط")
return None
# ── كاش الذاكرة الاحتياطي (fallback) ──────────────────────
_mem_cache: dict[str, tuple[Any, float]] = {}
def cache_get(key: str) -> Any | None:
import time
r = get_redis()
if r:
try:
val = r.get(key)
if val:
return json.loads(val)
return None
except Exception:
pass
# fallback للذاكرة
entry = _mem_cache.get(key)
if entry:
value, expires_at = entry
if time.time() < expires_at:
return value
del _mem_cache[key]
return None
def cache_set(key: str, value: Any, ttl: int = None) -> bool:
import time
if ttl is None:
ttl = config.CACHE_TTL # 20 ساعة افتراضياً
r = get_redis()
if r:
try:
r.setex(key, ttl, json.dumps(value, ensure_ascii=False))
return True
except Exception:
pass
# fallback للذاكرة
_mem_cache[key] = (value, time.time() + ttl)
# تنظيف الكاش القديم إذا كبر
if len(_mem_cache) > 500:
now = time.time()
expired = [k for k, (_, exp) in _mem_cache.items() if now > exp]
for k in expired:
_mem_cache.pop(k, None)
return True
def cache_delete(key: str) -> bool:
r = get_redis()
if r:
try:
r.delete(key)
except Exception:
pass
_mem_cache.pop(key, None)
return True
def cache_exists(key: str) -> bool:
return cache_get(key) is not None