champ-chatbot / dynamodb_helper.py
qyle's picture
deployment
392b300 verified
import os
import time
import boto3
from boto3.dynamodb.types import TypeDeserializer, TypeSerializer
from botocore.exceptions import ClientError
from datetime import datetime, timezone
from uuid import uuid4
from decimal import Decimal
from dotenv import load_dotenv
load_dotenv()
AWS_REGION = os.getenv("AWS_REGION", "us-east-1")
AWS_ACCESS_KEY = os.getenv("AWS_ACCESS_KEY", None)
AWS_SECRET_ACCESS_KEY = os.getenv("AWS_SECRET_ACCESS_KEY", None)
DYNAMODB_ENDPOINT = os.getenv("DYNAMODB_ENDPOINT", None)
DDB_TABLE = os.getenv("DDB_TABLE", "chatbot-conversations")
USE_LOCAL_DDB = os.getenv("USE_LOCAL_DDB", "false").lower() == "true"
def get_dynamodb_client():
if USE_LOCAL_DDB: # only for local testing with DynamoDB Local
return boto3.resource(
"dynamodb",
endpoint_url=DYNAMODB_ENDPOINT,
region_name=AWS_REGION,
aws_access_key_id="fake",
aws_secret_access_key="fake",
)
else: # production AWS DynamoDB
return boto3.resource(
"dynamodb",
region_name=AWS_REGION,
aws_access_key_id=AWS_ACCESS_KEY,
aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
)
dynamodb = get_dynamodb_client()
table = None
def create_table_if_not_exists(dynamodb):
global table
client = dynamodb.meta.client
try:
existing_tables = client.list_tables()["TableNames"]
except Exception as e:
print("Cannot list tables:", e)
return None
if DDB_TABLE in existing_tables:
print(f"Table {DDB_TABLE} already exists.")
table = dynamodb.Table(DDB_TABLE)
return table
print(f"Creating DynamoDB table {DDB_TABLE}...")
try:
table = dynamodb.create_table(
TableName=DDB_TABLE,
KeySchema=[
{"AttributeName": "PK", "KeyType": "HASH"},
{"AttributeName": "SK", "KeyType": "RANGE"},
],
AttributeDefinitions=[
{"AttributeName": "PK", "AttributeType": "S"},
{"AttributeName": "SK", "AttributeType": "S"},
{"AttributeName": "GSI1_PK", "AttributeType": "S"},
{"AttributeName": "GSI1_SK", "AttributeType": "S"},
],
GlobalSecondaryIndexes=[
{
"IndexName": "GSI1",
"KeySchema": [
{"AttributeName": "GSI1_PK", "KeyType": "HASH"},
{"AttributeName": "GSI1_SK", "KeyType": "RANGE"},
],
"Projection": {"ProjectionType": "ALL"},
"ProvisionedThroughput": {
"ReadCapacityUnits": 5,
"WriteCapacityUnits": 5,
},
}
],
BillingMode="PAY_PER_REQUEST",
# ProvisionedThroughput={
# "ReadCapacityUnits": 5,
# "WriteCapacityUnits": 5
# }
)
table.wait_until_exists()
print(f"Table {DDB_TABLE} created.")
return table
except ClientError as e:
print("Error creating table:", e.response["Error"]["Message"])
return None
def iso_ts():
# Return the current timestamp in ISO 8601 format
return datetime.now(timezone.utc).isoformat()
table = create_table_if_not_exists(dynamodb)
def convert_floats(obj):
if isinstance(obj, float):
return Decimal(str(obj))
elif isinstance(obj, dict):
return {k: convert_floats(v) for k, v in obj.items()}
elif isinstance(obj, list):
return [convert_floats(v) for v in obj]
else:
return obj
def log_event(user_id, session_id, data):
"""
Log conversation data to DynamoDB table.
:param user_id: ID of the user
:param session_id: ID of the session
:param data: Dictionary containing conversation data
"""
global table
if table is None:
print("Table not initialized. Skipping log.")
return
ts = iso_ts()
item = {
"PK": f"SESSION#{session_id}",
"SK": f"TS#{ts}#{uuid4().hex}",
"user_id": user_id,
"GSI1_PK": f"USER#{user_id}",
"GSI1_SK": f"TS#{ts}",
"session_id": session_id,
"timestamp": ts,
"data": convert_floats(data),
}
print(f"Logging conversation: {item}")
try:
table.put_item(Item=item)
except ClientError as e:
print(f"Error logging conversation: {e.response['Error']['Message']}")