""" Data Scientist.: Dr.Eddy Giusepe Chirinos Isidro RedisClient.py ============== Objetivo: Gerenciar nossos dados com o DB Redis. Execução: $ python RedisClient.py Versão: 1.0.0 Data: 07/07/2023 """ import redis from redis.commands.search.field import TagField, TextField, VectorField, NumericField # Olhar nossos dados para saber (Por exemplo: TagField == "category") from redis.commands.search.indexDefinition import IndexDefinition, IndexType # Aqui definimos o Índice e o tipo de Índice from redis.commands.search.query import Query # Para fazer a pesquisa dos K vizinhos mais próximos REDIS_HOST='localhost' REDIS_PORT=6379 REDIS_PASSWORD=None #REDIS_DB=0 class RedisSearchClient: def __init__(self, index_name='products', doc_prefix="doc:", vector_dim=1536): self.r = redis.Redis(host=REDIS_HOST, port=REDIS_PORT, password=REDIS_PASSWORD) # Selecione o banco de dados desejado (por exemplo, banco de dados 3): self.r.select(3) self.index_name = index_name self.doc_prefix = doc_prefix self.vector_dim = vector_dim self._create_index() self.pipe = self.r.pipeline() def _create_index(self): try: self.r.ft(self.index_name).info() print("O Índice já existe 😊!") except Exception as e: print("Creating index") schema = ( TagField("id"), TagField("title"), TagField("category"), NumericField("price"), TextField("description"), TextField("image"), # A Imagem é uma URL (passamos como uma string). NumericField("rating_rate"), # Este e o campo a seguir vem de um dicionário. Mas o REDIS não suporta dic... por isso colocamos como dois campos independentes. NumericField("rating_count"), VectorField("vector", "FLAT", { # Quando tenhamos uma grande quantidade de dado é melhor usar HNSW que FLAT 🧐 "TYPE": "FLOAT32", "DIM": self.vector_dim, # Fornecido pela OpenAI "DISTANCE_METRIC": "COSINE", # Usamos a distância cosseno ("COSINE"). Para usar Distância Euclidiana --> "L2" } ), ) definition = IndexDefinition(prefix=[self.doc_prefix], index_type=IndexType.HASH) # Aqui definimos os índice. É melhor usar HASH self.r.ft(self.index_name).create_index(fields=schema, definition=definition) def delete_index(self): self.r.ft(self.index_name).dropindex(delete_documents=True) def count_documents(self): return int(self.r.ft(self.index_name).info()['num_docs']) def schema_document(self, id, doc): # doc deve estar en formato dict {} self.pipe.hset(f'{self.doc_prefix}{id}', mapping=doc) # key: value def add_document(self, id, doc): # Está função adiciona apenas um Documento self.schema_document(id, doc) self.pipe.execute() # def add_bulk_document(self, docs): # for doc in docs: # self.schema_document(doc['id'], doc) # self.pipe.execute() def add_bulk_documents(self, docs): # docs: list [{},{},{}]. Para salvar grandes quantidades de Documentos. [self.schema_document(doc['id'], doc) for doc in docs] self.pipe.execute() # execute all commands in the pipeline def search_similar_documents(self, vector, topK=5, id=None): """Função que faz pesquisa por Similaridade 🤗""" filter_query = '(@id:{'+'file_'+str(id)+'})' if id else '*' query = ( Query(f"{filter_query}=>[KNN {topK} @vector $vec as score]") # Aqui faze a pesquisa pelos KNN mais próximos .sort_by("score") # Menor valor - menor ângulo --> Mais similar .paging(0, topK) .return_fields("id", "title", "price", "description", "category", "image", "rating_rate", "rating_count", "score") .dialect(2) ) query_params = {"vec": vector} return self.r.ft(self.index_name).search(query, query_params).docs # Finalmente nos conectamos ao índice, chamamos ao método "search" e pedimos os documentos (docs)