|
|
import boto3 |
|
|
|
|
|
from utility import * |
|
|
|
|
|
|
|
|
db_client = boto3.client( |
|
|
"dynamodb", |
|
|
aws_access_key_id = aws_access_key_id, |
|
|
aws_secret_access_key = aws_secret_access_key |
|
|
) |
|
|
|
|
|
''' |
|
|
dynamodb data operations |
|
|
''' |
|
|
|
|
|
|
|
|
def get_table(table_name:str): |
|
|
result = db_client.scan(TableName = table_name,AttributesToGet = data_structure[table_name]) |
|
|
return db_list_to_py_list(result["Items"]) |
|
|
|
|
|
|
|
|
def post_item(table_name:str,item:dict): |
|
|
try: |
|
|
res = db_client.put_item( |
|
|
TableName = table_name, |
|
|
Item = py_dict_to_db_map(item) |
|
|
) |
|
|
except Exception as e: |
|
|
return {"Error":e} |
|
|
return res |
|
|
|
|
|
|
|
|
def put_item(table_name:str,item:dict): |
|
|
try: |
|
|
res = db_client.put_item( |
|
|
TableName = table_name, |
|
|
Item = py_dict_to_db_map(item) |
|
|
) |
|
|
except Exception as e: |
|
|
return {"Error":e} |
|
|
return res |
|
|
|
|
|
|
|
|
def delete_item(table_name:str,key:dict): |
|
|
try: |
|
|
res = db_client.delete_item( |
|
|
TableName = table_name, |
|
|
Key = py_dict_to_db_map(key) |
|
|
) |
|
|
except Exception as e: |
|
|
return {"Error":e} |
|
|
return res |
|
|
|
|
|
|
|
|
|
|
|
def get_item(table_name:str,key:dict): |
|
|
try: |
|
|
res = db_client.get_item( |
|
|
TableName = table_name, |
|
|
Key = py_dict_to_db_map(key) |
|
|
) |
|
|
except Exception as e: |
|
|
return {"Error":e} |
|
|
return res |
|
|
|
|
|
''' |
|
|
dynamodb structure management |
|
|
''' |
|
|
def get_structure(table_name:str): |
|
|
result = db_client.describe_table(TableName = table_name) |
|
|
return result["Table"]["AttributeDefinitions"] |