Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| import os | |
| from openai import OpenAI | |
| from datetime import datetime | |
| import threading | |
| import time | |
| import os.path | |
| ################# Start PERSONA-SPECIFIC VALUES ###################### | |
| coach_code = os.getenv("COACH_CODE") | |
| coach_name_short = os.getenv("COACH_NAME_SHORT") | |
| coach_name_upper = os.getenv("COACH_NAME_UPPER") | |
| coach_name_long = os.getenv("COACH_NAME_LONG") | |
| sys_prompt_new = os.getenv("PROMPT_NEW") | |
| sys_prompt_hist = os.getenv("PROMPT_HIST") | |
| theme=os.getenv("THEME") | |
| ################# End PERSONA-SPECIFIC VALUES ###################### | |
| ################# Start OpenAI-SPECIFIC VALUES ###################### | |
| # Initialize OpenAI API client with API key | |
| client = OpenAI(api_key=os.getenv("OPENAI_API_KEY")) | |
| # OpenAI model | |
| openai_model = os.getenv("OPENAI_MODEL") | |
| ################# End OpenAI-SPECIFIC VALUES ###################### | |
| # define file name prefix | |
| prefix = os.getenv("PREFIX") # "/data/" if in HF or "data/" if local | |
| tx = os.getenv("TX") | |
| overwrite = os.getenv("OVERWRITE") | |
| # Assistants API prompts | |
| bullet_instructions = os.getenv("PROMPT_BULLET") | |
| ################################################################## | |
| ########################### END OF HF SECRETS #################### | |
| # Get dateTime string to build a filename reflecting the UserID + Timestamp | |
| dt = datetime.now() | |
| dt_string = str(dt) | |
| # User inactivity check interval in seconds | |
| CHECK_INTERVAL = 10 | |
| # User inactivity timeout in seconds (initially set to 1 minute) | |
| USER_TIMEOUT = 1 * 60 | |
| # Keep track of the last interaction time, current user_id, and the last file modification time | |
| last_interaction_time = time.time() | |
| user_id = "NOTLOGGEDIN" | |
| last_file_mod_time = None | |
| # Function to determine whether user is new or returning, and generate the appropriate SYSTEM PROMPT | |
| def get_system_prompt(user_id): | |
| past_summary_bullet = "" | |
| pics_file = "" | |
| user_summ_file_bullet = f"{prefix}{coach_code}-{user_id.upper().replace(' ', '')}-bullet.txt" | |
| pics_file_path = f"{prefix}{coach_code}-{user_id.upper().replace(' ', '')}-pics.txt" | |
| # Create pics file if not exist | |
| if not os.path.exists(pics_file_path): | |
| pics_file = "The user has not shared any images." | |
| with open(pics_file_path, "w", encoding="UTF-8") as file: | |
| file.write(pics_file) | |
| if os.path.exists(pics_file_path): | |
| with open(pics_file_path, "r", encoding="UTF-8") as file: | |
| user_pics_content = file.read().strip() | |
| # Check if the bullet summary file exists and read from it | |
| if os.path.exists(user_summ_file_bullet): | |
| with open(user_summ_file_bullet, "r", encoding="UTF-8") as file: | |
| past_summary_bullet = file.read().strip() # Read the contents of the summary file | |
| # past_summary = past_summary_bullet + "\n" + past_summary_summ | |
| past_summary = past_summary_bullet | |
| # System prompt for returning users includes past summary | |
| return [ | |
| {"role": "system", "content": sys_prompt_hist + past_summary + "The user may have shared pictures with you. If so, each picture will have a title and detailed description and would be summarized here: " + user_pics_content} | |
| ] | |
| else: | |
| # System prompt for new users | |
| return [ | |
| {"role": "system", "content": "IDENTITY: " + sys_prompt_new + " If the user asks about sharing or uploading pictures, refer them to the 'DatingFinesse.com' website for instructions."} | |
| ] | |
| # Function that returns the modification time of the user's history file or None if it does not exist | |
| def get_user_hist_file_mod_time(user_id): | |
| user_hist_file = f"{prefix}{coach_code}-{user_id.upper().replace(' ', '')}.txt" | |
| if os.path.exists(user_hist_file): | |
| return os.path.getmtime(user_hist_file) | |
| else: | |
| return None | |
| # Function to be run in the case of user timeout | |
| ############################## LAUNCH DUAL SUMMARIZER ASSISTANT | |
| def on_timeout(user_id): | |
| global last_file_mod_time | |
| current_mod_time = get_user_hist_file_mod_time(user_id) | |
| if current_mod_time is not None and (last_file_mod_time is None or current_mod_time > last_file_mod_time): | |
| last_file_mod_time = current_mod_time | |
| # Construct the filename for the user's bullet file | |
| user_summ_file_bullet = f"{prefix}{coach_code}-{user_id.upper().replace(' ', '')}-bullet.txt" | |
| # Construct the filename for the user's transcript file | |
| transcript_file_path = f"{prefix}{coach_code}-{user_id}-transcript.txt" | |
| if os.path.exists(transcript_file_path): | |
| with open(transcript_file_path, "r", encoding="UTF-8") as file: | |
| transcript = file.read().strip() | |
| else: | |
| transcript = "" | |
| assistant = client.beta.assistants.create( | |
| name="Summarizer_Bullet", | |
| instructions=bullet_instructions + transcript, | |
| model="gpt-4o" | |
| ) | |
| assistant_id = assistant.id | |
| # Create a conversation thread with the assistant | |
| thread = client.beta.threads.create( | |
| messages=[ | |
| { | |
| "role": "user", | |
| "content": " ", # Start the thread with a greeting message. CANNOT BE EMPTY "" | |
| } | |
| ] | |
| ) | |
| # Start a run to get the assistant's reply | |
| run = client.beta.threads.runs.create( | |
| thread_id=thread.id, | |
| assistant_id=assistant.id | |
| # The 'instructions' parameter is commented out, but indicates what kind of responses are expected from the assistant. | |
| ) | |
| # Function to wait for the run to complete before fetching the response | |
| def wait_on_run(run, thread): | |
| while run.status == "queued" or run.status == "in_progress": | |
| # Poll for the run's completion status | |
| run = client.beta.threads.runs.retrieve( | |
| thread_id=thread.id, | |
| run_id=run.id, | |
| ) | |
| time.sleep(0.5) # Sleep for 0.5 second between each poll to avoid hammering the API | |
| return run | |
| # Call the function to wait for the run to complete | |
| run = wait_on_run(run, thread) | |
| # Get the messages from the thread | |
| messages = client.beta.threads.messages.list( | |
| thread_id=thread.id | |
| ) | |
| # Print out all messages in reverse order (to show the conversation flow correctly) | |
| for message in reversed(messages.data): | |
| try: | |
| print(message.content[0].text.value) | |
| except (IndexError, AttributeError) as e: | |
| print(f"An error occurred while accessing message content: {e}") | |
| # Open the file for appending messages | |
| with open(user_summ_file_bullet, "a", encoding="UTF-8") as file: | |
| # Add a separator between previous and new bullet points | |
| file.write("\n--- New Conversation ---") | |
| # Loop through each message and append its contents | |
| for message in reversed(messages.data): | |
| try: | |
| # Attempt to write message content | |
| file.write(message.content[0].text.value + "\n") | |
| except IndexError: | |
| # Handle cases where message.content list is not as expected | |
| file.write("\n") | |
| except AttributeError: | |
| # Handle cases where .text or .value attributes are not present | |
| file.write("An error occurred while accessing message text attributes.\n") | |
| client.beta.assistants.delete(assistant_id) | |
| # Delete the TRANSCRIPT file | |
| if os.path.exists(f"{prefix}{coach_code}-{user_id}-transcript.txt"): | |
| os.remove(f"{prefix}{coach_code}-{user_id}-transcript.txt") | |
| user_id = "NOTLOGGEDIN" | |
| ############################## END OF SUMMARIZER ASSISTANT | |
| ############################## USER INACTIVITY CHECK | |
| # Function to keep checking for user inactivity | |
| def check_for_inactivity(): | |
| global last_interaction_time, user_id, USER_TIMEOUT | |
| while True: | |
| time.sleep(CHECK_INTERVAL) | |
| current_time = time.time() | |
| if current_time - last_interaction_time > USER_TIMEOUT and os.path.exists(f"{prefix}{coach_code}-{user_id}-transcript.txt") and user_id is not None: | |
| on_timeout(user_id) | |
| # Reset the last interaction time (WHY???) | |
| # last_interaction_time = current_time | |
| # Initialize and start the inactivity check thread | |
| inactivity_thread = threading.Thread(target=check_for_inactivity) | |
| inactivity_thread.daemon = True | |
| inactivity_thread.start() | |
| #### UPDATE FILE | |
| def update_file(file_path, content): | |
| try: | |
| # Create a backup of the existing file | |
| backup_file_path = file_path + "-OLD" | |
| if os.path.exists(file_path): | |
| # If the backup file already exists, delete it | |
| if os.path.exists(backup_file_path): | |
| os.remove(backup_file_path) | |
| # Rename the original file to the backup file | |
| os.rename(file_path, backup_file_path) | |
| # Write the new content to the file | |
| with open(file_path, "w", encoding="UTF-8") as file: | |
| file.write(content) | |
| return "File updated successfully. Backup created as " + backup_file_path | |
| except Exception as e: | |
| return f"Error updating file: {str(e)}" | |
| ############### CHAT ################### | |
| def predict(user_input, history): | |
| max_length = 2000 | |
| if len(user_input) > max_length: | |
| user_input = "" | |
| global last_interaction_time, user_id, last_file_mod_time | |
| # ... [other global variables that are used] | |
| # Update the last interaction time | |
| last_interaction_time = time.time() | |
| user_hist_file = f"{prefix}{coach_code}-{user_id}.txt" | |
| user_summ_file_bullet = f"{prefix}{coach_code}-{user_id.upper().replace(' ', '')}-bullet.txt" | |
| pics_file_path = f"{prefix}{coach_code}-{user_id}-pics.txt" | |
| ####### READ ####### | |
| if user_input == tx + coach_code: | |
| try: | |
| # Prepare the transcript for the Textbox output | |
| if os.path.exists(user_summ_file_bullet): | |
| with open(user_summ_file_bullet, "r", encoding="UTF-8") as file: | |
| output = file.read() | |
| yield output | |
| return | |
| except FileNotFoundError: | |
| yield "File '" + user_summ_file_bullet + "' not found." | |
| return | |
| if user_input == tx + coach_code + "hist": | |
| try: | |
| # Prepare the transcript for the Textbox output | |
| if os.path.exists(user_hist_file): | |
| with open(user_hist_file, "r", encoding="UTF-8") as file: | |
| output = file.read() | |
| yield output | |
| return | |
| except FileNotFoundError: | |
| yield "File '" + user_hist_file + "' not found." | |
| return | |
| if user_input == tx + coach_code + "pics": | |
| try: | |
| # Prepare the transcript for the Textbox output | |
| if os.path.exists(pics_file_path): | |
| with open(pics_file_path, "r", encoding="UTF-8") as file: | |
| output = file.read() | |
| yield output | |
| return | |
| except FileNotFoundError: | |
| yield "File '" + pics_file_path + "' not found." | |
| return | |
| ####### UPDATE ####### | |
| if user_input.startswith(overwrite + "bull:"): | |
| try: | |
| file_path, content = user_input[12:].split("|", 1) | |
| file_path = user_summ_file_bullet.strip() | |
| content = content.strip() | |
| if file_path and content: | |
| result = update_file(file_path, content) | |
| yield result | |
| return | |
| else: | |
| yield "Invalid format. Please use 'update_file: <file_path> | <content>'." | |
| return | |
| except Exception as e: | |
| yield f"Error updating file: {str(e)}" | |
| return | |
| if user_input.startswith(overwrite + "pics:"): | |
| try: | |
| file_path, content = user_input[12:].split("|", 1) | |
| file_path = pics_file_path.strip() | |
| content = content.strip() | |
| if file_path and content: | |
| result = update_file(file_path, content) | |
| yield result | |
| return | |
| else: | |
| yield "Invalid format. Please use 'update_file: <file_path> | <content>'." | |
| return | |
| except Exception as e: | |
| yield f"Error updating file: {str(e)}" | |
| return | |
| if user_id == "NOTLOGGEDIN": | |
| raise gr.Error(f"You must be LOGGED IN to Chat. Refresh the page and try again.") | |
| if user_id == "": | |
| raise gr.Error(f"You must be LOGGED IN to Chat. Refresh the page and try again.") | |
| if user_id == None: | |
| raise gr.Error(f"You must be LOGGED IN to Chat. Refresh the page and try again.") | |
| if user_hist_file == prefix + coach_code + "-None.txt": | |
| raise gr.Error(f"You must be LOGGED IN to Chat. Refresh the page and try again.") | |
| if user_hist_file == prefix + coach_code + "-.txt": | |
| raise gr.Error(f"You must be LOGGED IN to Chat. Refresh the page and try again.") | |
| # Determine appropriate system prompt based on new or returning user | |
| # This is a placeholder line, replace it with your actual function logic that creates the prompt message | |
| system_prompt = get_system_prompt(user_id) | |
| history_openai_format = system_prompt | |
| for human, assistant in history: | |
| history_openai_format.append({"role": "user", "content": human }) | |
| history_openai_format.append({"role": "assistant", "content":assistant}) | |
| history_openai_format.append({"role": "user", "content": user_input}) | |
| completion = client.chat.completions.create( | |
| model=openai_model, | |
| messages= history_openai_format, | |
| temperature=1.0, | |
| frequency_penalty=0.4, | |
| presence_penalty=0.1, | |
| stream=True | |
| ) | |
| output_stream = "" | |
| for chunk in completion: | |
| if chunk.choices[0].delta.content is not None: | |
| output_stream = output_stream + (chunk.choices[0].delta.content) | |
| yield output_stream | |
| message_content = output_stream | |
| # Prepare the transcript for the Textbox output | |
| transcript_file_path = f"{prefix}{coach_code}-{user_id}-transcript.txt" | |
| if os.path.exists(transcript_file_path): | |
| with open(transcript_file_path, "r", encoding="UTF-8") as file: | |
| transcript = file.read() | |
| else: | |
| transcript = "START OF CONVERSATION - Date/Time: " + datetime.now().strftime("%Y-%m-%d %H:%M:%S") + "\n\n" | |
| # Append latest user and assistant messages to the transcript | |
| transcript += f"YOU: {user_input}\n" | |
| transcript += f"{coach_name_upper}: {message_content}\n\n" | |
| # Write the updated transcript to the file | |
| with open(transcript_file_path, "w", encoding="UTF-8") as file: | |
| file.write(transcript) | |
| # Get the last exchange (the user input and the assistant's message) | |
| last_exchange = f"User: {user_input}\nAssistant: {message_content}\n\n" | |
| # Write the last exchange to the history file | |
| with open(user_hist_file, "a+", encoding="UTF-8") as file: | |
| file.write(last_exchange) | |
| return message_content, transcript | |
| # Function to update transcript | |
| def update_transcript(transcript_update, user_id): | |
| transcript_file_path = f"{prefix}{coach_code}-{user_id}-transcript.txt" | |
| if os.path.exists(transcript_file_path): | |
| with open(transcript_file_path, "r", encoding="UTF-8") as file: | |
| transcript = file.read() | |
| else: | |
| transcript = "START OF CONVERSATION - Date/Time: " + datetime.now().strftime("%Y-%m-%d %H:%M:%S") + "\n\n" | |
| transcript += transcript_update | |
| with open(transcript_file_path, "w", encoding="UTF-8") as file: | |
| file.write(transcript) | |
| return transcript | |
| def get_user_summary(name): | |
| global user_id | |
| name_cap = str(name) | |
| user_id = name.strip().upper().replace(" ", "") | |
| name_cap = name_cap.title() | |
| summary = "Welcome! Please proceed to the '" + coach_name_long + "' tab (at top of chatbot) to chat." | |
| return summary, admin() | |
| def admin(): | |
| if user_id == "POIPOIPOI": | |
| # Set the directory path | |
| directory_path = "data" # different on huggingface | |
| # Get a list of all files in the directory | |
| file_list = os.listdir(directory_path) | |
| # Print the list of files | |
| for file_name in file_list: | |
| print(file_name) | |
| #GUI | |
| with gr.Blocks(theme) as demo: | |
| output_choice = gr.State('Text') # Set 'Text' as the default state value | |
| with gr.Tabs() as tabs: | |
| with gr.TabItem("Log In"): | |
| username_input = gr.Textbox(label="Username (REQUIRED):", placeholder="Enter your USERNAME", type="text") | |
| password_input = gr.Textbox(label="Emailed Code (REQUIRED):", placeholder="Enter CODE from your email", type="password") | |
| # The submit_name function will expect two inputs | |
| name = "" | |
| password = "" | |
| def submit_name(name, password): | |
| name = name + "7777" | |
| name = name.upper() | |
| pwd = password | |
| pwd = pwd.upper() | |
| if name != pwd: | |
| name = "NOTLOGGEDIN" | |
| raise gr.Error(f"You must be LOGGED IN to Chat. Refresh the page, log in, and try again.") | |
| if name == "": | |
| name = "NOTLOGGEDIN" | |
| raise gr.Error(f"You must be LOGGED IN to Chat. Refresh the page, log in, and try again.") | |
| else: | |
| name = name[:-4] | |
| user_summary = get_user_summary(name) | |
| return user_summary[0] if isinstance(user_summary, tuple) else user_summary | |
| submit_name_button = gr.Button("Log in") | |
| status_text = gr.Label(label="", container=False) | |
| submit_name_button.click( | |
| fn=submit_name, | |
| inputs=[username_input, password_input], | |
| outputs=[status_text] | |
| ) | |
| # Attach submit event to both the username and password fields | |
| username_input.submit( | |
| fn=submit_name, | |
| inputs=[username_input, password_input], | |
| outputs=[status_text] | |
| ) | |
| # Make sure this is done for the password field as well | |
| password_input.submit( | |
| fn=submit_name, | |
| inputs=[username_input, password_input], | |
| outputs=[status_text] | |
| ) | |
| with gr.TabItem(coach_name_long): | |
| gr.ChatInterface( | |
| predict, | |
| submit_btn="Chat with " + coach_name_short, | |
| retry_btn=None, | |
| undo_btn=None, | |
| clear_btn=None, | |
| css=".gradio-container {height: 1600px !important;}" # Add custom CSS | |
| ) | |
| demo.launch(show_api=False) |