ai-kit / chat_bot /simple_chat_git.py
Kim Adams
cleanup
a5d5d8d
import os
import openai
import csv
import zipfile
import pandas as pd
from utilities import constants, api_keys, clean_text, prompt_constants
from embedding_tools import create_embedding_from_repo
openai.api_key = api_keys.APIKeys().get_key('OPENAI_API_KEY')
messages=[]
def extract_zip_to_directory(zip_path, extract_path):
"""Extracts a zip file to a specified directory."""
with zipfile.ZipFile(zip_path, 'r') as zip_ref:
zip_ref.extractall(extract_path)
def find_code_files(directory, extensions):
"""Recursively finds files with specified extensions in a directory."""
file_paths = []
for root, dirs, files in os.walk(directory):
for file in files:
if any(file.endswith(ext) for ext in extensions):
full_path = os.path.join(root, file)
file_paths.append(full_path)
return file_paths
def read_file(file_path):
"""Attempts to read a file and return its content; skips binary files."""
try:
with open(file_path, 'r', encoding='utf-8') as file:
return file.read()
except Exception as e:
print(f"Skipped {file_path}: {e}")
return None
def CreateCSV(text_chunks):
with open(constants.GIT_CSV_PATH, 'w', newline='') as csvfile:
csv_writer = csv.writer(csvfile)
# Iterate through the chunked_text array and write each chunk as a row
i=0
for chunk in text_chunks:
print(str(i) + ": " + chunk)
i+=1
csv_writer.writerow([chunk])
print(constants.GIT_CSV_PATH + " saved")
###
def create_chunks(transcript, length):
"""Breaks transcript into chunks based on specified length and whitespace/punctuation."""
total_length = len(transcript)
print("total_length: ", total_length, " legnth: " , str(length))
segment_length = length
segment_indices = [i for i in range(segment_length - 1, total_length, segment_length)]
text_chunks = []
start_idx = 0
for end_idx in segment_indices:
while end_idx > start_idx and transcript[end_idx]:
end_idx -= 1
if end_idx > start_idx:
text_chunks.append(transcript[start_idx:end_idx])
start_idx = end_idx + 1
if start_idx < total_length:
text_chunks.append(transcript[start_idx:])
return text_chunks
def CreateEmbeddings(zip_input_path, git_txt_output_path):
''' if os.path.exists(constants.GIT_PKL_PATH):
df = pd.read_pickle(constants.GIT_PKL_PATH)
create_embedding_from_repo.CreateEmbeddingsFlat(constants.GIT_CSV_PATH, constants.GIT_PKL_PATH)
return df
'''
extract_path = "extracted_repo"
#extract_zip_to_directory(zip_input_path, extract_path)
file_paths = find_code_files(extract_path, ('.pdf'))
text_chunks = []
for file_path in file_paths:
content = read_file(file_path)
if content:
text_chunks.append=(create_chunks(content, constants.EMBEDDING_CHUNK_LENGTH))
CreateCSV(text_chunks)
print("\n**done")
create_embedding_from_repo.CreateEmbeddingsFlat(constants.GIT_CSV_PATH, constants.GIT_PKL_PATH)
df = pd.read_pickle(constants.GIT_PKL_PATH)
#shutil.rmtree(extract_path) # Clean up extracted files
return df
###
def create_chunks_orig(transcript, length):
"""Breaks transcript into chunks based on specified length and whitespace/punctuation."""
total_length = len(transcript)
print("total_length: ", total_length, " legnth: " , str(length))
segment_length = length
segment_indices = [i for i in range(segment_length - 1, total_length, segment_length)]
text_chunks = []
start_idx = 0
for end_idx in segment_indices:
while end_idx > start_idx and transcript[end_idx]:
end_idx -= 1
if end_idx > start_idx:
text_chunks.append(transcript[start_idx:end_idx])
start_idx = end_idx + 1
if start_idx < total_length:
text_chunks.append(transcript[start_idx:])
return text_chunks
def CreateCSV(text_chunks):
with open(constants.GIT_CSV_PATH, 'w', newline='') as csvfile:
csv_writer = csv.writer(csvfile)
# Iterate through the chunked_text array and write each chunk as a row
i=0
for chunk in text_chunks:
print(str(i) + ": " + chunk)
i+=1
csv_writer.writerow([chunk])
print(constants.GIT_CSV_PATH + " saved")
def CreateEmbeddingsOrig(zip_input_path, git_txt_output_path):
if os.path.exists(constants.GIT_PKL_PATH):
df = pd.read_pickle(constants.GIT_PKL_PATH)
create_embedding_from_repo.CreateEmbeddingsFlat(constants.GIT_CSV_PATH, constants.GIT_PKL_PATH)
return df
extract_path = "extracted_repo"
extract_zip_to_directory(zip_input_path, extract_path)
file_paths = find_code_files(extract_path, ('.py', '.js', '.ts', '.json', '.html', '.css', '.cpp', '.c', '.java', '.yaml', '.sql'))
text_chunks = []
for file_path in file_paths:
content = read_file(file_path)
if content:
text_chunks.append=(create_chunks(content, constants.EMBEDDING_CHUNK_LENGTH))
CreateCSV(text_chunks)
print("\n**done")
create_embedding_from_repo.CreateEmbeddingsFlat(constants.GIT_CSV_PATH, constants.GIT_PKL_PATH)
df = pd.read_pickle(constants.GIT_PKL_PATH)
#shutil.rmtree(extract_path) # Clean up extracted files
return df
def Completion(messages):
response = openai.ChatCompletion.create(
model="gpt-4",
messages=messages
)
cleaned_text= clean_text.RemoveRole(response["choices"][0]["message"]["content"])
return cleaned_text
def QueryEmbeddingsSimple(query):
global messages
best_answer= create_embedding_from_repo.QueryEmbeddingsFlat(query)
prompt= prompt_constants.GIT_EXPERT_PROMPT
context_text = f"Using this context: {best_answer}"
messages.append({"role": "system", "content": context_text})
prompt_text = f"Using this prompt: {prompt}"
messages.append({"role": "system", "content": prompt_text})
query_text = f"Answer this question: {query}"
messages.append({"role": "user", "content": query_text})
system_message = Completion(messages)
messages.append({"role": "assistant", "content": system_message})
print("system_message: ")
print(system_message)
df = pd.DataFrame(messages)
return system_message, df