File size: 4,337 Bytes
1a9f185
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
"""
Data Scientist.: Dr.Eddy Giusepe Chirinos Isidro

RedisClient.py
==============

Objetivo: Gerenciar nossos dados com o DB Redis.

Execução: 
         $ python RedisClient.py

Versão: 1.0.0     
Data: 07/07/2023             
"""
import redis
from redis.commands.search.field import TagField, TextField, VectorField, NumericField # Olhar nossos dados para saber (Por exemplo: TagField == "category")
from redis.commands.search.indexDefinition import IndexDefinition, IndexType # Aqui definimos o Índice e o tipo de Índice
from redis.commands.search.query import Query # Para fazer a pesquisa dos K vizinhos mais próximos

REDIS_HOST='localhost'
REDIS_PORT=6379
REDIS_PASSWORD=None
#REDIS_DB=0

class RedisSearchClient:

    def __init__(self, index_name='products', doc_prefix="doc:", vector_dim=1536):
        self.r = redis.Redis(host=REDIS_HOST, port=REDIS_PORT, password=REDIS_PASSWORD)
        # Selecione o banco de dados desejado (por exemplo, banco de dados 3):
        self.r.select(3)
        self.index_name = index_name 
        self.doc_prefix = doc_prefix
        self.vector_dim = vector_dim
        self._create_index()
        self.pipe = self.r.pipeline()
    
    def _create_index(self):
        try:
            self.r.ft(self.index_name).info()
            print("O Índice já existe 😊!")
        except Exception as e:
            print("Creating index")
            schema = (
                TagField("id"),
                TagField("title"),
                TagField("category"),
                NumericField("price"),
                TextField("description"),
                TextField("image"), # A Imagem é uma URL (passamos como uma string).
                NumericField("rating_rate"), # Este e o campo a seguir vem de um dicionário. Mas o REDIS não suporta dic... por isso colocamos como dois campos independentes.
                NumericField("rating_count"),
                VectorField("vector",
                    "FLAT", {  # Quando tenhamos uma grande quantidade de dado é melhor usar HNSW que FLAT 🧐
                        "TYPE": "FLOAT32",
                        "DIM": self.vector_dim, # Fornecido pela OpenAI 
                        "DISTANCE_METRIC": "COSINE", # Usamos a distância cosseno ("COSINE"). Para usar Distância Euclidiana --> "L2" 
                    }
                ),
            )
            definition = IndexDefinition(prefix=[self.doc_prefix], index_type=IndexType.HASH) # Aqui definimos os índice. É melhor usar HASH 
            self.r.ft(self.index_name).create_index(fields=schema, definition=definition)
        
    def delete_index(self):
        self.r.ft(self.index_name).dropindex(delete_documents=True)

    def count_documents(self):
        return int(self.r.ft(self.index_name).info()['num_docs'])

    def schema_document(self, id, doc): # doc deve estar en formato dict {}
        self.pipe.hset(f'{self.doc_prefix}{id}', mapping=doc) # key: value
    
    def add_document(self, id, doc): # Está função adiciona apenas um Documento
        self.schema_document(id, doc)
        self.pipe.execute()

    # def add_bulk_document(self, docs):
    #     for doc in docs:
    #         self.schema_document(doc['id'], doc)
    #         self.pipe.execute()

    def add_bulk_documents(self, docs): # docs: list [{},{},{}]. Para salvar grandes quantidades de Documentos.
        [self.schema_document(doc['id'], doc) for doc in docs] 
        self.pipe.execute() # execute all commands in the pipeline

    def search_similar_documents(self, vector, topK=5, id=None):
        """Função que faz  pesquisa por Similaridade 🤗"""
        filter_query = '(@id:{'+'file_'+str(id)+'})' if id else '*'    
        query = (
            Query(f"{filter_query}=>[KNN {topK} @vector $vec as score]") # Aqui faze a pesquisa pelos KNN mais próximos
            .sort_by("score") # Menor valor - menor ângulo --> Mais similar
            .paging(0, topK)
            .return_fields("id", "title", "price", "description", "category", "image", "rating_rate", "rating_count", "score")   
            .dialect(2)
        )
        query_params = {"vec": vector}
        return self.r.ft(self.index_name).search(query, query_params).docs # Finalmente nos conectamos ao índice, chamamos ao método "search" e pedimos os documentos (docs)