File size: 1,975 Bytes
6a7abb7 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 |
import json
from fs_s3fs import S3FS
from src.libs.logger import logger
from src.libs.s3fs import get_s3_credentials
from phi.vectordb.pgvector import PgVector2
from phi.knowledge.json import JSONKnowledgeBase
from src.databases.postgres import sqlalchemy_engine
class JSONKnowledgeBaseExtended(JSONKnowledgeBase):
s3fs: S3FS = None # Explicitly declare the s3fs attribute
def __init__(
self,
s3_bucket_name,
vector_db,
s3_access_key_id,
s3_secret_access_key,
s3_endpoint_url,
s3_region,
):
super().__init__(path=s3_bucket_name, vector_db=vector_db, bucket_name = s3_bucket_name)
# Initialize the S3 filesystem
self.s3fs = S3FS(
bucket_name=s3_bucket_name,
aws_access_key_id=s3_access_key_id,
aws_secret_access_key=s3_secret_access_key,
endpoint_url = s3_endpoint_url,
region = s3_region,
)
def load_knowledge_base(self, recreate: bool = False):
json_knowledge_base.load(recreate=recreate)
def store_json_data_in_s3(self, json_data, file_path):
if file_path[0] == '/':
file_path = f"/json-data/{file_path[1:]}"
else:
file_path = f"/json-data/{file_path}"
logger.info(f"Storing JSON data in S3 bucket: {self.s3fs._bucket_name} at path: {file_path}")
# Open the file in write mode and write the JSON data
self.s3fs.open(path = f"/{file_path}", mode = 'w').write(json.dumps(json_data, indent=2))
return True
# S3 credentials
_s3_credendtials = get_s3_credentials()
_json_knowledge_base_arguments = {
"vector_db": PgVector2(
collection="json_documents",
db_engine=sqlalchemy_engine
),
**_s3_credendtials
}
# Initialize the extended JSONKnowledgeBase with the S3 bucket name and S3 credentials
json_knowledge_base = JSONKnowledgeBaseExtended(
**_json_knowledge_base_arguments
)
|