Spaces:
Paused
Paused
| import os | |
| import time | |
| import boto3 | |
| from boto3.dynamodb.types import TypeDeserializer, TypeSerializer | |
| from botocore.exceptions import ClientError | |
| from datetime import datetime, timezone | |
| from uuid import uuid4 | |
| from decimal import Decimal | |
| from dotenv import load_dotenv | |
| load_dotenv() | |
| AWS_REGION = os.getenv("AWS_REGION", "us-east-1") | |
| AWS_ACCESS_KEY = os.getenv("AWS_ACCESS_KEY", None) | |
| AWS_SECRET_ACCESS_KEY = os.getenv("AWS_SECRET_ACCESS_KEY", None) | |
| DYNAMODB_ENDPOINT = os.getenv("DYNAMODB_ENDPOINT", None) | |
| DDB_TABLE = os.getenv("DDB_TABLE", "chatbot-conversations") | |
| USE_LOCAL_DDB = os.getenv("USE_LOCAL_DDB", "false").lower() == "true" | |
| def get_dynamodb_client(): | |
| if USE_LOCAL_DDB: # only for local testing with DynamoDB Local | |
| return boto3.resource( | |
| "dynamodb", | |
| endpoint_url=DYNAMODB_ENDPOINT, | |
| region_name=AWS_REGION, | |
| aws_access_key_id="fake", | |
| aws_secret_access_key="fake", | |
| ) | |
| else: # production AWS DynamoDB | |
| return boto3.resource( | |
| "dynamodb", | |
| region_name=AWS_REGION, | |
| aws_access_key_id=AWS_ACCESS_KEY, | |
| aws_secret_access_key=AWS_SECRET_ACCESS_KEY, | |
| ) | |
| dynamodb = get_dynamodb_client() | |
| table = None | |
| def create_table_if_not_exists(dynamodb): | |
| global table | |
| client = dynamodb.meta.client | |
| try: | |
| existing_tables = client.list_tables()["TableNames"] | |
| except Exception as e: | |
| print("Cannot list tables:", e) | |
| return None | |
| if DDB_TABLE in existing_tables: | |
| print(f"Table {DDB_TABLE} already exists.") | |
| table = dynamodb.Table(DDB_TABLE) | |
| return table | |
| print(f"Creating DynamoDB table {DDB_TABLE}...") | |
| try: | |
| table = dynamodb.create_table( | |
| TableName=DDB_TABLE, | |
| KeySchema=[ | |
| {"AttributeName": "PK", "KeyType": "HASH"}, | |
| {"AttributeName": "SK", "KeyType": "RANGE"}, | |
| ], | |
| AttributeDefinitions=[ | |
| {"AttributeName": "PK", "AttributeType": "S"}, | |
| {"AttributeName": "SK", "AttributeType": "S"}, | |
| {"AttributeName": "GSI1_PK", "AttributeType": "S"}, | |
| {"AttributeName": "GSI1_SK", "AttributeType": "S"}, | |
| ], | |
| GlobalSecondaryIndexes=[ | |
| { | |
| "IndexName": "GSI1", | |
| "KeySchema": [ | |
| {"AttributeName": "GSI1_PK", "KeyType": "HASH"}, | |
| {"AttributeName": "GSI1_SK", "KeyType": "RANGE"}, | |
| ], | |
| "Projection": {"ProjectionType": "ALL"}, | |
| "ProvisionedThroughput": { | |
| "ReadCapacityUnits": 5, | |
| "WriteCapacityUnits": 5, | |
| }, | |
| } | |
| ], | |
| BillingMode="PAY_PER_REQUEST", | |
| # ProvisionedThroughput={ | |
| # "ReadCapacityUnits": 5, | |
| # "WriteCapacityUnits": 5 | |
| # } | |
| ) | |
| table.wait_until_exists() | |
| print(f"Table {DDB_TABLE} created.") | |
| return table | |
| except ClientError as e: | |
| print("Error creating table:", e.response["Error"]["Message"]) | |
| return None | |
| def iso_ts(): | |
| # Return the current timestamp in ISO 8601 format | |
| return datetime.now(timezone.utc).isoformat() | |
| table = create_table_if_not_exists(dynamodb) | |
| def convert_floats(obj): | |
| if isinstance(obj, float): | |
| return Decimal(str(obj)) | |
| elif isinstance(obj, dict): | |
| return {k: convert_floats(v) for k, v in obj.items()} | |
| elif isinstance(obj, list): | |
| return [convert_floats(v) for v in obj] | |
| else: | |
| return obj | |
| def log_event(user_id, session_id, data): | |
| """ | |
| Log conversation data to DynamoDB table. | |
| :param user_id: ID of the user | |
| :param session_id: ID of the session | |
| :param data: Dictionary containing conversation data | |
| """ | |
| global table | |
| if table is None: | |
| print("Table not initialized. Skipping log.") | |
| return | |
| ts = iso_ts() | |
| item = { | |
| "PK": f"SESSION#{session_id}", | |
| "SK": f"TS#{ts}#{uuid4().hex}", | |
| "user_id": user_id, | |
| "GSI1_PK": f"USER#{user_id}", | |
| "GSI1_SK": f"TS#{ts}", | |
| "session_id": session_id, | |
| "timestamp": ts, | |
| "data": convert_floats(data), | |
| } | |
| print(f"Logging conversation: {item}") | |
| try: | |
| table.put_item(Item=item) | |
| except ClientError as e: | |
| print(f"Error logging conversation: {e.response['Error']['Message']}") | |