Spaces:
Running
Running
| import redis.asyncio as redis | |
| from typing import Optional, Any, Dict, List | |
| import json | |
| import logging | |
| from datetime import datetime | |
| import pickle | |
| from pathlib import Path | |
| from .config import settings | |
| logger = logging.getLogger(__name__) | |
| class RedisCache: | |
| """Redis cache manager for certificate analysis results.""" | |
| def __init__(self, redis_url: Optional[str] = None): | |
| self.redis_url = redis_url or settings.redis_url | |
| self.client: Optional[redis.Redis] = None | |
| self.connected: bool = False | |
| async def connect(self): | |
| """Connect to Redis.""" | |
| if self.connected: | |
| return | |
| try: | |
| self.client = redis.from_url( | |
| self.redis_url, | |
| encoding="utf-8", | |
| decode_responses=False, | |
| ) | |
| await self.client.ping() | |
| self.connected = True | |
| logger.info(f"Connected to Redis at {self.redis_url}") | |
| except Exception as e: | |
| self.connected = False | |
| logger.error(f"Failed to connect to Redis: {e}") | |
| async def disconnect(self): | |
| """Disconnect from Redis.""" | |
| if self.client and self.connected: | |
| await self.client.close() | |
| self.connected = False | |
| logger.info("Disconnected from Redis") | |
| async def get(self, key: str) -> Optional[Any]: | |
| """Get value from cache.""" | |
| if not self.connected or not self.client: | |
| return None | |
| try: | |
| data = await self.client.get(key) | |
| if data is None: | |
| return None | |
| # Try JSON first | |
| try: | |
| return json.loads(data.decode("utf-8")) | |
| except Exception: | |
| return pickle.loads(data) | |
| except Exception as e: | |
| logger.error(f"Cache get failed for key {key}: {e}") | |
| return None | |
| async def set(self, key: str, value: Any, ttl: Optional[int] = None) -> bool: | |
| """Set value in cache, optionally with TTL.""" | |
| if not self.connected or not self.client: | |
| return False | |
| try: | |
| # Serialize value | |
| try: | |
| data = json.dumps(value, default=str).encode("utf-8") | |
| except Exception: | |
| data = pickle.dumps(value) | |
| if ttl: | |
| await self.client.setex(key, ttl, data) | |
| else: | |
| await self.client.set(key, data) | |
| return True | |
| except Exception as e: | |
| logger.error(f"Cache set failed for key {key}: {e}") | |
| return False | |
| # Add a proper setex wrapper for legacy code | |
| async def setex(self, key: str, ttl: int, value: Any) -> bool: | |
| """Set value with TTL (alias for compatibility).""" | |
| return await self.set(key, value, ttl) | |
| async def delete(self, key: str) -> bool: | |
| """Delete key from cache.""" | |
| if not self.connected or not self.client: | |
| return False | |
| try: | |
| result = await self.client.delete(key) | |
| return result > 0 | |
| except Exception as e: | |
| logger.error(f"Cache delete failed for key {key}: {e}") | |
| return False | |
| async def exists(self, key: str) -> bool: | |
| """Check if key exists in cache.""" | |
| if not self.connected or not self.client: | |
| return False | |
| try: | |
| return await self.client.exists(key) > 0 | |
| except Exception as e: | |
| logger.error(f"Cache exists check failed for key {key}: {e}") | |
| return False | |
| async def keys(self, pattern: str = "*") -> List[str]: | |
| """Get keys matching pattern.""" | |
| if not self.connected or not self.client: | |
| return [] | |
| try: | |
| return await self.client.keys(pattern) | |
| except Exception as e: | |
| logger.error(f"Cache keys failed for pattern {pattern}: {e}") | |
| return [] | |
| async def clear_pattern(self, pattern: str) -> int: | |
| """Clear keys matching pattern.""" | |
| if not self.connected or not self.client: | |
| return 0 | |
| try: | |
| keys = await self.keys(pattern) | |
| if keys: | |
| await self.client.delete(*keys) | |
| logger.info(f"Cleared {len(keys)} keys matching pattern: {pattern}") | |
| return len(keys) | |
| return 0 | |
| except Exception as e: | |
| logger.error(f"Cache clear pattern failed for {pattern}: {e}") | |
| return 0 | |
| async def get_stats(self) -> Dict[str, Any]: | |
| """Get cache statistics.""" | |
| if not self.connected or not self.client: | |
| return {"status": "disconnected"} | |
| try: | |
| info = await self.client.info() | |
| analysis_keys = await self.keys("analysis:*") | |
| certificate_keys = await self.keys("certificate:*") | |
| return { | |
| "status": "connected", | |
| "redis_version": info.get("redis_version", "unknown"), | |
| "used_memory": info.get("used_memory_human", "unknown"), | |
| "connected_clients": info.get("connected_clients", 0), | |
| "total_analysis_cache": len(analysis_keys), | |
| "total_certificate_cache": len(certificate_keys), | |
| "timestamp": datetime.now().isoformat(), | |
| } | |
| except Exception as e: | |
| logger.error(f"Cache stats failed: {e}") | |
| return {"status": "error", "error": str(e)} | |
| # Global cache instance | |
| redis_client = RedisCache() | |
| async def get_redis_client() -> RedisCache: | |
| """Return global Redis client instance.""" | |
| if not redis_client.connected: | |
| await redis_client.connect() | |
| return redis_client | |
| async def init_cache(): | |
| """Initialize cache connection.""" | |
| await redis_client.connect() | |
| async def close_cache(): | |
| """Close cache connection.""" | |
| await redis_client.disconnect() | |
| # import redis.asyncio as redis | |
| # from typing import Optional, Any, Dict | |
| # import json | |
| # import logging | |
| # from datetime import datetime | |
| # import pickle | |
| # from .config import settings | |
| # logger = logging.getLogger(__name__) | |
| # class RedisCache: | |
| # """Redis cache manager for certificate analysis results""" | |
| # def __init__(self, redis_url: Optional[str] = None): | |
| # self.redis_url = redis_url or settings.redis_url | |
| # self.client: Optional[redis.Redis] = None | |
| # self.connected = False | |
| # async def connect(self): | |
| # """Connect to Redis""" | |
| # try: | |
| # self.client = redis.from_url( | |
| # self.redis_url, | |
| # encoding="utf-8", | |
| # decode_responses=False | |
| # ) | |
| # await self.client.ping() | |
| # self.connected = True | |
| # logger.info(f"Connected to Redis at {self.redis_url}") | |
| # except Exception as e: | |
| # logger.error(f"Failed to connect to Redis: {e}") | |
| # self.connected = False | |
| # async def disconnect(self): | |
| # """Disconnect from Redis""" | |
| # if self.client and self.connected: | |
| # await self.client.close() | |
| # self.connected = False | |
| # logger.info("Disconnected from Redis") | |
| # async def get(self, key: str) -> Optional[Any]: | |
| # """Get value from cache""" | |
| # if not self.connected or not self.client: | |
| # return None | |
| # try: | |
| # data = await self.client.get(key) | |
| # if data: | |
| # try: | |
| # # Try JSON first | |
| # return json.loads(data.decode('utf-8')) | |
| # except: | |
| # # Fallback to pickle | |
| # return pickle.loads(data) | |
| # return None | |
| # except Exception as e: | |
| # logger.error(f"Cache get failed for key {key}: {e}") | |
| # return None | |
| # async def set(self, key: str, value: Any, ttl: Optional[int] = None): | |
| # """Set value in cache""" | |
| # if not self.connected or not self.client: | |
| # return False | |
| # try: | |
| # # Try JSON serialization first | |
| # try: | |
| # data = json.dumps(value, default=str).encode('utf-8') | |
| # except: | |
| # # Fallback to pickle | |
| # data = pickle.dumps(value) | |
| # if ttl: | |
| # await self.client.setex(key, ttl, data) | |
| # else: | |
| # await self.client.set(key, data) | |
| # return True | |
| # except Exception as e: | |
| # logger.error(f"Cache set failed for key {key}: {e}") | |
| # return False | |
| # async def delete(self, key: str) -> bool: | |
| # """Delete key from cache""" | |
| # if not self.connected or not self.client: | |
| # return False | |
| # try: | |
| # result = await self.client.delete(key) | |
| # return result > 0 | |
| # except Exception as e: | |
| # logger.error(f"Cache delete failed for key {key}: {e}") | |
| # return False | |
| # async def exists(self, key: str) -> bool: | |
| # """Check if key exists in cache""" | |
| # if not self.connected or not self.client: | |
| # return False | |
| # try: | |
| # return await self.client.exists(key) > 0 | |
| # except Exception as e: | |
| # logger.error(f"Cache exists check failed for key {key}: {e}") | |
| # return False | |
| # async def keys(self, pattern: str = "*") -> list: | |
| # """Get keys matching pattern""" | |
| # if not self.connected or not self.client: | |
| # return [] | |
| # try: | |
| # return await self.client.keys(pattern) | |
| # except Exception as e: | |
| # logger.error(f"Cache keys failed for pattern {pattern}: {e}") | |
| # return [] | |
| # async def clear_pattern(self, pattern: str) -> int: | |
| # """Clear keys matching pattern""" | |
| # if not self.connected or not self.client: | |
| # return 0 | |
| # try: | |
| # keys = await self.keys(pattern) | |
| # if keys: | |
| # await self.client.delete(*keys) | |
| # logger.info(f"Cleared {len(keys)} keys matching pattern: {pattern}") | |
| # return len(keys) | |
| # return 0 | |
| # except Exception as e: | |
| # logger.error(f"Cache clear pattern failed for {pattern}: {e}") | |
| # return 0 | |
| # async def get_stats(self) -> Dict[str, Any]: | |
| # """Get cache statistics""" | |
| # if not self.connected or not self.client: | |
| # return {"status": "disconnected"} | |
| # try: | |
| # info = await self.client.info() | |
| # # Get analysis cache stats | |
| # analysis_keys = await self.keys("analysis:*") | |
| # certificate_keys = await self.keys("certificate:*") | |
| # return { | |
| # "status": "connected", | |
| # "redis_version": info.get("redis_version", "unknown"), | |
| # "used_memory": info.get("used_memory_human", "unknown"), | |
| # "connected_clients": info.get("connected_clients", 0), | |
| # "total_analysis_cache": len(analysis_keys), | |
| # "total_certificate_cache": len(certificate_keys), | |
| # "timestamp": datetime.now().isoformat() | |
| # } | |
| # except Exception as e: | |
| # logger.error(f"Cache stats failed: {e}") | |
| # return {"status": "error", "error": str(e)} | |
| # # Global cache instance | |
| # redis_client = RedisCache() | |
| # async def get_redis_client() -> RedisCache: | |
| # """Get Redis client instance""" | |
| # if not redis_client.connected: | |
| # await redis_client.connect() | |
| # return redis_client | |
| # async def init_cache(): | |
| # """Initialize cache connection""" | |
| # await redis_client.connect() | |
| # async def close_cache(): | |
| # """Close cache connection""" | |
| # await redis_client.disconnect() |