| import os |
| import openai |
| import csv |
| import zipfile |
| import pandas as pd |
| from utilities import constants, api_keys, clean_text, prompt_constants |
| from embedding_tools import create_embedding_from_repo |
|
|
| openai.api_key = api_keys.APIKeys().get_key('OPENAI_API_KEY') |
| messages=[] |
|
|
| def extract_zip_to_directory(zip_path, extract_path): |
| """Extracts a zip file to a specified directory.""" |
| with zipfile.ZipFile(zip_path, 'r') as zip_ref: |
| zip_ref.extractall(extract_path) |
|
|
| def find_code_files(directory, extensions): |
| """Recursively finds files with specified extensions in a directory.""" |
| file_paths = [] |
| for root, dirs, files in os.walk(directory): |
| for file in files: |
| if any(file.endswith(ext) for ext in extensions): |
| full_path = os.path.join(root, file) |
| file_paths.append(full_path) |
| return file_paths |
|
|
| def read_file(file_path): |
| """Attempts to read a file and return its content; skips binary files.""" |
| try: |
| with open(file_path, 'r', encoding='utf-8') as file: |
| return file.read() |
| except Exception as e: |
| print(f"Skipped {file_path}: {e}") |
| return None |
|
|
|
|
| def CreateCSV(text_chunks): |
| with open(constants.GIT_CSV_PATH, 'w', newline='') as csvfile: |
| csv_writer = csv.writer(csvfile) |
| |
| i=0 |
| for chunk in text_chunks: |
| print(str(i) + ": " + chunk) |
| i+=1 |
| csv_writer.writerow([chunk]) |
| print(constants.GIT_CSV_PATH + " saved") |
|
|
| |
| def create_chunks(transcript, length): |
| """Breaks transcript into chunks based on specified length and whitespace/punctuation.""" |
| total_length = len(transcript) |
| print("total_length: ", total_length, " legnth: " , str(length)) |
| segment_length = length |
| segment_indices = [i for i in range(segment_length - 1, total_length, segment_length)] |
| text_chunks = [] |
| start_idx = 0 |
| for end_idx in segment_indices: |
| while end_idx > start_idx and transcript[end_idx]: |
| end_idx -= 1 |
| if end_idx > start_idx: |
| text_chunks.append(transcript[start_idx:end_idx]) |
| start_idx = end_idx + 1 |
| if start_idx < total_length: |
| text_chunks.append(transcript[start_idx:]) |
| return text_chunks |
|
|
|
|
| def CreateEmbeddings(zip_input_path, git_txt_output_path): |
| ''' if os.path.exists(constants.GIT_PKL_PATH): |
| df = pd.read_pickle(constants.GIT_PKL_PATH) |
| create_embedding_from_repo.CreateEmbeddingsFlat(constants.GIT_CSV_PATH, constants.GIT_PKL_PATH) |
| return df |
| ''' |
| extract_path = "extracted_repo" |
| |
| file_paths = find_code_files(extract_path, ('.pdf')) |
| text_chunks = [] |
| for file_path in file_paths: |
| content = read_file(file_path) |
| if content: |
| text_chunks.append=(create_chunks(content, constants.EMBEDDING_CHUNK_LENGTH)) |
| CreateCSV(text_chunks) |
| print("\n**done") |
| create_embedding_from_repo.CreateEmbeddingsFlat(constants.GIT_CSV_PATH, constants.GIT_PKL_PATH) |
| df = pd.read_pickle(constants.GIT_PKL_PATH) |
| |
| return df |
| |
| def create_chunks_orig(transcript, length): |
| """Breaks transcript into chunks based on specified length and whitespace/punctuation.""" |
| total_length = len(transcript) |
| print("total_length: ", total_length, " legnth: " , str(length)) |
| segment_length = length |
| segment_indices = [i for i in range(segment_length - 1, total_length, segment_length)] |
| text_chunks = [] |
| start_idx = 0 |
| for end_idx in segment_indices: |
| while end_idx > start_idx and transcript[end_idx]: |
| end_idx -= 1 |
| if end_idx > start_idx: |
| text_chunks.append(transcript[start_idx:end_idx]) |
| start_idx = end_idx + 1 |
| if start_idx < total_length: |
| text_chunks.append(transcript[start_idx:]) |
| return text_chunks |
|
|
| def CreateCSV(text_chunks): |
| with open(constants.GIT_CSV_PATH, 'w', newline='') as csvfile: |
| csv_writer = csv.writer(csvfile) |
| |
| i=0 |
| for chunk in text_chunks: |
| print(str(i) + ": " + chunk) |
| i+=1 |
| csv_writer.writerow([chunk]) |
| print(constants.GIT_CSV_PATH + " saved") |
|
|
| def CreateEmbeddingsOrig(zip_input_path, git_txt_output_path): |
| if os.path.exists(constants.GIT_PKL_PATH): |
| df = pd.read_pickle(constants.GIT_PKL_PATH) |
| create_embedding_from_repo.CreateEmbeddingsFlat(constants.GIT_CSV_PATH, constants.GIT_PKL_PATH) |
| return df |
| |
| extract_path = "extracted_repo" |
| extract_zip_to_directory(zip_input_path, extract_path) |
| file_paths = find_code_files(extract_path, ('.py', '.js', '.ts', '.json', '.html', '.css', '.cpp', '.c', '.java', '.yaml', '.sql')) |
| text_chunks = [] |
| for file_path in file_paths: |
| content = read_file(file_path) |
| if content: |
| text_chunks.append=(create_chunks(content, constants.EMBEDDING_CHUNK_LENGTH)) |
| CreateCSV(text_chunks) |
| print("\n**done") |
| create_embedding_from_repo.CreateEmbeddingsFlat(constants.GIT_CSV_PATH, constants.GIT_PKL_PATH) |
| df = pd.read_pickle(constants.GIT_PKL_PATH) |
| |
| return df |
|
|
|
|
| def Completion(messages): |
| response = openai.ChatCompletion.create( |
| model="gpt-4", |
| messages=messages |
| ) |
| cleaned_text= clean_text.RemoveRole(response["choices"][0]["message"]["content"]) |
| return cleaned_text |
|
|
| def QueryEmbeddingsSimple(query): |
| global messages |
| best_answer= create_embedding_from_repo.QueryEmbeddingsFlat(query) |
| prompt= prompt_constants.GIT_EXPERT_PROMPT |
|
|
| context_text = f"Using this context: {best_answer}" |
| messages.append({"role": "system", "content": context_text}) |
|
|
| prompt_text = f"Using this prompt: {prompt}" |
| messages.append({"role": "system", "content": prompt_text}) |
|
|
| query_text = f"Answer this question: {query}" |
| messages.append({"role": "user", "content": query_text}) |
| |
| system_message = Completion(messages) |
| messages.append({"role": "assistant", "content": system_message}) |
|
|
| print("system_message: ") |
| print(system_message) |
| df = pd.DataFrame(messages) |
| return system_message, df |