File size: 10,670 Bytes
b8ac35d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
import gradio as gr
import httpx
from loguru import logger
from typing import Optional, List
from pydantic import BaseModel
from llama_index.core import VectorStoreIndex, SimpleDirectoryReader
from bs4 import BeautifulSoup
import os
import json
import boto3
from os import environ as env
from reqif.parser import ReqIFParser
import shutil
import zipfile
from dotenv import load_dotenv
from typing import Any
from pydantic import BaseModel, Field
load_dotenv()

API_KEY = os.getenv("OPENAI_API_KEY")

class Message(BaseModel):
    role: str
    content: str

class FileContext(BaseModel):
    processed: bool = False
    indexed_documents: Any = Field(default=None, exclude=True)
    index: Any = Field(default=None, exclude=True)

def extract_text_data(reqif_data):
  text_data = []

  for spec_object in reqif_data.core_content.req_if_content.spec_objects:
      for attribute in spec_object.attributes:
          # Check if the attribute value contains XHTML content
          if '<xhtml:div>' in str(attribute.value):
              # Use BeautifulSoup to parse the XHTML content
              soup = BeautifulSoup(str(attribute.value), 'html.parser')
              # Extract and append the text content, removing tags
              text_data.append(soup.get_text())
          else:
              # If no XHTML content, append the value directly
              text_data.append(str(attribute.value))

  return text_data

def reqif_file_processing(root, filename, user_id):
  reqif_file_path = os.path.join(root, filename)
  reqif_data = ReqIFParser().parse(reqif_file_path)  
  text_data = extract_text_data(reqif_data)
  extracted_text = json.dumps('\n'.join(text_data)) 
  # Define the folder path
  data_folder = 'data'    
  # Check if the folder exists, if not, create it
  if not os.path.exists(data_folder):
    os.makedirs(data_folder)    
  # Write the extracted_text to a file in the data folder
  file_path = os.path.join(data_folder, f'{filename}.txt')
  # Eventually these will be written to S3 and fetched per individual user
  with open(file_path, 'w') as file:
    file.write(extracted_text)

def make_completion(history: List[dict], nb_retries: int = 3, delay: int = 30) -> Optional[str]:
  """
  Sends a request to the ChatGPT API to retrieve a response based on a list of previous messages.
  """
  if not history:
      logger.error("History is empty, cannot make LLM completion.")
      return "No prior conversation to base the response on."
  
  header = {
      "Content-Type": "application/json",
      "Authorization": f"Bearer {API_KEY}"
  }
  client = httpx.Client(headers=header)
  
  # Convert history to the expected format if not already done
  formatted_messages = [{"role": msg["role"], "content": msg["content"]} for msg in history]
  
  counter = 0
  keep_loop = True
  while keep_loop:
      logger.debug(f"Attempt {counter} for Chat/Completions")
      try:
          resp = client.post(
              url="https://api.openai.com/v1/chat/completions",
              json={
                  "model": "gpt-4",
                  "top_p": 1.0,
                  "n": 1,
                  "stream": False,
                  "messages": formatted_messages
              },
              timeout=delay
          )
          if resp.status_code == 200:
              content = resp.json()["choices"][0]["message"]["content"]
              logger.debug(f"LLM Response: {content}")
              return content
          else:
              logger.warning(f"API Error: {resp.text}")
              keep_loop = False
      except Exception as e:
          logger.error(f"Exception during API call: {e}")
          counter += 1
          if counter >= nb_retries:
              keep_loop = False
  return "Failed to get a response from the LLM."

