Spaces:
Runtime error
Runtime error
| import time | |
| import json | |
| from openai import OpenAI | |
| import gradio as gr | |
| class Backend: | |
| def __init__(self): | |
| self.return_instruction = """ Please only return in the following Json format: | |
| {{ | |
| "Answer": "", | |
| "Reference Sentences": [""] | |
| }}""" | |
| self.chat_history = [] | |
| def load_agent(self, openai_api_key, assistant_id): | |
| client = OpenAI(api_key=openai_api_key) | |
| assistant = client.beta.assistants.retrieve(assistant_id=assistant_id) | |
| return client, assistant | |
| def update_file(self, file_path): | |
| file = open(file_path, 'rb') | |
| file = self.client.files.create(file=file, purpose='assistants') | |
| return file | |
| def create_thread(self): | |
| thread = self.client.beta.threads.create() | |
| return thread | |
| def delate_thread(self, thread): | |
| self.client.beta.threads.delete(thread.id) | |
| def create_message(self, question, thread, file): | |
| message = self.client.beta.threads.messages.create( | |
| thread_id=thread.id, | |
| role="user", | |
| content= question + self.return_instruction, | |
| file_ids=[file.id] | |
| ) | |
| return message | |
| def delate_message(self, message): | |
| self.client.beta.threads.messages.delete(message.id) | |
| def create_run(self, thread, assistant): | |
| run = self.client.beta.threads.runs.create( | |
| thread_id=thread.id, | |
| assistant_id=assistant.id, | |
| # instructions="""Please read PDF and answer the qusetions asked by users with professional knowledge.""" | |
| ) | |
| return run | |
| def delate_run(self, run): | |
| self.client.beta.threads.runs.delete(run.id) | |
| def get_massage(self, thread): | |
| messages = self.client.beta.threads.messages.list( | |
| thread_id=thread.id | |
| ) | |
| return messages | |
| def phrase_massage(self, question, messages): | |
| mess = json.loads(messages.json()) | |
| output = mess['data'][0]['content'][0]['text']['value'] | |
| print(output) | |
| try: | |
| output = output.split("{")[1:] | |
| output = "{" + "".join(output) | |
| output = output.split("}")[:-1] | |
| output = "".join(output) + "}" | |
| print(output) | |
| output = eval(output) | |
| answer = output['Answer'] | |
| reference = output['Reference Sentences'] | |
| except: | |
| self.detete_message(message) | |
| answer = output | |
| reference = [] | |
| reference = ' '.join(reference) | |
| reference = self.processing_html(reference) | |
| self.chat_history.append([question, answer]) | |
| return self.chat_history, reference | |
| def phrase_massage_1(self, question, messages): | |
| mess = json.loads(messages.json()) | |
| output = mess['data'][0]['content'][0]['text']['value'] | |
| self.chat_history.append([question, output]) | |
| return self.chat_history | |
| def processing_html(self, text): | |
| return f'<center><p> {text} </p></center>' | |
| def submit_passage(self, openai_key, assistant_id, file): | |
| # Create a new conversation | |
| self.client, self.assistant = self.load_agent(openai_key, assistant_id) | |
| # Update file | |
| self.file = self.update_file(file.name) | |
| # Create a new conversation | |
| self.thread = self.create_thread() | |
| gr.Info("Upload successful. Please can now chat with the assistant. Enjoy!") | |
| def submit_question(self, question): | |
| # print(question) | |
| # print(self.thread.id) | |
| # print(self.file.id) | |
| # Create a new message | |
| self.message = self.create_message(question, self.thread, self.file) | |
| # Create a new run | |
| run = self.create_run(self.thread, self.assistant) | |
| # Wait for the run to complete | |
| while True: | |
| run = self.client.beta.threads.runs.retrieve(thread_id=self.thread.id, run_id=run.id) | |
| if run.status not in ["queued", "in_progress"]: | |
| break | |
| time.sleep(1) | |
| # Get the answer | |
| messages = self.get_massage(self.thread) | |
| answer, reference = self.phrase_massage(question, messages) | |
| return answer, reference | |
| def submit_question_another(self, question): | |
| # Create a new message | |
| self.message = self.create_message(question, self.thread, self.file) | |
| # Create a new run | |
| run = self.create_run(self.thread, self.assistant) | |
| # Wait for the run to complete | |
| while True: | |
| run = self.client.beta.threads.runs.retrieve(thread_id=self.thread.id, run_id=run.id) | |
| if run.status not in ["queued", "in_progress"]: | |
| break | |
| time.sleep(1) | |
| # Get the answer | |
| messages = self.get_massage(self.thread) | |
| answer = self.phrase_massage_1(question, messages) | |
| return answer |