Spaces:
Sleeping
Sleeping
| # Import necessary modules | |
| import gradio as gr # For creating the interactive user interface | |
| import os # For accessing environment variables | |
| import pandas as pd # For easier data handling | |
| from openai import OpenAI # For sending the quotes to OpenAI for tagging | |
| import openpyxl # Requirement for reading Excel files into pandas Dataframes | |
| import json # For conversion of OpenAI responses into json/dictionary objects so the contents can be extracted | |
| from dotenv import load_dotenv # For loading environment variables in local environment | |
| from collections import Counter # For tabulating tag occurrences | |
| import logging | |
| import time | |
| import random | |
| from datetime import datetime | |
| from typing import Generator | |
| from concurrent.futures import ThreadPoolExecutor, as_completed | |
| logger = logging.getLogger() | |
| logger.setLevel(logging.INFO) | |
| logging.basicConfig(level=logging.INFO, force=True) | |
| # Load environment variables from local .env file if it exists; otherwise this does nothing | |
| load_dotenv() | |
| # Import prompt for requesting the tags from OpenAI | |
| with open("prompts/prompt_030725.txt", "r") as prompt_file: | |
| PROMPT = prompt_file.read() | |
| logger.info(f"Loaded prompt: {PROMPT}") | |
| # Import user instructions for display on screen | |
| with open("user_instructions.txt", "r") as user_instruction_file: | |
| INSTRUCTIONS = user_instruction_file.read() | |
| #Initialising the OpenAI client | |
| client = OpenAI( | |
| api_key=os.getenv('OPENAI_KEY'), | |
| organization=os.getenv('ORG_KEY'), | |
| project=os.getenv('PROJ_KEY') | |
| ) | |
| logger.info("Initialised OpenAI client") | |
| # Function to send the prompt with quote and tag list to OpenAI and get the tags for that quote back | |
| def tag_quote(quote: str, tags_list: list) -> list: | |
| """ | |
| Generates a list of tags for a given quote based on a predefined list of potential tags. | |
| This function uses a GPT-based language model to analyze the input quote and determine | |
| the most relevant tags from the provided list. The response is parsed from the JSON | |
| output of the model and returned as a list of tags. This list is checked to ensure | |
| all tags tagged are taken from the input tags_list. | |
| Args: | |
| quote (str): The quote or text to be analyzed. | |
| tags_list (list): A list of potential tags to match against the quote. | |
| Returns: | |
| valid_tags: A list of tags that are relevant to the quote, as determined by the model. | |
| """ | |
| logger.info(f"Tagging quote {quote}") | |
| response = client.chat.completions.create( | |
| model = "gpt-4o-mini", | |
| response_format={"type": "json_object"}, | |
| messages=[ | |
| {"role": "system", "content": "You are a helpful assistant designed to output JSON."}, | |
| {"role": "user", "content": PROMPT.format(tags_list=tags_list, quote=quote)} | |
| ] | |
| ) | |
| tags = json.loads(response.choices[0].message.content)['tags'] | |
| valid_tags = [] | |
| for tag in tags: # filter out any hallucinated tags | |
| if tag in tags_list: | |
| valid_tags.append(tag) | |
| else: | |
| logger.warning(f"Invalid tag {tag} found and has been filtered out.") | |
| return valid_tags | |
| def translate_quote(quote: str) -> str: | |
| """ | |
| Translates a quote to English. | |
| """ | |
| logger.info(f"Translating quote {quote}") | |
| response = client.chat.completions.create( | |
| model = "gpt-4o-mini", | |
| messages=[ | |
| {"role": "user", "content": f"Translate the following quote into English. Do not return anything other than the translated quote. {quote}"} | |
| ] | |
| ) | |
| logger.info("Content") | |
| logger.info(response.choices[0].message.content) | |
| return response.choices[0].message.content | |
| def count_tags(tags_list: list, tags_col: pd.Series )->pd.DataFrame: | |
| """ | |
| Creates a DataFrame indicating number of occurences of each tag from a DataFrame column containing lists of tags. | |
| This function also takes in a tags_list; all tags in the tags_list will be in the output | |
| DataFrame even if they do not occur in the input tags_col. There may be some tags appearing | |
| in the output which were not in the original tag_list; these will be marked with a ! prefix. | |
| Args: | |
| tags_list (list): The list of tags given by the user | |
| tags_col (pd.Series): A column of lists where each list contains tags which are | |
| (ideally but not always; depending on OpenAI) selected from the tags_list. | |
| Returns: | |
| pd.DataFrame: A DataFrame with two columns. The first contains individual tags(str) which have | |
| appeared either in the tags_list, the lists within the tags_col, or both. The second | |
| contains the number of occurrences(int) of that tag within the lists in the tags_col. | |
| """ | |
| # Initialise Counter hash table | |
| tags_counter = Counter({tag: 0 for tag in tags_list}) | |
| # Iterate over the lists in tags_col | |
| for sublist in tags_col: | |
| # Iterate over the tags in each list | |
| for tag in sublist: | |
| # Update the tags_counter for each tag | |
| if tag in tags_list: | |
| tags_counter.update([tag]) | |
| # If the tag was not in the tags_list given by the user, prefix it with a ! before updating | |
| else: | |
| tags_counter.update([f"!{tag}"]) | |
| # Convert the tags_counter to a DataFrame and return it | |
| tags_counter_df = pd.DataFrame(tags_counter.items(), columns=['Tag', 'Count']) | |
| return tags_counter_df | |
| # Function that takes in a list of tags and an Excel file of quotes, calls tag_quote() on each quote, and returns all the quotes and tags in a DataFrame | |
| def process_quotes(quotes_file_path: str, quotes_col_name: str, retained_columns: str, tags_string: str) -> Generator[tuple[str, pd.DataFrame, pd.DataFrame, str]]: | |
| """ | |
| Processes quotes from an Excel file and assigns relevant tags to each quote. | |
| This function reads an Excel file containing quotes, validates the column containing | |
| the quotes, and applies the `tag_quote` function to assign tags to each quote. | |
| The tags are derived from a user-provided newline-separated string. | |
| Args: | |
| quotes_file_path (str): Path to the Excel file containing the quotes. | |
| quotes_col_name (str): The name of the column containing the quotes. | |
| retained_columns (str): The names of the columns in the Excel file which are to be added to the output file. | |
| tags_string (str): A newline-separated string of potential tags. | |
| Yields: | |
| tuple: A 4-element tuple containing: | |
| - str: A progress indicator (or "Not running" if tagging is complete) | |
| - pd.DataFrame: A DataFrame with two columns: (or None if tagging is incomplete) | |
| - The original column containing the quotes. | |
| - A new column 'Tags' with the tags assigned to each quote. | |
| - pd.DataFrame: A DataFrame with two columns: (or None if tagging is incomplete) | |
| -"Tag" - The list of tags that was passed in. | |
| -"Count" - The total number of times each tag was used in tagging all the quotes. | |
| - str: A path to an Excel file containing sheets derived from the previous 2 DataFrames. (or None if tagging is incomplete) | |
| Raises: | |
| gr.Error: If the specified column name does not exist or is not unique. | |
| """ | |
| tags_list = tags_string.split('\n') | |
| tags_list = [tag.strip() for tag in tags_list] | |
| if retained_columns: | |
| retained_cols_list = retained_columns.split(',') | |
| retained_cols_list = [colname.strip() for colname in retained_cols_list] | |
| else: | |
| retained_cols_list = [] | |
| # Transfer quotes data from Excel file into pandas DataFrame, handling potential duplicate column names in the Excel file | |
| # pd.read_excel will rename duplicates eg foo -> foo.1, causing a mismatch between quotes_col_name and the actual column name | |
| # Extract the first row(the actual header for the DataFrame) as a DataFrame without header. | |
| quotes_df_cols= pd.read_excel(quotes_file_path, header=None, nrows=1).values[0] | |
| # Extract all the other rows of the Excel file as a DataFrame without header | |
| quotes_df = pd.read_excel(quotes_file_path, header=None, skiprows=1) | |
| # Set the extracted first row as the header for the DataFrame resultant from the other rows | |
| quotes_df.columns = quotes_df_cols | |
| # Verify that all column names given are found in the quotes DF exactly once each | |
| for colname in retained_cols_list + [quotes_col_name]: | |
| count = quotes_df.columns.tolist().count(colname) | |
| if count == 0: | |
| raise gr.Error(f"No columns with name {colname} found, check your inputs") | |
| elif count > 1: | |
| raise gr.Error(f"Multiple columns with name {colname} found, please rename these columns to something unique") | |
| quotes_data = quotes_df[quotes_col_name] | |
| # Tag all the quotes one by one using tag_quote function | |
| tags_results = [None]*len(quotes_data) | |
| # Threading execution of tag_quotes with {max_workers} threads: we send {max_workers} requests to the LLM concurrently. | |
| with ThreadPoolExecutor() as executor: | |
| # Generate futures for each of the quotes and map them to the quote indices | |
| future_to_index = { | |
| executor.submit(tag_quote, quote, tags_list): i for i, quote in enumerate(quotes_data) | |
| } | |
| # Enumerate the completed futures(ordered as completed which may be different from submitted order) | |
| # This step waits for the tag_quote functions to complete | |
| for completed, future in enumerate(as_completed(future_to_index), 1): | |
| # Retrieve index of the completed future from above map | |
| i = future_to_index[future] | |
| # Insert the result of the completed future into the results list at its quote's original position | |
| try: | |
| tags_results[i] = future.result() | |
| except Exception as e: | |
| tags_results[i] = f"Error:{e}" | |
| # Update UI by yielding a status update | |
| yield (f"Tagged {completed}/{len(quotes_data)} quotes: {quotes_data[i]}", None, None, None) | |
| quotes_df['Tags'] = tags_results | |
| # One hot encoding of tagged tags | |
| for tag in tags_list: | |
| quotes_df[tag]=quotes_df['Tags'].apply(lambda quote_tags: int(tag in quote_tags)) | |
| logger.info("Quotes tagged") | |
| # Create hash table of tag occurrences using count_tags function | |
| tags_counter_df = count_tags(tags_list, quotes_df['Tags']) | |
| # Retrieve 2 quotes at random for each tag and put them in the tags counter DF | |
| for tag in tags_counter_df['Tag']: | |
| tagged_quotes_list = quotes_df.loc[quotes_df[tag]==1, quotes_col_name].tolist() | |
| sample_quotes = random.sample(tagged_quotes_list, min(2, len(tagged_quotes_list))) | |
| translated_quotes = [translate_quote(quote) for quote in sample_quotes] | |
| while len(sample_quotes) < 2: | |
| sample_quotes.append(None) | |
| translated_quotes.append(None) | |
| [tags_counter_df.loc[tags_counter_df['Tag'] == tag, 'Quote 1'], tags_counter_df.loc[tags_counter_df['Tag'] == tag, 'Quote 2']] = sample_quotes | |
| [tags_counter_df.loc[tags_counter_df['Tag'] == tag, 'Translated Quote 1'], tags_counter_df.loc[tags_counter_df['Tag'] == tag, 'Translated Quote 2']] = translated_quotes | |
| #Convert values in tags column from list to str | |
| quotes_df['Tags'] = quotes_df["Tags"].apply(lambda x: ", ".join(x)) | |
| # Return only the quotes column, the new tags column, and any other specified cols to retain | |
| output_df = quotes_df[retained_cols_list+[quotes_col_name, 'Tags']+tags_list] | |
| output_file_path = "output.xlsx" | |
| with pd.ExcelWriter(output_file_path) as writer: | |
| output_df.to_excel(writer, sheet_name='Coded Quotes', index=False) | |
| tags_counter_df.to_excel(writer, sheet_name='Tag Count', index=False) | |
| logger.info('Results written to Excel') | |
| yield ("Not running", output_df[[quotes_col_name, 'Tags']], tags_counter_df, output_file_path) | |
| def check_auth(username:str, password:str): | |
| """ | |
| Authenticate the user. | |
| Verifies the user's credentials against the values stored in the environment variables. | |
| User may authenticate with permanent username and password(for TFT team) or temporary username and password. | |
| For temporary username and password, they will only be valid before the expiry time as set in the environment variables. | |
| Returns True or False depending on authentication success. | |
| """ | |
| # Check permanent credentials | |
| if username == os.getenv('APP_USERNAME') and password == os.getenv('APP_PASSWORD'): | |
| return True | |
| # Check temporary credentials | |
| if ( | |
| username == os.getenv('TEMP_USERNAME') and | |
| password == os.getenv('TEMP_PASSWORD') and | |
| time.time() < datetime.fromisoformat(os.getenv('TEMP_EXPIRY_TIME_SG_ISO_8601').replace("Z", "+08:00")).timestamp() | |
| ): | |
| return True | |
| # Invalid credentials | |
| return False | |
| # Define user interface structure | |
| demo = gr.Interface( | |
| fn=process_quotes, | |
| inputs=[ | |
| gr.File(label="Quotes Excel File"), | |
| gr.Textbox(label="Name of quotes column"), | |
| gr.Textbox(label = "Names of columns(eg respondentID) to retain in output, separated by commas"), | |
| gr.Textbox(label = "List of tags, each tag on a new line"), | |
| ], | |
| outputs=[ | |
| gr.Textbox(label="Progress", value = "Not running"), | |
| gr.Dataframe(headers=["Quote", "Tags"], column_widths=["70%", "30%"], scale=2, label='Coded Quotes'), | |
| gr.Dataframe(headers=["Tag", "Count"], label='Tag Count'), | |
| gr.File(label="Output data in file format") | |
| ], | |
| title="Automated Research Code Tagger", | |
| description=INSTRUCTIONS | |
| ) | |
| demo.launch(share=True, auth=check_auth, ssr_mode=False) | |