Spaces:
Runtime error
Runtime error
| import datetime | |
| import os | |
| import json | |
| import gradio as gr | |
| from prompts.llm import qa_prompt_template | |
| from prompts.condense_llm import condense_template | |
| from config import HISTORY_DIR | |
| def web_citation(inputs, results, custom_websearch=False): | |
| import requests | |
| from bs4 import BeautifulSoup | |
| from chains.summary import WebSummary | |
| reference_results = [] | |
| display_append = [] | |
| for idx, result in enumerate(results): | |
| try: | |
| head = requests.head(result['link']) | |
| if "text/html" in head.headers['Content-Type']: | |
| html_response = requests.get(result['link']) | |
| soup = BeautifulSoup(html_response.content, "html.parser") | |
| if not custom_websearch: | |
| title = result["title"] | |
| else: | |
| title = soup.find_all('title')[0].get_text() | |
| try: | |
| web_summary = WebSummary() | |
| text = soup.get_text() | |
| lines = (line.strip() for line in text.splitlines()) | |
| # break multi-headlines into a line each | |
| chunks = (phrase.strip() for line in lines for phrase in line.split(" ")) | |
| # drop blank lines | |
| text = '\n'.join(chunk for chunk in chunks if chunk) | |
| summary = web_summary.predict(question=inputs, doc=text) | |
| print("Can access", result['link']) | |
| except: | |
| summary = "" | |
| print("Cannot access ", result['link']) | |
| reference_results.append([summary, result['link']]) | |
| display_append.append( | |
| f'<a href=\"{result["link"]}\" target=\"_blank\">{idx + 1}. {title}</a>' | |
| ) | |
| except: | |
| continue | |
| return reference_results, display_append | |
| def get_auth(): | |
| if os.path.exists("auth.json"): | |
| auth_list = [] | |
| with open("auth.json", "r", encoding='utf-8') as f: | |
| auth = json.load(f) | |
| # print(auth) | |
| for _ in auth: | |
| if auth[_]["username"] and auth[_]["password"]: | |
| auth_list.append((auth[_]["username"], auth[_]["password"])) | |
| return auth_list | |
| def transcribe(current_model, audio): | |
| return current_model.audio_response(audio) | |
| def related_question(current_model): | |
| return current_model.related_question() | |
| def history_file_path(username, file_name): | |
| now = datetime.datetime.now().strftime('%Y-%m-%d_%H-%M-%S') | |
| dirname = os.path.join(HISTORY_DIR, username, now) | |
| os.makedirs(dirname, exist_ok=True) | |
| history_path = os.path.join(dirname, f"{file_name}.json") | |
| return history_path | |
| def get_history_names(plain=False, user_name=""): | |
| from cosmos_db import query_item | |
| items = query_item(user_name) | |
| files = [item["id"] for item in items] | |
| if plain: | |
| return files | |
| else: | |
| return gr.update(choices=files) | |
| def load_lasted_file_username(username): | |
| if username not in os.listdir(HISTORY_DIR): | |
| return None | |
| date_time_list = [] | |
| for filename in os.listdir(os.path.join(HISTORY_DIR, username)): | |
| date_time_list.append(datetime.datetime.strptime(filename[:19], '%Y-%m-%d_%H-%M-%S')) | |
| lasted_time = max(date_time_list) | |
| lasted_file = lasted_time.strftime('%Y-%m-%d_%H-%M-%S') | |
| return os.path.join(HISTORY_DIR, username, lasted_file) | |
| def load_chat_history(current_model, file_name): | |
| return current_model.load_history(file_name) | |
| def save_chat_history(current_model, chatbot, file_name): | |
| return current_model.save_history(chatbot, file_name) | |
| def predict(chatbot, model, inputs, upload_files_btn, custom_websearch, local_db): | |
| iter = model.inference(inputs=inputs, chatbot=chatbot, streaming=True, upload_files_btn=upload_files_btn, | |
| custom_websearch=custom_websearch, qa_prompt_template=qa_prompt_template, | |
| local_db=local_db, condense_prompt_template=condense_template) | |
| for response in iter: | |
| yield response | |
| def set_user_identifier(current_model, *args): | |
| return current_model.set_user_identifie(*args) | |
| def retry(chatbot, model, upload_files_btn, custom_websearch, local_db): | |
| model.delete_last_conversation() | |
| if len(chatbot) > 0: | |
| inputs = chatbot[-1][0] | |
| iter = model.inference(inputs=inputs, chatbot=chatbot, streaming=True, upload_files_btn=upload_files_btn, | |
| custom_websearch=custom_websearch, qa_prompt_template=qa_prompt_template, | |
| local_db=local_db, condense_prompt_template=condense_template) | |
| for response in iter: | |
| yield response | |
| def reset(current_model): | |
| return current_model.reset_conversation() | |
| def delete_chat_history(current_model, file_name): | |
| return current_model.delete_history(file_name) | |
| def delete_first_conversation(current_model): | |
| return current_model.delete_first_conversation() | |
| def delete_last_conversation(current_model, chatbot): | |
| if len(chatbot) > 0: | |
| chatbot.pop() | |
| current_model.delete_last_conversation() | |
| return chatbot | |
| def add_source_numbers(lst, source_name="Source", use_source=True): | |
| if use_source: | |
| return [f'[{idx + 1}]\t "{item[0]}"\n{source_name}: {item[1]}' for idx, item in enumerate(lst)] | |
| else: | |
| return [f'[{idx + 1}]\t "{item}"' for idx, item in enumerate(lst)] | |
| def add_details(lst, lst_src): | |
| nodes = [] | |
| for idx, (txt, src) in enumerate(zip(lst, lst_src)): | |
| nodes.append( | |
| f"<details><summary>{txt[:4]}{src}</summary><p>{txt[4:]}</p></details>" | |
| ) | |
| return nodes | |