import json from fs_s3fs import S3FS from src.libs.logger import logger from src.libs.s3fs import get_s3_credentials from phi.vectordb.pgvector import PgVector2 from phi.knowledge.json import JSONKnowledgeBase from src.databases.postgres import sqlalchemy_engine class JSONKnowledgeBaseExtended(JSONKnowledgeBase): s3fs: S3FS = None # Explicitly declare the s3fs attribute def __init__( self, s3_bucket_name, vector_db, s3_access_key_id, s3_secret_access_key, s3_endpoint_url, s3_region, ): super().__init__(path=s3_bucket_name, vector_db=vector_db, bucket_name = s3_bucket_name) # Initialize the S3 filesystem self.s3fs = S3FS( bucket_name=s3_bucket_name, aws_access_key_id=s3_access_key_id, aws_secret_access_key=s3_secret_access_key, endpoint_url = s3_endpoint_url, region = s3_region, ) def load_knowledge_base(self, recreate: bool = False): json_knowledge_base.load(recreate=recreate) def store_json_data_in_s3(self, json_data, file_path): if file_path[0] == '/': file_path = f"/json-data/{file_path[1:]}" else: file_path = f"/json-data/{file_path}" logger.info(f"Storing JSON data in S3 bucket: {self.s3fs._bucket_name} at path: {file_path}") # Open the file in write mode and write the JSON data self.s3fs.open(path = f"/{file_path}", mode = 'w').write(json.dumps(json_data, indent=2)) return True # S3 credentials _s3_credendtials = get_s3_credentials() _json_knowledge_base_arguments = { "vector_db": PgVector2( collection="json_documents", db_engine=sqlalchemy_engine ), **_s3_credendtials } # Initialize the extended JSONKnowledgeBase with the S3 bucket name and S3 credentials json_knowledge_base = JSONKnowledgeBaseExtended( **_json_knowledge_base_arguments )