| import os |
| import openai |
| import string |
| import csv |
| import re |
| import pandas as pd |
| from utilities import constants, api_keys, clean_text, prompt_constants |
| from translators import translate_pdf_to_text |
| from embedding_tools import create_embedding |
| from chat_bot import process_policies |
|
|
| os.system('bash setup.sh') |
|
|
| openai.api_key = api_keys.APIKeys().get_key('OPENAI_API_KEY') |
| messages=[] |
|
|
| def flatten_json(json_obj, prefix=''): |
| items = [] |
|
|
| def flatten(json_fragment, current_prefix): |
| if isinstance(json_fragment, dict): |
| for key, value in json_fragment.items(): |
| new_key = f'{current_prefix}{key}_' if current_prefix else key |
| if isinstance(value, (dict, list)): |
| flatten(value, new_key) |
| else: |
| |
| cleaned_value = str(value).replace('\n', ' ') |
| items.append((new_key, cleaned_value)) |
| elif isinstance(json_fragment, list): |
| for index, item in enumerate(json_fragment): |
| new_key = f'{current_prefix}{index}_' |
| if isinstance(item, (dict, list)): |
| flatten(item, new_key) |
| else: |
| |
| cleaned_value = str(item).replace('\n', ' ') |
| items.append((new_key, cleaned_value)) |
|
|
| flatten(json_obj, prefix) |
| return items |
|
|
| def make_readable_key(key): |
| key = key.replace('_', ' ') |
| key = re.sub(r'\b0_', '(1) ', key) |
| key = re.sub(r'\b1_', '(2) ', key) |
| key = re.sub(r'\b2_', '(3) ', key) |
| key = ' '.join([word.capitalize() for word in key.split()]) |
| key = key.replace('Is', 'is').replace('Has a', 'has a') |
| return key |
|
|
| def process_list(items): |
| readable_items = [] |
| for item in items: |
| key, value = item.split(': ', 1) |
| readable_key = make_readable_key(key) |
| readable_value = value.replace('\n', ' ') |
| readable_items.append(f'{readable_key}: {readable_value}') |
| return readable_items |
|
|
| def convert_flattened_to_readable(flattened_items): |
| readable_items = [] |
| for key, value in flattened_items: |
| readable_key = make_readable_key(key) |
| readable_items.append(f'{readable_key}: {value}') |
| return readable_items |
|
|
| def convert_json_to_single_column_csv(json_obj): |
| flattened_items = flatten_json(json_obj) |
| readable_items = convert_flattened_to_readable(flattened_items) |
| flat_text = '\n'.join(readable_items) |
| return flat_text |
|
|
|
|
| def flatten_json_to_single_column(json_obj, prefix=''): |
| print("in flatten_json_to_single_column, json_obj:", json_obj, "prefix:", prefix) |
|
|
| items = [] |
| if isinstance(json_obj, dict): |
| for key, value in json_obj.items(): |
| if isinstance(value, dict): |
| items.extend(flatten_json_to_single_column(value, f'{prefix}{key}_')) |
| elif isinstance(value, list): |
| for index, item in enumerate(value): |
| items.extend(flatten_json_to_single_column(item, f'{prefix}{key}_{index}_')) |
| else: |
| items.append(f'{prefix}{key}: {value}') |
| elif isinstance(json_obj, list): |
| for index, item in enumerate(json_obj): |
| if isinstance(item, dict): |
| items.extend(flatten_json_to_single_column(item, f'{prefix}{index}_')) |
| else: |
| items.append(f'{prefix}{index}: {item}') |
| else: |
| items.append(f'{prefix[:-1]}: {json_obj}') |
| return items |
|
|
| def convert_json_to_single_column_csv_old(json_obj): |
| flat_data = flatten_json_to_single_column(json_obj) |
| flat_text = '\n'.join(flat_data) |
| return flat_text |
|
|
| def write_flat_text_to_csv(flat_text, csv_path): |
| |
| with open(csv_path, 'w', newline='') as csvfile: |
| csv_writer = csv.writer(csvfile) |
| |
| |
| for line in flat_text: |
| csv_writer.writerow([line]) |
|
|
|
|
| |
| def CreateEmbeddings(policy_input, policy_output): |
| content=process_policies.convert_pdf_to_text_ocr(policy_input, constants.POLICY_TXT_PATH) |
| print("**1. content after convert_pdf_to_text_ocr:", content) |
| df=None |
| if content: |
| schema=process_policies.create_schema(content) |
| print ("**2. schema:", schema) |
| flat_txt = flatten_json_to_single_column(schema) |
| readable= process_list(flat_txt) |
| print ("**3. human readable:", readable) |
| for item in readable: |
| print(item) |
| |
| print ("**4. flatten to csv and write:", readable) |
| write_flat_text_to_csv(readable,policy_output) |
| print ("**5. create embeddings & write pkl:") |
| df=create_embedding.CreateEmbeddingsFlatPolicy(policy_output, constants.POLICY_PKL_PATH) |
| print("return:", df) |
| return df |
|
|
| def CreateEmbeddingsOriginal(input_path, output_path): |
| print("in createembeddingsoriginal, input_path:", input_path, "output_path:", output_path) |
| if os.path.exists(constants.PKL_PATH): |
| df = pd.read_pickle(constants.PKL_PATH) |
| create_embedding.CreateEmbeddingsFlat(constants.CSV_PATH, constants.PKL_PATH) |
| return df |
| |
| transcript= translate_pdf_to_text.TranslatePdfToText(input_path, output_path) |
| text_chunks=CreateChunks(transcript, constants.EMBEDDING_CHUNK_LENGTH) |
| CreateCSV(text_chunks) |
| create_embedding.CreateEmbeddingsFlat(constants.CSV_PATH, constants.PKL_PATH) |
| df = pd.read_pickle(constants.PKL_PATH) |
| return df |
|
|
|
|
| def CreateCSV(text_chunks): |
| with open(constants.CSV_PATH, 'w', newline='') as csvfile: |
| csv_writer = csv.writer(csvfile) |
| |
| i=0 |
| for chunk in text_chunks: |
| print(str(i) + ": " + chunk) |
| i+=1 |
| csv_writer.writerow([chunk]) |
| print(constants.CSV_PATH + " saved") |
|
|
| def CreateChunks(transcript, length): |
| |
| total_length = len(transcript) |
| segment_length = length |
| segment_indices = [i for i in range(segment_length - 1, total_length, segment_length)] |
| text_chunks = [] |
| start_idx = 0 |
| for end_idx in segment_indices: |
| |
| while end_idx > start_idx and transcript[end_idx] not in string.whitespace + string.punctuation: |
| end_idx -= 1 |
| if end_idx > start_idx: |
| text_chunks.append(transcript[start_idx:end_idx]) |
| start_idx = end_idx + 1 |
| if start_idx < total_length: |
| text_chunks.append(transcript[start_idx:]) |
| return text_chunks |
|
|
| def Completion(messages): |
| response = openai.ChatCompletion.create( |
| model="gpt-3.5-turbo", |
| messages=messages |
| ) |
| cleaned_text= clean_text.RemoveRole(response["choices"][0]["message"]["content"]) |
| return cleaned_text |
|
|
| def QueryEmbeddingsSimple(query): |
| print ("in QueryEmbeddingsSimple") |
| global messages |
| best_answer= create_embedding.QueryEmbeddingsFlat(query) |
| prompt= prompt_constants.USAA_AR_EXPERT_PROMPT |
|
|
| context_text = f"Using this context: {best_answer}" |
| messages.append({"role": "system", "content": context_text}) |
|
|
| prompt_text = f"Using this prompt: {prompt}" |
| messages.append({"role": "system", "content": prompt_text}) |
|
|
| query_text = f"Answer this question: {query}" |
| messages.append({"role": "user", "content": query_text}) |
| |
| system_message = Completion(messages) |
| messages.append({"role": "assistant", "content": system_message}) |
|
|
| print("system_message: ") |
| print(system_message) |
| df = pd.DataFrame(messages) |
| return system_message, df |