|
|
import os |
|
|
import json |
|
|
from dotenv import load_dotenv |
|
|
from upstash_redis import Redis |
|
|
from src.libs.logger import logger |
|
|
from src.libs.helper_functions import chunk_data, create_uuid_from_string |
|
|
load_dotenv() |
|
|
|
|
|
url = os.getenv("UPSTASH_REDIS_REST_URL") |
|
|
token = os.getenv("UPSTASH_REDIS_REST_TOKEN") |
|
|
|
|
|
REDIS = Redis(url=url, token=token) |
|
|
REDIS_PIPELINE = REDIS.pipeline() |
|
|
|
|
|
|
|
|
@logger.instrument() |
|
|
def args_to_key(*args, **kwargs): |
|
|
""" |
|
|
This function generates a unique key based on the provided arguments and keyword arguments. |
|
|
|
|
|
Args: |
|
|
*args: A variable number of positional arguments. |
|
|
**kwargs: A variable number of keyword arguments. |
|
|
|
|
|
Returns: |
|
|
str: A unique key string generated from the provided arguments and keyword arguments. |
|
|
|
|
|
Example: |
|
|
>>> args_to_key("hello", "world", a=1, b=2) |
|
|
'hello_world_a_1_b_2' |
|
|
""" |
|
|
params = [] |
|
|
|
|
|
|
|
|
logger.debug(f"args: {args}") |
|
|
logger.debug(f"kwargs: {kwargs}") |
|
|
|
|
|
|
|
|
for arg in args: |
|
|
if callable(arg): |
|
|
params.append(arg.__name__) |
|
|
|
|
|
|
|
|
for arg in args: |
|
|
if not callable(arg): |
|
|
params.append(cast_args_to_string_and_return_first_index(arg)) |
|
|
|
|
|
for kwarg in kwargs.values(): |
|
|
params.append(str(kwarg)) |
|
|
|
|
|
|
|
|
return "_".join(params) |
|
|
|
|
|
|
|
|
@logger.instrument() |
|
|
def REDIS_CACHED(ttl: int = 3600, chunk: bool = False): |
|
|
""" |
|
|
This decorator caches the result of a function call in Redis. |
|
|
|
|
|
Args: |
|
|
ttl (int): The time-to-live (in seconds) for the cached result. Defaults to 3600 seconds (1 hour). |
|
|
chunk (bool): Whether to chunk the result of the original function call. Defaults to False. |
|
|
|
|
|
Returns: |
|
|
A wrapper function that caches the result of the original function call. |
|
|
|
|
|
Example: |
|
|
>>> @REDIS_CACHED(ttl=60, chunk=True) |
|
|
... def example_function(arg1, arg2): |
|
|
... return arg1 + arg2 |
|
|
... |
|
|
... cached_result = example_function(3, 4) |
|
|
... print(cached_result) # Output: 7 |
|
|
... print(cached_result) # Output: 7 (from cache) |
|
|
""" |
|
|
def decorator(func): |
|
|
def wrapper(*args, **kwargs): |
|
|
r = REDIS |
|
|
|
|
|
cache_key = args_to_key(func, *args, **kwargs) |
|
|
logger.debug(f"Cache key: {cache_key}") |
|
|
cache_key = str(create_uuid_from_string(cache_key)) |
|
|
logger.debug(f"Cache key: {cache_key}") |
|
|
|
|
|
|
|
|
cached = r.get(cache_key) |
|
|
if cached: |
|
|
|
|
|
return json.loads(cached) |
|
|
|
|
|
|
|
|
result = func(*args, **kwargs) |
|
|
|
|
|
|
|
|
|
|
|
cache_ttl = kwargs.get('cache_ttl') |
|
|
ttl_seconds = cache_ttl or kwargs.get('ttl', ttl) |
|
|
|
|
|
if chunk: |
|
|
chunked_result = chunk_data(result.data, 100) |
|
|
for i in range(len(chunked_result)): |
|
|
r.rpush(cache_key, json.dumps(chunked_result[i])) |
|
|
r.pexpire(cache_key, ttl_seconds) |
|
|
else: |
|
|
|
|
|
|
|
|
r.setex(cache_key, ttl_seconds, result) |
|
|
|
|
|
|
|
|
return result |
|
|
return wrapper |
|
|
return decorator |
|
|
|
|
|
|
|
|
@logger.instrument() |
|
|
def cast_args_to_string_and_return_first_index(args): |
|
|
""" |
|
|
This function takes an argument, casts it to a string, removes '<' and '>', splits it by spaces, and returns the first index. |
|
|
|
|
|
Args: |
|
|
args (any): The argument to be processed. |
|
|
|
|
|
Returns: |
|
|
str or None: The first index of the argument after processing, or None if the argument is empty. |
|
|
|
|
|
Example: |
|
|
>>> cast_args_to_string_and_return_first_index("hello <world>") |
|
|
'hello' |
|
|
""" |
|
|
args_str = str(args).strip('<>').split(' ') |
|
|
return args_str[0] if args_str else None |
|
|
|
|
|
|