Spaces:
PhilSpiel
/
Sleeping

3H9Z / app.py
PhilSpiel's picture
Update app.py
bc27619 verified
raw
history blame
19.2 kB
import gradio as gr
import os
from openai import OpenAI
from datetime import datetime
import threading
import time
import os.path
################# Start PERSONA-SPECIFIC VALUES ######################
coach_code = os.getenv("COACH_CODE")
coach_name_short = os.getenv("COACH_NAME_SHORT")
coach_name_upper = os.getenv("COACH_NAME_UPPER")
coach_name_long = os.getenv("COACH_NAME_LONG")
sys_prompt_new = os.getenv("PROMPT_NEW")
sys_prompt_hist = os.getenv("PROMPT_HIST")
theme=os.getenv("THEME")
################# End PERSONA-SPECIFIC VALUES ######################
################# Start OpenAI-SPECIFIC VALUES ######################
# Initialize OpenAI API client with API key
client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))
# OpenAI model
openai_model = os.getenv("OPENAI_MODEL")
################# End OpenAI-SPECIFIC VALUES ######################
# define file name prefix
prefix = os.getenv("PREFIX") # "/data/" if in HF or "data/" if local
tx = os.getenv("TX")
overwrite = os.getenv("OVERWRITE")
# Assistants API prompts
bullet_instructions = os.getenv("PROMPT_BULLET")
##################################################################
########################### END OF HF SECRETS ####################
# Get dateTime string to build a filename reflecting the UserID + Timestamp
dt = datetime.now()
dt_string = str(dt)
# User inactivity check interval in seconds
CHECK_INTERVAL = 10
# User inactivity timeout in seconds (initially set to 1 minute)
USER_TIMEOUT = 1 * 60
# Keep track of the last interaction time, current user_id, and the last file modification time
last_interaction_time = time.time()
user_id = "NOTLOGGEDIN"
last_file_mod_time = None
# Function to determine whether user is new or returning, and generate the appropriate SYSTEM PROMPT
def get_system_prompt(user_id):
past_summary_bullet = ""
pics_file = ""
user_summ_file_bullet = f"{prefix}{coach_code}-{user_id.upper().replace(' ', '')}-bullet.txt"
pics_file_path = f"{prefix}{coach_code}-{user_id.upper().replace(' ', '')}-pics.txt"
# Create pics file if not exist
if not os.path.exists(pics_file_path):
pics_file = "The user has not shared any images."
with open(pics_file_path, "w", encoding="UTF-8") as file:
file.write(pics_file)
if os.path.exists(pics_file_path):
with open(pics_file_path, "r", encoding="UTF-8") as file:
user_pics_content = file.read().strip()
# Check if the bullet summary file exists and read from it
if os.path.exists(user_summ_file_bullet):
with open(user_summ_file_bullet, "r", encoding="UTF-8") as file:
past_summary_bullet = file.read().strip() # Read the contents of the summary file
# past_summary = past_summary_bullet + "\n" + past_summary_summ
past_summary = past_summary_bullet
# System prompt for returning users includes past summary
return [
{"role": "system", "content": sys_prompt_hist + past_summary + "The user may have shared pictures with you. If so, each picture will have a title and detailed description and would be summarized here: " + user_pics_content}
]
else:
# System prompt for new users
return [
{"role": "system", "content": "IDENTITY: " + sys_prompt_new + " If the user asks about sharing or uploading pictures, refer them to the 'DatingFinesse.com' website for instructions."}
]
# Function that returns the modification time of the user's history file or None if it does not exist
def get_user_hist_file_mod_time(user_id):
user_hist_file = f"{prefix}{coach_code}-{user_id.upper().replace(' ', '')}.txt"
if os.path.exists(user_hist_file):
return os.path.getmtime(user_hist_file)
else:
return None
# Function to be run in the case of user timeout
############################## LAUNCH DUAL SUMMARIZER ASSISTANT
def on_timeout(user_id):
global last_file_mod_time
current_mod_time = get_user_hist_file_mod_time(user_id)
if current_mod_time is not None and (last_file_mod_time is None or current_mod_time > last_file_mod_time):
last_file_mod_time = current_mod_time
# Construct the filename for the user's bullet file
user_summ_file_bullet = f"{prefix}{coach_code}-{user_id.upper().replace(' ', '')}-bullet.txt"
# Construct the filename for the user's transcript file
transcript_file_path = f"{prefix}{coach_code}-{user_id}-transcript.txt"
if os.path.exists(transcript_file_path):
with open(transcript_file_path, "r", encoding="UTF-8") as file:
transcript = file.read().strip()
else:
transcript = ""
assistant = client.beta.assistants.create(
name="Summarizer_Bullet",
instructions=bullet_instructions + transcript,
model="gpt-4o"
)
assistant_id = assistant.id
# Create a conversation thread with the assistant
thread = client.beta.threads.create(
messages=[
{
"role": "user",
"content": " ", # Start the thread with a greeting message. CANNOT BE EMPTY ""
}
]
)
# Start a run to get the assistant's reply
run = client.beta.threads.runs.create(
thread_id=thread.id,
assistant_id=assistant.id
# The 'instructions' parameter is commented out, but indicates what kind of responses are expected from the assistant.
)
# Function to wait for the run to complete before fetching the response
def wait_on_run(run, thread):
while run.status == "queued" or run.status == "in_progress":
# Poll for the run's completion status
run = client.beta.threads.runs.retrieve(
thread_id=thread.id,
run_id=run.id,
)
time.sleep(0.5) # Sleep for 0.5 second between each poll to avoid hammering the API
return run
# Call the function to wait for the run to complete
run = wait_on_run(run, thread)
# Get the messages from the thread
messages = client.beta.threads.messages.list(
thread_id=thread.id
)
# Print out all messages in reverse order (to show the conversation flow correctly)
for message in reversed(messages.data):
try:
print(message.content[0].text.value)
except (IndexError, AttributeError) as e:
print(f"An error occurred while accessing message content: {e}")
# Open the file for appending messages
with open(user_summ_file_bullet, "a", encoding="UTF-8") as file:
# Add a separator between previous and new bullet points
file.write("\n--- New Conversation ---")
# Loop through each message and append its contents
for message in reversed(messages.data):
try:
# Attempt to write message content
file.write(message.content[0].text.value + "\n")
except IndexError:
# Handle cases where message.content list is not as expected
file.write("\n")
except AttributeError:
# Handle cases where .text or .value attributes are not present
file.write("An error occurred while accessing message text attributes.\n")
client.beta.assistants.delete(assistant_id)
# Delete the TRANSCRIPT file
if os.path.exists(f"{prefix}{coach_code}-{user_id}-transcript.txt"):
os.remove(f"{prefix}{coach_code}-{user_id}-transcript.txt")
user_id = "NOTLOGGEDIN"
############################## END OF SUMMARIZER ASSISTANT
############################## USER INACTIVITY CHECK
# Function to keep checking for user inactivity
def check_for_inactivity():
global last_interaction_time, user_id, USER_TIMEOUT
while True:
time.sleep(CHECK_INTERVAL)
current_time = time.time()
if current_time - last_interaction_time > USER_TIMEOUT and os.path.exists(f"{prefix}{coach_code}-{user_id}-transcript.txt") and user_id is not None:
on_timeout(user_id)
# Reset the last interaction time (WHY???)
# last_interaction_time = current_time
# Initialize and start the inactivity check thread
inactivity_thread = threading.Thread(target=check_for_inactivity)
inactivity_thread.daemon = True
inactivity_thread.start()
#### UPDATE FILE
def update_file(file_path, content):
try:
# Create a backup of the existing file
backup_file_path = file_path + "-OLD"
if os.path.exists(file_path):
# If the backup file already exists, delete it
if os.path.exists(backup_file_path):
os.remove(backup_file_path)
# Rename the original file to the backup file
os.rename(file_path, backup_file_path)
# Write the new content to the file
with open(file_path, "w", encoding="UTF-8") as file:
file.write(content)
return "File updated successfully. Backup created as " + backup_file_path
except Exception as e:
return f"Error updating file: {str(e)}"
############### CHAT ###################
def predict(user_input, history):
max_length = 2000
if len(user_input) > max_length:
user_input = ""
global last_interaction_time, user_id, last_file_mod_time
# ... [other global variables that are used]
# Update the last interaction time
last_interaction_time = time.time()
user_hist_file = f"{prefix}{coach_code}-{user_id}.txt"
user_summ_file_bullet = f"{prefix}{coach_code}-{user_id.upper().replace(' ', '')}-bullet.txt"
pics_file_path = f"{prefix}{coach_code}-{user_id}-pics.txt"
####### READ #######
if user_input == tx + coach_code:
try:
# Prepare the transcript for the Textbox output
if os.path.exists(user_summ_file_bullet):
with open(user_summ_file_bullet, "r", encoding="UTF-8") as file:
output = file.read()
yield output
return
except FileNotFoundError:
yield "File '" + user_summ_file_bullet + "' not found."
return
if user_input == tx + coach_code + "hist":
try:
# Prepare the transcript for the Textbox output
if os.path.exists(user_hist_file):
with open(user_hist_file, "r", encoding="UTF-8") as file:
output = file.read()
yield output
return
except FileNotFoundError:
yield "File '" + user_hist_file + "' not found."
return
if user_input == tx + coach_code + "pics":
try:
# Prepare the transcript for the Textbox output
if os.path.exists(pics_file_path):
with open(pics_file_path, "r", encoding="UTF-8") as file:
output = file.read()
yield output
return
except FileNotFoundError:
yield "File '" + pics_file_path + "' not found."
return
####### UPDATE #######
if user_input.startswith(overwrite + "bull:"):
try:
file_path, content = user_input[12:].split("|", 1)
file_path = user_summ_file_bullet.strip()
content = content.strip()
if file_path and content:
result = update_file(file_path, content)
yield result
return
else:
yield "Invalid format. Please use 'update_file: <file_path> | <content>'."
return
except Exception as e:
yield f"Error updating file: {str(e)}"
return
if user_input.startswith(overwrite + "pics:"):
try:
file_path, content = user_input[12:].split("|", 1)
file_path = pics_file_path.strip()
content = content.strip()
if file_path and content:
result = update_file(file_path, content)
yield result
return
else:
yield "Invalid format. Please use 'update_file: <file_path> | <content>'."
return
except Exception as e:
yield f"Error updating file: {str(e)}"
return
if user_id == "NOTLOGGEDIN":
raise gr.Error(f"You must be LOGGED IN to Chat. Refresh the page and try again.")
if user_id == "":
raise gr.Error(f"You must be LOGGED IN to Chat. Refresh the page and try again.")
if user_id == None:
raise gr.Error(f"You must be LOGGED IN to Chat. Refresh the page and try again.")
if user_hist_file == prefix + coach_code + "-None.txt":
raise gr.Error(f"You must be LOGGED IN to Chat. Refresh the page and try again.")
if user_hist_file == prefix + coach_code + "-.txt":
raise gr.Error(f"You must be LOGGED IN to Chat. Refresh the page and try again.")
# Determine appropriate system prompt based on new or returning user
# This is a placeholder line, replace it with your actual function logic that creates the prompt message
system_prompt = get_system_prompt(user_id)
history_openai_format = system_prompt
for human, assistant in history:
history_openai_format.append({"role": "user", "content": human })
history_openai_format.append({"role": "assistant", "content":assistant})
history_openai_format.append({"role": "user", "content": user_input})
completion = client.chat.completions.create(
model=openai_model,
messages= history_openai_format,
temperature=1.0,
frequency_penalty=0.4,
presence_penalty=0.1,
stream=True
)
output_stream = ""
for chunk in completion:
if chunk.choices[0].delta.content is not None:
output_stream = output_stream + (chunk.choices[0].delta.content)
yield output_stream
message_content = output_stream
# Prepare the transcript for the Textbox output
transcript_file_path = f"{prefix}{coach_code}-{user_id}-transcript.txt"
if os.path.exists(transcript_file_path):
with open(transcript_file_path, "r", encoding="UTF-8") as file:
transcript = file.read()
else:
transcript = "START OF CONVERSATION - Date/Time: " + datetime.now().strftime("%Y-%m-%d %H:%M:%S") + "\n\n"
# Append latest user and assistant messages to the transcript
transcript += f"YOU: {user_input}\n"
transcript += f"{coach_name_upper}: {message_content}\n\n"
# Write the updated transcript to the file
with open(transcript_file_path, "w", encoding="UTF-8") as file:
file.write(transcript)
# Get the last exchange (the user input and the assistant's message)
last_exchange = f"User: {user_input}\nAssistant: {message_content}\n\n"
# Write the last exchange to the history file
with open(user_hist_file, "a+", encoding="UTF-8") as file:
file.write(last_exchange)
return message_content, transcript
# Function to update transcript
def update_transcript(transcript_update, user_id):
transcript_file_path = f"{prefix}{coach_code}-{user_id}-transcript.txt"
if os.path.exists(transcript_file_path):
with open(transcript_file_path, "r", encoding="UTF-8") as file:
transcript = file.read()
else:
transcript = "START OF CONVERSATION - Date/Time: " + datetime.now().strftime("%Y-%m-%d %H:%M:%S") + "\n\n"
transcript += transcript_update
with open(transcript_file_path, "w", encoding="UTF-8") as file:
file.write(transcript)
return transcript
def get_user_summary(name):
global user_id
name_cap = str(name)
user_id = name.strip().upper().replace(" ", "")
name_cap = name_cap.title()
summary = "Welcome! Please proceed to the '" + coach_name_long + "' tab (at top of chatbot) to chat."
return summary, admin()
def admin():
if user_id == "POIPOIPOI":
# Set the directory path
directory_path = "data" # different on huggingface
# Get a list of all files in the directory
file_list = os.listdir(directory_path)
# Print the list of files
for file_name in file_list:
print(file_name)
#GUI
with gr.Blocks(theme) as demo:
output_choice = gr.State('Text') # Set 'Text' as the default state value
with gr.Tabs() as tabs:
with gr.TabItem("Log In"):
username_input = gr.Textbox(label="Username (REQUIRED):", placeholder="Enter your USERNAME", type="text")
password_input = gr.Textbox(label="Emailed Code (REQUIRED):", placeholder="Enter CODE from your email", type="password")
# The submit_name function will expect two inputs
name = ""
password = ""
def submit_name(name, password):
name = name + "7777"
name = name.upper()
pwd = password
pwd = pwd.upper()
if name != pwd:
name = "NOTLOGGEDIN"
raise gr.Error(f"You must be LOGGED IN to Chat. Refresh the page, log in, and try again.")
if name == "":
name = "NOTLOGGEDIN"
raise gr.Error(f"You must be LOGGED IN to Chat. Refresh the page, log in, and try again.")
else:
name = name[:-4]
user_summary = get_user_summary(name)
return user_summary[0] if isinstance(user_summary, tuple) else user_summary
submit_name_button = gr.Button("Log in")
status_text = gr.Label(label="", container=False)
submit_name_button.click(
fn=submit_name,
inputs=[username_input, password_input],
outputs=[status_text]
)
# Attach submit event to both the username and password fields
username_input.submit(
fn=submit_name,
inputs=[username_input, password_input],
outputs=[status_text]
)
# Make sure this is done for the password field as well
password_input.submit(
fn=submit_name,
inputs=[username_input, password_input],
outputs=[status_text]
)
with gr.TabItem(coach_name_long):
gr.ChatInterface(
predict,
submit_btn="Chat with " + coach_name_short,
retry_btn=None,
undo_btn=None,
clear_btn=None,
css=".gradio-container {height: 1600px !important;}" # Add custom CSS
)
demo.launch(show_api=False)