File size: 12,696 Bytes
132a30e 530b312 f72d32b cf24a85 d4c6b6c 530b312 d4c6b6c f72d32b 6ef2d0a f72d32b 6ef2d0a f72d32b 6ef2d0a f72d32b 6ef2d0a 698100a 01d333a a7246ad 698100a 01d333a a10389f 8ea0861 70f9fdc a10389f 698100a 70f9fdc 8ea0861 7ff0301 3a986c3 d4c6b6c a7246ad d4c6b6c 530b312 eecafb7 6ef2d0a f72d32b 6ef2d0a a7246ad 7ff0301 530b312 d4c6b6c f72d32b 698100a 530b312 7ff0301 530b312 e26210c f72d32b f2c4a3b f72d32b 6ef2d0a 530b312 6ef2d0a f72d32b 8ea0861 6ef2d0a f72d32b 530b312 d4c6b6c a7246ad 698100a |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 |
import gradio as gr
import os
from openai import OpenAI
from datetime import datetime
import threading
import time
import os.path
import requests
# Set the ElevenLabs API key using an environment variable
elevenlabs_api_key = os.getenv("ELEVENLABS_API_KEY")
# User inactivity check interval in seconds
CHECK_INTERVAL = 10
# User inactivity timeout in seconds (initially set to 4 minutes)
USER_TIMEOUT = 4 * 60
# Keep track of the last interaction time, current user_id, and the last file modification time
last_interaction_time = time.time()
current_user_id = None
last_file_mod_time = None
# Function to determine whether user is new or returning, and generate the appropriate SYSTEM PROMPT
def get_system_prompt(user_id):
user_summ_file = f"jcTSS-{user_id.lower().replace(' ', '')}-summ.txt"
if os.path.exists(user_summ_file):
with open(user_summ_file, "r", encoding="UTF-8") as file:
past_summary = file.read().strip() # Read the contents of the summary file
# System prompt for returning users includes past summary
return [
{"role": "system", "content": os.getenv("PROMPT1") + user_id + os.getenv("PROMPT2") + past_summary}
]
else:
# System prompt for new users
return [
{"role": "system", "content": os.getenv("PROMPT1") + user_id + os.getenv("PROMPT3")}
]
# Function that returns the modification time of the user's history file or None if it does not exist
def get_user_hist_file_mod_time(user_id):
user_hist_file = f"jcTSS-{user_id.lower().replace(' ', '')}.txt"
if os.path.exists(user_hist_file):
return os.path.getmtime(user_hist_file)
else:
return None
# Function to be run in the case of user timeout
############################## LAUNCH SUMMARIZER ASSISTANT
def on_timeout(user_id):
global last_file_mod_time
current_mod_time = get_user_hist_file_mod_time(user_id)
if current_mod_time is not None and (last_file_mod_time is None or current_mod_time > last_file_mod_time):
#print(f"User with ID {user_id} has been inactive, but the history file was updated. Running timeout script.")
last_file_mod_time = current_mod_time
# Insert your timeout script here
# Initialize the OpenAI API client with the provided API key
client = OpenAI(api_key=os.getenv("OPENAI_API_KEY")) # This is your API key, be careful not to share it publicly.
# Construct the filename for the user's summary file & user's history file
user_summ_file = "jcTSS-" + current_user_id + "-summ.txt" # Filename where the user history summary will be stored.
user_hist_file = "jcTSS-" + current_user_id + ".txt" # Filename where the user history is stored.
if os.path.exists(user_hist_file):
with open(user_hist_file, "r", encoding="UTF-8") as file:
user_hist = file.read().strip() # Remove leading/trailing whitespace
assistant = client.beta.assistants.create(
name="GP-Summarizer",
instructions=os.getenv("ASST_PROMPT") + user_hist,
model="gpt-3.5-turbo-1106"
)
####################################################
# Function to read last N lines of the file
#def LastNlines(fname, N):
# opening file using with() method so that file get closed after completing work
#with open(fname) as file:
# loop to read last N lines and print it
#for line in (file.readlines() [-N:]):
#print(line, end ='')
#################################
assistant_id=assistant.id
# Create a conversation thread with the assistant
thread = client.beta.threads.create(
messages=[
{
"role": "user",
"content": "", # Start the thread with a greeting message
}
]
)
# Start a run to get the assistant's reply
run = client.beta.threads.runs.create(
thread_id=thread.id,
assistant_id=assistant.id
# The 'instructions' parameter is commented out, but indicates what kind of responses are expected from the assistant.
)
# Function to wait for the run to complete before fetching the response
def wait_on_run(run, thread):
while run.status == "queued" or run.status == "in_progress":
# Poll for the run's completion status
run = client.beta.threads.runs.retrieve(
thread_id=thread.id,
run_id=run.id,
)
time.sleep(0.5) # Sleep for 0.5 second between each poll to avoid hammering the API
return run
# Call the function to wait for the run to complete
run = wait_on_run(run, thread)
# Get the messages from the thread
messages = client.beta.threads.messages.list(
thread_id=thread.id
)
# Print out all messages in reverse order (to show the conversation flow correctly)
for message in reversed(messages.data):
print(message.content[0].text.value)
# create user summary file exists
with open(user_summ_file, "w", encoding="UTF-8") as file:
file.write("Date/Time: " + datetime.now().strftime("%Y-%m-%d %H:%M:%S") + "\n\n")
for message in reversed(messages.data):
file.write(message.content[0].text.value)
new_user="False"
client.beta.assistants.delete(assistant_id)
# Delete the TRANSCRIPT file
if os.path.exists(f"jcTSS-{current_user_id}-transcript.txt"):
os.remove(f"jcTSS-{current_user_id}-transcript.txt")
else:
#print("New User")
new_user="True"
############################## USER INACTIVITY CHECK
# Function to keep checking for user inactivity
def check_for_inactivity():
global last_interaction_time, current_user_id, USER_TIMEOUT
while True:
time.sleep(CHECK_INTERVAL)
current_time = time.time()
if current_time - last_interaction_time > USER_TIMEOUT and current_user_id is not None:
on_timeout(current_user_id)
# Reset the last interaction time
last_interaction_time = current_time
# Initialize and start the inactivity check thread
inactivity_thread = threading.Thread(target=check_for_inactivity)
inactivity_thread.daemon = True
inactivity_thread.start()
# Get dateTime strubg to build a filename reflecting the UserID + Timestamp
dt = datetime.now()
dt_string = str(dt)
# Initialize OpenAI API client with API key
client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))
# Define a generate_speech function that updates the last_interaction_time
# Update the generate_speech function to reset the last_file_mod_time when the user interacts with the server
def generate_speech(name, input_text):
global last_interaction_time, current_user_id, last_file_mod_time
# Check if the name field is empty and return a message if it is
if not name.strip():
return None, "Please enter your FIRST NAME to continue."
# Update the last interaction time and file mod time
last_interaction_time = time.time()
# Assign the current user_id and check if user has changed NAME
user_id = name.lower().replace(" ", "")
if current_user_id is not None and user_id != current_user_id:
user_id = current_user_id
return None, "Do not change your name DURING a session."
else:
current_user_id = user_id
user_hist_file = f"jcTSS-{user_id}.txt"
last_file_mod_time = get_user_hist_file_mod_time(user_id)
# Check if the user's history file exists and read or create accordingly
if os.path.exists(user_hist_file):
with open(user_hist_file, "r", encoding="UTF-8") as file:
user_hist = file.read().strip()
else:
with open(user_hist_file, "w", encoding="UTF-8") as file:
file.write("User ID: " + user_id)
# Determine appropriate system prompt based on new or returning user
history_openai_format = get_system_prompt(current_user_id)
# Append user message to history with the name included
#input_text1 = f"I'm {name}. " + input_text if name else input_text
input_text1 = input_text
history_openai_format.append({"role": "user", "content": input_text1})
# Build completion with OpenAI using the accumulated history
completion = client.chat.completions.create(
model="gpt-3.5-turbo-1106",
messages=history_openai_format
)
# Extract generated text (response by the assistant) from completion
message_content = completion.choices[0].message.content.strip()
# Remove "Johnny" from the beginning of the assistant's message if present
if message_content.lower().startswith("johnny"):
# Strip the leading "Johnny" from the message content
message_content = message_content[6:].strip()
# Append assistant's message to history
history_openai_format.append({"role": "assistant", "content": message_content})
# Use ElevenLabs TTS API settings and request for the latest assistant response
url = os.getenv("URL1")
headers = {
"Accept": "audio/mpeg",
"Content-Type": "application/json",
"xi-api-key": elevenlabs_api_key
}
data = {
"text": message_content,
"model_id": "eleven_multilingual_v2",
"voice_settings": {
"stability": 1.0,
"similarity_boost": 1.0,
"excitement": 0.9,
"speed": 1.1,
"volume": 80,
"pitch": 2.0,
"breathiness": 0.8,
"voice_id": os.getenv("VOICE_ID")
}
}
# Send the request to ElevenLabs API
response = requests.post(url, json=data, headers=headers)
"""
# Error handling causing problems
# Return the response content if successful, otherwise print error details
if response.status_code == 200:
# Return only the audio of the latest assistant message
return response.content
else:
print("Error with ElevenLabs API:", response.status_code, response.text)
raise Exception(f"Failed to generate speech, status code: {response.status_code}, response: {response.text}")
"""
# Prepare the transcript for the Textbox output
# Read the existing transcript from file if exists or initialize an empty transcript
transcript_file_path = f"jcTSS-{current_user_id}-transcript.txt"
if os.path.exists(transcript_file_path):
with open(transcript_file_path, "r", encoding="UTF-8") as file:
transcript = file.read()
else:
transcript = "Date/Time: " + datetime.now().strftime("%Y-%m-%d %H:%M:%S") + "\n\n"
# Append latest user and assistant messages to the transcript
#user_input = f"I'm {name}. " + input_text if name else input_text
user_input = input_text
transcript += f"GUEST: {user_input}\n"
transcript += f"JOHNNY: {message_content}\n\n"
# Write the updated transcript to the history file
with open(transcript_file_path, "w", encoding="UTF-8") as file:
file.write(transcript)
# Write the user and assistant messages to the history file after the exchange
with open(user_hist_file, "a+", encoding="UTF-8") as file:
file.write("\n\nDate/Time: " + datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
for message in history_openai_format[-2:]: # Last 2 messages include the user and assistant responses
file.write(f"\n{message['role'].title()}: {message['content']}")
# Return the binary audio data and the transcript
return response.content, transcript
##########LAUNCH THE GRADIO APP
# Define the Gradio interface with inputs for name and user text
iface = gr.Interface(
fn=generate_speech,
inputs=[
gr.Textbox(label="Your Name (REQUIRED):", placeholder="Enter your FIRST NAME"),
gr.Textbox(label="Your question or comment for Johnny:")
],
outputs=[gr.Audio(autoplay=True, label="Johnny's response:"), gr.Textbox(label="Transcript", max_lines=12, autoscroll="True", show_copy_button="True")],
live=False,
allow_flagging="never"
)
# Launch the interface
iface.launch(show_api=False) |