Johnny / app.py
PhilSpiel's picture
Update app.py
3a986c3
import gradio as gr
import os
from openai import OpenAI
from datetime import datetime
import threading
import time
import os.path
import requests
# Set the ElevenLabs API key using an environment variable
elevenlabs_api_key = os.getenv("ELEVENLABS_API_KEY")
# User inactivity check interval in seconds
CHECK_INTERVAL = 10
# User inactivity timeout in seconds (initially set to 4 minutes)
USER_TIMEOUT = 4 * 60
# Keep track of the last interaction time, current user_id, and the last file modification time
last_interaction_time = time.time()
current_user_id = None
last_file_mod_time = None
# Function to determine whether user is new or returning, and generate the appropriate SYSTEM PROMPT
def get_system_prompt(user_id):
user_summ_file = f"jcTSS-{user_id.lower().replace(' ', '')}-summ.txt"
if os.path.exists(user_summ_file):
with open(user_summ_file, "r", encoding="UTF-8") as file:
past_summary = file.read().strip() # Read the contents of the summary file
# System prompt for returning users includes past summary
return [
{"role": "system", "content": os.getenv("PROMPT1") + user_id + os.getenv("PROMPT2") + past_summary}
]
else:
# System prompt for new users
return [
{"role": "system", "content": os.getenv("PROMPT1") + user_id + os.getenv("PROMPT3")}
]
# Function that returns the modification time of the user's history file or None if it does not exist
def get_user_hist_file_mod_time(user_id):
user_hist_file = f"jcTSS-{user_id.lower().replace(' ', '')}.txt"
if os.path.exists(user_hist_file):
return os.path.getmtime(user_hist_file)
else:
return None
# Function to be run in the case of user timeout
############################## LAUNCH SUMMARIZER ASSISTANT
def on_timeout(user_id):
global last_file_mod_time
current_mod_time = get_user_hist_file_mod_time(user_id)
if current_mod_time is not None and (last_file_mod_time is None or current_mod_time > last_file_mod_time):
#print(f"User with ID {user_id} has been inactive, but the history file was updated. Running timeout script.")
last_file_mod_time = current_mod_time
# Insert your timeout script here
# Initialize the OpenAI API client with the provided API key
client = OpenAI(api_key=os.getenv("OPENAI_API_KEY")) # This is your API key, be careful not to share it publicly.
# Construct the filename for the user's summary file & user's history file
user_summ_file = "jcTSS-" + current_user_id + "-summ.txt" # Filename where the user history summary will be stored.
user_hist_file = "jcTSS-" + current_user_id + ".txt" # Filename where the user history is stored.
if os.path.exists(user_hist_file):
with open(user_hist_file, "r", encoding="UTF-8") as file:
user_hist = file.read().strip() # Remove leading/trailing whitespace
assistant = client.beta.assistants.create(
name="GP-Summarizer",
instructions=os.getenv("ASST_PROMPT") + user_hist,
model="gpt-3.5-turbo-1106"
)
####################################################
# Function to read last N lines of the file
#def LastNlines(fname, N):
# opening file using with() method so that file get closed after completing work
#with open(fname) as file:
# loop to read last N lines and print it
#for line in (file.readlines() [-N:]):
#print(line, end ='')
#################################
assistant_id=assistant.id
# Create a conversation thread with the assistant
thread = client.beta.threads.create(
messages=[
{
"role": "user",
"content": "", # Start the thread with a greeting message
}
]
)
# Start a run to get the assistant's reply
run = client.beta.threads.runs.create(
thread_id=thread.id,
assistant_id=assistant.id
# The 'instructions' parameter is commented out, but indicates what kind of responses are expected from the assistant.
)
# Function to wait for the run to complete before fetching the response
def wait_on_run(run, thread):
while run.status == "queued" or run.status == "in_progress":
# Poll for the run's completion status
run = client.beta.threads.runs.retrieve(
thread_id=thread.id,
run_id=run.id,
)
time.sleep(0.5) # Sleep for 0.5 second between each poll to avoid hammering the API
return run
# Call the function to wait for the run to complete
run = wait_on_run(run, thread)
# Get the messages from the thread
messages = client.beta.threads.messages.list(
thread_id=thread.id
)
# Print out all messages in reverse order (to show the conversation flow correctly)
for message in reversed(messages.data):
print(message.content[0].text.value)
# create user summary file exists
with open(user_summ_file, "w", encoding="UTF-8") as file:
file.write("Date/Time: " + datetime.now().strftime("%Y-%m-%d %H:%M:%S") + "\n\n")
for message in reversed(messages.data):
file.write(message.content[0].text.value)
new_user="False"
client.beta.assistants.delete(assistant_id)
# Delete the TRANSCRIPT file
if os.path.exists(f"jcTSS-{current_user_id}-transcript.txt"):
os.remove(f"jcTSS-{current_user_id}-transcript.txt")
else:
#print("New User")
new_user="True"
############################## USER INACTIVITY CHECK
# Function to keep checking for user inactivity
def check_for_inactivity():
global last_interaction_time, current_user_id, USER_TIMEOUT
while True:
time.sleep(CHECK_INTERVAL)
current_time = time.time()
if current_time - last_interaction_time > USER_TIMEOUT and current_user_id is not None:
on_timeout(current_user_id)
# Reset the last interaction time
last_interaction_time = current_time
# Initialize and start the inactivity check thread
inactivity_thread = threading.Thread(target=check_for_inactivity)
inactivity_thread.daemon = True
inactivity_thread.start()
# Get dateTime strubg to build a filename reflecting the UserID + Timestamp
dt = datetime.now()
dt_string = str(dt)
# Initialize OpenAI API client with API key
client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))
# Define a generate_speech function that updates the last_interaction_time
# Update the generate_speech function to reset the last_file_mod_time when the user interacts with the server
def generate_speech(name, input_text):
global last_interaction_time, current_user_id, last_file_mod_time
# Check if the name field is empty and return a message if it is
if not name.strip():
return None, "Please enter your FIRST NAME to continue."
# Update the last interaction time and file mod time
last_interaction_time = time.time()
# Assign the current user_id and check if user has changed NAME
user_id = name.lower().replace(" ", "")
if current_user_id is not None and user_id != current_user_id:
user_id = current_user_id
return None, "Do not change your name DURING a session."
else:
current_user_id = user_id
user_hist_file = f"jcTSS-{user_id}.txt"
last_file_mod_time = get_user_hist_file_mod_time(user_id)
# Check if the user's history file exists and read or create accordingly
if os.path.exists(user_hist_file):
with open(user_hist_file, "r", encoding="UTF-8") as file:
user_hist = file.read().strip()
else:
with open(user_hist_file, "w", encoding="UTF-8") as file:
file.write("User ID: " + user_id)
# Determine appropriate system prompt based on new or returning user
history_openai_format = get_system_prompt(current_user_id)
# Append user message to history with the name included
#input_text1 = f"I'm {name}. " + input_text if name else input_text
input_text1 = input_text
history_openai_format.append({"role": "user", "content": input_text1})
# Build completion with OpenAI using the accumulated history
completion = client.chat.completions.create(
model="gpt-3.5-turbo-1106",
messages=history_openai_format
)
# Extract generated text (response by the assistant) from completion
message_content = completion.choices[0].message.content.strip()
# Remove "Johnny" from the beginning of the assistant's message if present
if message_content.lower().startswith("johnny"):
# Strip the leading "Johnny" from the message content
message_content = message_content[6:].strip()
# Append assistant's message to history
history_openai_format.append({"role": "assistant", "content": message_content})
# Use ElevenLabs TTS API settings and request for the latest assistant response
url = os.getenv("URL1")
headers = {
"Accept": "audio/mpeg",
"Content-Type": "application/json",
"xi-api-key": elevenlabs_api_key
}
data = {
"text": message_content,
"model_id": "eleven_multilingual_v2",
"voice_settings": {
"stability": 1.0,
"similarity_boost": 1.0,
"excitement": 0.9,
"speed": 1.1,
"volume": 80,
"pitch": 2.0,
"breathiness": 0.8,
"voice_id": os.getenv("VOICE_ID")
}
}
# Send the request to ElevenLabs API
response = requests.post(url, json=data, headers=headers)
"""
# Error handling causing problems
# Return the response content if successful, otherwise print error details
if response.status_code == 200:
# Return only the audio of the latest assistant message
return response.content
else:
print("Error with ElevenLabs API:", response.status_code, response.text)
raise Exception(f"Failed to generate speech, status code: {response.status_code}, response: {response.text}")
"""
# Prepare the transcript for the Textbox output
# Read the existing transcript from file if exists or initialize an empty transcript
transcript_file_path = f"jcTSS-{current_user_id}-transcript.txt"
if os.path.exists(transcript_file_path):
with open(transcript_file_path, "r", encoding="UTF-8") as file:
transcript = file.read()
else:
transcript = "Date/Time: " + datetime.now().strftime("%Y-%m-%d %H:%M:%S") + "\n\n"
# Append latest user and assistant messages to the transcript
#user_input = f"I'm {name}. " + input_text if name else input_text
user_input = input_text
transcript += f"GUEST: {user_input}\n"
transcript += f"JOHNNY: {message_content}\n\n"
# Write the updated transcript to the history file
with open(transcript_file_path, "w", encoding="UTF-8") as file:
file.write(transcript)
# Write the user and assistant messages to the history file after the exchange
with open(user_hist_file, "a+", encoding="UTF-8") as file:
file.write("\n\nDate/Time: " + datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
for message in history_openai_format[-2:]: # Last 2 messages include the user and assistant responses
file.write(f"\n{message['role'].title()}: {message['content']}")
# Return the binary audio data and the transcript
return response.content, transcript
##########LAUNCH THE GRADIO APP
# Define the Gradio interface with inputs for name and user text
iface = gr.Interface(
fn=generate_speech,
inputs=[
gr.Textbox(label="Your Name (REQUIRED):", placeholder="Enter your FIRST NAME"),
gr.Textbox(label="Your question or comment for Johnny:")
],
outputs=[gr.Audio(autoplay=True, label="Johnny's response:"), gr.Textbox(label="Transcript", max_lines=12, autoscroll="True", show_copy_button="True")],
live=False,
allow_flagging="never"
)
# Launch the interface
iface.launch(show_api=False)