import re from persistStorage import saveLog class ChatgptManager: def __init__(self, openAIClient, model="gpt-3.5-turbo-1106", tokenLimit=8000, throwError=False): self.client = openAIClient self.tokenLimit = tokenLimit self.model = model self.throwError = throwError def _chatHistoryToGptMessages(self, chatHistory=[]): messages = [] for i in range(len(chatHistory)): if i%2==0: message = {"role":"user", "content":chatHistory[i]} else: message = {"role":"assistant", "content": chatHistory[i]} messages.append(message) return messages def getResponseForUserInput(self, userInput, systemPrompt, chatHistory=[]): self.messages = self._chatHistoryToGptMessages(chatHistory[:]) newMessage = {"role":"system", "content":systemPrompt} if not self.isTokeLimitExceeding(newMessage): self.messages.append(newMessage) else: if chatHistory==[]: raise ValueError("System Prompt Too long.") return self.getResponseForUserInput(userInput=userInput, systemPrompt=systemPrompt) userMessage = {"role":"user", "content":userInput} if not self.isTokeLimitExceeding(userMessage): self.messages.append(userMessage) else: if chatHistory==[]: raise ValueError("Token Limit exceeding. With user input") return self.getResponseForUserInput(userInput=userInput, systemPrompt=systemPrompt) # completion = self.client.chat.completions.create( # model="gpt-3.5-turbo-1106", # messages=self.messages, # temperature=0, # ) try: completion = self.client.chat.completions.create( model=self.model, messages=self.messages, temperature=0, ) gptResponse = completion.choices[0].message.content except Exception as e: if not self.throwError: gptResponse = "Error while connecting with gpt " + str(e)[:50] + "..." self.messages.append({"role": "assistant", "content": gptResponse}) return gptResponse def isTokeLimitExceeding(self, newMessage=None, truncate=True, throwError=True): if self.getTokenCount(newMessage=newMessage) > self.tokenLimit: return True return False def getTokenCount(self, newMessage=None): """Token count including new Message""" def getWordsCount(text): return len(re.findall(r'\b\w+\b', text)) messages = self.messages[:] if newMessage!=None: messages.append(newMessage) if len(messages)!=0: combinedContent = " ".join(msg["content"] for msg in messages) else: combinedContent = "" currentTokensInMessages = getWordsCount(combinedContent) return currentTokensInMessages