lemdaddy's picture
Major knowledgebase update
6a7abb7
raw
history blame
1.98 kB
import json
from fs_s3fs import S3FS
from src.libs.logger import logger
from src.libs.s3fs import get_s3_credentials
from phi.vectordb.pgvector import PgVector2
from phi.knowledge.json import JSONKnowledgeBase
from src.databases.postgres import sqlalchemy_engine
class JSONKnowledgeBaseExtended(JSONKnowledgeBase):
s3fs: S3FS = None # Explicitly declare the s3fs attribute
def __init__(
self,
s3_bucket_name,
vector_db,
s3_access_key_id,
s3_secret_access_key,
s3_endpoint_url,
s3_region,
):
super().__init__(path=s3_bucket_name, vector_db=vector_db, bucket_name = s3_bucket_name)
# Initialize the S3 filesystem
self.s3fs = S3FS(
bucket_name=s3_bucket_name,
aws_access_key_id=s3_access_key_id,
aws_secret_access_key=s3_secret_access_key,
endpoint_url = s3_endpoint_url,
region = s3_region,
)
def load_knowledge_base(self, recreate: bool = False):
json_knowledge_base.load(recreate=recreate)
def store_json_data_in_s3(self, json_data, file_path):
if file_path[0] == '/':
file_path = f"/json-data/{file_path[1:]}"
else:
file_path = f"/json-data/{file_path}"
logger.info(f"Storing JSON data in S3 bucket: {self.s3fs._bucket_name} at path: {file_path}")
# Open the file in write mode and write the JSON data
self.s3fs.open(path = f"/{file_path}", mode = 'w').write(json.dumps(json_data, indent=2))
return True
# S3 credentials
_s3_credendtials = get_s3_credentials()
_json_knowledge_base_arguments = {
"vector_db": PgVector2(
collection="json_documents",
db_engine=sqlalchemy_engine
),
**_s3_credendtials
}
# Initialize the extended JSONKnowledgeBase with the S3 bucket name and S3 credentials
json_knowledge_base = JSONKnowledgeBaseExtended(
**_json_knowledge_base_arguments
)