| import os |
| import json |
| from dotenv import load_dotenv |
| from upstash_redis import Redis |
| from src.libs.logger import logger |
| from src.libs.helper_functions import chunk_data, create_uuid_from_string |
| load_dotenv() |
|
|
| url = os.getenv("UPSTASH_REDIS_REST_URL") |
| token = os.getenv("UPSTASH_REDIS_REST_TOKEN") |
|
|
| REDIS = Redis(url=url, token=token) |
| REDIS_PIPELINE = REDIS.pipeline() |
|
|
|
|
| @logger.instrument() |
| def args_to_key(*args, **kwargs): |
| """ |
| This function generates a unique key based on the provided arguments and keyword arguments. |
| |
| Args: |
| *args: A variable number of positional arguments. |
| **kwargs: A variable number of keyword arguments. |
| |
| Returns: |
| str: A unique key string generated from the provided arguments and keyword arguments. |
| |
| Example: |
| >>> args_to_key("hello", "world", a=1, b=2) |
| 'hello_world_a_1_b_2' |
| """ |
| params = [] |
|
|
| |
| logger.debug(f"args: {args}") |
| logger.debug(f"kwargs: {kwargs}") |
|
|
| |
| for arg in args: |
| if callable(arg): |
| params.append(arg.__name__) |
|
|
| |
| for arg in args: |
| if not callable(arg): |
| params.append(cast_args_to_string_and_return_first_index(arg)) |
|
|
| for kwarg in kwargs.values(): |
| params.append(str(kwarg)) |
|
|
| |
| return "_".join(params) |
|
|
|
|
| @logger.instrument() |
| def REDIS_CACHED(ttl: int = 3600, chunk: bool = False): |
| """ |
| This decorator caches the result of a function call in Redis. |
| |
| Args: |
| ttl (int): The time-to-live (in seconds) for the cached result. Defaults to 3600 seconds (1 hour). |
| chunk (bool): Whether to chunk the result of the original function call. Defaults to False. |
| |
| Returns: |
| A wrapper function that caches the result of the original function call. |
| |
| Example: |
| >>> @REDIS_CACHED(ttl=60, chunk=True) |
| ... def example_function(arg1, arg2): |
| ... return arg1 + arg2 |
| ... |
| ... cached_result = example_function(3, 4) |
| ... print(cached_result) # Output: 7 |
| ... print(cached_result) # Output: 7 (from cache) |
| """ |
| def decorator(func): |
| def wrapper(*args, **kwargs): |
| r = REDIS |
|
|
| cache_key = args_to_key(func, *args, **kwargs) |
| logger.debug(f"Cache key: {cache_key}") |
| cache_key = str(create_uuid_from_string(cache_key)) |
| logger.debug(f"Cache key: {cache_key}") |
|
|
| |
| cached = r.get(cache_key) |
| if cached: |
| |
| return json.loads(cached) |
|
|
| |
| result = func(*args, **kwargs) |
|
|
| |
| |
| cache_ttl = kwargs.get('cache_ttl') |
| ttl_seconds = cache_ttl or kwargs.get('ttl', ttl) |
|
|
| if chunk: |
| chunked_result = chunk_data(result.data, 100) |
| for i in range(len(chunked_result)): |
| r.rpush(cache_key, json.dumps(chunked_result[i])) |
| r.pexpire(cache_key, ttl_seconds) |
| else: |
| |
| |
| r.setex(cache_key, ttl_seconds, result) |
|
|
| |
| return result |
| return wrapper |
| return decorator |
|
|
|
|
| @logger.instrument() |
| def cast_args_to_string_and_return_first_index(args): |
| """ |
| This function takes an argument, casts it to a string, removes '<' and '>', splits it by spaces, and returns the first index. |
| |
| Args: |
| args (any): The argument to be processed. |
| |
| Returns: |
| str or None: The first index of the argument after processing, or None if the argument is empty. |
| |
| Example: |
| >>> cast_args_to_string_and_return_first_index("hello <world>") |
| 'hello' |
| """ |
| args_str = str(args).strip('<>').split(' ') |
| return args_str[0] if args_str else None |
|
|
|
|