import os import openai import csv import zipfile import pandas as pd from utilities import constants, api_keys, clean_text, prompt_constants from embedding_tools import create_embedding_from_repo openai.api_key = api_keys.APIKeys().get_key('OPENAI_API_KEY') messages=[] def extract_zip_to_directory(zip_path, extract_path): """Extracts a zip file to a specified directory.""" with zipfile.ZipFile(zip_path, 'r') as zip_ref: zip_ref.extractall(extract_path) def find_code_files(directory, extensions): """Recursively finds files with specified extensions in a directory.""" file_paths = [] for root, dirs, files in os.walk(directory): for file in files: if any(file.endswith(ext) for ext in extensions): full_path = os.path.join(root, file) file_paths.append(full_path) return file_paths def read_file(file_path): """Attempts to read a file and return its content; skips binary files.""" try: with open(file_path, 'r', encoding='utf-8') as file: return file.read() except Exception as e: print(f"Skipped {file_path}: {e}") return None def CreateCSV(text_chunks): with open(constants.GIT_CSV_PATH, 'w', newline='') as csvfile: csv_writer = csv.writer(csvfile) # Iterate through the chunked_text array and write each chunk as a row i=0 for chunk in text_chunks: print(str(i) + ": " + chunk) i+=1 csv_writer.writerow([chunk]) print(constants.GIT_CSV_PATH + " saved") ### def create_chunks(transcript, length): """Breaks transcript into chunks based on specified length and whitespace/punctuation.""" total_length = len(transcript) print("total_length: ", total_length, " legnth: " , str(length)) segment_length = length segment_indices = [i for i in range(segment_length - 1, total_length, segment_length)] text_chunks = [] start_idx = 0 for end_idx in segment_indices: while end_idx > start_idx and transcript[end_idx]: end_idx -= 1 if end_idx > start_idx: text_chunks.append(transcript[start_idx:end_idx]) start_idx = end_idx + 1 if start_idx < total_length: text_chunks.append(transcript[start_idx:]) return text_chunks def CreateEmbeddings(zip_input_path, git_txt_output_path): ''' if os.path.exists(constants.GIT_PKL_PATH): df = pd.read_pickle(constants.GIT_PKL_PATH) create_embedding_from_repo.CreateEmbeddingsFlat(constants.GIT_CSV_PATH, constants.GIT_PKL_PATH) return df ''' extract_path = "extracted_repo" #extract_zip_to_directory(zip_input_path, extract_path) file_paths = find_code_files(extract_path, ('.pdf')) text_chunks = [] for file_path in file_paths: content = read_file(file_path) if content: text_chunks.append=(create_chunks(content, constants.EMBEDDING_CHUNK_LENGTH)) CreateCSV(text_chunks) print("\n**done") create_embedding_from_repo.CreateEmbeddingsFlat(constants.GIT_CSV_PATH, constants.GIT_PKL_PATH) df = pd.read_pickle(constants.GIT_PKL_PATH) #shutil.rmtree(extract_path) # Clean up extracted files return df ### def create_chunks_orig(transcript, length): """Breaks transcript into chunks based on specified length and whitespace/punctuation.""" total_length = len(transcript) print("total_length: ", total_length, " legnth: " , str(length)) segment_length = length segment_indices = [i for i in range(segment_length - 1, total_length, segment_length)] text_chunks = [] start_idx = 0 for end_idx in segment_indices: while end_idx > start_idx and transcript[end_idx]: end_idx -= 1 if end_idx > start_idx: text_chunks.append(transcript[start_idx:end_idx]) start_idx = end_idx + 1 if start_idx < total_length: text_chunks.append(transcript[start_idx:]) return text_chunks def CreateCSV(text_chunks): with open(constants.GIT_CSV_PATH, 'w', newline='') as csvfile: csv_writer = csv.writer(csvfile) # Iterate through the chunked_text array and write each chunk as a row i=0 for chunk in text_chunks: print(str(i) + ": " + chunk) i+=1 csv_writer.writerow([chunk]) print(constants.GIT_CSV_PATH + " saved") def CreateEmbeddingsOrig(zip_input_path, git_txt_output_path): if os.path.exists(constants.GIT_PKL_PATH): df = pd.read_pickle(constants.GIT_PKL_PATH) create_embedding_from_repo.CreateEmbeddingsFlat(constants.GIT_CSV_PATH, constants.GIT_PKL_PATH) return df extract_path = "extracted_repo" extract_zip_to_directory(zip_input_path, extract_path) file_paths = find_code_files(extract_path, ('.py', '.js', '.ts', '.json', '.html', '.css', '.cpp', '.c', '.java', '.yaml', '.sql')) text_chunks = [] for file_path in file_paths: content = read_file(file_path) if content: text_chunks.append=(create_chunks(content, constants.EMBEDDING_CHUNK_LENGTH)) CreateCSV(text_chunks) print("\n**done") create_embedding_from_repo.CreateEmbeddingsFlat(constants.GIT_CSV_PATH, constants.GIT_PKL_PATH) df = pd.read_pickle(constants.GIT_PKL_PATH) #shutil.rmtree(extract_path) # Clean up extracted files return df def Completion(messages): response = openai.ChatCompletion.create( model="gpt-4", messages=messages ) cleaned_text= clean_text.RemoveRole(response["choices"][0]["message"]["content"]) return cleaned_text def QueryEmbeddingsSimple(query): global messages best_answer= create_embedding_from_repo.QueryEmbeddingsFlat(query) prompt= prompt_constants.GIT_EXPERT_PROMPT context_text = f"Using this context: {best_answer}" messages.append({"role": "system", "content": context_text}) prompt_text = f"Using this prompt: {prompt}" messages.append({"role": "system", "content": prompt_text}) query_text = f"Answer this question: {query}" messages.append({"role": "user", "content": query_text}) system_message = Completion(messages) messages.append({"role": "assistant", "content": system_message}) print("system_message: ") print(system_message) df = pd.DataFrame(messages) return system_message, df