import boto3 from utility import * # initialize dynamodb instance db_client = boto3.client( "dynamodb", aws_access_key_id = aws_access_key_id, aws_secret_access_key = aws_secret_access_key, region_name = "us-east-1" ) ''' dynamodb data operations ''' # get the list of items from a table in dynamodb def get_table(table_name:str): ''' get the list of items from table in dynamodb Args: table_name (str): the name of the table in dynamodb Returns: list: a list of items in the table ''' result = db_client.scan(TableName = table_name,AttributesToGet = data_structure[table_name]["fields"]) return [db_map_to_py_dict(r) for r in result["Items"]] # add a new item to a table in dynamodb, return error if failed def post_item(table_name:str,item:dict): ''' add a new item to table in dynamodb, return error if failed Args: table_name (str): the name of the table in dynamodb item (dict): the item to be added to the table Returns: dict: the result of the operation''' try: res = db_client.put_item( TableName = table_name, Item = py_dict_to_db_map(item) ) except Exception as e: return {"Error":e} return res # update an item in a table in dynamodb, return error if failed def put_item(table_name:str,item:dict): ''' update an item in table in dynamodb, return error if failed Args: table_name (str): the name of the table in dynamodb item (dict): the item to be updated to the table Returns: dict: the result of the operation ''' try: res = db_client.put_item( TableName = table_name, Item = py_dict_to_db_map(item) ) except Exception as e: return {"Error":e} return res # delete an item in a table in dynamodb, return error if not found. def delete_item(table_name:str,key:dict): ''' delete an item in table in dynamodb, return error if not found. Args: table_name (str): the name of the table in dynamodb key (dict): the key of the item to be deleted Returns: dict: the result of the operation ''' try: res = db_client.delete_item( TableName = table_name, Key = py_dict_to_db_map(key) ) except Exception as e: return {"Error":e} return res def get_item(table_name:str,key:dict): ''' get an item in table in dynamodb, return error if not found. Args: table_name (str): the name of the table in dynamodb key (dict): the key of the item to be deleted Returns: dict: the result of the operation ''' try: res = db_client.get_item( TableName = table_name, Key = py_dict_to_db_map(key) ) except Exception as e: return {"Error":e} return res ''' dynamodb structure management ''' def get_structure(table_name:str): ''' get the structure of a table in dynamodb Args: table_name (str): the name of the table in dynamodb Returns: dict: the structure of the table ''' result = db_client.describe_table(TableName = table_name) return result["Table"]["AttributeDefinitions"]