Spaces:
Sleeping
Sleeping
| # @title web interface demo | |
| import random | |
| import gradio as gr | |
| import time | |
| import numpy as np | |
| import pandas as pd | |
| import torch | |
| import faiss | |
| from sklearn.preprocessing import normalize | |
| from transformers import AutoTokenizer, AutoModelForQuestionAnswering | |
| from sentence_transformers import SentenceTransformer, util | |
| from pythainlp import Tokenizer | |
| import pickle | |
| import re | |
| from pythainlp.tokenize import sent_tokenize | |
| DEFAULT_MODEL = 'wangchanberta-hyp' | |
| DEFAULT_SENTENCE_EMBEDDING_MODEL = 'intfloat/multilingual-e5-base' | |
| MODEL_DICT = { | |
| 'wangchanberta': 'powerpuf-bot/wangchanberta-xet_ref-params', | |
| 'wangchanberta-hyp': 'powerpuf-bot/wangchanberta-xet_hyp-params', | |
| } | |
| EMBEDDINGS_PATH = 'data/embeddings.pkl' | |
| DATA_PATH='data/dataset.xlsx' | |
| def load_data(path=DATA_PATH): | |
| df = pd.read_excel(path, sheet_name='Default') | |
| df['Context'] = pd.read_excel(path, sheet_name='mdeberta')['Context'] | |
| print(len(df)) | |
| print('Load data done') | |
| return df | |
| def load_model(model_name=DEFAULT_MODEL): | |
| model = AutoModelForQuestionAnswering.from_pretrained(MODEL_DICT[model_name]) | |
| tokenizer = AutoTokenizer.from_pretrained(MODEL_DICT[model_name]) | |
| print('Load model done') | |
| return model, tokenizer | |
| def load_embedding_model(model_name=DEFAULT_SENTENCE_EMBEDDING_MODEL): | |
| # if torch.cuda.is_available(): | |
| # embedding_model = SentenceTransformer(model_name, device='cuda') | |
| # else: | |
| embedding_model = SentenceTransformer(model_name) | |
| print('Load sentence embedding model done') | |
| return embedding_model | |
| def set_index(vector): | |
| if torch.cuda.is_available(): | |
| res = faiss.StandardGpuResources() | |
| index = faiss.IndexFlatL2(vector.shape[1]) | |
| gpu_index_flat = faiss.index_cpu_to_gpu(res, 0, index) | |
| gpu_index_flat.add(vector) | |
| index = gpu_index_flat | |
| else: | |
| index = faiss.IndexFlatL2(vector.shape[1]) | |
| index.add(vector) | |
| return index | |
| def get_embeddings(embedding_model, text_list): | |
| return embedding_model.encode(text_list) | |
| def prepare_sentences_vector(encoded_list): | |
| encoded_list = [i.reshape(1, -1) for i in encoded_list] | |
| encoded_list = np.vstack(encoded_list).astype('float32') | |
| encoded_list = normalize(encoded_list) | |
| return encoded_list | |
| def store_embeddings(df, embeddings): | |
| with open('embeddings.pkl', "wb") as fOut: | |
| pickle.dump({'sentences': df['Question'], 'embeddings': embeddings}, fOut, protocol=pickle.HIGHEST_PROTOCOL) | |
| print('Store embeddings done') | |
| def load_embeddings(file_path=EMBEDDINGS_PATH): | |
| with open(file_path, "rb") as fIn: | |
| stored_data = pickle.load(fIn) | |
| stored_sentences = stored_data['sentences'] | |
| stored_embeddings = stored_data['embeddings'] | |
| print('Load (questions) embeddings done') | |
| return stored_embeddings | |
| def model_pipeline(model, tokenizer, question, similar_context): | |
| inputs = tokenizer(question, similar_context, return_tensors="pt") | |
| with torch.no_grad(): | |
| outputs = model(**inputs) | |
| answer_start_index = outputs.start_logits.argmax() | |
| answer_end_index = outputs.end_logits.argmax() | |
| predict_answer_tokens = inputs.input_ids[0, answer_start_index: answer_end_index + 1] | |
| Answer = tokenizer.decode(predict_answer_tokens) | |
| return Answer.replace('<unk>','@') | |
| def faiss_search(index, question_vector, k=1): | |
| distances, indices = index.search(question_vector, k) | |
| return distances,indices | |
| def create_segment_index(vector): | |
| segment_index = faiss.IndexFlatL2(vector.shape[1]) | |
| segment_index.add(vector) | |
| return segment_index | |
| def predict_faiss(model, tokenizer, embedding_model, df, question, index): | |
| t = time.time() | |
| question = question.strip() | |
| question_vector = get_embeddings(embedding_model, question) | |
| question_vector = prepare_sentences_vector([question_vector]) | |
| distances,indices = faiss_search(index, question_vector) | |
| Answers = [df['Answer'][i] for i in indices[0]] | |
| _time = time.time() - t | |
| output = { | |
| "user_question": question, | |
| "answer": Answers[0], | |
| "totaltime": round(_time, 3), | |
| "score": round(distances[0][0], 4) | |
| } | |
| return output | |
| def predict(model, tokenizer, embedding_model, df, question, index): # sent_tokenize pythainlp | |
| t = time.time() | |
| question = question.strip() | |
| question_vector = get_embeddings(embedding_model, question) | |
| question_vector = prepare_sentences_vector([question_vector]) | |
| distances,indices = faiss_search(index, question_vector) | |
| mostSimContext = df['Context'][indices[0][0]] | |
| pattern = r'(?<=\s{10}).*' | |
| matches = re.search(pattern, mostSimContext, flags=re.DOTALL) | |
| if matches: | |
| mostSimContext = matches.group(0) | |
| mostSimContext = mostSimContext.strip() | |
| mostSimContext = re.sub(r'\s+', ' ', mostSimContext) | |
| segments = sent_tokenize(mostSimContext, engine="crfcut") | |
| segment_embeddings = get_embeddings(embedding_model, segments) | |
| segment_embeddings = prepare_sentences_vector(segment_embeddings) | |
| segment_index = create_segment_index(segment_embeddings) | |
| _distances,_indices = faiss_search(segment_index, question_vector) | |
| mostSimSegment = segments[_indices[0][0]] | |
| Answer = model_pipeline(model, tokenizer,question,mostSimSegment) | |
| if len(Answer) <= 2: | |
| Answer = mostSimSegment | |
| # Find the start and end indices of mostSimSegment within mostSimContext | |
| start_index = mostSimContext.find(Answer) | |
| end_index = start_index + len(Answer) | |
| print(f"answer {len(Answer)} => {Answer} || startIndex =>{start_index} || endIndex =>{end_index}") | |
| print(f"mostSimContext{len(mostSimContext)}=>{mostSimContext}\nsegments{len(segments)}=>{segments}\nmostSimSegment{len(mostSimSegment)}=>{mostSimSegment}") | |
| _time = time.time() - t | |
| output = { | |
| "user_question": question, | |
| "answer": df['Answer'][indices[0][0]], | |
| "totaltime": round(_time, 3), | |
| "distance": round(distances[0][0], 4), | |
| "highlight_start": start_index, | |
| "highlight_end": end_index | |
| } | |
| return output | |
| def highlight_text(text, start_index, end_index): | |
| if start_index < 0: | |
| start_index = 0 | |
| if end_index > len(text): | |
| end_index = len(text) | |
| highlighted_text = "" | |
| for i, char in enumerate(text): | |
| if i == start_index: | |
| highlighted_text += "<mark>" | |
| highlighted_text += char | |
| if i == end_index - 1: | |
| highlighted_text += "</mark>" | |
| return highlighted_text | |
| def chat_interface(question, history): | |
| response = predict(model, tokenizer, embedding_model, df, question, index) | |
| highlighted_answer = highlight_text(response["answer"], response["highlight_start"], response["highlight_end"]) | |
| return highlighted_answer | |
| examples=[ | |
| 'ขอเลขที่บัญชีของบริษัทหน่อย', | |
| 'บริษัทตั้งอยู่ที่ถนนอะไร', | |
| 'ขอช่องทางติดตามข่าวสารทาง Line หน่อย', | |
| 'อยากทราบความถี่ในการดึงข้อมูลของ DXT360 ในแต่ละแพลตฟอร์ม', | |
| 'อยากทราบความถี่ในการดึงข้อมูลของ DXT360 บน Twitter', | |
| # 'ช่องทางติดตามข่าวสารของเรา', | |
| ] | |
| interface = gr.ChatInterface(fn=chat_interface, | |
| examples=examples) | |
| if __name__ == "__main__": | |
| # Load your model, tokenizer, data, and index here... | |
| df = load_data() | |
| model, tokenizer = load_model('wangchanberta-hyp') | |
| embedding_model = load_embedding_model() | |
| index = set_index(prepare_sentences_vector(load_embeddings(EMBEDDINGS_PATH))) | |
| interface.launch() |