| import os
|
| import tkinter as tk
|
| from tkinter import filedialog, messagebox
|
| import PyPDF2
|
| import re
|
| import json
|
| import torch
|
| import ollama
|
| from openai import OpenAI
|
| import argparse
|
|
|
|
|
| PINK = '\033[95m'
|
| CYAN = '\033[96m'
|
| YELLOW = '\033[93m'
|
| NEON_GREEN = '\033[92m'
|
| RESET_COLOR = '\033[0m'
|
|
|
|
|
| def open_file(filepath):
|
| with open(filepath, 'r', encoding='utf-8') as infile:
|
| return infile.read()
|
|
|
|
|
| def convert_pdf_to_text():
|
| file_path = filedialog.askopenfilename(filetypes=[("PDF Files", "*.pdf")])
|
| if file_path:
|
| base_directory = os.path.join("local-rag", "text_parse")
|
| file_name = os.path.basename(file_path)
|
| output_file_name = os.path.splitext(file_name)[0] + ".txt"
|
| file_output_path = os.path.join(base_directory, output_file_name)
|
|
|
| if not os.path.exists(base_directory):
|
| os.makedirs(base_directory)
|
| print(f"Directory '{base_directory}' created.")
|
|
|
| with open(file_path, 'rb') as pdf_file:
|
| pdf_reader = PyPDF2.PdfReader(pdf_file)
|
| text = ''
|
| for page_num in range(len(pdf_reader.pages)):
|
| page = pdf_reader.pages[page_num]
|
| if page.extract_text():
|
| text += page.extract_text() + " "
|
|
|
| text = re.sub(r'\s+', ' ', text).strip()
|
| sentences = re.split(r'(?<=[.!?]) +', text)
|
| chunks = []
|
| current_chunk = ""
|
| for sentence in sentences:
|
| if len(current_chunk) + len(sentence) + 1 < 1000:
|
| current_chunk += (sentence + " ").strip()
|
| else:
|
| chunks.append(current_chunk)
|
| current_chunk = sentence + " "
|
| if current_chunk:
|
| chunks.append(current_chunk)
|
|
|
| with open(os.path.join("local-rag", "temp.txt"), "w", encoding="utf-8") as temp_file:
|
| temp_file.write(output_file_name + "\n")
|
| for chunk in chunks:
|
| temp_file.write(chunk.strip() + "\n")
|
|
|
| with open(os.path.join("local-rag", "vault.txt"), "a", encoding="utf-8") as vault_file:
|
| vault_file.write("\n")
|
| for chunk in chunks:
|
| vault_file.write(chunk.strip() + "\n")
|
|
|
| if not os.path.exists(file_output_path):
|
| with open(file_output_path, "w", encoding="utf-8") as f:
|
| for chunk in chunks:
|
| f.write(chunk.strip() + "\n")
|
| f.write("====================NOT FINISHED====================\n")
|
| print(f"File '{file_output_path}' created with NOT FINISHED flag at the end.")
|
| else:
|
| print(f"File '{file_output_path}' already exists.")
|
|
|
| print(f"PDF content appended to vault.txt with each chunk on a separate line.")
|
|
|
|
|
| input_value = input("Enter your question:")
|
| process_text_files(input_value)
|
|
|
|
|
| def upload_txtfile():
|
| file_path = filedialog.askopenfilename(filetypes=[("Text Files", "*.txt")])
|
| if file_path:
|
|
|
| base_directory = os.path.join("local-rag", "text_parse")
|
|
|
|
|
| file_name = os.path.basename(file_path)
|
| output_file_name = os.path.splitext(file_name)[0] + ".txt"
|
|
|
|
|
|
|
| file_output_path = os.path.join(base_directory, output_file_name)
|
|
|
|
|
| if not os.path.exists(base_directory):
|
| os.makedirs(base_directory)
|
| print(f"Directory '{base_directory}' created.")
|
|
|
|
|
| with open(file_path, 'r', encoding="utf-8") as txt_file:
|
| text = txt_file.read()
|
|
|
|
|
| text = re.sub(r'\s+', ' ', text).strip()
|
|
|
|
|
| sentences = re.split(r'(?<=[.!?]) +', text)
|
| chunks = []
|
| current_chunk = ""
|
| for sentence in sentences:
|
|
|
| if len(current_chunk) + len(sentence) + 1 < 1000:
|
| current_chunk += (sentence + " ").strip()
|
| else:
|
|
|
| chunks.append(current_chunk)
|
| current_chunk = sentence + " "
|
| if current_chunk:
|
| chunks.append(current_chunk)
|
|
|
|
|
| with open(os.path.join("local-rag", "temp.txt"), "w", encoding="utf-8") as temp_file:
|
| temp_file.write(output_file_name + "\n")
|
| for chunk in chunks:
|
|
|
| temp_file.write(chunk.strip() + "\n")
|
|
|
| with open(os.path.join("local-rag", "vault.txt"), "a", encoding="utf-8") as vault_file:
|
| vault_file.write("\n")
|
| for chunk in chunks:
|
|
|
| vault_file.write(chunk.strip() + "\n")
|
|
|
|
|
| if not os.path.exists(file_output_path):
|
| with open(file_output_path, "w") as f:
|
| f.write("")
|
| f.write("====================NOT FINISHED====================\n")
|
| print(f"File '{file_output_path}' created with NOT FINISHED flag at the end.")
|
| else:
|
| print(f"File '{file_output_path}' already exists.")
|
|
|
| print(f"Text file content appended to vault.txt with each chunk on a separate line.")
|
|
|
| input_value = input("Enter your question:")
|
| process_text_files(input_value)
|
| else:
|
| print("No file selected.")
|
|
|
|
|
| def upload_jsonfile():
|
| file_path = filedialog.askopenfilename(filetypes=[("JSON Files", "*.json")])
|
| if file_path:
|
|
|
|
|
| base_directory = os.path.join("local-rag", "text_parse")
|
|
|
|
|
| file_name = os.path.basename(file_path)
|
| output_file_name = os.path.splitext(file_name)[0] + ".txt"
|
|
|
|
|
|
|
| file_output_path = os.path.join(base_directory, output_file_name)
|
|
|
|
|
| if not os.path.exists(base_directory):
|
| os.makedirs(base_directory)
|
| print(f"Directory '{base_directory}' created.")
|
|
|
|
|
|
|
|
|
| with open(file_path, 'r', encoding="utf-8") as json_file:
|
| data = json.load(json_file)
|
|
|
|
|
| text = json.dumps(data, ensure_ascii=False)
|
|
|
|
|
| text = re.sub(r'\s+', ' ', text).strip()
|
|
|
|
|
| sentences = re.split(r'(?<=[.!?]) +', text)
|
| chunks = []
|
| current_chunk = ""
|
| for sentence in sentences:
|
|
|
| if len(current_chunk) + len(sentence) + 1 < 1000:
|
| current_chunk += (sentence + " ").strip()
|
| else:
|
|
|
| chunks.append(current_chunk)
|
| current_chunk = sentence + " "
|
| if current_chunk:
|
| chunks.append(current_chunk)
|
|
|
|
|
| with open(os.path.join("local-rag", "temp.txt"), "w", encoding="utf-8") as temp_file:
|
| temp_file.write(output_file_name + "\n")
|
| for chunk in chunks:
|
|
|
| temp_file.write(chunk.strip() + "\n")
|
|
|
| with open(os.path.join("local-rag", "vault.txt"), "a", encoding="utf-8") as vault_file:
|
| vault_file.write("\n")
|
| for chunk in chunks:
|
|
|
| vault_file.write(chunk.strip() + "\n")
|
|
|
| if not os.path.exists(file_output_path):
|
| with open(file_output_path, "w", encoding="utf-8") as f:
|
| for chunk in chunks:
|
| f.write(chunk.strip() + "\n")
|
| f.write("====================NOT FINISHED====================\n")
|
| print(f"File '{file_output_path}' created with NOT FINISHED flag at the end.")
|
| else:
|
| print(f"File '{file_output_path}' already exists.")
|
|
|
|
|
|
|
| print(f"JSON file content appended to vault.txt with each chunk on a separate line.")
|
|
|
| input_value = input("Enter your question:")
|
| process_text_files(input_value)
|
|
|
| def summarize():
|
| summary_window = tk.Toplevel(root)
|
| summary_window.title("Text Summarizer")
|
| summary_window.geometry("400x200")
|
|
|
|
|
| label = tk.Label(summary_window, text="Choose an option to summarize text:")
|
| label.pack(pady=10)
|
|
|
|
|
| upload_button = tk.Button(summary_window, text="Upload from .txt File", command=summarize_from_file)
|
| upload_button.pack(pady=5)
|
|
|
| paste_button = tk.Button(summary_window, text="Paste your text", command=lambda: open_paste_window(summary_window))
|
| paste_button.pack(pady=5)
|
|
|
|
|
| def summarize_from_file():
|
| file_path = filedialog.askopenfilename(filetypes=[("Text Files", "*.txt")])
|
| if file_path:
|
|
|
| base_directory = os.path.join("local-rag", "text_sum")
|
|
|
| file_name = os.path.basename(file_path)
|
|
|
|
|
| if not os.path.exists(base_directory):
|
| os.makedirs(base_directory)
|
| print(f"Directory '{base_directory}' created.")
|
|
|
| summary_content = []
|
| if os.path.exists(file_name):
|
| with open(file_name, "r", encoding='utf-8') as sum_file:
|
| summary_content = sum_file.readlines()
|
|
|
| summary_embeddings = []
|
| for content in summary_content:
|
| response = ollama.embeddings(model='mxbai-embed-large', prompt=content)
|
| summary_embeddings.append(response["embedding"])
|
|
|
| summary_embeddings_tensor = torch.tensor(summary_embeddings)
|
| print("Embeddings for each line in the vault:")
|
| print(summary_embeddings_tensor)
|
|
|
| conversation_history = []
|
| system_message = "You are a helpful assistant that is an expert at summarizing the text from a given document"
|
| user_input = "Summarize this paragraph"
|
|
|
| response = ollama_chat(user_input, system_message, summary_embeddings_tensor, summary_content, args.model, conversation_history)
|
|
|
| messagebox.showinfo("Summary", response)
|
| else:
|
| messagebox.showerror("Error", "No file selected!")
|
|
|
|
|
| def open_paste_window(parent_window):
|
|
|
| paste_window = tk.Toplevel(parent_window)
|
| paste_window.title("Paste Your Text")
|
| paste_window.geometry("400x300")
|
|
|
|
|
| label = tk.Label(paste_window, text="Paste your text below:")
|
| label.pack(pady=5)
|
|
|
| input_textbox = tk.Text(paste_window, height=8, width=40)
|
| input_textbox.pack(pady=5)
|
|
|
|
|
| def submit_text():
|
| pasted_text = input_textbox.get("1.0", tk.END).strip()
|
| if pasted_text:
|
|
|
| system_message = "You are a helpful assistant that is an expert at summarizing the text from a given document"
|
| user_input = "Summarize this paragraph:"
|
| new_value = user_input + pasted_text
|
| messages = [
|
| {
|
| "system",
|
| system_message,
|
| },
|
| {"human", new_value},
|
| ]
|
| response = client.chat.completions.create(model=args.model, messages=messages)
|
|
|
| response_value = response.choices[0].message.content
|
|
|
|
|
| messagebox.showinfo("Summary", response_value)
|
| paste_window.destroy()
|
| else:
|
| messagebox.showerror("Error", "No text entered!")
|
|
|
|
|
| submit_button = tk.Button(paste_window, text="Submit", command=submit_text)
|
| submit_button.pack(side=tk.LEFT, padx=10, pady=10)
|
|
|
| cancel_button = tk.Button(paste_window, text="Cancel", command=paste_window.destroy)
|
| cancel_button.pack(side=tk.RIGHT, padx=10, pady=10)
|
|
|
|
|
|
|
| def get_relevant_context(rewritten_input, vault_embeddings, vault_content, top_k=3):
|
| if vault_embeddings.nelement() == 0:
|
| return []
|
| input_embedding = ollama.embeddings(model='mxbai-embed-large', prompt=rewritten_input)["embedding"]
|
| cos_scores = torch.cosine_similarity(torch.tensor(input_embedding).unsqueeze(0), vault_embeddings)
|
| top_k = min(top_k, len(cos_scores))
|
| top_indices = torch.topk(cos_scores, k=top_k)[1].tolist()
|
| relevant_context = [vault_content[idx].strip() for idx in top_indices]
|
| return relevant_context
|
|
|
|
|
| def ollama_chat(user_input, system_message, vault_embeddings, vault_content, ollama_model, conversation_history):
|
| relevant_context = get_relevant_context(user_input, vault_embeddings, vault_content, top_k=3)
|
| if relevant_context:
|
| context_str = "\n".join(relevant_context)
|
| print("Context Pulled from Documents: \n\n" + CYAN + context_str + RESET_COLOR)
|
| else:
|
| print(CYAN + "No relevant context found." + RESET_COLOR)
|
|
|
| user_input_with_context = user_input
|
| if relevant_context:
|
| user_input_with_context = context_str + "\n\n" + user_input
|
|
|
| conversation_history.append({"role": "user", "content": user_input_with_context})
|
| messages = [{"role": "system", "content": system_message}, *conversation_history]
|
|
|
| response = client.chat.completions.create(model=ollama_model, messages=messages)
|
| conversation_history.append({"role": "assistant", "content": response.choices[0].message.content})
|
|
|
| return response.choices[0].message.content
|
|
|
|
|
| def process_text_files(user_input):
|
| text_parse_directory = os.path.join("local-rag", "text_parse")
|
| temp_file_path = os.path.join("local-rag", "temp.txt")
|
|
|
| if not os.path.exists(text_parse_directory):
|
| print(f"Directory '{text_parse_directory}' does not exist.")
|
| return False
|
|
|
| if not os.path.exists(temp_file_path):
|
| print("temp.txt does not exist.")
|
| return False
|
|
|
| with open(temp_file_path, 'r', encoding='utf-8') as temp_file:
|
| first_line = temp_file.readline().strip()
|
|
|
| text_files = [f for f in os.listdir(text_parse_directory) if f.endswith('.txt')]
|
|
|
| if f"{first_line}" not in text_files:
|
| print(f"No matching file found for '{first_line}.txt' in text_parse directory.")
|
| return False
|
|
|
| file_path = os.path.join(text_parse_directory, f"{first_line}")
|
| with open(file_path, 'r', encoding='utf-8') as f:
|
| lines = f.readlines()
|
|
|
| lines = [line.strip() for line in lines]
|
|
|
| if len(lines) >= 2 and lines[-1] == "====================NOT FINISHED====================":
|
| print(f"'{first_line}' contains the 'NOT FINISHED' flag. Computing embeddings.")
|
|
|
| vault_content = []
|
| if os.path.exists(temp_file_path):
|
| with open(temp_file_path, "r", encoding='utf-8') as vault_file:
|
| vault_content = vault_file.readlines()
|
|
|
| vault_embeddings = []
|
| for content in vault_content:
|
| response = ollama.embeddings(model='mxbai-embed-large', prompt=content)
|
| vault_embeddings.append(response["embedding"])
|
|
|
| vault_embeddings_tensor = torch.tensor(vault_embeddings)
|
| print("Embeddings for each line in the vault:")
|
| print(vault_embeddings_tensor)
|
|
|
| with open(os.path.join(text_parse_directory, f"{first_line}_embedding.pt"), "wb") as tensor_file:
|
| torch.save(vault_embeddings_tensor, tensor_file)
|
|
|
| with open(file_path, 'w', encoding='utf-8') as f:
|
| f.writelines(lines[:-1])
|
|
|
| else:
|
| print(f"'{first_line}' does not contain the 'NOT FINISHED' flag or is already complete. Loading tensor if it exists.")
|
|
|
| tensor_file_path = os.path.join(text_parse_directory, f"{first_line}_embedding.pt")
|
| if os.path.exists(tensor_file_path):
|
| vault_embeddings_tensor = torch.load(tensor_file_path)
|
| print("Loaded Vault Embedding Tensor:")
|
| print(vault_embeddings_tensor)
|
|
|
| vault_content = []
|
| if os.path.exists(temp_file_path):
|
| with open(temp_file_path, "r", encoding='utf-8') as vault_file:
|
| vault_content = vault_file.readlines()
|
|
|
| conversation_history = []
|
| system_message = "You are a helpful assistant that is an expert at extracting the most useful information from a given text"
|
| response = ollama_chat(user_input, system_message, vault_embeddings_tensor, vault_content, args.model, conversation_history)
|
|
|
| print (response)
|
|
|
| return response
|
|
|
|
|
| root = tk.Tk()
|
| root.title("Upload .pdf, .txt, or .json")
|
|
|
|
|
| pdf_button = tk.Button(root, text="Upload PDF", command=convert_pdf_to_text)
|
| pdf_button.pack(pady=15)
|
|
|
|
|
| txt_button = tk.Button(root, text="Upload Text File", command=upload_txtfile)
|
| txt_button.pack(pady=15)
|
|
|
|
|
| json_button = tk.Button(root, text="Upload JSON File", command=upload_jsonfile)
|
| json_button.pack(pady=15)
|
|
|
|
|
| json_button = tk.Button(root, text="Summarize This!", command=summarize)
|
| json_button.pack(pady=15)
|
|
|
|
|
| client = OpenAI(base_url='http://localhost:11434/v1', api_key='llama3')
|
|
|
|
|
| parser = argparse.ArgumentParser(description="Ollama Chat")
|
| parser.add_argument("--model", default="llama3", help="Ollama model to use (default: llama3)")
|
| args = parser.parse_args()
|
|
|
|
|
| root.mainloop()
|
|
|