QueryHelper / gptManager.py
anumaurya114exp's picture
Upload 4 files
e7c1910
raw
history blame
2.7 kB
import re
from persistStorage import saveLog
class ChatgptManager:
def __init__(self, openAIClient, model="gpt-3.5-turbo-1106", tokenLimit=8000, throwError=False):
self.client = openAIClient
self.tokenLimit = tokenLimit
self.model = model
self.throwError = throwError
def _chatHistoryToGptMessages(self, chatHistory=[]):
messages = []
for i in range(len(chatHistory)):
if i%2==0:
message = {"role":"user", "content":chatHistory[i]}
else:
message = {"role":"assistant", "content": chatHistory[i]}
messages.append(message)
return messages
def getResponseForUserInput(self, userInput, systemPrompt, chatHistory=[]):
self.messages = self._chatHistoryToGptMessages(chatHistory[:])
newMessage = {"role":"system", "content":systemPrompt}
if not self.isTokeLimitExceeding(newMessage):
self.messages.append(newMessage)
else:
if chatHistory==[]:
raise ValueError("System Prompt Too long.")
return self.getResponseForUserInput(userInput=userInput, systemPrompt=systemPrompt)
userMessage = {"role":"user", "content":userInput}
if not self.isTokeLimitExceeding(userMessage):
self.messages.append(userMessage)
else:
if chatHistory==[]:
raise ValueError("Token Limit exceeding. With user input")
return self.getResponseForUserInput(userInput=userInput, systemPrompt=systemPrompt)
# completion = self.client.chat.completions.create(
# model="gpt-3.5-turbo-1106",
# messages=self.messages,
# temperature=0,
# )
try:
completion = self.client.chat.completions.create(
model=self.model,
messages=self.messages,
temperature=0,
)
gptResponse = completion.choices[0].message.content
except Exception as e:
if not self.throwError:
gptResponse = "Error while connecting with gpt " + str(e)[:50] + "..."
self.messages.append({"role": "assistant", "content": gptResponse})
return gptResponse
def isTokeLimitExceeding(self, newMessage=None, truncate=True, throwError=True):
if self.getTokenCount(newMessage=newMessage) > self.tokenLimit:
return True
return False
def getTokenCount(self, newMessage=None):
"""Token count including new Message"""
def getWordsCount(text):
return len(re.findall(r'\b\w+\b', text))
messages = self.messages[:]
if newMessage!=None:
messages.append(newMessage)
if len(messages)!=0:
combinedContent = " ".join(msg["content"] for msg in messages)
else:
combinedContent = ""
currentTokensInMessages = getWordsCount(combinedContent)
return currentTokensInMessages