| --- |
| license: mit |
| language: |
| - en |
| base_model: |
| - meta-llama/Llama-3.1-8B |
| pipeline_tag: reinforcement-learning |
| library_name: adapter-transformers |
| --- |
| |
| import os |
| import tkinter as tk |
| from tkinter import filedialog, messagebox |
| import PyPDF2 |
| import re |
| import json |
| import torch |
| import ollama |
| from openai import OpenAI |
| import argparse |
|
|
| # ANSI escape codes for colors |
| PINK = '\033[95m' |
| CYAN = '\033[96m' |
| YELLOW = '\033[93m' |
| NEON_GREEN = '\033[92m' |
| RESET_COLOR = '\033[0m' |
|
|
| # Function to open a file and return its contents as a string |
| def open_file(filepath): |
| with open(filepath, 'r', encoding='utf-8') as infile: |
| return infile.read() |
| |
| # Function to convert PDF to text and append to vault.txt |
| def convert_pdf_to_text(): |
| file_path = filedialog.askopenfilename(filetypes=[("PDF Files", "*.pdf")]) |
| if file_path: |
| base_directory = os.path.join("local-rag", "text_parse") |
| file_name = os.path.basename(file_path) |
| output_file_name = os.path.splitext(file_name)[0] + ".txt" |
| file_output_path = os.path.join(base_directory, output_file_name) |
| |
| if not os.path.exists(base_directory): |
| os.makedirs(base_directory) |
| print(f"Directory '{base_directory}' created.") |
| |
| with open(file_path, 'rb') as pdf_file: |
| pdf_reader = PyPDF2.PdfReader(pdf_file) |
| text = '' |
| for page_num in range(len(pdf_reader.pages)): |
| page = pdf_reader.pages[page_num] |
| if page.extract_text(): |
| text += page.extract_text() + " " |
| |
| text = re.sub(r'\s+', ' ', text).strip() |
| sentences = re.split(r'(?<=[.!?]) +', text) |
| chunks = [] |
| current_chunk = "" |
| for sentence in sentences: |
| if len(current_chunk) + len(sentence) + 1 < 1000: |
| current_chunk += (sentence + " ").strip() |
| else: |
| chunks.append(current_chunk) |
| current_chunk = sentence + " " |
| if current_chunk: |
| chunks.append(current_chunk) |
| |
| with open(os.path.join("local-rag", "temp.txt"), "w", encoding="utf-8") as temp_file: |
| temp_file.write(output_file_name + "\n") |
| for chunk in chunks: |
| temp_file.write(chunk.strip() + "\n") |
| |
| with open(os.path.join("local-rag", "vault.txt"), "a", encoding="utf-8") as vault_file: |
| vault_file.write("\n") |
| for chunk in chunks: |
| vault_file.write(chunk.strip() + "\n") |
| |
| if not os.path.exists(file_output_path): |
| with open(file_output_path, "w", encoding="utf-8") as f: |
| for chunk in chunks: |
| f.write(chunk.strip() + "\n") |
| f.write("====================NOT FINISHED====================\n") |
| print(f"File '{file_output_path}' created with NOT FINISHED flag at the end.") |
| else: |
| print(f"File '{file_output_path}' already exists.") |
| |
| print(f"PDF content appended to vault.txt with each chunk on a separate line.") |
| # Call the second part after the PDF conversion is done |
| |
| input_value = input("Enter your question:") |
| process_text_files(input_value) |
| |
| # Function to upload a text file and append to vault.txt |
| def upload_txtfile(): |
| file_path = filedialog.askopenfilename(filetypes=[("Text Files", "*.txt")]) |
| if file_path: |
| # Define the base directory |
| base_directory = os.path.join("local-rag", "text_parse") |
| |
| # Get the file name without the directory and extension |
| file_name = os.path.basename(file_path) |
| output_file_name = os.path.splitext(file_name)[0] + ".txt" # Convert PDF filename to .txt |
| |
| |
| # Construct the output file path in the base directory |
| file_output_path = os.path.join(base_directory, output_file_name) |
| |
| # Create base directory if it doesn't exist |
| if not os.path.exists(base_directory): |
| os.makedirs(base_directory) |
| print(f"Directory '{base_directory}' created.") |
| |
| |
| with open(file_path, 'r', encoding="utf-8") as txt_file: |
| text = txt_file.read() |
| |
| # Normalize whitespace and clean up text |
| text = re.sub(r'\s+', ' ', text).strip() |
| |
| # Split text into chunks by sentences, respecting a maximum chunk size |
| sentences = re.split(r'(?<=[.!?]) +', text) # split on spaces following sentence-ending punctuation |
| chunks = [] |
| current_chunk = "" |
| for sentence in sentences: |
| # Check if the current sentence plus the current chunk exceeds the limit |
| if len(current_chunk) + len(sentence) + 1 < 1000: # +1 for the space |
| current_chunk += (sentence + " ").strip() |
| else: |
| # When the chunk exceeds 1000 characters, store it and start a new one |
| chunks.append(current_chunk) |
| current_chunk = sentence + " " |
| if current_chunk: # Don't forget the last chunk! |
| chunks.append(current_chunk) |
| |
| # Clear temp.txt and write the new content |
| with open(os.path.join("local-rag", "temp.txt"), "w", encoding="utf-8") as temp_file: |
| temp_file.write(output_file_name + "\n") # Write the output file name as the first line |
| for chunk in chunks: |
| # Write each chunk to its own line |
| temp_file.write(chunk.strip() + "\n") # Each chunk on a new line |
| |
| with open(os.path.join("local-rag", "vault.txt"), "a", encoding="utf-8") as vault_file: |
| vault_file.write("\n") # Add a new line to separate content |
| for chunk in chunks: |
| # Write each chunk to its own line |
| vault_file.write(chunk.strip() + "\n") # Two newlines to separate chunks |
| |
| # Create the file in the directory if it doesn't exist |
| if not os.path.exists(file_output_path): |
| with open(file_output_path, "w") as f: |
| f.write("") # Create an empty file |
| f.write("====================NOT FINISHED====================\n") |
| print(f"File '{file_output_path}' created with NOT FINISHED flag at the end.") |
| else: |
| print(f"File '{file_output_path}' already exists.") |
| |
| print(f"Text file content appended to vault.txt with each chunk on a separate line.") |
| |
| input_value = input("Enter your question:") |
| process_text_files(input_value) |
| else: |
| print("No file selected.") |
| |
| # Function to upload a JSON file and append to vault.txt |
| def upload_jsonfile(): |
| file_path = filedialog.askopenfilename(filetypes=[("JSON Files", "*.json")]) |
| if file_path: |
| |
| # Define the base directory |
| base_directory = os.path.join("local-rag", "text_parse") |
| |
| # Get the file name without the directory and extension |
| file_name = os.path.basename(file_path) |
| output_file_name = os.path.splitext(file_name)[0] + ".txt" # Convert PDF filename to .txt |
| |
|
|
| # Construct the output file path in the base directory |
| file_output_path = os.path.join(base_directory, output_file_name) |
| |
| # Create base directory if it doesn't exist |
| if not os.path.exists(base_directory): |
| os.makedirs(base_directory) |
| print(f"Directory '{base_directory}' created.") |
| |
| |
| |
| |
| with open(file_path, 'r', encoding="utf-8") as json_file: |
| data = json.load(json_file) |
| |
| # Flatten the JSON data into a single string |
| text = json.dumps(data, ensure_ascii=False) |
| |
| # Normalize whitespace and clean up text |
| text = re.sub(r'\s+', ' ', text).strip() |
| |
| # Split text into chunks by sentences, respecting a maximum chunk size |
| sentences = re.split(r'(?<=[.!?]) +', text) # split on spaces following sentence-ending punctuation |
| chunks = [] |
| current_chunk = "" |
| for sentence in sentences: |
| # Check if the current sentence plus the current chunk exceeds the limit |
| if len(current_chunk) + len(sentence) + 1 < 1000: # +1 for the space |
| current_chunk += (sentence + " ").strip() |
| else: |
| # When the chunk exceeds 1000 characters, store it and start a new one |
| chunks.append(current_chunk) |
| current_chunk = sentence + " " |
| if current_chunk: # Don't forget the last chunk! |
| chunks.append(current_chunk) |
| |
| # Clear temp.txt and write the new content |
| with open(os.path.join("local-rag", "temp.txt"), "w", encoding="utf-8") as temp_file: |
| temp_file.write(output_file_name + "\n") # Write the output file name as the first line |
| for chunk in chunks: |
| # Write each chunk to its own line |
| temp_file.write(chunk.strip() + "\n") # Each chunk on a new line |
| |
| with open(os.path.join("local-rag", "vault.txt"), "a", encoding="utf-8") as vault_file: |
| vault_file.write("\n") # Add a new line to separate content |
| for chunk in chunks: |
| # Write each chunk to its own line |
| vault_file.write(chunk.strip() + "\n") # Two newlines to separate chunks |
| |
| if not os.path.exists(file_output_path): |
| with open(file_output_path, "w", encoding="utf-8") as f: |
| for chunk in chunks: |
| f.write(chunk.strip() + "\n") # Each chunk on a new line |
| f.write("====================NOT FINISHED====================\n") |
| print(f"File '{file_output_path}' created with NOT FINISHED flag at the end.") |
| else: |
| print(f"File '{file_output_path}' already exists.") |
| |
| |
| |
| print(f"JSON file content appended to vault.txt with each chunk on a separate line.") |
| |
| input_value = input("Enter your question:") |
| process_text_files(input_value) |
| |
| def summarize(): |
| summary_window = tk.Toplevel(root) |
| summary_window.title("Text Summarizer") |
| summary_window.geometry("400x200") |
| |
| # Create a label for the window |
| label = tk.Label(summary_window, text="Choose an option to summarize text:") |
| label.pack(pady=10) |
| |
| # Create two buttons: one for uploading a .txt file, and one for pasting text directly |
| upload_button = tk.Button(summary_window, text="Upload from .txt File", command=summarize_from_file) |
| upload_button.pack(pady=5) |
| |
| paste_button = tk.Button(summary_window, text="Paste your text", command=lambda: open_paste_window(summary_window)) |
| paste_button.pack(pady=5) |
| |
| # Function to upload a .txt file and summarize |
| def summarize_from_file(): |
| file_path = filedialog.askopenfilename(filetypes=[("Text Files", "*.txt")]) |
| if file_path: |
| # Define the base directory where the file will be saved |
| base_directory = os.path.join("local-rag", "text_sum") |
| |
| file_name = os.path.basename(file_path) |
| |
| # Create the directory if it doesn't exist |
| if not os.path.exists(base_directory): |
| os.makedirs(base_directory) |
| print(f"Directory '{base_directory}' created.") |
| |
| summary_content = [] |
| if os.path.exists(file_name): |
| with open(file_name, "r", encoding='utf-8') as sum_file: |
| summary_content = sum_file.readlines() |
| |
| summary_embeddings = [] |
| for content in summary_content: |
| response = ollama.embeddings(model='mxbai-embed-large', prompt=content) |
| summary_embeddings.append(response["embedding"]) |
| |
| summary_embeddings_tensor = torch.tensor(summary_embeddings) |
| print("Embeddings for each line in the vault:") |
| print(summary_embeddings_tensor) |
| |
| conversation_history = [] |
| system_message = "You are a helpful assistant that is an expert at summarizing the text from a given document" |
| user_input = "Summarize this paragraph" |
| |
| response = ollama_chat(user_input, system_message, summary_embeddings_tensor, summary_content, args.model, conversation_history) |
| |
| messagebox.showinfo("Summary", response) # Replace with actual summarizing logic |
| else: |
| messagebox.showerror("Error", "No file selected!") |
| |
| # Function to open a window for pasting text and summarizing |
| def open_paste_window(parent_window): |
| # Create a new window for pasting text |
| paste_window = tk.Toplevel(parent_window) |
| paste_window.title("Paste Your Text") |
| paste_window.geometry("400x300") |
| |
| # Create a label and text box for the pasted text |
| label = tk.Label(paste_window, text="Paste your text below:") |
| label.pack(pady=5) |
| |
| input_textbox = tk.Text(paste_window, height=8, width=40) |
| input_textbox.pack(pady=5) |
| |
| # Function to handle the "Submit" button click |
| def submit_text(): |
| pasted_text = input_textbox.get("1.0", tk.END).strip() |
| if pasted_text: |
| |
| system_message = "You are a helpful assistant that is an expert at summarizing the text from a given document" |
| user_input = "Summarize this paragraph:" |
| new_value = user_input + pasted_text |
| messages = [ |
| { |
| "system", |
| system_message, |
| }, |
| {"human", new_value}, |
| ] |
| response = client.chat.completions.create(model=args.model, messages=messages) |
| |
| response_value = response.choices[0].message.content |
| |
|
|
| messagebox.showinfo("Summary", response_value) # Replace with actual summarizing logic |
| paste_window.destroy() # Close the window |
| else: |
| messagebox.showerror("Error", "No text entered!") |
| |
| # Add Submit and Cancel buttons |
| submit_button = tk.Button(paste_window, text="Submit", command=submit_text) |
| submit_button.pack(side=tk.LEFT, padx=10, pady=10) |
| |
| cancel_button = tk.Button(paste_window, text="Cancel", command=paste_window.destroy) |
| cancel_button.pack(side=tk.RIGHT, padx=10, pady=10) |
| |
|
|
| # Function to get relevant context from the vault based on user input |
| def get_relevant_context(rewritten_input, vault_embeddings, vault_content, top_k=3): |
| if vault_embeddings.nelement() == 0: |
| return [] |
| input_embedding = ollama.embeddings(model='mxbai-embed-large', prompt=rewritten_input)["embedding"] |
| cos_scores = torch.cosine_similarity(torch.tensor(input_embedding).unsqueeze(0), vault_embeddings) |
| top_k = min(top_k, len(cos_scores)) |
| top_indices = torch.topk(cos_scores, k=top_k)[1].tolist() |
| relevant_context = [vault_content[idx].strip() for idx in top_indices] |
| return relevant_context |
| |
| # Function to interact with the Ollama model |
| def ollama_chat(user_input, system_message, vault_embeddings, vault_content, ollama_model, conversation_history): |
| relevant_context = get_relevant_context(user_input, vault_embeddings, vault_content, top_k=3) |
| if relevant_context: |
| context_str = "\n".join(relevant_context) |
| print("Context Pulled from Documents: \n\n" + CYAN + context_str + RESET_COLOR) |
| else: |
| print(CYAN + "No relevant context found." + RESET_COLOR) |
| |
| user_input_with_context = user_input |
| if relevant_context: |
| user_input_with_context = context_str + "\n\n" + user_input |
| |
| conversation_history.append({"role": "user", "content": user_input_with_context}) |
| messages = [{"role": "system", "content": system_message}, *conversation_history] |
| |
| response = client.chat.completions.create(model=ollama_model, messages=messages) |
| conversation_history.append({"role": "assistant", "content": response.choices[0].message.content}) |
| |
| return response.choices[0].message.content |
| |
| # Function to process text files, check for NOT FINISHED flag, and compute embeddings |
| def process_text_files(user_input): |
| text_parse_directory = os.path.join("local-rag", "text_parse") |
| temp_file_path = os.path.join("local-rag", "temp.txt") |
| |
| if not os.path.exists(text_parse_directory): |
| print(f"Directory '{text_parse_directory}' does not exist.") |
| return False |
| |
| if not os.path.exists(temp_file_path): |
| print("temp.txt does not exist.") |
| return False |
| |
| with open(temp_file_path, 'r', encoding='utf-8') as temp_file: |
| first_line = temp_file.readline().strip() |
| |
| text_files = [f for f in os.listdir(text_parse_directory) if f.endswith('.txt')] |
| |
| if f"{first_line}" not in text_files: |
| print(f"No matching file found for '{first_line}.txt' in text_parse directory.") |
| return False |
| |
| file_path = os.path.join(text_parse_directory, f"{first_line}") |
| with open(file_path, 'r', encoding='utf-8') as f: |
| lines = f.readlines() |
| |
| lines = [line.strip() for line in lines] |
| |
| if len(lines) >= 2 and lines[-1] == "====================NOT FINISHED====================": |
| print(f"'{first_line}' contains the 'NOT FINISHED' flag. Computing embeddings.") |
| |
| vault_content = [] |
| if os.path.exists(temp_file_path): |
| with open(temp_file_path, "r", encoding='utf-8') as vault_file: |
| vault_content = vault_file.readlines() |
| |
| vault_embeddings = [] |
| for content in vault_content: |
| response = ollama.embeddings(model='mxbai-embed-large', prompt=content) |
| vault_embeddings.append(response["embedding"]) |
| |
| vault_embeddings_tensor = torch.tensor(vault_embeddings) |
| print("Embeddings for each line in the vault:") |
| print(vault_embeddings_tensor) |
| |
| with open(os.path.join(text_parse_directory, f"{first_line}_embedding.pt"), "wb") as tensor_file: |
| torch.save(vault_embeddings_tensor, tensor_file) |
| |
| with open(file_path, 'w', encoding='utf-8') as f: |
| f.writelines(lines[:-1]) |
| |
| else: |
| print(f"'{first_line}' does not contain the 'NOT FINISHED' flag or is already complete. Loading tensor if it exists.") |
| |
| tensor_file_path = os.path.join(text_parse_directory, f"{first_line}_embedding.pt") |
| if os.path.exists(tensor_file_path): |
| vault_embeddings_tensor = torch.load(tensor_file_path) |
| print("Loaded Vault Embedding Tensor:") |
| print(vault_embeddings_tensor) |
| |
| vault_content = [] |
| if os.path.exists(temp_file_path): |
| with open(temp_file_path, "r", encoding='utf-8') as vault_file: |
| vault_content = vault_file.readlines() |
| |
| conversation_history = [] |
| system_message = "You are a helpful assistant that is an expert at extracting the most useful information from a given text" |
| response = ollama_chat(user_input, system_message, vault_embeddings_tensor, vault_content, args.model, conversation_history) |
| |
| print (response) |
| |
| return response |
| |
| # Create the main window |
| root = tk.Tk() |
| root.title("Upload .pdf, .txt, or .json") |
|
|
| # Create a button to open the file dialog for PDF |
| pdf_button = tk.Button(root, text="Upload PDF", command=convert_pdf_to_text) |
| pdf_button.pack(pady=15) |
| |
| # Create a button to open the file dialog for text file |
| txt_button = tk.Button(root, text="Upload Text File", command=upload_txtfile) |
| txt_button.pack(pady=15) |
|
|
| # Create a button to open the file dialog for JSON file |
| json_button = tk.Button(root, text="Upload JSON File", command=upload_jsonfile) |
| json_button.pack(pady=15) |
| |
| # Create a button to open the summerizer |
| json_button = tk.Button(root, text="Summarize This!", command=summarize) |
| json_button.pack(pady=15) |
| |
| # Configuration for the Ollama API client |
| client = OpenAI(base_url='http://localhost:11434/v1', api_key='llama3') |
| |
| # Parse command-line arguments |
| parser = argparse.ArgumentParser(description="Ollama Chat") |
| parser.add_argument("--model", default="llama3", help="Ollama model to use (default: llama3)") |
| args = parser.parse_args() |
| |
| # Run the main event loop |
| root.mainloop() |