import boto3 from utility import * # initialize dynamodb instance db_client = boto3.client( "dynamodb", aws_access_key_id = aws_access_key_id, aws_secret_access_key = aws_secret_access_key, region_name = "us-east-1" ) ''' dynamodb data operations ''' # get the list of articles from articles table in dynamodb @terminal_print def get_table(table_name:str): result = db_client.scan(TableName = table_name) items = result["Items"] while "LastEvaluatedKey" in result: result = db_client.scan(TableName = table_name,ExclusiveStartKey = result["LastEvaluatedKey"]) items.extend(result["Items"]) return [db_map_to_py_dict(r) for r in items] # add a new article to table articles in dynamodb, return error if failed def post_item(table_name:str,item:dict): try: res = db_client.put_item( TableName = table_name, Item = py_dict_to_db_map(item) ) except Exception as e: return {"Error":e} return res # update an article in table articles in dynamodb, return error if failed @terminal_print def put_item(table_name:str,item:dict): try: res = db_client.put_item( TableName = table_name, Item = py_dict_to_db_map(item) ) except Exception as e: return {"Error":e} return res # delete an article in table articles in dynamodb, return error if not found. @terminal_print def delete_item(table_name:str,key:dict): try: res = db_client.delete_item( TableName = table_name, Key = py_dict_to_db_map(key) ) except Exception as e: return {"Error":e} return res ''' ''' @terminal_print def get_item(table_name:str,key:dict): try: res = db_client.get_item( TableName = table_name, Key = py_dict_to_db_map(key) ) except Exception as e: return {"Error":e} return res ''' dynamodb structure management ''' @terminal_print def get_structure(table_name:str): result = db_client.describe_table(TableName = table_name) return result["Table"]["AttributeDefinitions"]