import gradio as gr import os from openai import OpenAI from datetime import datetime import threading import time import os.path import requests # Set the ElevenLabs API key using an environment variable elevenlabs_api_key = os.getenv("ELEVENLABS_API_KEY") # User inactivity check interval in seconds CHECK_INTERVAL = 10 # User inactivity timeout in seconds (initially set to 4 minutes) USER_TIMEOUT = 4 * 60 # Keep track of the last interaction time, current user_id, and the last file modification time last_interaction_time = time.time() current_user_id = None last_file_mod_time = None # Function to determine whether user is new or returning, and generate the appropriate SYSTEM PROMPT def get_system_prompt(user_id): user_summ_file = f"jcTSS-{user_id.lower().replace(' ', '')}-summ.txt" if os.path.exists(user_summ_file): with open(user_summ_file, "r", encoding="UTF-8") as file: past_summary = file.read().strip() # Read the contents of the summary file # System prompt for returning users includes past summary return [ {"role": "system", "content": os.getenv("PROMPT1") + user_id + os.getenv("PROMPT2") + past_summary} ] else: # System prompt for new users return [ {"role": "system", "content": os.getenv("PROMPT1") + user_id + os.getenv("PROMPT3")} ] # Function that returns the modification time of the user's history file or None if it does not exist def get_user_hist_file_mod_time(user_id): user_hist_file = f"jcTSS-{user_id.lower().replace(' ', '')}.txt" if os.path.exists(user_hist_file): return os.path.getmtime(user_hist_file) else: return None # Function to be run in the case of user timeout ############################## LAUNCH SUMMARIZER ASSISTANT def on_timeout(user_id): global last_file_mod_time current_mod_time = get_user_hist_file_mod_time(user_id) if current_mod_time is not None and (last_file_mod_time is None or current_mod_time > last_file_mod_time): #print(f"User with ID {user_id} has been inactive, but the history file was updated. Running timeout script.") last_file_mod_time = current_mod_time # Insert your timeout script here # Initialize the OpenAI API client with the provided API key client = OpenAI(api_key=os.getenv("OPENAI_API_KEY")) # This is your API key, be careful not to share it publicly. # Construct the filename for the user's summary file & user's history file user_summ_file = "jcTSS-" + current_user_id + "-summ.txt" # Filename where the user history summary will be stored. user_hist_file = "jcTSS-" + current_user_id + ".txt" # Filename where the user history is stored. if os.path.exists(user_hist_file): with open(user_hist_file, "r", encoding="UTF-8") as file: user_hist = file.read().strip() # Remove leading/trailing whitespace assistant = client.beta.assistants.create( name="GP-Summarizer", instructions=os.getenv("ASST_PROMPT") + user_hist, model="gpt-3.5-turbo-1106" ) #################################################### # Function to read last N lines of the file #def LastNlines(fname, N): # opening file using with() method so that file get closed after completing work #with open(fname) as file: # loop to read last N lines and print it #for line in (file.readlines() [-N:]): #print(line, end ='') ################################# assistant_id=assistant.id # Create a conversation thread with the assistant thread = client.beta.threads.create( messages=[ { "role": "user", "content": "", # Start the thread with a greeting message } ] ) # Start a run to get the assistant's reply run = client.beta.threads.runs.create( thread_id=thread.id, assistant_id=assistant.id # The 'instructions' parameter is commented out, but indicates what kind of responses are expected from the assistant. ) # Function to wait for the run to complete before fetching the response def wait_on_run(run, thread): while run.status == "queued" or run.status == "in_progress": # Poll for the run's completion status run = client.beta.threads.runs.retrieve( thread_id=thread.id, run_id=run.id, ) time.sleep(0.5) # Sleep for 0.5 second between each poll to avoid hammering the API return run # Call the function to wait for the run to complete run = wait_on_run(run, thread) # Get the messages from the thread messages = client.beta.threads.messages.list( thread_id=thread.id ) # Print out all messages in reverse order (to show the conversation flow correctly) for message in reversed(messages.data): print(message.content[0].text.value) # create user summary file exists with open(user_summ_file, "w", encoding="UTF-8") as file: file.write("Date/Time: " + datetime.now().strftime("%Y-%m-%d %H:%M:%S") + "\n\n") for message in reversed(messages.data): file.write(message.content[0].text.value) new_user="False" client.beta.assistants.delete(assistant_id) # Delete the TRANSCRIPT file if os.path.exists(f"jcTSS-{current_user_id}-transcript.txt"): os.remove(f"jcTSS-{current_user_id}-transcript.txt") else: #print("New User") new_user="True" ############################## USER INACTIVITY CHECK # Function to keep checking for user inactivity def check_for_inactivity(): global last_interaction_time, current_user_id, USER_TIMEOUT while True: time.sleep(CHECK_INTERVAL) current_time = time.time() if current_time - last_interaction_time > USER_TIMEOUT and current_user_id is not None: on_timeout(current_user_id) # Reset the last interaction time last_interaction_time = current_time # Initialize and start the inactivity check thread inactivity_thread = threading.Thread(target=check_for_inactivity) inactivity_thread.daemon = True inactivity_thread.start() # Get dateTime strubg to build a filename reflecting the UserID + Timestamp dt = datetime.now() dt_string = str(dt) # Initialize OpenAI API client with API key client = OpenAI(api_key=os.getenv("OPENAI_API_KEY")) # Define a generate_speech function that updates the last_interaction_time # Update the generate_speech function to reset the last_file_mod_time when the user interacts with the server def generate_speech(name, input_text): global last_interaction_time, current_user_id, last_file_mod_time # Check if the name field is empty and return a message if it is if not name.strip(): return None, "Please enter your FIRST NAME to continue." # Update the last interaction time and file mod time last_interaction_time = time.time() # Assign the current user_id and check if user has changed NAME user_id = name.lower().replace(" ", "") if current_user_id is not None and user_id != current_user_id: user_id = current_user_id return None, "Do not change your name DURING a session." else: current_user_id = user_id user_hist_file = f"jcTSS-{user_id}.txt" last_file_mod_time = get_user_hist_file_mod_time(user_id) # Check if the user's history file exists and read or create accordingly if os.path.exists(user_hist_file): with open(user_hist_file, "r", encoding="UTF-8") as file: user_hist = file.read().strip() else: with open(user_hist_file, "w", encoding="UTF-8") as file: file.write("User ID: " + user_id) # Determine appropriate system prompt based on new or returning user history_openai_format = get_system_prompt(current_user_id) # Append user message to history with the name included #input_text1 = f"I'm {name}. " + input_text if name else input_text input_text1 = input_text history_openai_format.append({"role": "user", "content": input_text1}) # Build completion with OpenAI using the accumulated history completion = client.chat.completions.create( model="gpt-3.5-turbo-1106", messages=history_openai_format ) # Extract generated text (response by the assistant) from completion message_content = completion.choices[0].message.content.strip() # Remove "Johnny" from the beginning of the assistant's message if present if message_content.lower().startswith("johnny"): # Strip the leading "Johnny" from the message content message_content = message_content[6:].strip() # Append assistant's message to history history_openai_format.append({"role": "assistant", "content": message_content}) # Use ElevenLabs TTS API settings and request for the latest assistant response url = os.getenv("URL1") headers = { "Accept": "audio/mpeg", "Content-Type": "application/json", "xi-api-key": elevenlabs_api_key } data = { "text": message_content, "model_id": "eleven_multilingual_v2", "voice_settings": { "stability": 1.0, "similarity_boost": 1.0, "excitement": 0.9, "speed": 1.1, "volume": 80, "pitch": 2.0, "breathiness": 0.8, "voice_id": os.getenv("VOICE_ID") } } # Send the request to ElevenLabs API response = requests.post(url, json=data, headers=headers) """ # Error handling causing problems # Return the response content if successful, otherwise print error details if response.status_code == 200: # Return only the audio of the latest assistant message return response.content else: print("Error with ElevenLabs API:", response.status_code, response.text) raise Exception(f"Failed to generate speech, status code: {response.status_code}, response: {response.text}") """ # Prepare the transcript for the Textbox output # Read the existing transcript from file if exists or initialize an empty transcript transcript_file_path = f"jcTSS-{current_user_id}-transcript.txt" if os.path.exists(transcript_file_path): with open(transcript_file_path, "r", encoding="UTF-8") as file: transcript = file.read() else: transcript = "Date/Time: " + datetime.now().strftime("%Y-%m-%d %H:%M:%S") + "\n\n" # Append latest user and assistant messages to the transcript #user_input = f"I'm {name}. " + input_text if name else input_text user_input = input_text transcript += f"GUEST: {user_input}\n" transcript += f"JOHNNY: {message_content}\n\n" # Write the updated transcript to the history file with open(transcript_file_path, "w", encoding="UTF-8") as file: file.write(transcript) # Write the user and assistant messages to the history file after the exchange with open(user_hist_file, "a+", encoding="UTF-8") as file: file.write("\n\nDate/Time: " + datetime.now().strftime("%Y-%m-%d %H:%M:%S")) for message in history_openai_format[-2:]: # Last 2 messages include the user and assistant responses file.write(f"\n{message['role'].title()}: {message['content']}") # Return the binary audio data and the transcript return response.content, transcript ##########LAUNCH THE GRADIO APP # Define the Gradio interface with inputs for name and user text iface = gr.Interface( fn=generate_speech, inputs=[ gr.Textbox(label="Your Name (REQUIRED):", placeholder="Enter your FIRST NAME"), gr.Textbox(label="Your question or comment for Johnny:") ], outputs=[gr.Audio(autoplay=True, label="Johnny's response:"), gr.Textbox(label="Transcript", max_lines=12, autoscroll="True", show_copy_button="True")], live=False, allow_flagging="never" ) # Launch the interface iface.launch(show_api=False)