Spaces:
Sleeping
Sleeping
| import openai | |
| import json | |
| from typing import List, Dict | |
| from callback_handler import BaseCallbackHandler | |
| import tiktoken | |
| def call_openai( | |
| messages: List[Dict[str, str]], | |
| functions: List[str] = None, | |
| stream: str = "no", | |
| model: str = "gpt-3.5-turbo", | |
| temperature: float = 0, | |
| callback: BaseCallbackHandler = None | |
| ) -> str: | |
| """ | |
| Call openai with list of messages and optional list of functions. See description at openai website. | |
| Args: | |
| messages: messages passed to openai. list of dictionaries with keys: role=[system, user, assitant, function] + content= message | |
| functions: function list passed to openai | |
| stream: ["no", "sentence", "token"] | |
| model: name of openai model | |
| temperature: of openai model | |
| callback: callback handler class. If streaming, it is mandatory | |
| Returns: | |
| final message | |
| """ | |
| current_state = None | |
| prompt_tokens = token_count( | |
| messages=messages, | |
| functions=functions | |
| ) | |
| if functions == None: | |
| completion_tokens = -2 | |
| response = openai.ChatCompletion.create( | |
| model = model, | |
| temperature=temperature, | |
| stream=True, | |
| messages=messages, | |
| ) | |
| else: | |
| completion_tokens = -1 | |
| response = openai.ChatCompletion.create( | |
| model = model, | |
| temperature=temperature, | |
| stream=True, | |
| messages=messages, | |
| functions=functions | |
| ) | |
| for chunk in response: | |
| completion_tokens += 1 | |
| data = json.loads(str(chunk["choices"][0])) | |
| delta = data["delta"] | |
| finish_reason = data["finish_reason"] | |
| if finish_reason is not None: | |
| if finish_reason == "function_call": | |
| completion_tokens += 6 | |
| final_response = { | |
| "usage": { | |
| "completion_tokens": completion_tokens, | |
| "prompt_tokens": prompt_tokens, | |
| }, | |
| "choices": [] | |
| } | |
| if current_state == "function": | |
| d = { | |
| "finish_reason": "function_call", | |
| "message": { | |
| "content": None, | |
| "function_call": { | |
| "arguments": function_arg, | |
| "name": function_name | |
| }, | |
| "role": "assistant" | |
| } | |
| } | |
| final_response["choices"].append(d) | |
| if current_state == "user": | |
| d = { | |
| "finish_reason": "stop", | |
| "message": { | |
| "content": message_all, | |
| "role": "assistant" | |
| } | |
| } | |
| final_response["choices"].append(d) | |
| if callback: | |
| callback.on_llm_end(response=final_response) | |
| return final_response | |
| else: | |
| if current_state == None: | |
| if 'function_call' in delta: | |
| current_state = "function" | |
| function_name = delta["function_call"]["name"] | |
| function_arg = "" | |
| # if stream != "no": | |
| # s = f" - {function_name}" | |
| # callback.on_llm_new_token(token=s) | |
| else: | |
| current_state = "user" | |
| message_stream = "" | |
| message_all = "" | |
| elif current_state == "function": | |
| function_arg += delta['function_call']['arguments'] | |
| elif current_state == "user": | |
| token = delta["content"] | |
| message_all += token | |
| if stream == "token": | |
| callback.on_llm_new_token(token=token) | |
| if stream == "sentence": | |
| message_stream += token | |
| if "." in token or "!" in token or "?" in token or "\n" in token: | |
| if message_stream[-1] == "\n": | |
| callback.on_llm_new_token(token=message_stream[:-1]) | |
| else: | |
| callback.on_llm_new_token(token=message_stream) | |
| message_stream = "" | |
| def num_tokens_from_messages(messages, model="gpt-3.5-turbo-0613"): | |
| """Return the number of tokens used by a list of messages.""" | |
| try: | |
| encoding = tiktoken.encoding_for_model(model) | |
| except KeyError: | |
| # print("Warning: model not found. Using cl100k_base encoding.") | |
| encoding = tiktoken.get_encoding("cl100k_base") | |
| if model in { | |
| "gpt-3.5-turbo-0613", | |
| "gpt-3.5-turbo-16k-0613", | |
| "gpt-4-0314", | |
| "gpt-4-32k-0314", | |
| "gpt-4-0613", | |
| "gpt-4-32k-0613", | |
| }: | |
| tokens_per_message = 3 | |
| tokens_per_name = 1 | |
| elif model == "gpt-3.5-turbo-0301": | |
| tokens_per_message = 4 # every message follows <|start|>{role/name}\n{content}<|end|>\n | |
| tokens_per_name = -1 # if there's a name, the role is omitted | |
| elif "gpt-3.5-turbo" in model: | |
| # print("Warning: gpt-3.5-turbo may update over time. Returning num tokens assuming gpt-3.5-turbo-0613.") | |
| return num_tokens_from_messages(messages, model="gpt-3.5-turbo-0613") | |
| elif "gpt-4" in model: | |
| # print("Warning: gpt-4 may update over time. Returning num tokens assuming gpt-4-0613.") | |
| return num_tokens_from_messages(messages, model="gpt-4-0613") | |
| else: | |
| raise NotImplementedError( | |
| f"""num_tokens_from_messages() is not implemented for model {model}. See https://github.com/openai/openai-python/blob/main/chatml.md for information on how messages are converted to tokens.""" | |
| ) | |
| num_tokens = 0 | |
| # print(messages) | |
| for message in messages: | |
| num_tokens += tokens_per_message | |
| for key, value in message.items(): | |
| if key == "function_call": | |
| num_tokens += tokens_per_name | |
| for k, v in value.items(): | |
| # print(k,v) | |
| num_tokens += len(encoding.encode(v)) | |
| if value != None and key != "function_call": | |
| num_tokens += len(encoding.encode(value)) | |
| if key == "name": | |
| num_tokens += tokens_per_name | |
| num_tokens += 3 # every reply is primed with <|start|>assistant<|message|> | |
| return num_tokens | |
| def num_tokens_from_functions(functions, model="gpt-3.5-turbo-0613"): | |
| """Return the number of tokens used by a list of functions.""" | |
| try: | |
| encoding = tiktoken.encoding_for_model(model) | |
| except KeyError: | |
| # print("Warning: model not found. Using cl100k_base encoding.") | |
| encoding = tiktoken.get_encoding("cl100k_base") | |
| num_tokens = 0 | |
| for function in functions: | |
| function_tokens = len(encoding.encode(function['name'])) | |
| function_tokens += len(encoding.encode(function['description'])) | |
| if 'parameters' in function: | |
| parameters = function['parameters'] | |
| if 'properties' in parameters: | |
| for propertiesKey in parameters['properties']: | |
| function_tokens += len(encoding.encode(propertiesKey)) | |
| v = parameters['properties'][propertiesKey] | |
| for field in v: | |
| if field == 'type': | |
| function_tokens += 2 | |
| function_tokens += len(encoding.encode(v['type'])) | |
| elif field == 'description': | |
| function_tokens += 2 | |
| function_tokens += len(encoding.encode(v['description'])) | |
| elif field == 'enum': | |
| function_tokens -= 3 | |
| for o in v['enum']: | |
| function_tokens += 3 | |
| function_tokens += len(encoding.encode(o)) | |
| else: | |
| dummy = 0 | |
| # print(f"Warning: not supported field: {field}") | |
| function_tokens += 16 | |
| num_tokens += function_tokens | |
| num_tokens += 16 | |
| return num_tokens | |
| def token_count( | |
| messages: List[Dict[str, str]], | |
| functions: List[str] = None, | |
| model = "gpt-3.5-turbo-0613" | |
| ) -> int: | |
| msgs_tokens = num_tokens_from_messages(messages=messages, model=model) | |
| tokens_used = msgs_tokens | |
| if functions is not None: | |
| function_tokens = num_tokens_from_functions(functions=functions, model=model) | |
| tokens_used += function_tokens | |
| return tokens_used |