| | import gradio as gr
|
| | from gradio_calendar import Calendar
|
| | import pandas as pd
|
| | import psycopg2
|
| | from psycopg2 import sql
|
| | from openai import OpenAI
|
| | from pprint import pprint
|
| | from user import User
|
| | from dotenv import load_dotenv
|
| | import os
|
| | from datetime import timedelta
|
| | import json
|
| | import traceback
|
| | import shutil
|
| |
|
| |
|
| | load_dotenv()
|
| |
|
| |
|
| | def fetch_users():
|
| | db_params = {
|
| | 'dbname': 'ourcoach',
|
| | 'user': 'ourcoach',
|
| | 'password': 'hvcTL3kN3pOG5KteT17T',
|
| | 'host': 'staging-ourcoach.cx8se8o0iaiy.ap-southeast-1.rds.amazonaws.com',
|
| | 'port': '5432'
|
| | }
|
| | try:
|
| | with psycopg2.connect(**db_params) as conn:
|
| | with conn.cursor() as cursor:
|
| | query = sql.SQL("SELECT id, name, created_at FROM {table}").format(
|
| | table=sql.Identifier('public', 'users')
|
| | )
|
| | cursor.execute(query)
|
| | rows = cursor.fetchall()
|
| | colnames = [desc[0] for desc in cursor.description]
|
| | df = pd.DataFrame(rows, columns=colnames)
|
| | return df[['id', 'name', 'created_at']]
|
| | except psycopg2.Error as e:
|
| | print(f"An error occurred: {e}")
|
| | return pd.DataFrame()
|
| |
|
| | def get_user_info(user_id):
|
| | db_params = {
|
| | 'dbname': 'ourcoach',
|
| | 'user': 'ourcoach',
|
| | 'password': 'hvcTL3kN3pOG5KteT17T',
|
| | 'host': 'staging-ourcoach.cx8se8o0iaiy.ap-southeast-1.rds.amazonaws.com',
|
| | 'port': '5432'
|
| | }
|
| | try:
|
| | with psycopg2.connect(**db_params) as conn:
|
| | with conn.cursor() as cursor:
|
| | query = sql.SQL("SELECT * FROM {table} WHERE id = %s").format(
|
| | table=sql.Identifier('public', 'users')
|
| | )
|
| | cursor.execute(query, (user_id,))
|
| | row = cursor.fetchone()
|
| | if row:
|
| | colnames = [desc[0] for desc in cursor.description]
|
| | user_data = dict(zip(colnames, row))
|
| |
|
| | user_data_clean = json.loads(user_data['onboarding'])
|
| | doLiving = "\n".join([f"- {item['question']} : {item['answer']}" for item in user_data_clean['doLiving']])
|
| | whoImportant = "\n".join([f"- {item['question']} : {item['answer']}" for item in user_data_clean['whoImportant']])
|
| | user_data_formatted = f"""
|
| | ### USER PROFILE ###
|
| |
|
| | Name: {user_data_clean['firstName']}
|
| | {user_data_clean['firstName']}'s Legend Persona: {user_data_clean['legendPersona']}
|
| | Pronouns: {user_data_clean['pronouns']}
|
| | Birthday: {user_data_clean['birthDate']}
|
| | {user_data_clean['firstName']}'s MBTI: {user_data_clean['mbti']}
|
| | {user_data_clean['firstName']}'s Love Language: {user_data_clean['loveLanguage']}
|
| | Has {user_data_clean['firstName']} tried coaching before: {user_data_clean['triedCoaching']}
|
| | Belief in Astrology: {user_data_clean['astrology']}
|
| | What matters the most in {user_data_clean['firstName']}'s life: {user_data_clean['mattersMost'][0]}
|
| | Other things that also matters (ordered from the most to least important):
|
| | 1. {user_data_clean['mattersMost'][1]}
|
| | 2. {user_data_clean['mattersMost'][2]}
|
| | 3. {user_data_clean['mattersMost'][3]}
|
| | 4. {user_data_clean['mattersMost'][4]} (Matters the least)
|
| | What does {user_data_clean['firstName']} do for a living:
|
| | {doLiving}
|
| | {user_data_clean['firstName']}'s current situation: {user_data_clean['mySituation']}
|
| | {user_data_clean['firstName']}'s most important person:
|
| | {whoImportant}
|
| | """
|
| |
|
| | return user_data_formatted
|
| | else:
|
| | return None
|
| | except psycopg2.Error as e:
|
| | print(f"An error occurred: {e}")
|
| | return None
|
| |
|
| |
|
| | user_df = fetch_users()
|
| |
|
| | with gr.Blocks() as app:
|
| | gr.Markdown("## Demo 2")
|
| |
|
| |
|
| |
|
| |
|
| | client = OpenAI(api_key=os.getenv('OPENAI_API_KEY'))
|
| | user_message = gr.State("")
|
| | user = gr.State()
|
| | prev_date = gr.State(pd.Timestamp.now().date())
|
| |
|
| |
|
| | user_dropdown = gr.Dropdown(
|
| | choices=user_df['id'].apply(lambda x: f"{x} - {user_df.loc[user_df['id'] == x, 'name'].values[0]}").tolist(),
|
| | label="Please Select User First",
|
| | value=None,
|
| | )
|
| |
|
| | assistant = gr.Dropdown(
|
| | choices=[("Backup of Coach Steve","asst_SI8I6oLdqPAQTAiUL3tTO8E4"), ("Coach Steve","asst_C7oBmqDRi085X8V7DPdMxhRF")],
|
| | label="Please Select an Assistant",
|
| | )
|
| |
|
| |
|
| | date_picker = Calendar(type="string", label="Select Date and Time")
|
| |
|
| |
|
| | change_day_button = gr.Button("Change Day")
|
| |
|
| |
|
| | with gr.Tabs():
|
| | with gr.Tab("Chat"):
|
| |
|
| | chatbot = gr.Chatbot(type='messages')
|
| | chatbot_input = gr.Textbox(label="Your Message")
|
| | send_button = gr.Button("Send")
|
| | reset_button = gr.Button("Reset")
|
| | with gr.Tab("Mementos/Goals"):
|
| |
|
| | mementos_display = gr.JSON(label="Mementos / Goals")
|
| | with gr.Tab("Raw Message History"):
|
| |
|
| | message_history_display = gr.Textbox(label="Raw Message History", interactive=False, lines=20, show_copy_button=True, show_label=True, autofocus=True)
|
| |
|
| |
|
| | def build_user(selected_user, assistant):
|
| | print("Building user")
|
| | user_id = selected_user.split(" - ")[0]
|
| | user_info = get_user_info(user_id)
|
| | return User(user_id, user_info, client, assistant), [{'role': 'assistant', 'content': f"Chatbot initialized for user {selected_user}."}]
|
| |
|
| | def change_assistant(selected_user, assistant, user):
|
| | print("Building user")
|
| | user_id = selected_user.split(" - ")[0]
|
| | if user_id == user.user_id:
|
| | user = user.change_assistant(assistant), [{'role': 'assistant', 'content': f"Assistant changed to:{assistant}."}]
|
| | return user
|
| | user_info = get_user_info(user_id)
|
| | return User(user_id, user_info, client, assistant), [{'role': 'assistant', 'content': f"Assistant changed to:{assistant}."}]
|
| |
|
| | def reset(history, user):
|
| | history = [{'role': 'assistant', 'content': "Chatbot reset."}]
|
| | user.reset()
|
| |
|
| | user_mementos_path = f"mementos/{user.user_id}"
|
| | if os.path.exists(user_mementos_path):
|
| | for memento_file in os.listdir(user_mementos_path):
|
| | os.remove(f"{user_mementos_path}/{memento_file}")
|
| | return history
|
| |
|
| | def update_message_history(user):
|
| |
|
| | return "\n".join([f"{msg['role']}: {msg['content']}" for msg in user.get_messages(False)])
|
| |
|
| | def fetch_mementos(selected_user):
|
| |
|
| | user_id = selected_user.split(" - ")[0]
|
| | mementos = []
|
| | user_mementos_path = f"mementos/{user_id}"
|
| |
|
| |
|
| | if os.path.exists(user_mementos_path):
|
| | for memento_file in os.listdir(user_mementos_path):
|
| | with open(f"{user_mementos_path}/{memento_file}", 'r') as f:
|
| | mementos.append(json.load(f))
|
| | else:
|
| | mementos = [{"message": "No mementos found for this user."}]
|
| |
|
| | print("Fetched mementos:", mementos)
|
| | return mementos
|
| |
|
| | user_dropdown.change(
|
| | fn=build_user,
|
| | inputs=[user_dropdown, assistant],
|
| | outputs=[user, chatbot]
|
| | ).then(
|
| | fn=fetch_mementos,
|
| | inputs=user_dropdown,
|
| | outputs=mementos_display
|
| | )
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| | assistant.change(
|
| | fn=change_assistant,
|
| | inputs=[user_dropdown, assistant, user],
|
| | outputs=[user, chatbot]
|
| | ).then(
|
| | fn=fetch_mementos,
|
| | inputs=user_dropdown,
|
| | outputs=mementos_display
|
| | ).then(
|
| | fn=update_message_history,
|
| | inputs=user,
|
| | outputs=message_history_display
|
| | )
|
| |
|
| | def user_input(user_message, history):
|
| | user_content = user_message
|
| | return "", history + [{"role": "user", "content": user_message}], user_content
|
| |
|
| | def chatbot_response(user, message):
|
| | print(f"User message: {message}")
|
| | user.send_message(message)
|
| | return user.get_messages()
|
| |
|
| | chatbot_input.submit(
|
| | fn=user_input,
|
| | inputs=[chatbot_input, chatbot],
|
| | outputs=[chatbot_input, chatbot, user_message],
|
| | queue=False
|
| | ).then(
|
| | fn=chatbot_response,
|
| | inputs=[user, user_message],
|
| | outputs=chatbot
|
| | ).then(
|
| | fn=fetch_mementos,
|
| | inputs=user_dropdown,
|
| | outputs=mementos_display
|
| | ).then(
|
| | fn=update_message_history,
|
| | inputs=user,
|
| | outputs=message_history_display
|
| | )
|
| |
|
| | send_button.click(
|
| | fn=user_input,
|
| | inputs=[chatbot_input, chatbot],
|
| | outputs=[chatbot_input, chatbot, user_message],
|
| | queue=False
|
| | ).then(
|
| | fn=chatbot_response,
|
| | inputs=[user, user_message],
|
| | outputs=chatbot
|
| | ).then(
|
| | fn=fetch_mementos,
|
| | inputs=user_dropdown,
|
| | outputs=mementos_display
|
| | ).then(
|
| | fn=update_message_history,
|
| | inputs=user,
|
| | outputs=message_history_display
|
| | )
|
| |
|
| | reset_button.click(
|
| | fn=reset,
|
| | inputs=[chatbot, user],
|
| | outputs=chatbot
|
| | ).then(
|
| | fn=update_message_history,
|
| | inputs=user,
|
| | outputs=message_history_display
|
| | )
|
| |
|
| | def date_changed(selected_date, user):
|
| | user.change_date(selected_date)
|
| | history = user.get_messages()[:-1] + [{'role': 'assistant', 'content': f"Date changed to {selected_date}."}] + [user.get_messages()[-1]]
|
| | return history
|
| |
|
| |
|
| | def next_day(selected_date, prev_date, user):
|
| |
|
| | next_date = pd.to_datetime(selected_date).date()
|
| |
|
| | print("Selected date:", next_date)
|
| | print("Previous date:", prev_date)
|
| | if next_date == prev_date:
|
| | next_date = pd.to_datetime(selected_date) + timedelta(days=1)
|
| | next_date = next_date.date()
|
| |
|
| |
|
| | user.infer_memento_follow_ups()
|
| |
|
| |
|
| | user_id = user.user_id
|
| | to_upload_folder = f"mementos/to_upload/{user_id}"
|
| | destination_folder = f"mementos/{user_id}"
|
| |
|
| | if os.path.exists(to_upload_folder):
|
| |
|
| | if not os.path.exists(destination_folder):
|
| | os.makedirs(destination_folder)
|
| |
|
| | for filename in os.listdir(to_upload_folder):
|
| | if filename.endswith('.json'):
|
| | file_path = os.path.join(to_upload_folder, filename)
|
| | try:
|
| |
|
| | with open(file_path, "rb") as f:
|
| | memory_file = client.files.create(file=f, purpose="assistants")
|
| | vector_store_file = client.beta.vector_stores.files.create(
|
| | vector_store_id=user.conversations.user_personal_memory.id,
|
| | file_id=memory_file.id
|
| | )
|
| |
|
| | shutil.move(file_path, os.path.join(destination_folder, filename))
|
| | except Exception as e:
|
| | print(f"Failed to upload and move file {filename}: {e}")
|
| | traceback.print_exc()
|
| |
|
| |
|
| | return date_changed(next_date, user), next_date, next_date.strftime("%Y-%m-%d")
|
| |
|
| | change_day_button.click(
|
| | fn=next_day,
|
| | inputs=[date_picker, prev_date, user],
|
| | outputs=[chatbot, prev_date, date_picker]
|
| | ).then(
|
| | fn=fetch_mementos,
|
| | inputs=user_dropdown,
|
| | outputs=mementos_display
|
| | ).then(
|
| | fn=update_message_history,
|
| | inputs=user,
|
| | outputs=message_history_display
|
| | )
|
| |
|
| |
|
| | app.launch() |