File size: 1,975 Bytes
6a7abb7
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
import json
from fs_s3fs import S3FS
from src.libs.logger import logger
from src.libs.s3fs import get_s3_credentials
from phi.vectordb.pgvector import PgVector2
from phi.knowledge.json import JSONKnowledgeBase
from src.databases.postgres import sqlalchemy_engine

class JSONKnowledgeBaseExtended(JSONKnowledgeBase):
    s3fs: S3FS = None  # Explicitly declare the s3fs attribute

    def __init__(
        self, 
        s3_bucket_name, 
        vector_db, 
        s3_access_key_id,
        s3_secret_access_key,
        s3_endpoint_url,
        s3_region,
    ):
        super().__init__(path=s3_bucket_name, vector_db=vector_db, bucket_name = s3_bucket_name)

        # Initialize the S3 filesystem
        self.s3fs = S3FS(
            bucket_name=s3_bucket_name,
            aws_access_key_id=s3_access_key_id,
            aws_secret_access_key=s3_secret_access_key,
            endpoint_url = s3_endpoint_url,
            region = s3_region,
        )

    def load_knowledge_base(self, recreate: bool = False):
        json_knowledge_base.load(recreate=recreate)

    def store_json_data_in_s3(self, json_data, file_path):
        if file_path[0] == '/':
            file_path = f"/json-data/{file_path[1:]}"
        else:
            file_path = f"/json-data/{file_path}"

        logger.info(f"Storing JSON data in S3 bucket: {self.s3fs._bucket_name} at path: {file_path}")
        
        # Open the file in write mode and write the JSON data
        self.s3fs.open(path = f"/{file_path}", mode = 'w').write(json.dumps(json_data, indent=2))
        return True


# S3 credentials
_s3_credendtials = get_s3_credentials()
_json_knowledge_base_arguments = {
    "vector_db": PgVector2(
        collection="json_documents",
        db_engine=sqlalchemy_engine
    ),
    **_s3_credendtials
}

# Initialize the extended JSONKnowledgeBase with the S3 bucket name and S3 credentials
json_knowledge_base = JSONKnowledgeBaseExtended(
    **_json_knowledge_base_arguments
)