EddyGiusepe's picture
DB Redis e Modules
1a9f185
"""
Data Scientist.: Dr.Eddy Giusepe Chirinos Isidro
RedisClient.py
==============
Objetivo: Gerenciar nossos dados com o DB Redis.
Execução:
$ python RedisClient.py
Versão: 1.0.0
Data: 07/07/2023
"""
import redis
from redis.commands.search.field import TagField, TextField, VectorField, NumericField # Olhar nossos dados para saber (Por exemplo: TagField == "category")
from redis.commands.search.indexDefinition import IndexDefinition, IndexType # Aqui definimos o Índice e o tipo de Índice
from redis.commands.search.query import Query # Para fazer a pesquisa dos K vizinhos mais próximos
REDIS_HOST='localhost'
REDIS_PORT=6379
REDIS_PASSWORD=None
#REDIS_DB=0
class RedisSearchClient:
def __init__(self, index_name='products', doc_prefix="doc:", vector_dim=1536):
self.r = redis.Redis(host=REDIS_HOST, port=REDIS_PORT, password=REDIS_PASSWORD)
# Selecione o banco de dados desejado (por exemplo, banco de dados 3):
self.r.select(3)
self.index_name = index_name
self.doc_prefix = doc_prefix
self.vector_dim = vector_dim
self._create_index()
self.pipe = self.r.pipeline()
def _create_index(self):
try:
self.r.ft(self.index_name).info()
print("O Índice já existe 😊!")
except Exception as e:
print("Creating index")
schema = (
TagField("id"),
TagField("title"),
TagField("category"),
NumericField("price"),
TextField("description"),
TextField("image"), # A Imagem é uma URL (passamos como uma string).
NumericField("rating_rate"), # Este e o campo a seguir vem de um dicionário. Mas o REDIS não suporta dic... por isso colocamos como dois campos independentes.
NumericField("rating_count"),
VectorField("vector",
"FLAT", { # Quando tenhamos uma grande quantidade de dado é melhor usar HNSW que FLAT 🧐
"TYPE": "FLOAT32",
"DIM": self.vector_dim, # Fornecido pela OpenAI
"DISTANCE_METRIC": "COSINE", # Usamos a distância cosseno ("COSINE"). Para usar Distância Euclidiana --> "L2"
}
),
)
definition = IndexDefinition(prefix=[self.doc_prefix], index_type=IndexType.HASH) # Aqui definimos os índice. É melhor usar HASH
self.r.ft(self.index_name).create_index(fields=schema, definition=definition)
def delete_index(self):
self.r.ft(self.index_name).dropindex(delete_documents=True)
def count_documents(self):
return int(self.r.ft(self.index_name).info()['num_docs'])
def schema_document(self, id, doc): # doc deve estar en formato dict {}
self.pipe.hset(f'{self.doc_prefix}{id}', mapping=doc) # key: value
def add_document(self, id, doc): # Está função adiciona apenas um Documento
self.schema_document(id, doc)
self.pipe.execute()
# def add_bulk_document(self, docs):
# for doc in docs:
# self.schema_document(doc['id'], doc)
# self.pipe.execute()
def add_bulk_documents(self, docs): # docs: list [{},{},{}]. Para salvar grandes quantidades de Documentos.
[self.schema_document(doc['id'], doc) for doc in docs]
self.pipe.execute() # execute all commands in the pipeline
def search_similar_documents(self, vector, topK=5, id=None):
"""Função que faz pesquisa por Similaridade 🤗"""
filter_query = '(@id:{'+'file_'+str(id)+'})' if id else '*'
query = (
Query(f"{filter_query}=>[KNN {topK} @vector $vec as score]") # Aqui faze a pesquisa pelos KNN mais próximos
.sort_by("score") # Menor valor - menor ângulo --> Mais similar
.paging(0, topK)
.return_fields("id", "title", "price", "description", "category", "image", "rating_rate", "rating_count", "score")
.dialect(2)
)
query_params = {"vec": vector}
return self.r.ft(self.index_name).search(query, query_params).docs # Finalmente nos conectamos ao índice, chamamos ao método "search" e pedimos os documentos (docs)