app / src /databases /redis.py
LONGYKING
removed default chainlit component: chainlit water marks and readme
a3ce5eb
raw
history blame
4.49 kB
import os
import json
from dotenv import load_dotenv
from upstash_redis import Redis
from src.libs.logger import logger
from src.libs.helper_functions import chunk_data, create_uuid_from_string
load_dotenv()
url = os.getenv("UPSTASH_REDIS_REST_URL")
token = os.getenv("UPSTASH_REDIS_REST_TOKEN")
REDIS = Redis(url=url, token=token)
REDIS_PIPELINE = REDIS.pipeline()
@logger.instrument()
def args_to_key(*args, **kwargs):
"""
This function generates a unique key based on the provided arguments and keyword arguments.
Args:
*args: A variable number of positional arguments.
**kwargs: A variable number of keyword arguments.
Returns:
str: A unique key string generated from the provided arguments and keyword arguments.
Example:
>>> args_to_key("hello", "world", a=1, b=2)
'hello_world_a_1_b_2'
"""
params = []
# TODO: turn these into debugging logs
logger.debug(f"args: {args}")
logger.debug(f"kwargs: {kwargs}")
# Append the names of callable arguments to the params list
for arg in args:
if callable(arg):
params.append(arg.__name__)
# Append the string representations of non-callable arguments and keyword arguments to the params list
for arg in args:
if not callable(arg):
params.append(cast_args_to_string_and_return_first_index(arg))
for kwarg in kwargs.values():
params.append(str(kwarg))
# Join the elements in the params list using the '_' character as a separator
return "_".join(params)
@logger.instrument()
def REDIS_CACHED(ttl: int = 3600, chunk: bool = False):
"""
This decorator caches the result of a function call in Redis.
Args:
ttl (int): The time-to-live (in seconds) for the cached result. Defaults to 3600 seconds (1 hour).
chunk (bool): Whether to chunk the result of the original function call. Defaults to False.
Returns:
A wrapper function that caches the result of the original function call.
Example:
>>> @REDIS_CACHED(ttl=60, chunk=True)
... def example_function(arg1, arg2):
... return arg1 + arg2
...
... cached_result = example_function(3, 4)
... print(cached_result) # Output: 7
... print(cached_result) # Output: 7 (from cache)
"""
def decorator(func):
def wrapper(*args, **kwargs):
r = REDIS
cache_key = args_to_key(func, *args, **kwargs)
logger.debug(f"Cache key: {cache_key}") # TODO: turn these into debugging logs
cache_key = str(create_uuid_from_string(cache_key))
logger.debug(f"Cache key: {cache_key}") # TODO: turn these into debugging logs
# Test if a matching cache key exists
cached = r.get(cache_key)
if cached:
# Found in cache, return it
return json.loads(cached)
# Otherwise, pass everything to the downstream function
result = func(*args, **kwargs)
# Set cache time-to-live duration
# Use the default TTL if not provided as an argument
cache_ttl = kwargs.get('cache_ttl')
ttl_seconds = cache_ttl or kwargs.get('ttl', ttl)
if chunk:
chunked_result = chunk_data(result.data, 100)
for i in range(len(chunked_result)):
r.rpush(cache_key, json.dumps(chunked_result[i]))
r.pexpire(cache_key, ttl_seconds)
else:
# Put the result from downstream function into cache, with a TTL
# So next call with the same parameters will be handled by the cache
r.setex(cache_key, ttl_seconds, result)
# Return the result transparently
return result
return wrapper
return decorator
@logger.instrument()
def cast_args_to_string_and_return_first_index(args):
"""
This function takes an argument, casts it to a string, removes '<' and '>', splits it by spaces, and returns the first index.
Args:
args (any): The argument to be processed.
Returns:
str or None: The first index of the argument after processing, or None if the argument is empty.
Example:
>>> cast_args_to_string_and_return_first_index("hello <world>")
'hello'
"""
args_str = str(args).strip('<>').split(' ')
return args_str[0] if args_str else None