|
|
import json |
|
|
from fs_s3fs import S3FS |
|
|
from src.libs.logger import logger |
|
|
from src.libs.s3fs import get_s3_credentials |
|
|
from phi.vectordb.pgvector import PgVector2 |
|
|
from phi.knowledge.json import JSONKnowledgeBase |
|
|
from src.databases.postgres import sqlalchemy_engine |
|
|
|
|
|
class JSONKnowledgeBaseExtended(JSONKnowledgeBase): |
|
|
s3fs: S3FS = None |
|
|
|
|
|
def __init__( |
|
|
self, |
|
|
s3_bucket_name, |
|
|
vector_db, |
|
|
s3_access_key_id, |
|
|
s3_secret_access_key, |
|
|
s3_endpoint_url, |
|
|
s3_region, |
|
|
): |
|
|
super().__init__(path=s3_bucket_name, vector_db=vector_db, bucket_name = s3_bucket_name) |
|
|
|
|
|
|
|
|
self.s3fs = S3FS( |
|
|
bucket_name=s3_bucket_name, |
|
|
aws_access_key_id=s3_access_key_id, |
|
|
aws_secret_access_key=s3_secret_access_key, |
|
|
endpoint_url = s3_endpoint_url, |
|
|
region = s3_region, |
|
|
) |
|
|
|
|
|
def load_knowledge_base(self, recreate: bool = False): |
|
|
json_knowledge_base.load(recreate=recreate) |
|
|
|
|
|
def store_json_data_in_s3(self, json_data, file_path): |
|
|
if file_path[0] == '/': |
|
|
file_path = f"/json-data/{file_path[1:]}" |
|
|
else: |
|
|
file_path = f"/json-data/{file_path}" |
|
|
|
|
|
logger.info(f"Storing JSON data in S3 bucket: {self.s3fs._bucket_name} at path: {file_path}") |
|
|
|
|
|
|
|
|
self.s3fs.open(path = f"/{file_path}", mode = 'w').write(json.dumps(json_data, indent=2)) |
|
|
return True |
|
|
|
|
|
|
|
|
|
|
|
_s3_credendtials = get_s3_credentials() |
|
|
_json_knowledge_base_arguments = { |
|
|
"vector_db": PgVector2( |
|
|
collection="json_documents", |
|
|
db_engine=sqlalchemy_engine |
|
|
), |
|
|
**_s3_credendtials |
|
|
} |
|
|
|
|
|
|
|
|
json_knowledge_base = JSONKnowledgeBaseExtended( |
|
|
**_json_knowledge_base_arguments |
|
|
) |
|
|
|