import gradio as gr import os from openai import OpenAI from datetime import datetime import threading import time import os.path ################# Start PERSONA-SPECIFIC VALUES ###################### coach_code = os.getenv("COACH_CODE") coach_name_short = os.getenv("COACH_NAME_SHORT") coach_name_upper = os.getenv("COACH_NAME_UPPER") coach_name_long = os.getenv("COACH_NAME_LONG") sys_prompt_new = os.getenv("PROMPT_NEW") sys_prompt_hist = os.getenv("PROMPT_HIST") theme=os.getenv("THEME") ################# End PERSONA-SPECIFIC VALUES ###################### ################# Start OpenAI-SPECIFIC VALUES ###################### # Initialize OpenAI API client with API key client = OpenAI(api_key=os.getenv("OPENAI_API_KEY")) # OpenAI model openai_model = os.getenv("OPENAI_MODEL") ################# End OpenAI-SPECIFIC VALUES ###################### # define file name prefix prefix = os.getenv("PREFIX") # "/data/" if in HF or "data/" if local tx = os.getenv("TX") overwrite = os.getenv("OVERWRITE") # Assistants API prompts bullet_instructions = os.getenv("PROMPT_BULLET") ################################################################## ########################### END OF HF SECRETS #################### # Get dateTime string to build a filename reflecting the UserID + Timestamp dt = datetime.now() dt_string = str(dt) # User inactivity check interval in seconds CHECK_INTERVAL = 10 # User inactivity timeout in seconds (initially set to 1 minute) USER_TIMEOUT = 1 * 60 # Keep track of the last interaction time, current user_id, and the last file modification time last_interaction_time = time.time() user_id = "NOTLOGGEDIN" last_file_mod_time = None # Function to determine whether user is new or returning, and generate the appropriate SYSTEM PROMPT def get_system_prompt(user_id): past_summary_bullet = "" pics_file = "" user_summ_file_bullet = f"{prefix}{coach_code}-{user_id.upper().replace(' ', '')}-bullet.txt" pics_file_path = f"{prefix}{coach_code}-{user_id.upper().replace(' ', '')}-pics.txt" # Create pics file if not exist if not os.path.exists(pics_file_path): pics_file = "The user has not shared any images." with open(pics_file_path, "w", encoding="UTF-8") as file: file.write(pics_file) if os.path.exists(pics_file_path): with open(pics_file_path, "r", encoding="UTF-8") as file: user_pics_content = file.read().strip() # Check if the bullet summary file exists and read from it if os.path.exists(user_summ_file_bullet): with open(user_summ_file_bullet, "r", encoding="UTF-8") as file: past_summary_bullet = file.read().strip() # Read the contents of the summary file # past_summary = past_summary_bullet + "\n" + past_summary_summ past_summary = past_summary_bullet # System prompt for returning users includes past summary return [ {"role": "system", "content": sys_prompt_hist + past_summary + "The user may have shared pictures with you. If so, each picture will have a title and detailed description and would be summarized here: " + user_pics_content} ] else: # System prompt for new users return [ {"role": "system", "content": "IDENTITY: " + sys_prompt_new + " If the user asks about sharing or uploading pictures, refer them to the 'DatingFinesse.com' website for instructions."} ] # Function that returns the modification time of the user's history file or None if it does not exist def get_user_hist_file_mod_time(user_id): user_hist_file = f"{prefix}{coach_code}-{user_id.upper().replace(' ', '')}.txt" if os.path.exists(user_hist_file): return os.path.getmtime(user_hist_file) else: return None # Function to be run in the case of user timeout ############################## LAUNCH DUAL SUMMARIZER ASSISTANT def on_timeout(user_id): global last_file_mod_time current_mod_time = get_user_hist_file_mod_time(user_id) if current_mod_time is not None and (last_file_mod_time is None or current_mod_time > last_file_mod_time): last_file_mod_time = current_mod_time # Construct the filename for the user's bullet file user_summ_file_bullet = f"{prefix}{coach_code}-{user_id.upper().replace(' ', '')}-bullet.txt" # Construct the filename for the user's transcript file transcript_file_path = f"{prefix}{coach_code}-{user_id}-transcript.txt" if os.path.exists(transcript_file_path): with open(transcript_file_path, "r", encoding="UTF-8") as file: transcript = file.read().strip() else: transcript = "" assistant = client.beta.assistants.create( name="Summarizer_Bullet", instructions=bullet_instructions + transcript, model="gpt-4o" ) assistant_id = assistant.id # Create a conversation thread with the assistant thread = client.beta.threads.create( messages=[ { "role": "user", "content": " ", # Start the thread with a greeting message. CANNOT BE EMPTY "" } ] ) # Start a run to get the assistant's reply run = client.beta.threads.runs.create( thread_id=thread.id, assistant_id=assistant.id # The 'instructions' parameter is commented out, but indicates what kind of responses are expected from the assistant. ) # Function to wait for the run to complete before fetching the response def wait_on_run(run, thread): while run.status == "queued" or run.status == "in_progress": # Poll for the run's completion status run = client.beta.threads.runs.retrieve( thread_id=thread.id, run_id=run.id, ) time.sleep(0.5) # Sleep for 0.5 second between each poll to avoid hammering the API return run # Call the function to wait for the run to complete run = wait_on_run(run, thread) # Get the messages from the thread messages = client.beta.threads.messages.list( thread_id=thread.id ) # Print out all messages in reverse order (to show the conversation flow correctly) for message in reversed(messages.data): try: print(message.content[0].text.value) except (IndexError, AttributeError) as e: print(f"An error occurred while accessing message content: {e}") # Open the file for appending messages with open(user_summ_file_bullet, "a", encoding="UTF-8") as file: # Add a separator between previous and new bullet points file.write("\n--- New Conversation ---") # Loop through each message and append its contents for message in reversed(messages.data): try: # Attempt to write message content file.write(message.content[0].text.value + "\n") except IndexError: # Handle cases where message.content list is not as expected file.write("\n") except AttributeError: # Handle cases where .text or .value attributes are not present file.write("An error occurred while accessing message text attributes.\n") client.beta.assistants.delete(assistant_id) # Delete the TRANSCRIPT file if os.path.exists(f"{prefix}{coach_code}-{user_id}-transcript.txt"): os.remove(f"{prefix}{coach_code}-{user_id}-transcript.txt") user_id = "NOTLOGGEDIN" ############################## END OF SUMMARIZER ASSISTANT ############################## USER INACTIVITY CHECK # Function to keep checking for user inactivity def check_for_inactivity(): global last_interaction_time, user_id, USER_TIMEOUT while True: time.sleep(CHECK_INTERVAL) current_time = time.time() if current_time - last_interaction_time > USER_TIMEOUT and os.path.exists(f"{prefix}{coach_code}-{user_id}-transcript.txt") and user_id is not None: on_timeout(user_id) # Reset the last interaction time (WHY???) # last_interaction_time = current_time # Initialize and start the inactivity check thread inactivity_thread = threading.Thread(target=check_for_inactivity) inactivity_thread.daemon = True inactivity_thread.start() #### UPDATE FILE def update_file(file_path, content): try: # Create a backup of the existing file backup_file_path = file_path + "-OLD" if os.path.exists(file_path): # If the backup file already exists, delete it if os.path.exists(backup_file_path): os.remove(backup_file_path) # Rename the original file to the backup file os.rename(file_path, backup_file_path) # Write the new content to the file with open(file_path, "w", encoding="UTF-8") as file: file.write(content) return "File updated successfully. Backup created as " + backup_file_path except Exception as e: return f"Error updating file: {str(e)}" ############### CHAT ################### def predict(user_input, history): max_length = 2000 if len(user_input) > max_length: user_input = "" global last_interaction_time, user_id, last_file_mod_time # ... [other global variables that are used] # Update the last interaction time last_interaction_time = time.time() user_hist_file = f"{prefix}{coach_code}-{user_id}.txt" user_summ_file_bullet = f"{prefix}{coach_code}-{user_id.upper().replace(' ', '')}-bullet.txt" pics_file_path = f"{prefix}{coach_code}-{user_id}-pics.txt" ####### READ ####### if user_input == tx + coach_code: try: # Prepare the transcript for the Textbox output if os.path.exists(user_summ_file_bullet): with open(user_summ_file_bullet, "r", encoding="UTF-8") as file: output = file.read() yield output return except FileNotFoundError: yield "File '" + user_summ_file_bullet + "' not found." return if user_input == tx + coach_code + "hist": try: # Prepare the transcript for the Textbox output if os.path.exists(user_hist_file): with open(user_hist_file, "r", encoding="UTF-8") as file: output = file.read() yield output return except FileNotFoundError: yield "File '" + user_hist_file + "' not found." return if user_input == tx + coach_code + "pics": try: # Prepare the transcript for the Textbox output if os.path.exists(pics_file_path): with open(pics_file_path, "r", encoding="UTF-8") as file: output = file.read() yield output return except FileNotFoundError: yield "File '" + pics_file_path + "' not found." return ####### UPDATE ####### if user_input.startswith(overwrite + "bull:"): try: file_path, content = user_input[12:].split("|", 1) file_path = user_summ_file_bullet.strip() content = content.strip() if file_path and content: result = update_file(file_path, content) yield result return else: yield "Invalid format. Please use 'update_file: | '." return except Exception as e: yield f"Error updating file: {str(e)}" return if user_input.startswith(overwrite + "pics:"): try: file_path, content = user_input[12:].split("|", 1) file_path = pics_file_path.strip() content = content.strip() if file_path and content: result = update_file(file_path, content) yield result return else: yield "Invalid format. Please use 'update_file: | '." return except Exception as e: yield f"Error updating file: {str(e)}" return if user_id == "NOTLOGGEDIN": raise gr.Error(f"You must be LOGGED IN to Chat. Refresh the page and try again.") if user_id == "": raise gr.Error(f"You must be LOGGED IN to Chat. Refresh the page and try again.") if user_id == None: raise gr.Error(f"You must be LOGGED IN to Chat. Refresh the page and try again.") if user_hist_file == prefix + coach_code + "-None.txt": raise gr.Error(f"You must be LOGGED IN to Chat. Refresh the page and try again.") if user_hist_file == prefix + coach_code + "-.txt": raise gr.Error(f"You must be LOGGED IN to Chat. Refresh the page and try again.") # Determine appropriate system prompt based on new or returning user # This is a placeholder line, replace it with your actual function logic that creates the prompt message system_prompt = get_system_prompt(user_id) history_openai_format = system_prompt for human, assistant in history: history_openai_format.append({"role": "user", "content": human }) history_openai_format.append({"role": "assistant", "content":assistant}) history_openai_format.append({"role": "user", "content": user_input}) completion = client.chat.completions.create( model=openai_model, messages= history_openai_format, temperature=1.0, frequency_penalty=0.4, presence_penalty=0.1, stream=True ) output_stream = "" for chunk in completion: if chunk.choices[0].delta.content is not None: output_stream = output_stream + (chunk.choices[0].delta.content) yield output_stream message_content = output_stream # Prepare the transcript for the Textbox output transcript_file_path = f"{prefix}{coach_code}-{user_id}-transcript.txt" if os.path.exists(transcript_file_path): with open(transcript_file_path, "r", encoding="UTF-8") as file: transcript = file.read() else: transcript = "START OF CONVERSATION - Date/Time: " + datetime.now().strftime("%Y-%m-%d %H:%M:%S") + "\n\n" # Append latest user and assistant messages to the transcript transcript += f"YOU: {user_input}\n" transcript += f"{coach_name_upper}: {message_content}\n\n" # Write the updated transcript to the file with open(transcript_file_path, "w", encoding="UTF-8") as file: file.write(transcript) # Get the last exchange (the user input and the assistant's message) last_exchange = f"User: {user_input}\nAssistant: {message_content}\n\n" # Write the last exchange to the history file with open(user_hist_file, "a+", encoding="UTF-8") as file: file.write(last_exchange) return message_content, transcript # Function to update transcript def update_transcript(transcript_update, user_id): transcript_file_path = f"{prefix}{coach_code}-{user_id}-transcript.txt" if os.path.exists(transcript_file_path): with open(transcript_file_path, "r", encoding="UTF-8") as file: transcript = file.read() else: transcript = "START OF CONVERSATION - Date/Time: " + datetime.now().strftime("%Y-%m-%d %H:%M:%S") + "\n\n" transcript += transcript_update with open(transcript_file_path, "w", encoding="UTF-8") as file: file.write(transcript) return transcript def get_user_summary(name): global user_id name_cap = str(name) user_id = name.strip().upper().replace(" ", "") name_cap = name_cap.title() summary = "Welcome! Please proceed to the '" + coach_name_long + "' tab (at top of chatbot) to chat." return summary, admin() def admin(): if user_id == "POIPOIPOI": # Set the directory path directory_path = "data" # different on huggingface # Get a list of all files in the directory file_list = os.listdir(directory_path) # Print the list of files for file_name in file_list: print(file_name) #GUI with gr.Blocks(theme) as demo: output_choice = gr.State('Text') # Set 'Text' as the default state value with gr.Tabs() as tabs: with gr.TabItem("Log In"): username_input = gr.Textbox(label="Username (REQUIRED):", placeholder="Enter your USERNAME", type="text") password_input = gr.Textbox(label="Emailed Code (REQUIRED):", placeholder="Enter CODE from your email", type="password") # The submit_name function will expect two inputs name = "" password = "" def submit_name(name, password): name = name + "7777" name = name.upper() pwd = password pwd = pwd.upper() if name != pwd: name = "NOTLOGGEDIN" raise gr.Error(f"You must be LOGGED IN to Chat. Refresh the page, log in, and try again.") if name == "": name = "NOTLOGGEDIN" raise gr.Error(f"You must be LOGGED IN to Chat. Refresh the page, log in, and try again.") else: name = name[:-4] user_summary = get_user_summary(name) return user_summary[0] if isinstance(user_summary, tuple) else user_summary submit_name_button = gr.Button("Log in") status_text = gr.Label(label="", container=False) submit_name_button.click( fn=submit_name, inputs=[username_input, password_input], outputs=[status_text] ) # Attach submit event to both the username and password fields username_input.submit( fn=submit_name, inputs=[username_input, password_input], outputs=[status_text] ) # Make sure this is done for the password field as well password_input.submit( fn=submit_name, inputs=[username_input, password_input], outputs=[status_text] ) with gr.TabItem(coach_name_long): gr.ChatInterface( predict, submit_btn="Chat with " + coach_name_short, retry_btn=None, undo_btn=None, clear_btn=None, css=".gradio-container {height: 1600px !important;}" # Add custom CSS ) demo.launch(show_api=False)