Spaces:
Runtime error
Runtime error
| import os | |
| import csv | |
| import uuid | |
| import json | |
| import logging | |
| import pinecone | |
| import gradio as gr | |
| from PIL import Image | |
| from typing import Union | |
| from openai import Client | |
| from pinecone import Index | |
| from services import audio_model, gcp | |
| if not os.path.exists('tts_model'): # Get TTS model | |
| audio_model.download_model() | |
| from services.audio import * | |
| from services.video import * | |
| pinecone.init(api_key=os.getenv('PINECONE_API_KEY'), environment=os.getenv('PINECONE_ENV')) | |
| INDEX = Index(os.getenv('PINECONE_INDEX')) | |
| OPENAI_CLIENT = Client() | |
| TRANSLATE_LANGUAGES = {'español': 'es', 'ingles': 'en', 'portugués': 'pt'} | |
| TRANSLATE_GREET = {'Saludo': 'greeting', 'Despedida': 'goodbye', 'Error': 'error'} | |
| def add_data_table(table: list[list[str]], *data: str) -> tuple[list[list[str]], list[str]]: | |
| """ | |
| Adds the data to the table. Some data consist of two columns others only one. | |
| So depending on that, the new row and returned value will be different. | |
| :param table: table to add the data to | |
| :param data: new row to be added to the table | |
| :return: updated table and list of strings for cleaning the input | |
| """ | |
| if len(data) == 3: # It is the greet tab | |
| new_value = '', *data[1:] | |
| elif data[-1] in ['español', 'ingles', 'portugués']: | |
| new_value = '', data[-1] | |
| else: | |
| new_value = '', '' | |
| # The table is empty, do not append it but replace the first row | |
| if all(column == '' for column in table[0]): | |
| table[0] = ['❌', *data] | |
| # Add the new data | |
| else: | |
| table.append(['❌', *data]) | |
| return table, *new_value | |
| def remove_data_table(table: list[list[str]], evt: gr.SelectData) -> list[list[str]]: | |
| """ | |
| Deletes a row on the table if the selected column is the first one. | |
| :param table: clicked table | |
| :param evt: the event (has info of the position of the click) | |
| :return: updated table | |
| """ | |
| # The clicked column is not the first one (the one with the X), do not do anything | |
| if evt.index[1] != 0: | |
| return table | |
| # The list only has one row, do not delete it, just put the default one | |
| if len(table) == 1: | |
| table[0] = ['' for _ in range(len(table[0]))] | |
| # Delete the row | |
| else: | |
| del table[evt.index[0]] | |
| return table | |
| def add_language(languages: list[str]) -> Union[gr.Error, tuple[gr.helpers, gr.helpers, gr.helpers]]: | |
| """ | |
| Updated the dropdown with the selected languages | |
| :param languages: list of selected languages | |
| :return: three updated dropdowns if at least 1 language was selected, otherwise an error | |
| """ | |
| if len(languages) == 0: | |
| raise gr.Error('Debe seleccionar al menos 1 idioma') | |
| return ( | |
| gr.update(choices=[i for i in languages], value=languages[0], interactive=True), | |
| gr.update(choices=[i for i in languages], value=languages[0], interactive=True), | |
| gr.update(choices=[i for i in languages], value=languages[0], interactive=True) | |
| ) | |
| def create_chatbot( | |
| client: str, name: str, messages_table: list[list[str]], random_table: list[list[str]], | |
| questions_table: list[list[str]], image: Image | |
| ) -> gr.helpers: | |
| """ | |
| Creation of the chatbot. It creates all the audios, videos csv files for the given tables | |
| (greetings, goodbyes, errors and random) and uploads them to GCP, and it creates the | |
| vectorstore with the given questions and answers. | |
| :param client: name of the client (Nosotras, Visit Orlando, etc.) | |
| :param name: name of the chatbot (Bella, Roomie, etc.) | |
| :param messages_table: table with the greetings, goodbyes and errors messages | |
| :param random_table: table with the random data about the client | |
| :param questions_table: table with the questions and answers for each question | |
| :param image: image used as base for the videos | |
| :return: updates the value of a button (know lets know the user if the process is done or there was an error) | |
| """ | |
| # Set up general info | |
| client_name = client.lower().replace(' ', '-') | |
| _ = name.lower() # TODO: use it | |
| # Group messages by their type (greeting, goodbye or error) and language | |
| messages = dict() | |
| for message in messages_table: | |
| msg = message[1] | |
| type_msg = TRANSLATE_GREET[message[2]] | |
| language_msg = TRANSLATE_LANGUAGES[message[-1]] | |
| os.makedirs(f'assets/{client_name}/{type_msg}s', exist_ok=True) | |
| if type_msg not in messages: | |
| messages[type_msg] = {language_msg: [msg]} | |
| else: | |
| if language_msg not in messages[type_msg]: | |
| messages[type_msg][language_msg] = [msg] | |
| else: | |
| messages[type_msg][language_msg].append(msg) | |
| # Create CSV files (greeting, goodbye and error) | |
| for type_msg in messages: | |
| for language in messages[type_msg]: | |
| with (open(f'assets/{client_name}/{type_msg}s/{language}.csv', mode='w', encoding='utf-8', newline='') | |
| as outfile): | |
| writer = csv.writer(outfile) | |
| for msg in messages[type_msg][language]: | |
| writer.writerow([msg]) | |
| # Create the audios (greeting, goodbye and error) | |
| path_audios = f'assets/{client_name}/media/audio' | |
| os.makedirs(path_audios, exist_ok=True) | |
| for type_msg in messages: | |
| for language in messages[type_msg]: | |
| for i, msg in enumerate(messages[type_msg][language]): | |
| full_path = f'{path_audios}/{type_msg}_{language}_{i}' | |
| get_audio(msg, language, full_path) | |
| # Group random audios by their language | |
| random = dict() | |
| for _, msg, language in random_table: | |
| short_language = TRANSLATE_LANGUAGES[language] | |
| if short_language not in random: | |
| random[short_language] = [msg] | |
| else: | |
| random[short_language].append(msg) | |
| # Create the random audios | |
| for language in random: | |
| for i, msg in enumerate(random[language]): | |
| full_path = f'{path_audios}/random_{language}_{i}' | |
| get_audio(msg, language, full_path) | |
| # Save image | |
| os.makedirs(f'assets/{client_name}/media/image', exist_ok=True) | |
| image.save(f'assets/{client_name}/media/image/base.png') | |
| # Upload files and audios to bucket in GCP | |
| gcp.upload_folder(client_name, f'assets/{client_name}') | |
| # Create videos for the generated audios and the waiting video (it is muted) | |
| path_videos = f'assets/{client_name}/media/video' | |
| os.makedirs(path_videos, exist_ok=True) | |
| list_audios = os.listdir(path_audios) + ['waiting.wav'] | |
| for audio_file in list_audios: | |
| name_file = audio_file.split('.')[0] | |
| link_audio = gcp.get_link_file(client_name, 'audio', audio_file) | |
| link_image = gcp.get_link_file(client_name, 'image', 'base.png') | |
| try: | |
| get_video(link_audio, link_image, f'{path_videos}/{name_file}') | |
| except Exception as e: | |
| gr.Error(f'Problema con la creación del video, hable con el administrador. Error: {e}') | |
| logging.error(e) | |
| return gr.update(value='ERROR!', interactive=False) | |
| # Upload videos to GCP | |
| gcp.upload_folder(client_name, path_videos) | |
| # Set up vectorstore | |
| vectors = [] | |
| for _, question, context in questions_table: | |
| vector = { | |
| "id": str(uuid.uuid4()), | |
| "values": _get_embedding(question), | |
| "metadata": {'Text': context}, | |
| } | |
| vectors.append(vector) | |
| INDEX.upsert(vectors=vectors, namespace=f'{client_name}-context') | |
| # Change text in the button | |
| return gr.update(value='Chatbot created!!!', interactive=False) | |
| def save_prompts(client: str, context_prompt: str, prompts_table: list[list[str]]) -> None: | |
| """ | |
| Saves all the prompts (standalone and one for each language) and uploads them to Google Cloud Storage | |
| :param client: name of the client | |
| :param context_prompt: standalone prompt used to search into the vectorstore | |
| :param prompts_table: table with the prompt of each language | |
| :return: None | |
| """ | |
| client_name = client.lower().replace(' ', '-') | |
| path_prompts = f'assets/{client_name}/prompts' | |
| os.makedirs(path_prompts, exist_ok=True) | |
| # Save standalone prompt. It is the same for all languages | |
| with open(f'{path_prompts}/prompt_standalone_q.txt', mode='w', encoding='utf-8') as outfile: | |
| outfile.write(context_prompt) | |
| # Save the prompt of each language | |
| for _, prompt, language in prompts_table: | |
| language_prompt = TRANSLATE_LANGUAGES[language] | |
| with open(f'{path_prompts}/prompt_{language_prompt}.txt', mode='w', encoding='utf-8') as outfile: | |
| outfile.write(prompt) | |
| gcp.upload_folder(client_name, path_prompts) | |
| return | |
| def generate_json(client: str, languages: list[str], max_num_questions: int, chatbot_name: str) -> gr.helpers: | |
| """ | |
| Creates a json file with the environment variables used in the API | |
| :param client: | |
| :param languages: | |
| :param max_num_questions: | |
| :param chatbot_name: | |
| :return: gradio file with the value as the path of the json file | |
| """ | |
| # Format the name and the languages | |
| short_languages = ''.join(f'{TRANSLATE_LANGUAGES[language]},' for language in languages) | |
| short_languages = short_languages[:-1] | |
| client_name = client.lower().replace(' ', '-') | |
| json_object = json.dumps( | |
| { | |
| 'CLIENT_NAME': client_name, 'MODEL_OPENAI': os.getenv('OPENAI_MODEL'), 'LANGUAGES': short_languages, | |
| 'MAX_NUM_QUESTIONS': max_num_questions, 'NUM_VECTORS_CONTEXT': 10, 'THRESHOLD_RECYCLE': 0.97, | |
| 'OPENAI_API_KEY': 'Check OpenAI for this', 'CHATBOT_NAME': chatbot_name, 'HAS_ROADMAP': 0, | |
| 'SAVE_ANSWERS': 0, 'USE_RECYCLED_DATA': 1 | |
| }, | |
| indent=4 | |
| ) | |
| path_json = f"assets/{client_name}/chatbot_variables.json" | |
| with open(path_json, mode='w', encoding='utf-8') as outfile: | |
| outfile.write(json_object) | |
| return gr.update(value=path_json, label='Output file', interactive=True) | |
| def _get_embedding(sentence: str) -> list[float]: | |
| """ | |
| Gets the embedding of a word/sentence/paragraph | |
| :param sentence: input of the model | |
| :return: list of floats representing the embedding | |
| """ | |
| response = OPENAI_CLIENT.embeddings.create( | |
| input=sentence, | |
| model='text-embedding-ada-002' | |
| ) | |
| return response.data[0].embedding | |