Spaces:
Sleeping
Sleeping
| import os | |
| import json | |
| import base64 | |
| import shutil | |
| import requests | |
| import gradio as gr | |
| from datetime import datetime | |
| from huggingface_hub import hf_hub_download, HfApi | |
| def get_main_data(): | |
| """ | |
| Returns the scores parameters and authors | |
| """ | |
| scores_parameters = [ | |
| 'Personalidad', 'Intereses', 'Lenguaje/Estilo', 'Autenticidad', 'Habilidad de conversaci贸n', | |
| 'Marca/Producto', 'Identificaci贸n', 'Experiencia de uso', 'Recomendaci贸n', 'Conversaci贸n org谩nica' | |
| ] | |
| authors = ['Sofia', 'Eliza', 'Sindy', 'Carlos', 'Andres', 'Adriana', 'Carolina', 'Valeria'] | |
| return scores_parameters, authors | |
| def make_invisible(): | |
| """ | |
| Makes visible a row | |
| """ | |
| return gr.Row.update(visible=False) | |
| def make_visible(): | |
| """ | |
| Makes visibles 2 rows | |
| """ | |
| return gr.Row.update(visible=True), gr.Row.update(visible=True) | |
| def play_searching(language: str): | |
| """ | |
| """ | |
| if language == 'Espa帽ol': | |
| l = 'es' | |
| elif language == 'English': | |
| l = 'en' | |
| else: | |
| l = 'pt' | |
| return f'videos/searching_{l}.mp4' | |
| def _query(payload, size_gpu: str): | |
| """ | |
| Returns the json from a post request. It is done to the BellaAPI | |
| """ | |
| API_TOKEN = os.getenv(f'API_TOKEN') | |
| API_URL = os.getenv(f'API_URL_{size_gpu}') | |
| headers = { | |
| "Authorization": f"Bearer {API_TOKEN}", | |
| "Content-Type": "application/json" | |
| } | |
| response = requests.post(API_URL, headers=headers, json=payload) | |
| return response.json() | |
| def _download_media(url: str, type_media: str) -> None: | |
| """ | |
| Downloads a video or audio (depending on the type_media) that can be | |
| used inside a gr.Video or gr.Audio | |
| """ | |
| name = 'video.mp4' if type_media == 'video' else 'audio.wav' | |
| with requests.get(url, stream=True) as r, open(name, "wb") as f: | |
| shutil.copyfileobj(r.raw, f) | |
| def init_chatbot(chatbot: list[tuple[str, str]], language: str): | |
| """ | |
| Returns a greeting video, with its transcription and the user_id that | |
| will be used later in the other requests | |
| """ | |
| # Select language | |
| if language == 'Espa帽ol': | |
| l = 'es' | |
| elif language == 'English': | |
| l = 'en' | |
| else: | |
| l = 'pt' | |
| # Call API with the following json | |
| inputs = {"inputs": { | |
| "date": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), | |
| 'language': l, | |
| 'get_video': True | |
| }} | |
| output = _query(inputs, 'small') | |
| chatbot.append(('', output['answer'])) | |
| _download_media(output['link_media'], 'video') | |
| return 'video.mp4', chatbot, output['user_id'] | |
| def get_answer_text( | |
| question: str, chatbot: list[tuple[str, str]], user_id: str, checkbox: bool, size_gpu: str, | |
| detect_language: bool | |
| ): | |
| """ | |
| Gets the answer of the chatbot | |
| """ | |
| get_language = None if detect_language is False else True | |
| # Create json and send it to the API | |
| inputs = {'inputs': { | |
| 'text': question, 'user_id': user_id, 'get_video': checkbox, 'get_language': get_language | |
| }} | |
| output = _query(inputs, size_gpu) | |
| return _update_elements(question, chatbot, output, checkbox, '') | |
| def get_answer_audio( | |
| audio_path, chatbot: list[tuple[str, str]], user_id: str, checkbox: bool, size_gpu: str, | |
| detect_language: bool | |
| ): | |
| """ | |
| Gets the answer of the chatbot | |
| """ | |
| # Encode audio data to Base64 | |
| with open(audio_path, 'rb') as audio_file: | |
| audio_data = audio_file.read() | |
| encoded_audio = base64.b64encode(audio_data).decode('utf-8') | |
| get_language = None if detect_language is False else True | |
| # Create json and send it to the API | |
| inputs = {'inputs': { | |
| 'is_audio': True, 'audio': encoded_audio, 'user_id': user_id, 'get_video': checkbox, | |
| 'get_language': get_language | |
| }} | |
| output = _query(inputs, size_gpu) | |
| # Transcription of the audio | |
| question = output['question'] | |
| return _update_elements(question, chatbot, output, checkbox, None) | |
| def _update_elements(question, chatbot, output, checkbox, clean): | |
| """ | |
| Adds the video, output audio, interaction and cleans the text or audio | |
| """ | |
| chatbot.append((question, output['answer'])) | |
| link_media = output['link_media'] | |
| if checkbox: | |
| _download_media(link_media, 'video') | |
| return 'video.mp4', None, chatbot, clean | |
| else: | |
| _download_media(link_media, 'audio') | |
| return 'videos/waiting.mp4', 'audio.wav', chatbot, clean | |
| def save_scores(author: gr.Dropdown, history: gr.Chatbot, opinion: gr.Textbox, *score_values): | |
| """ | |
| Saves the scores and chat's info into the json file | |
| """ | |
| # Get the parameters for each score | |
| score_parameters, _ = get_main_data() | |
| # Get the score of each parameter | |
| scores = dict() | |
| for parameter, score in zip(score_parameters, score_values): | |
| # Check the score is a valid value if not, raise Error | |
| if score is None: | |
| raise gr.Error('Aseg煤rese de haber seleccionado al menos 1 opci贸n en cada categor铆a') | |
| scores[parameter] = score | |
| # Get all the messages including their reaction | |
| chat = [] | |
| for conversation in history: | |
| info = { | |
| 'message': conversation[0], | |
| 'answer': conversation[1] | |
| } | |
| chat.append(info) | |
| date = datetime.now().strftime("%Y-%m-%d %H:%M:%S") | |
| # Save the info | |
| session = dict( | |
| opinion=opinion, | |
| scores=scores, | |
| chat=chat, | |
| author=author, | |
| date=date | |
| ) | |
| # Open the file, add the new info and save it | |
| hf_hub_download( | |
| repo_id=os.environ.get('DATASET_NAME'), | |
| repo_type='dataset', | |
| filename="data.json", | |
| token=os.environ.get('HUB_TOKEN'), | |
| local_dir="./" | |
| ) | |
| with open('data.json', 'r') as infile: | |
| past_sessions = json.load(infile) | |
| # Add the new info | |
| past_sessions['sessions'].append(session) | |
| with open('data.json', 'w', encoding='utf-8') as outfile: | |
| json.dump(past_sessions, outfile, indent=4, ensure_ascii=False) | |
| # Save the updated file | |
| api = HfApi(token=os.environ.get('HUB_TOKEN')) | |
| api.upload_file( | |
| path_or_fileobj="data.json", | |
| path_in_repo="data.json", | |
| repo_id=os.environ.get('DATASET_NAME'), | |
| repo_type='dataset' | |
| ) | |
| # Return a confirmation message | |
| return 'Done' | |