import gradio as gr import httpx from loguru import logger from typing import Optional, List from pydantic import BaseModel from llama_index.core import VectorStoreIndex, SimpleDirectoryReader from bs4 import BeautifulSoup import os import json import boto3 from os import environ as env from reqif.parser import ReqIFParser import shutil import zipfile from dotenv import load_dotenv from typing import Any from pydantic import BaseModel, Field load_dotenv() API_KEY = os.getenv("OPENAI_API_KEY") class Message(BaseModel): role: str content: str class FileContext(BaseModel): processed: bool = False indexed_documents: Any = Field(default=None, exclude=True) index: Any = Field(default=None, exclude=True) def extract_text_data(reqif_data): text_data = [] for spec_object in reqif_data.core_content.req_if_content.spec_objects: for attribute in spec_object.attributes: # Check if the attribute value contains XHTML content if '' in str(attribute.value): # Use BeautifulSoup to parse the XHTML content soup = BeautifulSoup(str(attribute.value), 'html.parser') # Extract and append the text content, removing tags text_data.append(soup.get_text()) else: # If no XHTML content, append the value directly text_data.append(str(attribute.value)) return text_data def reqif_file_processing(root, filename, user_id): reqif_file_path = os.path.join(root, filename) reqif_data = ReqIFParser().parse(reqif_file_path) text_data = extract_text_data(reqif_data) extracted_text = json.dumps('\n'.join(text_data)) # Define the folder path data_folder = 'data' # Check if the folder exists, if not, create it if not os.path.exists(data_folder): os.makedirs(data_folder) # Write the extracted_text to a file in the data folder file_path = os.path.join(data_folder, f'{filename}.txt') # Eventually these will be written to S3 and fetched per individual user with open(file_path, 'w') as file: file.write(extracted_text) def make_completion(history: List[dict], nb_retries: int = 3, delay: int = 30) -> Optional[str]: """ Sends a request to the ChatGPT API to retrieve a response based on a list of previous messages. """ if not history: logger.error("History is empty, cannot make LLM completion.") return "No prior conversation to base the response on." header = { "Content-Type": "application/json", "Authorization": f"Bearer {API_KEY}" } client = httpx.Client(headers=header) # Convert history to the expected format if not already done formatted_messages = [{"role": msg["role"], "content": msg["content"]} for msg in history] counter = 0 keep_loop = True while keep_loop: logger.debug(f"Attempt {counter} for Chat/Completions") try: resp = client.post( url="https://api.openai.com/v1/chat/completions", json={ "model": "gpt-4", "top_p": 1.0, "n": 1, "stream": False, "messages": formatted_messages }, timeout=delay ) if resp.status_code == 200: content = resp.json()["choices"][0]["message"]["content"] logger.debug(f"LLM Response: {content}") return content else: logger.warning(f"API Error: {resp.text}") keep_loop = False except Exception as e: logger.error(f"Exception during API call: {e}") counter += 1 if counter >= nb_retries: keep_loop = False return "Failed to get a response from the LLM." def process_and_index_file(uploaded_file, file_processed_state: FileContext): if uploaded_file is None: return "No file uploaded", None filename = os.path.basename(uploaded_file.name) data_folder = 'data' file_path = os.path.join(data_folder, filename) if uploaded_file is not None and not file_processed_state.processed: filename = os.path.basename(uploaded_file.name) temp_file_path = os.path.join('temp', filename) if not os.path.exists('temp'): os.makedirs('temp') shutil.copyfile(uploaded_file.name, temp_file_path) data_folder = 'data' if filename.endswith('.txt'): if not os.path.exists(data_folder): os.makedirs(data_folder) shutil.move(temp_file_path, os.path.join(data_folder, filename)) file_processed_state.processed = True documents = SimpleDirectoryReader(data_folder).load_data() index = VectorStoreIndex.from_documents(documents) # Updating FileContext file_processed_state.indexed_documents = documents file_processed_state.index = index elif filename.endswith('.reqif'): reqif_data = ReqIFParser().parse(temp_file_path) text_data = extract_text_data(reqif_data) extracted_text = '\n'.join(text_data) with open(os.path.join(data_folder, f"{filename}.txt"), 'w') as file: file.write(extracted_text) shutil.move(temp_file_path, os.path.join(data_folder, filename)) file_processed_state.processed = True documents = SimpleDirectoryReader(data_folder).load_data() index = VectorStoreIndex.from_documents(documents) # Updating FileContext file_processed_state.indexed_documents = documents file_processed_state.index = index elif filename.endswith('.reqifz'): with zipfile.ZipFile(temp_file_path, 'r') as zip_ref: for member in zip_ref.namelist(): # Check if the file ends with .reqif if member.endswith('.reqif'): # Extract only the .reqif file zip_ref.extract(member, data_folder) # Assuming there is only one .reqif file of interest, or you break after the first temp_file_path = os.path.join(data_folder, member) filename = member break if filename.endswith('.reqif') and not file_processed_state.processed: reqif_data = ReqIFParser().parse(temp_file_path) text_data = extract_text_data(reqif_data) extracted_text = '\n'.join(text_data) with open(os.path.join(data_folder, f"{filename}.txt"), 'w') as file: file.write(extracted_text) shutil.move(temp_file_path, os.path.join(data_folder, filename)) file_processed_state.processed = True documents = SimpleDirectoryReader(data_folder).load_data() index = VectorStoreIndex.from_documents(documents) # Updating FileContext file_processed_state.indexed_documents = documents file_processed_state.index = index # Cleanup: Remove all files other than .txt files and directories named 'media' for file in os.listdir('data'): file_path = os.path.join('data', file) if not file.endswith('.txt'): if os.path.isdir(file_path): # If the item is a directory, remove it and its contents shutil.rmtree(file_path) else: # If the item is a file, just remove it os.remove(file_path) def predict(input, history): data_folder = 'data' response = None if os.listdir(data_folder): documents = SimpleDirectoryReader("data").load_data() index = VectorStoreIndex.from_documents(documents) query_engine = index.as_query_engine() llm_response = query_engine.query(input) response = str(llm_response) print("RAG Response:", response) history.append({"role": "user", "content": input}) if response: history.append({"role": "assistant", "content": response}) else: response = make_completion(history) print("LLM Response:", response) history.append({"role": "assistant", "content": response}) messages = [(history[i]["content"], history[i+1]["content"]) for i in range(0, len(history)-1, 2)] return messages, history def s3_upload(s3_checkbox, file_processed_state): if not s3_checkbox: return # Exit the function if the checkbox is not checked # This needs to be pulled dynamically somehow and integrated with this service. Perhaps we include this code within the main Saphira input. To be experimented with further. project = "SubmarineSpec" object_key = f'{project}.json' # Assuming you've already loaded the AWS credentials into your environment # and imported them here if needed aws_access_key = os.getenv('AWS_ACCESS_KEY_ID') aws_secret = os.getenv('AWS_SECRET_ACCESS_KEY') aws_region = os.getenv('AWS_DEFAULT_REGION') # Create an S3 client s3 = boto3.client('s3', aws_access_key_id=aws_access_key, aws_secret_access_key=aws_secret, region_name=aws_region) bucket_name = 'saphira-userprojects' try: # Download the data from S3 response = s3.get_object(Bucket=bucket_name, Key=object_key) data = json.loads(response['Body'].read().decode('utf-8')) # Ensure decoding of the bytes object # Specify the data folder and check if it exists data_folder = 'data' if not os.path.exists(data_folder): os.makedirs(data_folder) # Write the data to a file in the data folder with open(os.path.join(data_folder, f"{project}.txt"), 'w') as file: # Assuming you want to write the JSON data as a string # If 'data' is a dictionary, you might want to format it as a string differently file.write(json.dumps(data)) # Convert the JSON data back into a string file_processed_state.processed = True except Exception as e: print(f"Error downloading from S3: {e}") file_processed_state.processed = False # Gradio interface with file input with gr.Blocks() as demo: chatbot = gr.Chatbot(label="SaphiraGPT") history_state = gr.State([]) file_processed_state = gr.State(FileContext()) s3_checkbox = gr.Checkbox(label="Load your project data into SaphiraGPT") with gr.Row(): txt = gr.Textbox(lines=1, show_label=False, placeholder="Enter text and press enter") with gr.Row(): file_input = gr.File(label="Select file for SaphiraGPT context") s3_checkbox.change(fn=s3_upload, inputs=[s3_checkbox, file_processed_state], outputs=None) file_input.change(fn=process_and_index_file, inputs=[file_input, file_processed_state], outputs=None) txt.submit(predict, inputs=[txt, history_state], outputs=[chatbot, file_processed_state]) demo.launch(share=True)