| |
| import json |
| import logging |
| import os |
| from pathlib import Path |
|
|
| import openai |
| from dotenv import load_dotenv |
| from openai.error import (APIError, RateLimitError, ServiceUnavailableError, |
| Timeout, APIConnectionError, InvalidRequestError) |
| from tenacity import (before_sleep_log, retry, retry_if_exception_type, |
| stop_after_delay, wait_random_exponential, stop_after_attempt) |
| from tiktoken import Encoding, encoding_for_model |
|
|
| logger = logging.getLogger(__name__) |
|
|
| load_dotenv() |
|
|
| |
| MAX_MODEL_TOKEN_COUNT = 4096 |
| |
| MAX_RESPONSE_TOKEN_COUNT = 512 |
| RESPONSES_DIRECTORY_PATH = Path('../openai-api-responses-new') |
|
|
|
|
| def get_openai_model_encoding(model_id): |
| """Get the encoding (tokenizer) for the OpenAI model.""" |
| return encoding_for_model(model_id) |
|
|
|
|
| def get_max_chapter_segment_token_count(prompt: str, model_id: str) -> int: |
| """ |
| Calculate the maximum number of tokens that a chapter segment may contain |
| given the prompt. |
| """ |
| encoding = get_openai_model_encoding(model_id) |
| |
| |
| prompt_token_count = len(encoding.encode_ordinary(prompt)) |
| |
| |
| |
| |
| |
| max_chapter_segment_token_count = (MAX_MODEL_TOKEN_COUNT |
| - MAX_RESPONSE_TOKEN_COUNT |
| - prompt_token_count - 8 - 1) |
| return max_chapter_segment_token_count |
|
|
|
|
| @retry(retry=retry_if_exception_type((APIError, Timeout, RateLimitError, |
| ServiceUnavailableError, APIConnectionError, InvalidRequestError)), |
| wait=wait_random_exponential(max=60), stop=stop_after_attempt(10), |
| before_sleep=before_sleep_log(logger, logging.WARNING)) |
| def save_openai_api_response(prompt_messages): |
| """ |
| Use a prompt to make a request to the OpenAI API and return the response data. |
| """ |
| |
| openai.api_key = prompt_messages[0]['api_key'] |
| model_id = prompt_messages[0]['model_id'] |
| prompt_messages[0].pop('api_key') |
| prompt_messages[0].pop('model_id') |
| |
| try: |
| logger.info('Calling OpenAI API...') |
| response = openai.ChatCompletion.create( |
| model=model_id, messages=prompt_messages, temperature=0 |
| ) |
| finish_reason = response.choices[0].finish_reason |
| if finish_reason != 'stop': |
| logger.error(f'`finish_reason` is `{finish_reason}`.') |
| |
| save_data = { |
| 'model': response.model, |
| 'usage': response.usage, |
| 'finish_reason': finish_reason, |
| 'prompt_messages': prompt_messages, |
| 'response': response.choices[0].message.content |
| } |
| except InvalidRequestError: |
| logger.error('InvalidRequestError encountered 10 times. Returning empty response.') |
| save_data = { |
| 'model': None, |
| 'usage': None, |
| 'finish_reason': 'invalid_request', |
| 'prompt_messages': prompt_messages, |
| 'response': ' ' |
| } |
|
|
| return save_data |
|
|
|
|
| def load_response_text(save_path): |
| """ |
| Load the response text from a JSON file containing response data from the |
| OpenAI API. |
| """ |
| with open(save_path, 'r') as save_file: |
| save_data = json.load(save_file) |
| return save_data['response'] |
|
|