""" Data Scientist.: PhD.Eddy Giusepe Chirinos Isidro Baseado no maravilhoso canal de 🤗YouTube IAnews --> https://www.youtube.com/@ianewsglobal/videos🤗 redis_client.py =============== Objetivo: Este script é um módulo para a gestão de dados no redis. Para realizar esses processo criamos uma classe 🤗! Versão: 1.0.0 Data: 27/06/2023 Autor: Dr.Eddy Giusepe Método de execução: $ uvicorn main:app --reload """ import redis, json from redis.exceptions import ConnectionError class RedisClient: """Com esta classe fazemos as gestão do redis.""" def __init__(self, host, port): self.host = host self.port = port self.redis = redis.Redis(host=self.host, port=self.port) def set(self, key, value): # key, que é a chave que será usada para armazenar o valor. value --> é um vetor. """Está função armazena um valor no objeto 'self.redis', usando a chave key""" try: self.redis.set(key, json.dumps(value)) # A função json.dumps() é usada para converter o valor em uma string JSON. except ConnectionError: print("Connection Error") def get(self, key): # key é a chave usada para recuperar o valor. """Função usada para obter o valor armazenado no objeto self.redis usando a chave key""" try: return json.loads(self.redis.get(key)) # O valor retornado por self.redis.get(key) é uma representação em string do valor armazenado. except ConnectionError: print("Connection Error") def delete(self, key): """Função que elimina um dado, isso é feito através da key""" try: self.redis.delete(key) except ConnectionError: print("Connection Error") def exists(self, key): """Esta função verifica a existência de certo dado, em nosso caso a existência do Embedding.""" try: return self.redis.exists(key) except ConnectionError: print("Connection Error") def keys(self, pattern): # pattern = "*" # O parâmetro pattern é um padrão usado para buscar chaves. """Esta função serve para poder obter todos nossos dados. Em nosso caso para obter todos os Embeddings.""" try: return self.redis.keys(pattern) except ConnectionError: print("Connection Error") def flush(self): """Função que nos permite eliminar toda a informação que está no redis.""" try: self.redis.flushall() except ConnectionError: print("Connection Error")