def process_and_index_file(uploaded_file, file_processed_state: FileContext):
  if uploaded_file is None:
      return "No file uploaded", None

  filename = os.path.basename(uploaded_file.name)
  data_folder = 'data'
  file_path = os.path.join(data_folder, filename)

  if uploaded_file is not None and not file_processed_state.processed:
      filename = os.path.basename(uploaded_file.name)
      temp_file_path = os.path.join('temp', filename)
  
      if not os.path.exists('temp'):
          os.makedirs('temp')
  
      shutil.copyfile(uploaded_file.name, temp_file_path)
      data_folder = 'data'
  
      if filename.endswith('.txt'):
        if not os.path.exists(data_folder):
          os.makedirs(data_folder)
        shutil.move(temp_file_path, os.path.join(data_folder, filename))
        file_processed_state.processed = True
        documents = SimpleDirectoryReader(data_folder).load_data()
        index = VectorStoreIndex.from_documents(documents)      
        # Updating FileContext
        file_processed_state.indexed_documents = documents
        file_processed_state.index = index
      elif filename.endswith('.reqif'):
        reqif_data = ReqIFParser().parse(temp_file_path)
        text_data = extract_text_data(reqif_data)
        extracted_text = '\n'.join(text_data)
        with open(os.path.join(data_folder, f"{filename}.txt"), 'w') as file:
            file.write(extracted_text)
        shutil.move(temp_file_path, os.path.join(data_folder, filename))
        file_processed_state.processed = True
        documents = SimpleDirectoryReader(data_folder).load_data()
        index = VectorStoreIndex.from_documents(documents)      
        # Updating FileContext
        file_processed_state.indexed_documents = documents
        file_processed_state.index = index
    
      elif filename.endswith('.reqifz'):
        with zipfile.ZipFile(temp_file_path, 'r') as zip_ref:
          for member in zip_ref.namelist():
            # Check if the file ends with .reqif
            if member.endswith('.reqif'):
                # Extract only the .reqif file
                zip_ref.extract(member, data_folder)
                # Assuming there is only one .reqif file of interest, or you break after the first
                temp_file_path = os.path.join(data_folder, member)
                filename = member
                break
  
      if filename.endswith('.reqif') and not file_processed_state.processed:
        reqif_data = ReqIFParser().parse(temp_file_path)
        text_data = extract_text_data(reqif_data)
        extracted_text = '\n'.join(text_data)
        with open(os.path.join(data_folder, f"{filename}.txt"), 'w') as file:
            file.write(extracted_text)
        shutil.move(temp_file_path, os.path.join(data_folder, filename))
        file_processed_state.processed = True  

        documents = SimpleDirectoryReader(data_folder).load_data()
        index = VectorStoreIndex.from_documents(documents)      
        # Updating FileContext
        file_processed_state.indexed_documents = documents
        file_processed_state.index = index
  
  # Cleanup: Remove all files other than .txt files and directories named 'media'
  for file in os.listdir('data'):
    file_path = os.path.join('data', file)
    if not file.endswith('.txt'):
      if os.path.isdir(file_path):
        # If the item is a directory, remove it and its contents
        shutil.rmtree(file_path)
      else:
        # If the item is a file, just remove it
        os.remove(file_path)  

def predict(input, history):
  data_folder = 'data'
  response = None
  if os.listdir(data_folder):
      documents = SimpleDirectoryReader("data").load_data()
      index = VectorStoreIndex.from_documents(documents)
      query_engine = index.as_query_engine()
      llm_response = query_engine.query(input)
      response = str(llm_response)
      print("RAG Response:", response)

  history.append({"role": "user", "content": input})
  if response:
    history.append({"role": "assistant", "content": response})  
  else:
    response = make_completion(history)
    print("LLM Response:", response)
    history.append({"role": "assistant", "content": response})
  messages = [(history[i]["content"], history[i+1]["content"]) for i in range(0, len(history)-1, 2)]
  return messages, history

def s3_upload(s3_checkbox, file_processed_state):
    if not s3_checkbox:
        return  # Exit the function if the checkbox is not checked

    # This needs to be pulled dynamically somehow and integrated with this service. Perhaps we include this code within the main Saphira input. To be experimented with further. 
    project = "SubmarineSpec"
    object_key = f'{project}.json'

    # Assuming you've already loaded the AWS credentials into your environment
    # and imported them here if needed
    aws_access_key = os.getenv('AWS_ACCESS_KEY_ID')
    aws_secret = os.getenv('AWS_SECRET_ACCESS_KEY')
    aws_region = os.getenv('AWS_DEFAULT_REGION')

    # Create an S3 client
    s3 = boto3.client('s3', aws_access_key_id=aws_access_key, aws_secret_access_key=aws_secret, region_name=aws_region)
    bucket_name = 'saphira-userprojects'

    try:
        # Download the data from S3
        response = s3.get_object(Bucket=bucket_name, Key=object_key)
        data = json.loads(response['Body'].read().decode('utf-8'))  # Ensure decoding of the bytes object

        # Specify the data folder and check if it exists
        data_folder = 'data'
        if not os.path.exists(data_folder):
            os.makedirs(data_folder)

        # Write the data to a file in the data folder
        with open(os.path.join(data_folder, f"{project}.txt"), 'w') as file:
            # Assuming you want to write the JSON data as a string
            # If 'data' is a dictionary, you might want to format it as a string differently
            file.write(json.dumps(data))  # Convert the JSON data back into a string

        file_processed_state.processed = True

    except Exception as e:
        print(f"Error downloading from S3: {e}")
        file_processed_state.processed = False

# Gradio interface with file input
with gr.Blocks() as demo:
    chatbot = gr.Chatbot(label="SaphiraGPT")
    history_state = gr.State([])
    file_processed_state = gr.State(FileContext())    
    s3_checkbox = gr.Checkbox(label="Load your project data into SaphiraGPT")
    with gr.Row():
      txt = gr.Textbox(lines=1, show_label=False, placeholder="Enter text and press enter")
    with gr.Row():
      file_input = gr.File(label="Select file for SaphiraGPT context")
    s3_checkbox.change(fn=s3_upload, inputs=[s3_checkbox, file_processed_state], outputs=None)
    file_input.change(fn=process_and_index_file, inputs=[file_input, file_processed_state], outputs=None)
    txt.submit(predict, inputs=[txt, history_state], outputs=[chatbot, file_processed_state])

demo.launch(share=True)