| """Redis-backed cache for verdict deduplication. |
| |
| Falls back gracefully to an in-process dict when Redis is unreachable so the |
| pipeline never crashes due to cache infrastructure. |
| """ |
|
|
| from __future__ import annotations |
|
|
| import asyncio |
| import hashlib |
| import json |
| import logging |
| import os |
| from typing import Optional |
|
|
| try: |
| import redis.asyncio as aioredis |
| _REDIS_AVAILABLE = True |
| except ImportError: |
| aioredis = None |
| _REDIS_AVAILABLE = False |
|
|
| from models.response_models import FinalVerdict |
|
|
| logger = logging.getLogger(__name__) |
|
|
|
|
| class RedisCache: |
| """Async Redis cache with transparent in-memory fallback.""" |
|
|
| def __init__(self, url: Optional[str] = None, ttl_seconds: int = 86400) -> None: |
| self.url = url or os.getenv("REDIS_URL", "redis://localhost:6379") |
| self.ttl_seconds = ttl_seconds |
| self._client: Optional["aioredis.Redis"] = None |
| self._fallback: dict[str, str] = {} |
| self._using_fallback = not _REDIS_AVAILABLE |
| self._lock = asyncio.Lock() |
|
|
| async def _get_client(self) -> Optional["aioredis.Redis"]: |
| if self._using_fallback or not _REDIS_AVAILABLE: |
| return None |
| if self._client is not None: |
| return self._client |
| async with self._lock: |
| if self._client is not None: |
| return self._client |
| try: |
| self._client = aioredis.from_url( |
| self.url, |
| encoding="utf-8", |
| decode_responses=True, |
| socket_connect_timeout=2, |
| socket_timeout=2, |
| ) |
| await self._client.ping() |
| logger.info("Redis cache connected at %s", self.url) |
| except Exception as e: |
| logger.warning("Redis unavailable (%s); using in-memory cache", e) |
| self._client = None |
| self._using_fallback = True |
| return self._client |
|
|
| @staticmethod |
| def generate_hash(content: str) -> str: |
| """SHA256 of normalized content (lower-case, stripped).""" |
| normalized = content.strip().lower().encode("utf-8", errors="ignore") |
| return hashlib.sha256(normalized).hexdigest() |
|
|
| async def get_cached(self, content_hash: str) -> Optional[FinalVerdict]: |
| try: |
| client = await self._get_client() |
| payload: Optional[str] |
| if client is not None: |
| payload = await client.get(f"rasad:verdict:{content_hash}") |
| else: |
| payload = self._fallback.get(content_hash) |
| if not payload: |
| return None |
| data = json.loads(payload) |
| return FinalVerdict.model_validate(data) |
| except Exception as e: |
| logger.warning("Cache read failed for %s: %s", content_hash[:8], e) |
| return None |
|
|
| async def set_cache( |
| self, |
| content_hash: str, |
| verdict: FinalVerdict, |
| ttl: Optional[int] = None, |
| ) -> None: |
| try: |
| payload = verdict.model_dump_json() |
| ttl_actual = ttl if ttl is not None else self.ttl_seconds |
| client = await self._get_client() |
| if client is not None: |
| await client.setex(f"rasad:verdict:{content_hash}", ttl_actual, payload) |
| else: |
| self._fallback[content_hash] = payload |
| except Exception as e: |
| logger.warning("Cache write failed for %s: %s", content_hash[:8], e) |
|
|
| async def increment_counter(self, key: str) -> int: |
| try: |
| client = await self._get_client() |
| if client is not None: |
| return int(await client.incr(f"rasad:counter:{key}")) |
| self._fallback[f"counter:{key}"] = str(int(self._fallback.get(f"counter:{key}", "0")) + 1) |
| return int(self._fallback[f"counter:{key}"]) |
| except Exception: |
| return 0 |
|
|
| async def get_counter(self, key: str) -> int: |
| try: |
| client = await self._get_client() |
| if client is not None: |
| value = await client.get(f"rasad:counter:{key}") |
| return int(value) if value else 0 |
| return int(self._fallback.get(f"counter:{key}", "0")) |
| except Exception: |
| return 0 |
|
|
| async def close(self) -> None: |
| if self._client is not None: |
| try: |
| await self._client.close() |
| except Exception: |
| pass |
| self._client = None |
|
|