Anirudh Esthuri
Add better error handling for AWS Bedrock credentials
4d77281
raw
history blame
23 kB
import os
import secrets
import time
from typing import cast
from urllib.parse import urlencode
import requests
import streamlit as st
from gateway_client import delete_profile, ingest_and_rewrite
from llm import chat, set_model
from model_config import MODEL_CHOICES, MODEL_TO_PROVIDER, MODEL_DISPLAY_NAMES
def _generate_session_name(base: str = "Session") -> str:
existing = set(st.session_state.get("session_order", []))
idx = 1
while True:
candidate = f"{base} {idx}"
if candidate not in existing:
return candidate
idx += 1
def ensure_session_state() -> None:
if "sessions" not in st.session_state:
st.session_state.sessions = {}
if "session_order" not in st.session_state:
st.session_state.session_order = []
if (
"active_session_id" not in st.session_state
or st.session_state.active_session_id not in st.session_state.sessions
):
default_name = _generate_session_name()
st.session_state.sessions.setdefault(default_name, {"history": []})
if default_name not in st.session_state.session_order:
st.session_state.session_order.append(default_name)
st.session_state.active_session_id = default_name
if "session_select" not in st.session_state:
st.session_state.session_select = st.session_state.active_session_id
if st.session_state.session_select not in st.session_state.sessions:
st.session_state.session_select = st.session_state.active_session_id
st.session_state.setdefault(
"rename_session_name", st.session_state.active_session_id
)
st.session_state.setdefault(
"rename_session_synced_to", st.session_state.active_session_id
)
st.session_state.history = cast(
list[dict],
st.session_state.sessions[
st.session_state.active_session_id
].setdefault("history", []),
)
def create_session(session_name: str | None = None) -> tuple[bool, str]:
ensure_session_state()
candidate = (session_name or "").strip()
if not candidate:
candidate = _generate_session_name()
if candidate in st.session_state.sessions:
return False, candidate
st.session_state.sessions[candidate] = {"history": []}
st.session_state.session_order.append(candidate)
st.session_state.active_session_id = candidate
st.session_state.session_select = candidate
st.session_state.history = cast(
list[dict], st.session_state.sessions[candidate]["history"]
)
st.session_state.rename_session_name = candidate
st.session_state.rename_session_synced_to = candidate
return True, candidate
def rename_session(current_name: str, new_name: str) -> bool:
ensure_session_state()
target = new_name.strip()
if not target or target == current_name:
return False
if target in st.session_state.sessions:
return False
st.session_state.sessions[target] = st.session_state.sessions.pop(current_name)
order = st.session_state.session_order
order[order.index(current_name)] = target
if st.session_state.active_session_id == current_name:
st.session_state.active_session_id = target
st.session_state.session_select = target
st.session_state.history = cast(
list[dict],
st.session_state.sessions[st.session_state.active_session_id]["history"],
)
st.session_state.rename_session_name = target
st.session_state.rename_session_synced_to = target
return True
def delete_session(session_name: str) -> bool:
ensure_session_state()
if session_name not in st.session_state.sessions:
return False
if len(st.session_state.session_order) <= 1:
return False
st.session_state.sessions.pop(session_name, None)
st.session_state.session_order.remove(session_name)
if st.session_state.active_session_id == session_name:
st.session_state.active_session_id = st.session_state.session_order[-1]
st.session_state.session_select = st.session_state.active_session_id
st.session_state.rename_session_name = st.session_state.active_session_id
st.session_state.rename_session_synced_to = st.session_state.active_session_id
st.session_state.history = cast(
list[dict],
st.session_state.sessions[st.session_state.active_session_id]["history"],
)
return True
def rewrite_message(
msg: str, persona_name: str, show_rationale: bool
) -> str:
if persona_name.lower() == "control":
rewritten_msg = msg
if show_rationale:
rewritten_msg += " At the beginning of your response, please say the following in ITALIC: 'Persona Rationale: No personalization applied.'. Begin your answer on the next line."
return rewritten_msg
try:
rewritten_msg = ingest_and_rewrite(
user_id=persona_name, query=msg
)
if show_rationale:
rewritten_msg += " At the beginning of your response, please say the following in ITALIC: 'Persona Rationale: ' followed by 1 sentence about how your reasoning for how the persona traits influenced this response, also in italics. Begin your answer on the next line."
except Exception as e:
st.error(f"Failed to ingest_and_append message: {e}")
raise
print(rewritten_msg)
return rewritten_msg
# ──────────────────────────────────────────────────────────────
# Page setup & CSS
# ──────────────────────────────────────────────────────────────
st.set_page_config(page_title="MemMachine Chatbot", layout="wide")
try:
with open("./styles.css") as f:
st.markdown(f"<style>{f.read()}</style>", unsafe_allow_html=True)
except FileNotFoundError:
pass
ensure_session_state()
HEADER_STYLE = """
<style>
.memmachine-header-wrapper {
display: flex;
justify-content: flex-end;
margin-bottom: 1.2rem;
}
.memmachine-header-links {
display: inline-flex;
gap: 14px;
align-items: center;
background: transparent;
padding: 0;
border-radius: 0;
}
.memmachine-header-links .powered-by {
color: #0a6cff;
font-weight: 700;
font-size: 16px;
margin-right: 6px;
white-space: nowrap;
}
.memmachine-header-links a {
text-decoration: none;
color: inherit;
display: flex;
align-items: center;
justify-content: center;
padding: 0;
border-radius: 0;
transition: opacity 0.2s ease;
}
.memmachine-header-links a:hover {
opacity: 0.7;
}
.memmachine-header-links img,
.memmachine-header-links svg {
width: 22px;
height: 22px;
}
@media (max-width: 768px) {
.memmachine-header-wrapper {
justify-content: center;
margin-bottom: 0.8rem;
}
.memmachine-header-links {
flex-wrap: wrap;
row-gap: 8px;
justify-content: center;
}
}
</style>
"""
HEADER_HTML = """
<div class="memmachine-header-wrapper">
<div class="memmachine-header-links">
<span class="powered-by">Powered by MemMachine</span>
<a href="https://memmachine.ai/" target="_blank" title="MemMachine">
<img src="https://avatars.githubusercontent.com/u/226739620?s=48&v=4" alt="MemMachine logo"/>
</a>
<a href="https://github.com/MemMachine/MemMachine" target="_blank" title="GitHub Repository">
<svg xmlns="http://www.w3.org/2000/svg" viewBox="0 0 24 24" fill="currentColor">
<path d="M12 0c-6.626 0-12 5.373-12 12 0 5.302 3.438 9.8 8.207 11.387.599.111.793-.261.793-.577v-2.234c-3.338.726-4.033-1.416-4.033-1.416-.546-1.387-1.333-1.756-1.333-1.756-1.089-.745.083-.729.083-.729 1.205.084 1.839 1.237 1.839 1.237 1.07 1.834 2.807 1.304 3.492.997.107-.775.418-1.305.762-1.604-2.665-.305-5.467-1.334-5.467-5.931 0-1.311.469-2.381 1.236-3.221-.124-.303-.535-1.524.117-3.176 0 0 1.008-.322 3.301 1.23.957-.266 1.983-.399 3.003-.404 1.02.005 2.047.138 3.006.404 2.291-1.552 3.297-1.23 3.297-1.23.653 1.653.242 2.874.118 3.176.77.84 1.235 1.911 1.235 3.221 0 4.609-2.807 5.624-5.479 5.921.43.372.823 1.102.823 2.222v3.293c0 .319.192.694.801.576 4.765-1.589 8.199-6.086 8.199-11.386 0-6.627-5.373-12-12-12z"/>
</svg>
</a>
<a href="https://discord.gg/usydANvKqD" target="_blank" title="Discord Community">
<svg xmlns="http://www.w3.org/2000/svg" viewBox="0 0 24 24" fill="currentColor">
<path d="M20.317 4.37a19.791 19.791 0 0 0-4.885-1.515.074.074 0 0 0-.079.037c-.21.375-.444.864-.608 1.25a18.27 18.27 0 0 0-5.487 0 12.64 12.64 0 0 0-.617-1.25.077.077 0 0 0-.079-.037A19.736 19.736 0 0 0 3.677 4.37a.07.07 0 0 0-.032.027C.533 9.046-.32 13.58.099 18.057a.082.082 0 0 0 .031.057 19.9 19.9 0 0 0 5.993 3.03.078.078 0 0 0 .084-.028c.462-.63.874-1.295 1.226-1.994a.076.076 0 0 0-.041-.106 13.107 13.107 0 0 1-1.872-.892.077.077 0 0 1-.008-.128 10.2 10.2 0 0 0 .372-.292.074.074 0 0 1 .077-.01c3.928 1.793 8.18 1.793 12.062 0a.074.074 0 0 1 .078.01c.12.098.246.198.373.292a.077.077 0 0 1-.006.127 12.299 12.299 0 0 1-1.873.892.077.077 0 0 0-.041.107c.36.698.772 1.362 1.225 1.993a.076.076 0 0 0 .084.028 19.839 19.839 0 0 0 6.002-3.03.077.077 0 0 0 .032-.054c.5-5.177-.838-9.674-3.549-13.66a.061.061 0 0 0-.031-.03zM8.02 15.33c-1.183 0-2.157-1.085-2.157-2.419 0-1.333.956-2.419 2.157-2.419 1.21 0 2.176 1.096 2.157 2.42 0 1.333-.956 2.418-2.157 2.418zm7.975 0c-1.183 0-2.157-1.085-2.157-2.419 0-1.333.955-2.419 2.157-2.419 1.21 0 2.176 1.096 2.157 2.42 0 1.333-.946 2.418-2.157 2.418z"/>
</svg>
</a>
</div>
</div>
"""
st.markdown(HEADER_STYLE, unsafe_allow_html=True)
st.markdown(HEADER_HTML, unsafe_allow_html=True)
# ──────────────────────────────────────────────────────────────
# Sidebar
# ──────────────────────────────────────────────────────────────
default_model = MODEL_CHOICES[0] if MODEL_CHOICES else "gpt-4.1-mini"
model_id = default_model
provider = MODEL_TO_PROVIDER.get(default_model, "openai")
selected_persona = "Charlie"
persona_name = "Charlie"
skip_rewrite = False
compare_personas = False
show_rationale = False
with st.sidebar:
st.markdown("#### Sessions")
session_options = st.session_state.session_order
active_session = st.session_state.active_session_id
if st.session_state.rename_session_synced_to != active_session:
st.session_state.rename_session_name = active_session
st.session_state.rename_session_synced_to = active_session
for idx, session_name in enumerate(session_options, start=1):
is_active = session_name == active_session
button_label = f"{session_name}"
row = st.container()
with row:
button_col, menu_col = st.columns([0.8, 0.2])
with button_col:
if st.button(
button_label,
key=f"session_button_{session_name}",
use_container_width=True,
type="primary" if is_active else "secondary",
):
if not is_active:
st.session_state.active_session_id = session_name
st.session_state.session_select = session_name
st.session_state.history = cast(
list[dict],
st.session_state.sessions[session_name]["history"],
)
st.session_state.rename_session_name = session_name
st.session_state.rename_session_synced_to = session_name
st.rerun()
with menu_col:
if hasattr(st, "popover"):
menu_container = st.popover("β‹―", use_container_width=True)
else:
menu_container = st.expander(
"β‹―", expanded=False, key=f"session_actions_{session_name}"
)
with menu_container:
st.markdown(f"**Actions for {session_name}**")
rename_value = st.text_input(
"Rename session",
value=session_name,
key=f"rename_session_input_{session_name}",
)
if st.button(
"Rename",
use_container_width=True,
key=f"rename_session_button_{session_name}",
):
rename_target = rename_value.strip()
if not rename_target:
st.warning("Enter a session name to rename.")
elif rename_target == session_name:
st.info("Session name unchanged.")
elif rename_target in st.session_state.sessions:
st.warning(f"Session '{rename_target}' already exists.")
elif rename_session(session_name, rename_target):
st.success(f"Session renamed to '{rename_target}'.")
st.rerun()
else:
st.error("Unable to rename session. Please try again.")
st.divider()
if st.button(
"Delete session",
use_container_width=True,
type="secondary",
key=f"delete_session_button_{session_name}",
):
if delete_session(session_name):
new_active = st.session_state.active_session_id
st.session_state.session_select = new_active
st.session_state.rename_session_name = new_active
st.session_state.rename_session_synced_to = new_active
st.success(f"Session '{session_name}' deleted.")
st.rerun()
else:
st.warning("Cannot delete the last remaining session.")
with st.form("create_session_form", clear_on_submit=True):
new_session_name = st.text_input(
"New session name",
key="create_session_name",
placeholder="Leave blank for automatic name",
)
if st.form_submit_button("Create session", use_container_width=True):
success, created_name = create_session(new_session_name)
if success:
st.success(f"Session '{created_name}' created.")
st.rerun()
else:
st.warning(f"Session '{created_name}' already exists.")
st.divider()
st.markdown("#### Choose Model")
# Create display options with categories
display_options = [MODEL_DISPLAY_NAMES[model] for model in MODEL_CHOICES]
selected_display = st.selectbox(
"Choose Model", display_options, index=0, label_visibility="collapsed"
)
# Get the actual model ID from the display name
model_id = next(model for model, display in MODEL_DISPLAY_NAMES.items()
if display == selected_display)
provider = MODEL_TO_PROVIDER[model_id]
set_model(model_id)
st.markdown("#### Choose user persona")
selected_persona = st.selectbox(
"Choose user persona",
["Charlie", "Jing", "Charles", "Control"],
label_visibility="collapsed",
)
custom_persona = st.text_input("Or enter your name", "")
persona_name = (
custom_persona.strip() if custom_persona.strip() else selected_persona
)
compare_personas = st.checkbox("Compare with Control persona")
show_rationale = st.checkbox("Show Persona Rationale")
st.divider()
if st.button("Clear chat", use_container_width=True):
active = st.session_state.active_session_id
st.session_state.sessions[active]["history"].clear()
st.session_state.history = cast(
list[dict],
st.session_state.sessions[active]["history"],
)
st.rerun()
if st.button("Delete Profile", use_container_width=True):
success = delete_profile(persona_name)
active = st.session_state.active_session_id
st.session_state.sessions[active]["history"].clear()
st.session_state.history = cast(
list[dict],
st.session_state.sessions[active]["history"],
)
if success:
st.success(f"Profile for '{persona_name}' deleted.")
else:
st.error(f"Failed to delete profile for '{persona_name}'.")
st.divider()
# ──────────────────────────────────────────────────────────────
# Enforce alternating roles
# ──────────────────────────────────────────────────────────────
def clean_history(history: list[dict], persona: str) -> list[dict]:
out = []
for turn in history:
if turn.get("role") == "user":
out.append({"role": "user", "content": turn["content"]})
elif turn.get("role") == "assistant" and turn.get("persona") == persona:
out.append({"role": "assistant", "content": turn["content"]})
cleaned = []
last_role = None
for msg in out:
if msg["role"] != last_role:
cleaned.append(msg)
last_role = msg["role"]
return cleaned
def append_user_turn(msgs: list[dict], new_user_msg: str) -> list[dict]:
if msgs and msgs[-1]["role"] == "user":
msgs[-1] = {"role": "user", "content": new_user_msg}
else:
msgs.append({"role": "user", "content": new_user_msg})
return msgs
def typewriter_effect(text: str, speed: float = 0.02):
"""Generator that yields text word by word to create a typing effect."""
words = text.split(" ")
for i, word in enumerate(words):
if i == 0:
yield word
else:
yield " " + word
time.sleep(speed)
msg = st.chat_input("Type your message…")
if msg:
st.session_state.history.append({"role": "user", "content": msg})
if compare_personas:
all_answers = {}
rewritten_msg = rewrite_message(msg, persona_name, show_rationale)
msgs = clean_history(st.session_state.history, persona_name)
msgs = append_user_turn(msgs, rewritten_msg)
try:
txt, lat, tok, tps = chat(msgs, persona_name)
all_answers[persona_name] = txt
except ValueError as e:
st.error(f"❌ {str(e)}")
st.stop()
rewritten_msg_control = rewrite_message(msg, "Control", show_rationale)
msgs_control = clean_history(st.session_state.history, "Control")
msgs_control = append_user_turn(msgs_control, rewritten_msg_control)
try:
txt_control, lat, tok, tps = chat(msgs_control, "Arnold")
all_answers["Control"] = txt_control
except ValueError as e:
st.error(f"❌ {str(e)}")
st.stop()
st.session_state.history.append(
{"role": "assistant_all", "axis": "role", "content": all_answers, "is_new": True}
)
else:
rewritten_msg = rewrite_message(msg, persona_name, show_rationale)
msgs = clean_history(st.session_state.history, persona_name)
msgs = append_user_turn(msgs, rewritten_msg)
try:
txt, lat, tok, tps = chat(
msgs, "Arnold" if persona_name == "Control" else persona_name
)
st.session_state.history.append(
{"role": "assistant", "persona": persona_name, "content": txt, "is_new": True}
)
except ValueError as e:
st.error(f"❌ {str(e)}")
st.stop()
st.rerun()
# ──────────────────────────────────────────────────────────────
# Chat history display
# ──────────────────────────────────────────────────────────────
for turn in st.session_state.history:
if turn.get("role") == "user":
st.chat_message("user").write(turn["content"])
elif turn.get("role") == "assistant":
with st.chat_message("assistant"):
# Use typing effect for new messages, normal display for old ones
if turn.get("is_new", False):
st.write_stream(typewriter_effect(turn["content"]))
# Mark as no longer new so it displays normally on rerun
turn["is_new"] = False
else:
st.write(turn["content"])
elif turn.get("role") == "assistant_all":
content_items = list(turn["content"].items())
is_new = turn.get("is_new", False)
if len(content_items) >= 2:
cols = st.columns([1, 0.03, 1])
persona_label, persona_response = content_items[0]
control_label, control_response = content_items[1]
with cols[0]:
st.markdown(f"**{persona_label}**")
if is_new:
st.write_stream(typewriter_effect(persona_response))
else:
st.markdown(
f'<div class="answer">{persona_response}</div>',
unsafe_allow_html=True,
)
with cols[1]:
st.markdown(
'<div class="vertical-divider"></div>', unsafe_allow_html=True
)
with cols[2]:
st.markdown(f"**{control_label}**")
if is_new:
st.write_stream(typewriter_effect(control_response))
else:
st.markdown(
f'<div class="answer">{control_response}</div>',
unsafe_allow_html=True,
)
else:
for label, response in content_items:
st.markdown(f"**{label}**")
if is_new:
st.write_stream(typewriter_effect(response))
else:
st.markdown(
f'<div class="answer">{response}</div>', unsafe_allow_html=True
)
# Mark as no longer new
if is_new:
turn["is_new"] = False