Spaces:
Runtime error
Runtime error
| import pandas as pd | |
| import numpy as np | |
| from sklearn.metrics.pairwise import cosine_similarity | |
| from scipy import sparse | |
| import pickle | |
| def scripts_rework(path, character): | |
| """FOR GENERARTIVE MODEL TRAINING!!! | |
| this functions split scripts for question, answer, context, | |
| picks up the character, augments data for generative model training | |
| and saves data in pickle format""" | |
| df = pd.read_csv(path) | |
| # split data for scenes | |
| count = 0 | |
| df["scene_count"] = "" | |
| for index, row in df.iterrows(): | |
| if index == 0: | |
| df.iloc[index]["scene_count"] = count | |
| elif row["person_scene"] == "Scene": | |
| count += 1 | |
| df.iloc[index]["scene_count"] = count | |
| else: | |
| df.iloc[index]["scene_count"] = count | |
| df = df.dropna().reset_index() | |
| # rework scripts to filer by caracter utterances and related context | |
| scripts = pd.DataFrame() | |
| for index, row in df.iterrows(): | |
| if (row["person_scene"] == character) & ( | |
| df.iloc[index - 1]["person_scene"] != "Scene" | |
| ): | |
| context = [] | |
| for i in reversed(range(2, 6)): | |
| if (df.iloc[index - i]["person_scene"] != "Scene") & (index - i >= 0): | |
| context.append(df.iloc[index - i]["dialogue"]) | |
| else: | |
| break | |
| for j in range(len(context)): | |
| new_row = { | |
| "answer": row["dialogue"], | |
| "question": df.iloc[index - 1]["dialogue"], | |
| "context": context[j:], | |
| } | |
| scripts = pd.concat([scripts, pd.DataFrame([new_row])]) | |
| new_row = { | |
| "answer": row["dialogue"], | |
| "question": df.iloc[index - 1]["dialogue"], | |
| "context": [], | |
| } | |
| scripts = pd.concat([scripts, pd.DataFrame([new_row])]) | |
| elif (row["person_scene"] == character) & ( | |
| df.iloc[index - 1]["person_scene"] == "Scene" | |
| ): | |
| context = [] | |
| new_row = {"answer": row["dialogue"], "question": "", "context": context} | |
| scripts = pd.concat([scripts, pd.DataFrame([new_row])]) | |
| # load reworked data to pkl | |
| scripts = scripts[scripts["question"] != ""] | |
| scripts["context"] = scripts["context"].apply(lambda x: "".join(x)) | |
| scripts = scripts.reset_index(drop=True) | |
| scripts.to_pickle("data/scripts_reworked.pkl") | |
| # =================================================== | |
| def scripts_rework_ranking(path, character): | |
| """FOR RAG RETRIEVAL !!!! | |
| this functions split scripts for queation, answer, context, | |
| picks up the cahracter and saves data in pickle format""" | |
| df = pd.read_csv(path) | |
| # split data for scenes | |
| count = 0 | |
| df["scene_count"] = "" | |
| for index, row in df.iterrows(): | |
| if index == 0: | |
| df.iloc[index]["scene_count"] = count | |
| elif row["person_scene"] == "Scene": | |
| count += 1 | |
| df.iloc[index]["scene_count"] = count | |
| else: | |
| df.iloc[index]["scene_count"] = count | |
| df = df.dropna().reset_index() | |
| # rework scripts to filer by caracter utterances and related context | |
| scripts = pd.DataFrame() | |
| for index, row in df.iterrows(): | |
| if (row["person_scene"] == character) & ( | |
| df.iloc[index - 1]["person_scene"] != "Scene" | |
| ): | |
| context = [] | |
| for i in reversed(range(2, 5)): | |
| if (df.iloc[index - i]["person_scene"] != "Scene") & (index - i >= 0): | |
| context.append(df.iloc[index - i]["dialogue"]) | |
| else: | |
| break | |
| new_row = { | |
| "answer": row["dialogue"], | |
| "question": df.iloc[index - 1]["dialogue"], | |
| "context": context, | |
| } | |
| scripts = pd.concat([scripts, pd.DataFrame([new_row])]) | |
| elif (row["person_scene"] == character) & ( | |
| df.iloc[index - 1]["person_scene"] == "Scene" | |
| ): | |
| context = [] | |
| new_row = {"answer": row["dialogue"], "question": "", "context": context} | |
| scripts = pd.concat([scripts, pd.DataFrame([new_row])]) | |
| # load reworked data to pkl | |
| scripts = scripts[scripts["question"] != ""] | |
| scripts = scripts.reset_index(drop=True) | |
| scripts.to_pickle("data/scripts.pkl") | |
| # =================================================== | |
| def encode(texts, model, contexts=None, do_norm=True): | |
| """function to encode texts for cosine similarity search""" | |
| question_vectors = model.encode(texts) | |
| if type(contexts) is list: | |
| context_vectors = model.encode("".join(contexts)) | |
| else: | |
| context_vectors = model.encode(contexts) | |
| return np.concatenate( | |
| [ | |
| np.asarray(context_vectors), | |
| np.asarray(question_vectors), | |
| ], | |
| axis=-1, | |
| ) | |
| def encode_rag(texts, model, contexts=None, do_norm=True): | |
| """function to encode texts for cosine similarity search""" | |
| question_vectors = model.encode(texts) | |
| context_vectors = model.encode("".join(contexts)) | |
| return np.concatenate( | |
| [ | |
| np.asarray(context_vectors), | |
| np.asarray(question_vectors), | |
| ], | |
| axis=-1, | |
| ) | |
| # =================================================== | |
| def encode_df_save(model): | |
| """FOR RAG RETRIEVAL DATABASE | |
| this functions vectorizes reworked scripts and loads them to | |
| pickle file to be used as retrieval base for ranking script""" | |
| scripts_reopened = pd.read_pickle("data/scripts.pkl") | |
| vect_data = [] | |
| for index, row in scripts_reopened.iterrows(): | |
| if type(row["context"]) is list: | |
| vect = encode( | |
| texts=row["question"], | |
| model=model, | |
| contexts="".join(row["context"]), | |
| ) | |
| vect_data.append(vect) | |
| else: | |
| vect = encode( | |
| texts=row["question"], | |
| model=model, | |
| contexts=row["context"], | |
| ) | |
| vect_data.append(vect) | |
| with open("data/scripts_vectors.pkl", "wb") as f: | |
| pickle.dump(vect_data, f) | |
| # =================================================== | |
| def cosine_sim(answer_true_vectros, answer_generated_vectors) -> list: | |
| """FOR MODEL EVALUATION!!!! | |
| returns list of tuples with similarity score""" | |
| data_emb = sparse.csr_matrix(answer_true_vectros) | |
| query_emb = sparse.csr_matrix(answer_generated_vectors) | |
| similarity = cosine_similarity(query_emb, data_emb).flatten() | |
| return similarity[0] | |
| # =================================================== | |
| def cosine_sim_rag(data_vectors, query_vectors) -> list: | |
| """FOR RAG RETRIEVAL RANKS!!! | |
| returns list of tuples with similarity score and | |
| script index in initial dataframe""" | |
| data_emb = sparse.csr_matrix(data_vectors) | |
| query_emb = sparse.csr_matrix(query_vectors) | |
| similarity = cosine_similarity(query_emb, data_emb).flatten() | |
| ind = np.argwhere(similarity) | |
| match = sorted(zip(similarity, ind.tolist()), reverse=True) | |
| return match | |
| # =================================================== | |
| def generate_response( | |
| model, | |
| tokenizer, | |
| question, | |
| context, | |
| top_p, | |
| temperature, | |
| rag_answer="", | |
| ): | |
| combined = ( | |
| "context:" + rag_answer + | |
| "".join(context) + "</s>" + | |
| "question: " + question | |
| ) | |
| input_ids = tokenizer.encode(combined, return_tensors="pt") | |
| sample_output = model.generate( | |
| input_ids, | |
| do_sample=True, | |
| max_length=1000, | |
| top_p=top_p, | |
| temperature=temperature, | |
| repetition_penalty=2.0, | |
| top_k=50, | |
| no_repeat_ngram_size=4, | |
| # early_stopping=True, | |
| # min_length=10, | |
| ) | |
| out = tokenizer.decode(sample_output[0][1:], skip_special_tokens=True) | |
| if "</s>" in out: | |
| out = out[: out.find("</s>")].strip() | |
| return out | |
| # =================================================== | |
| def top_candidates(score_lst_sorted, initial_data, top=1): | |
| """this functions receives results of the cousine similarity ranking and | |
| returns top items' scores and their indices""" | |
| scores = [item[0] for item in score_lst_sorted] | |
| candidates_indexes = [item[1][0] for item in score_lst_sorted] | |
| return scores[0:top], candidates_indexes[0:top] | |