| | import gradio as gr
|
| | from gradio_calendar import Calendar
|
| | import pandas as pd
|
| | import psycopg2
|
| | from psycopg2 import sql
|
| | from openai import OpenAI
|
| | from pprint import pprint
|
| | from user import User
|
| | from dotenv import load_dotenv
|
| | import os
|
| | from datetime import timedelta
|
| | import json
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| | load_dotenv()
|
| |
|
| | client = OpenAI(api_key=os.getenv('OPENAI_API_KEY'))
|
| | user_info = None
|
| | user = None
|
| | history = []
|
| |
|
| |
|
| | def fetch_users():
|
| | db_params = {
|
| | 'dbname': 'ourcoach',
|
| | 'user': 'ourcoach',
|
| | 'password': 'hvcTL3kN3pOG5KteT17T',
|
| | 'host': 'staging-ourcoach.cx8se8o0iaiy.ap-southeast-1.rds.amazonaws.com',
|
| | 'port': '5432'
|
| | }
|
| | try:
|
| | with psycopg2.connect(**db_params) as conn:
|
| | with conn.cursor() as cursor:
|
| | query = sql.SQL("SELECT id, name, created_at FROM {table}").format(
|
| | table=sql.Identifier('public', 'users')
|
| | )
|
| | cursor.execute(query)
|
| | rows = cursor.fetchall()
|
| | colnames = [desc[0] for desc in cursor.description]
|
| | df = pd.DataFrame(rows, columns=colnames)
|
| | return df[['id', 'name', 'created_at']]
|
| | except psycopg2.Error as e:
|
| | print(f"An error occurred: {e}")
|
| | return pd.DataFrame()
|
| |
|
| |
|
| | user_df = fetch_users()
|
| |
|
| | def get_user_info(user_id):
|
| | db_params = {
|
| | 'dbname': 'ourcoach',
|
| | 'user': 'ourcoach',
|
| | 'password': 'hvcTL3kN3pOG5KteT17T',
|
| | 'host': 'staging-ourcoach.cx8se8o0iaiy.ap-southeast-1.rds.amazonaws.com',
|
| | 'port': '5432'
|
| | }
|
| | try:
|
| | with psycopg2.connect(**db_params) as conn:
|
| | with conn.cursor() as cursor:
|
| | query = sql.SQL("SELECT * FROM {table} WHERE id = %s").format(
|
| | table=sql.Identifier('public', 'users')
|
| | )
|
| | cursor.execute(query, (user_id,))
|
| | row = cursor.fetchone()
|
| | if row:
|
| | colnames = [desc[0] for desc in cursor.description]
|
| | user_data = dict(zip(colnames, row))
|
| | return user_data
|
| | else:
|
| | return None
|
| | except psycopg2.Error as e:
|
| | print(f"An error occurred: {e}")
|
| | return None
|
| |
|
| | def display_info(user_id, selected_date):
|
| | global user_info, user, history
|
| | user_info = get_user_info(user_id)
|
| |
|
| |
|
| | user = User(user_id=user_info['id'], user_info=user_info, client=client)
|
| |
|
| | if user_info:
|
| | history.append({'role': 'assistant', 'content': f"Chatbot initialized for {user_info['name']} (User ID: {user_id}) on {selected_date}."})
|
| | return history
|
| | else:
|
| | history.append({'role': 'assistant', 'content': "User not found. Please select a valid user."})
|
| | return history
|
| |
|
| | def chatbot_response(message):
|
| | global user, history
|
| | print(f"User message: {message}")
|
| | user.send_message(message)
|
| | return history + user.get_messages()
|
| |
|
| |
|
| | last_selected_date = pd.Timestamp.now().strftime('%Y-%m-%d')
|
| |
|
| | def date_changed(selected_date):
|
| | global user, history, last_selected_date
|
| |
|
| |
|
| | if selected_date == last_selected_date:
|
| | return history
|
| |
|
| |
|
| | last_selected_date = selected_date
|
| |
|
| | if user is not None:
|
| | user.change_date(selected_date)
|
| | history = user.get_messages()[:-2] + [{'role': 'assistant', 'content': f"Date changed to {selected_date}."}] + [user.get_messages()[-1]]
|
| | return history
|
| |
|
| | def next_day():
|
| | global last_selected_date
|
| | next_date = pd.to_datetime(last_selected_date) + timedelta(days=1)
|
| | last_selected_date = next_date.strftime('%Y-%m-%d')
|
| | return last_selected_date, date_changed(last_selected_date)
|
| |
|
| | def fetch_mementos(selected_user):
|
| |
|
| | user_id = selected_user.split(" - ")[0]
|
| | mementos = []
|
| | user_mementos_path = f"mementos/{user_id}"
|
| |
|
| |
|
| | if os.path.exists(user_mementos_path):
|
| | for memento_file in os.listdir(user_mementos_path):
|
| | with open(f"{user_mementos_path}/{memento_file}", 'r') as f:
|
| | mementos.append(json.load(f))
|
| | else:
|
| | mementos = [{"message": "No mementos found for this user."}]
|
| |
|
| | print("Fetched mementos:", mementos)
|
| | return mementos
|
| |
|
| | display_info(user_df.loc[0, 'id'], pd.Timestamp.now())
|
| |
|
| | with gr.Blocks() as app:
|
| | gr.Markdown("## Demo")
|
| |
|
| |
|
| | user_content = gr.State("")
|
| |
|
| |
|
| | user_dropdown = gr.Dropdown(
|
| | choices=user_df['id'].apply(lambda x: f"{x} - {user_df.loc[user_df['id'] == x, 'name'].values[0]}").tolist(),
|
| | label="Select User"
|
| | )
|
| |
|
| |
|
| | date_picker = Calendar(type="datetime", label="Select Date and Time")
|
| |
|
| |
|
| | next_day_button = gr.Button("Next Day")
|
| |
|
| |
|
| | with gr.Tabs():
|
| | with gr.Tab("Chat"):
|
| |
|
| | chatbot = gr.Chatbot(type='messages')
|
| | chatbot_input = gr.Textbox(label="Your Message")
|
| | send_button = gr.Button("Send")
|
| | reset_button = gr.Button("Reset")
|
| | with gr.Tab("Mementos/Goals"):
|
| |
|
| | mementos_display = gr.JSON(label="Mementos / Goals")
|
| | with gr.Tab("Raw Message History"):
|
| |
|
| | message_history_display = gr.Textbox(label="Raw Message History", interactive=False, lines=20, show_copy_button=True, show_label=True, autofocus=True)
|
| |
|
| |
|
| | def build_user(selected_user, selected_date):
|
| | user_id = selected_user.split(" - ")[0]
|
| | return display_info(user_id, selected_date)
|
| |
|
| | def reset():
|
| | global history, user
|
| | history = [{'role': 'assistant', 'content': "Chatbot reset."}]
|
| | user.reset()
|
| |
|
| | user_mementos_path = f"mementos/{user.user_id}"
|
| | if os.path.exists(user_mementos_path):
|
| | for memento_file in os.listdir(user_mementos_path):
|
| | os.remove(f"{user_mementos_path}/{memento_file}")
|
| | return history
|
| |
|
| | def update_message_history():
|
| |
|
| | return "\n".join([f"{msg['role']}: {msg['content']}" for msg in user.get_messages(False)])
|
| |
|
| | user_dropdown.change(
|
| | fn=build_user,
|
| | inputs=[user_dropdown, date_picker],
|
| | outputs=chatbot
|
| | ).then(
|
| | fn=fetch_mementos,
|
| | inputs=user_dropdown,
|
| | outputs=mementos_display
|
| | ).then(
|
| | fn=update_message_history,
|
| | inputs=None,
|
| | outputs=message_history_display
|
| | )
|
| |
|
| | def user_input(user_message, history):
|
| | user_content = user_message
|
| | return "", history + [{"role": "user", "content": user_message}], user_content
|
| |
|
| | chatbot_input.submit(
|
| | fn=user_input,
|
| | inputs=[chatbot_input, chatbot],
|
| | outputs=[chatbot_input, chatbot, user_content],
|
| | queue=False
|
| | ).then(
|
| | fn=chatbot_response,
|
| | inputs=user_content,
|
| | outputs=chatbot
|
| | ).then(
|
| | fn=fetch_mementos,
|
| | inputs=user_dropdown,
|
| | outputs=mementos_display
|
| | ).then(
|
| | fn=update_message_history,
|
| | inputs=None,
|
| | outputs=message_history_display
|
| | )
|
| |
|
| | send_button.click(
|
| | fn=user_input,
|
| | inputs=[chatbot_input, chatbot],
|
| | outputs=[chatbot_input, chatbot, user_content],
|
| | queue=False
|
| | ).then(
|
| | fn=chatbot_response,
|
| | inputs=user_content,
|
| | outputs=chatbot
|
| | ).then(
|
| | fn=fetch_mementos,
|
| | inputs=user_dropdown,
|
| | outputs=mementos_display
|
| | ).then(
|
| | fn=update_message_history,
|
| | inputs=None,
|
| | outputs=message_history_display
|
| | )
|
| |
|
| | reset_button.click(
|
| | fn=reset,
|
| | inputs=None,
|
| | outputs=chatbot
|
| | ).then(
|
| | fn=update_message_history,
|
| | inputs=None,
|
| | outputs=message_history_display
|
| | )
|
| |
|
| | date_picker.change(
|
| | fn=date_changed,
|
| | inputs=date_picker,
|
| | outputs=chatbot
|
| | ).then(
|
| | fn=update_message_history,
|
| | inputs=None,
|
| | outputs=message_history_display
|
| | )
|
| |
|
| | next_day_button.click(
|
| | fn=next_day,
|
| | inputs=None,
|
| | outputs=[date_picker, chatbot]
|
| | ).then(
|
| | fn=update_message_history,
|
| | inputs=None,
|
| | outputs=message_history_display
|
| | )
|
| |
|
| |
|
| | app.launch() |