# Import necessary modules import gradio as gr # For creating the interactive user interface import os # For accessing environment variables import pandas as pd # For easier data handling from openai import OpenAI # For sending the quotes to OpenAI for tagging import openpyxl # Requirement for reading Excel files into pandas Dataframes import json # For conversion of OpenAI responses into json/dictionary objects so the contents can be extracted from dotenv import load_dotenv # For loading environment variables in local environment from collections import Counter # For tabulating tag occurrences import logging import time import random from datetime import datetime from typing import Generator from concurrent.futures import ThreadPoolExecutor, as_completed logger = logging.getLogger() logger.setLevel(logging.INFO) logging.basicConfig(level=logging.INFO, force=True) # Load environment variables from local .env file if it exists; otherwise this does nothing load_dotenv() # Import prompt for requesting the tags from OpenAI with open("prompts/prompt_030725.txt", "r") as prompt_file: PROMPT = prompt_file.read() logger.info(f"Loaded prompt: {PROMPT}") # Import user instructions for display on screen with open("user_instructions.txt", "r") as user_instruction_file: INSTRUCTIONS = user_instruction_file.read() #Initialising the OpenAI client client = OpenAI( api_key=os.getenv('OPENAI_KEY'), organization=os.getenv('ORG_KEY'), project=os.getenv('PROJ_KEY') ) logger.info("Initialised OpenAI client") # Function to send the prompt with quote and tag list to OpenAI and get the tags for that quote back def tag_quote(quote: str, tags_list: list) -> list: """ Generates a list of tags for a given quote based on a predefined list of potential tags. This function uses a GPT-based language model to analyze the input quote and determine the most relevant tags from the provided list. The response is parsed from the JSON output of the model and returned as a list of tags. This list is checked to ensure all tags tagged are taken from the input tags_list. Args: quote (str): The quote or text to be analyzed. tags_list (list): A list of potential tags to match against the quote. Returns: valid_tags: A list of tags that are relevant to the quote, as determined by the model. """ logger.info(f"Tagging quote {quote}") response = client.chat.completions.create( model = "gpt-4o-mini", response_format={"type": "json_object"}, messages=[ {"role": "system", "content": "You are a helpful assistant designed to output JSON."}, {"role": "user", "content": PROMPT.format(tags_list=tags_list, quote=quote)} ] ) tags = json.loads(response.choices[0].message.content)['tags'] valid_tags = [] for tag in tags: # filter out any hallucinated tags if tag in tags_list: valid_tags.append(tag) else: logger.warning(f"Invalid tag {tag} found and has been filtered out.") return valid_tags def translate_quote(quote: str) -> str: """ Translates a quote to English. """ logger.info(f"Translating quote {quote}") response = client.chat.completions.create( model = "gpt-4o-mini", messages=[ {"role": "user", "content": f"Translate the following quote into English. Do not return anything other than the translated quote. {quote}"} ] ) logger.info("Content") logger.info(response.choices[0].message.content) return response.choices[0].message.content def count_tags(tags_list: list, tags_col: pd.Series )->pd.DataFrame: """ Creates a DataFrame indicating number of occurences of each tag from a DataFrame column containing lists of tags. This function also takes in a tags_list; all tags in the tags_list will be in the output DataFrame even if they do not occur in the input tags_col. There may be some tags appearing in the output which were not in the original tag_list; these will be marked with a ! prefix. Args: tags_list (list): The list of tags given by the user tags_col (pd.Series): A column of lists where each list contains tags which are (ideally but not always; depending on OpenAI) selected from the tags_list. Returns: pd.DataFrame: A DataFrame with two columns. The first contains individual tags(str) which have appeared either in the tags_list, the lists within the tags_col, or both. The second contains the number of occurrences(int) of that tag within the lists in the tags_col. """ # Initialise Counter hash table tags_counter = Counter({tag: 0 for tag in tags_list}) # Iterate over the lists in tags_col for sublist in tags_col: # Iterate over the tags in each list for tag in sublist: # Update the tags_counter for each tag if tag in tags_list: tags_counter.update([tag]) # If the tag was not in the tags_list given by the user, prefix it with a ! before updating else: tags_counter.update([f"!{tag}"]) # Convert the tags_counter to a DataFrame and return it tags_counter_df = pd.DataFrame(tags_counter.items(), columns=['Tag', 'Count']) return tags_counter_df # Function that takes in a list of tags and an Excel file of quotes, calls tag_quote() on each quote, and returns all the quotes and tags in a DataFrame def process_quotes(quotes_file_path: str, quotes_col_name: str, retained_columns: str, tags_string: str) -> Generator[tuple[str, pd.DataFrame, pd.DataFrame, str]]: """ Processes quotes from an Excel file and assigns relevant tags to each quote. This function reads an Excel file containing quotes, validates the column containing the quotes, and applies the `tag_quote` function to assign tags to each quote. The tags are derived from a user-provided newline-separated string. Args: quotes_file_path (str): Path to the Excel file containing the quotes. quotes_col_name (str): The name of the column containing the quotes. retained_columns (str): The names of the columns in the Excel file which are to be added to the output file. tags_string (str): A newline-separated string of potential tags. Yields: tuple: A 4-element tuple containing: - str: A progress indicator (or "Not running" if tagging is complete) - pd.DataFrame: A DataFrame with two columns: (or None if tagging is incomplete) - The original column containing the quotes. - A new column 'Tags' with the tags assigned to each quote. - pd.DataFrame: A DataFrame with two columns: (or None if tagging is incomplete) -"Tag" - The list of tags that was passed in. -"Count" - The total number of times each tag was used in tagging all the quotes. - str: A path to an Excel file containing sheets derived from the previous 2 DataFrames. (or None if tagging is incomplete) Raises: gr.Error: If the specified column name does not exist or is not unique. """ tags_list = tags_string.split('\n') tags_list = [tag.strip() for tag in tags_list] if retained_columns: retained_cols_list = retained_columns.split(',') retained_cols_list = [colname.strip() for colname in retained_cols_list] else: retained_cols_list = [] # Transfer quotes data from Excel file into pandas DataFrame, handling potential duplicate column names in the Excel file # pd.read_excel will rename duplicates eg foo -> foo.1, causing a mismatch between quotes_col_name and the actual column name # Extract the first row(the actual header for the DataFrame) as a DataFrame without header. quotes_df_cols= pd.read_excel(quotes_file_path, header=None, nrows=1).values[0] # Extract all the other rows of the Excel file as a DataFrame without header quotes_df = pd.read_excel(quotes_file_path, header=None, skiprows=1) # Set the extracted first row as the header for the DataFrame resultant from the other rows quotes_df.columns = quotes_df_cols # Verify that all column names given are found in the quotes DF exactly once each for colname in retained_cols_list + [quotes_col_name]: count = quotes_df.columns.tolist().count(colname) if count == 0: raise gr.Error(f"No columns with name {colname} found, check your inputs") elif count > 1: raise gr.Error(f"Multiple columns with name {colname} found, please rename these columns to something unique") quotes_data = quotes_df[quotes_col_name] # Tag all the quotes one by one using tag_quote function tags_results = [None]*len(quotes_data) # Threading execution of tag_quotes with {max_workers} threads: we send {max_workers} requests to the LLM concurrently. with ThreadPoolExecutor() as executor: # Generate futures for each of the quotes and map them to the quote indices future_to_index = { executor.submit(tag_quote, quote, tags_list): i for i, quote in enumerate(quotes_data) } # Enumerate the completed futures(ordered as completed which may be different from submitted order) # This step waits for the tag_quote functions to complete for completed, future in enumerate(as_completed(future_to_index), 1): # Retrieve index of the completed future from above map i = future_to_index[future] # Insert the result of the completed future into the results list at its quote's original position try: tags_results[i] = future.result() except Exception as e: tags_results[i] = f"Error:{e}" # Update UI by yielding a status update yield (f"Tagged {completed}/{len(quotes_data)} quotes: {quotes_data[i]}", None, None, None) quotes_df['Tags'] = tags_results # One hot encoding of tagged tags for tag in tags_list: quotes_df[tag]=quotes_df['Tags'].apply(lambda quote_tags: int(tag in quote_tags)) logger.info("Quotes tagged") # Create hash table of tag occurrences using count_tags function tags_counter_df = count_tags(tags_list, quotes_df['Tags']) # Retrieve 2 quotes at random for each tag and put them in the tags counter DF for tag in tags_counter_df['Tag']: tagged_quotes_list = quotes_df.loc[quotes_df[tag]==1, quotes_col_name].tolist() sample_quotes = random.sample(tagged_quotes_list, min(2, len(tagged_quotes_list))) translated_quotes = [translate_quote(quote) for quote in sample_quotes] while len(sample_quotes) < 2: sample_quotes.append(None) translated_quotes.append(None) [tags_counter_df.loc[tags_counter_df['Tag'] == tag, 'Quote 1'], tags_counter_df.loc[tags_counter_df['Tag'] == tag, 'Quote 2']] = sample_quotes [tags_counter_df.loc[tags_counter_df['Tag'] == tag, 'Translated Quote 1'], tags_counter_df.loc[tags_counter_df['Tag'] == tag, 'Translated Quote 2']] = translated_quotes #Convert values in tags column from list to str quotes_df['Tags'] = quotes_df["Tags"].apply(lambda x: ", ".join(x)) # Return only the quotes column, the new tags column, and any other specified cols to retain output_df = quotes_df[retained_cols_list+[quotes_col_name, 'Tags']+tags_list] output_file_path = "output.xlsx" with pd.ExcelWriter(output_file_path) as writer: output_df.to_excel(writer, sheet_name='Coded Quotes', index=False) tags_counter_df.to_excel(writer, sheet_name='Tag Count', index=False) logger.info('Results written to Excel') yield ("Not running", output_df[[quotes_col_name, 'Tags']], tags_counter_df, output_file_path) def check_auth(username:str, password:str): """ Authenticate the user. Verifies the user's credentials against the values stored in the environment variables. User may authenticate with permanent username and password(for TFT team) or temporary username and password. For temporary username and password, they will only be valid before the expiry time as set in the environment variables. Returns True or False depending on authentication success. """ # Check permanent credentials if username == os.getenv('APP_USERNAME') and password == os.getenv('APP_PASSWORD'): return True # Check temporary credentials if ( username == os.getenv('TEMP_USERNAME') and password == os.getenv('TEMP_PASSWORD') and time.time() < datetime.fromisoformat(os.getenv('TEMP_EXPIRY_TIME_SG_ISO_8601').replace("Z", "+08:00")).timestamp() ): return True # Invalid credentials return False # Define user interface structure demo = gr.Interface( fn=process_quotes, inputs=[ gr.File(label="Quotes Excel File"), gr.Textbox(label="Name of quotes column"), gr.Textbox(label = "Names of columns(eg respondentID) to retain in output, separated by commas"), gr.Textbox(label = "List of tags, each tag on a new line"), ], outputs=[ gr.Textbox(label="Progress", value = "Not running"), gr.Dataframe(headers=["Quote", "Tags"], column_widths=["70%", "30%"], scale=2, label='Coded Quotes'), gr.Dataframe(headers=["Tag", "Count"], label='Tag Count'), gr.File(label="Output data in file format") ], title="Automated Research Code Tagger", description=INSTRUCTIONS ) demo.launch(share=True, auth=check_auth, ssr_mode=False)