| print("IMPORTED") |
| """ |
| API REQUEST PARALLEL PROCESSOR |
| |
| Using the OpenAI API to process lots of text quickly takes some care. |
| If you trickle in a million API requests one by one, they'll take days to complete. |
| If you flood a million API requests in parallel, they'll exceed the rate limits and fail with errors. |
| To maximize throughput, parallel requests need to be throttled to stay under rate limits. |
| |
| This script parallelizes requests to the OpenAI API while throttling to stay under rate limits. |
| |
| Features: |
| - Streams requests from file, to avoid running out of memory for giant jobs |
| - Makes requests concurrently, to maximize throughput |
| - Throttles request and token usage, to stay under rate limits |
| - Retries failed requests up to {max_attempts} times, to avoid missing data |
| - Logs errors, to diagnose problems with requests |
| |
| Example command to call script: |
| ``` |
| python examples/api_request_parallel_processor.py \ |
| --requests_filepath examples/data/example_requests_to_parallel_process.jsonl \ |
| --save_filepath examples/data/example_requests_to_parallel_process_results.jsonl \ |
| --request_url https://api.openai.com/v1/embeddings \ |
| --max_requests_per_minute 1500 \ |
| --max_tokens_per_minute 6250000 \ |
| --token_encoding_name cl100k_base \ |
| --max_attempts 5 \ |
| --logging_level 20 |
| ``` |
| |
| Inputs: |
| - requests_filepath : str |
| - path to the file containing the requests to be processed |
| - file should be a jsonl file, where each line is a json object with API parameters and an optional metadata field |
| - e.g., {"model": "text-embedding-ada-002", "input": "embed me", "metadata": {"row_id": 1}} |
| - as with all jsonl files, take care that newlines in the content are properly escaped (json.dumps does this automatically) |
| - an example file is provided at examples/data/example_requests_to_parallel_process.jsonl |
| - the code to generate the example file is appended to the bottom of this script |
| - save_filepath : str, optional |
| - path to the file where the results will be saved |
| - file will be a jsonl file, where each line is an array with the original request plus the API response |
| - e.g., [{"model": "text-embedding-ada-002", "input": "embed me"}, {...}] |
| - if omitted, results will be saved to {requests_filename}_results.jsonl |
| - request_url : str, optional |
| - URL of the API endpoint to call |
| - if omitted, will default to "https://api.openai.com/v1/embeddings" |
| - api_key : str, optional |
| - API key to use |
| - if omitted, the script will attempt to read it from an environment variable {os.getenv("OPENAI_API_KEY")} |
| - max_requests_per_minute : float, optional |
| - target number of requests to make per minute (will make less if limited by tokens) |
| - leave headroom by setting this to 50% or 75% of your limit |
| - if requests are limiting you, try batching multiple embeddings or completions into one request |
| - if omitted, will default to 1,500 |
| - max_tokens_per_minute : float, optional |
| - target number of tokens to use per minute (will use less if limited by requests) |
| - leave headroom by setting this to 50% or 75% of your limit |
| - if omitted, will default to 125,000 |
| - token_encoding_name : str, optional |
| - name of the token encoding used, as defined in the `tiktoken` package |
| - if omitted, will default to "cl100k_base" (used by `text-embedding-ada-002`) |
| - max_attempts : int, optional |
| - number of times to retry a failed request before giving up |
| - if omitted, will default to 5 |
| - logging_level : int, optional |
| - level of logging to use; higher numbers will log fewer messages |
| - 40 = ERROR; will log only when requests fail after all retries |
| - 30 = WARNING; will log when requests his rate limits or other errors |
| - 20 = INFO; will log when requests start and the status at finish |
| - 10 = DEBUG; will log various things as the loop runs to see when they occur |
| - if omitted, will default to 20 (INFO). |
| |
| The script is structured as follows: |
| - Imports |
| - Define main() |
| - Initialize things |
| - In main loop: |
| - Get next request if one is not already waiting for capacity |
| - Update available token & request capacity |
| - If enough capacity available, call API |
| - The loop pauses if a rate limit error is hit |
| - The loop breaks when no tasks remain |
| - Define dataclasses |
| - StatusTracker (stores script metadata counters; only one instance is created) |
| - APIRequest (stores API inputs, outputs, metadata; one method to call API) |
| - Define functions |
| - api_endpoint_from_url (extracts API endpoint from request URL) |
| - append_to_jsonl (writes to results file) |
| - num_tokens_consumed_from_request (bigger function to infer token usage from request) |
| - task_id_generator_function (yields 0, 1, 2, ...) |
| - Run main() |
| """ |
|
|
| |
| import aiohttp |
| import argparse |
| import asyncio |
| import json |
| import logging |
| import os |
| import re |
| import tiktoken |
| import time |
| from dataclasses import ( |
| dataclass, |
| field, |
| ) |
|
|
|
|
| async def process_api_requests_from_file( |
| requests_filepath: str, |
| save_filepath: str, |
| request_url: str, |
| api_key: str, |
| max_requests_per_minute: float, |
| max_tokens_per_minute: float, |
| token_encoding_name: str, |
| max_attempts: int, |
| logging_level: int, |
| ): |
| """Processes API requests in parallel, throttling to stay under rate limits.""" |
| |
| seconds_to_pause_after_rate_limit_error = 15 |
| seconds_to_sleep_each_loop = ( |
| 0.001 |
| ) |
|
|
| |
| logging.basicConfig(level=logging_level) |
| logging.debug(f"Logging initialized at level {logging_level}") |
|
|
| |
| api_endpoint = api_endpoint_from_url(request_url) |
| |
| request_header = { |
| "x-api-key": api_key, |
| "anthropic-version": "2023-06-01", |
| "content-type": "application/json", |
| } |
| |
| if '/deployments' in request_url: |
| request_header = {"api-key": f"{api_key}"} |
|
|
| |
| queue_of_requests_to_retry = asyncio.Queue() |
| task_id_generator = ( |
| task_id_generator_function() |
| ) |
| status_tracker = ( |
| StatusTracker() |
| ) |
| next_request = None |
|
|
| |
| available_request_capacity = max_requests_per_minute |
| available_token_capacity = max_tokens_per_minute |
| last_update_time = time.time() |
|
|
| |
| file_not_finished = True |
| logging.debug(f"Initialization complete.") |
|
|
| |
| with open(requests_filepath) as file: |
| |
| requests = file.__iter__() |
| logging.debug(f"File opened. Entering main loop") |
| async with aiohttp.ClientSession() as session: |
| while True: |
| |
| if next_request is None: |
| if not queue_of_requests_to_retry.empty(): |
| next_request = queue_of_requests_to_retry.get_nowait() |
| logging.debug( |
| f"Retrying request {next_request.task_id}: {next_request}" |
| ) |
| elif file_not_finished: |
| try: |
| |
| request_json = json.loads(next(requests)) |
| next_request = APIRequest( |
| task_id=next(task_id_generator), |
| request_json=request_json, |
| token_consumption=num_tokens_consumed_from_request( |
| request_json, api_endpoint, token_encoding_name |
| ), |
| attempts_left=max_attempts, |
| metadata=request_json.pop("metadata", None), |
| ) |
| status_tracker.num_tasks_started += 1 |
| status_tracker.num_tasks_in_progress += 1 |
| logging.debug( |
| f"Reading request {next_request.task_id}: {next_request}" |
| ) |
| except StopIteration: |
| |
| logging.debug("Read file exhausted") |
| file_not_finished = False |
|
|
| |
| current_time = time.time() |
| seconds_since_update = current_time - last_update_time |
| available_request_capacity = min( |
| available_request_capacity |
| + max_requests_per_minute * seconds_since_update / 60.0, |
| max_requests_per_minute, |
| ) |
| available_token_capacity = min( |
| available_token_capacity |
| + max_tokens_per_minute * seconds_since_update / 60.0, |
| max_tokens_per_minute, |
| ) |
| last_update_time = current_time |
|
|
| |
| if next_request: |
| next_request_tokens = next_request.token_consumption |
| if ( |
| available_request_capacity >= 1 |
| and available_token_capacity >= next_request_tokens |
| ): |
| |
| available_request_capacity -= 1 |
| available_token_capacity -= next_request_tokens |
| next_request.attempts_left -= 1 |
|
|
| |
| asyncio.create_task( |
| next_request.call_api( |
| session=session, |
| request_url=request_url, |
| request_header=request_header, |
| retry_queue=queue_of_requests_to_retry, |
| save_filepath=save_filepath, |
| status_tracker=status_tracker, |
| ) |
| ) |
| next_request = None |
|
|
| |
| if status_tracker.num_tasks_in_progress == 0: |
| break |
|
|
| |
| await asyncio.sleep(seconds_to_sleep_each_loop) |
|
|
| |
| seconds_since_rate_limit_error = ( |
| time.time() - status_tracker.time_of_last_rate_limit_error |
| ) |
| if ( |
| seconds_since_rate_limit_error |
| < seconds_to_pause_after_rate_limit_error |
| ): |
| remaining_seconds_to_pause = ( |
| seconds_to_pause_after_rate_limit_error |
| - seconds_since_rate_limit_error |
| ) |
| await asyncio.sleep(remaining_seconds_to_pause) |
| |
| logging.warn( |
| f"Pausing to cool down until {time.ctime(status_tracker.time_of_last_rate_limit_error + seconds_to_pause_after_rate_limit_error)}" |
| ) |
|
|
| |
| logging.info( |
| f"""Parallel processing complete. Results saved to {save_filepath}""" |
| ) |
| if status_tracker.num_tasks_failed > 0: |
| logging.warning( |
| f"{status_tracker.num_tasks_failed} / {status_tracker.num_tasks_started} requests failed. Errors logged to {save_filepath}." |
| ) |
| if status_tracker.num_rate_limit_errors > 0: |
| logging.warning( |
| f"{status_tracker.num_rate_limit_errors} rate limit errors received. Consider running at a lower rate." |
| ) |
|
|
|
|
| |
|
|
|
|
| @dataclass |
| class StatusTracker: |
| """Stores metadata about the script's progress. Only one instance is created.""" |
|
|
| num_tasks_started: int = 0 |
| num_tasks_in_progress: int = 0 |
| num_tasks_succeeded: int = 0 |
| num_tasks_failed: int = 0 |
| num_rate_limit_errors: int = 0 |
| num_api_errors: int = 0 |
| num_other_errors: int = 0 |
| time_of_last_rate_limit_error: int = 0 |
|
|
|
|
| @dataclass |
| class APIRequest: |
| """Stores an API request's inputs, outputs, and other metadata. Contains a method to make an API call.""" |
|
|
| task_id: int |
| request_json: dict |
| token_consumption: int |
| attempts_left: int |
| metadata: dict |
| result: list = field(default_factory=list) |
|
|
| async def call_api( |
| self, |
| session: aiohttp.ClientSession, |
| request_url: str, |
| request_header: dict, |
| retry_queue: asyncio.Queue, |
| save_filepath: str, |
| status_tracker: StatusTracker, |
| ): |
| """Calls the OpenAI API and saves results.""" |
| logging.info(f"Starting request #{self.task_id}") |
| error = None |
| try: |
| async with session.post( |
| url=request_url, headers=request_header, json=self.request_json |
| ) as response: |
| response = await response.json() |
| if "error" in response: |
| logging.warning( |
| f"Request {self.task_id} failed with error {response['error']}" |
| ) |
| status_tracker.num_api_errors += 1 |
| error = response |
| if "Rate limit" in response["error"].get("message", ""): |
| status_tracker.time_of_last_rate_limit_error = time.time() |
| status_tracker.num_rate_limit_errors += 1 |
| status_tracker.num_api_errors -= ( |
| 1 |
| ) |
|
|
| except ( |
| Exception |
| ) as e: |
| logging.warning(f"Request {self.task_id} failed with Exception {e}") |
| status_tracker.num_other_errors += 1 |
| error = e |
| if error: |
| self.result.append(error) |
| if self.attempts_left: |
| retry_queue.put_nowait(self) |
| else: |
| logging.error( |
| f"Request {self.request_json} failed after all attempts. Saving errors: {self.result}" |
| ) |
| data = ( |
| [self.request_json, [str(e) for e in self.result], self.metadata] |
| if self.metadata |
| else [self.request_json, [str(e) for e in self.result]] |
| ) |
| append_to_jsonl(data, save_filepath) |
| status_tracker.num_tasks_in_progress -= 1 |
| status_tracker.num_tasks_failed += 1 |
| else: |
| data = ( |
| [self.request_json, response, self.metadata] |
| if self.metadata |
| else [self.request_json, response] |
| ) |
| append_to_jsonl(data, save_filepath) |
| status_tracker.num_tasks_in_progress -= 1 |
| status_tracker.num_tasks_succeeded += 1 |
| logging.debug(f"Request {self.task_id} saved to {save_filepath}") |
|
|
|
|
| |
|
|
|
|
| def api_endpoint_from_url(request_url): |
| print(request_url) |
| """Extract the API endpoint from the request URL.""" |
| match = re.search(r"^https://[^/]+/v1/(.+)$", request_url) |
| if match: |
| return match[1] |
| else: |
| raise ValueError(f"Invalid API URL: {request_url}") |
|
|
|
|
| def append_to_jsonl(data, filename: str) -> None: |
| """Append a json payload to the end of a jsonl file.""" |
| json_string = json.dumps(data) |
| with open(filename, "a") as f: |
| f.write(json_string + "\n") |
|
|
|
|
| def num_tokens_consumed_from_request( |
| request_json: dict, |
| api_endpoint: str, |
| token_encoding_name: str, |
| ): |
| encoding = tiktoken.get_encoding(token_encoding_name) |
| print(api_endpoint) |
| if api_endpoint == "messages": |
| num_tokens = 0 |
| for message in request_json["messages"]: |
| num_tokens += len(encoding.encode(message["content"])) |
| return num_tokens |
| else: |
| raise NotImplementedError( |
| f'API endpoint "{api_endpoint}" not implemented in this script' |
| ) |
|
|
| def task_id_generator_function(): |
| """Generate integers 0, 1, 2, and so on.""" |
| task_id = 0 |
| while True: |
| yield task_id |
| task_id += 1 |
|
|
|
|
| |
|
|
|
|
| if __name__ == "__main__": |
| |
| parser = argparse.ArgumentParser() |
| parser.add_argument("--requests_filepath") |
| parser.add_argument("--save_filepath", default=None) |
| parser.add_argument("--request_url", default="https://api.openai.com/v1/embeddings") |
| parser.add_argument("--api_key", default=os.getenv("ANTHROPIC_API_KEY")) |
| parser.add_argument("--max_requests_per_minute", type=int, default=3_000 * 0.5) |
| parser.add_argument("--max_tokens_per_minute", type=int, default=250_000 * 0.5) |
| parser.add_argument("--token_encoding_name", default="cl100k_base") |
| parser.add_argument("--max_attempts", type=int, default=5) |
| parser.add_argument("--logging_level", default=logging.INFO) |
| args = parser.parse_args() |
|
|
| if args.save_filepath is None: |
| args.save_filepath = args.requests_filepath.replace(".jsonl", "_results.jsonl") |
|
|
| |
| asyncio.run( |
| process_api_requests_from_file( |
| requests_filepath=args.requests_filepath, |
| save_filepath=args.save_filepath, |
| request_url=args.request_url, |
| api_key=args.api_key, |
| max_requests_per_minute=float(args.max_requests_per_minute), |
| max_tokens_per_minute=float(args.max_tokens_per_minute), |
| token_encoding_name=args.token_encoding_name, |
| max_attempts=int(args.max_attempts), |
| logging_level=int(args.logging_level), |
| ) |
| ) |
|
|
|
|
| """ |
| APPENDIX |
| |
| The example requests file at openai-cookbook/examples/data/example_requests_to_parallel_process.jsonl contains 10,000 requests to text-embedding-ada-002. |
| |
| It was generated with the following code: |
| |
| ```python |
| import json |
| |
| filename = "data/example_requests_to_parallel_process.jsonl" |
| n_requests = 10_000 |
| jobs = [{"model": "text-embedding-ada-002", "input": str(x) + "\n"} for x in range(n_requests)] |
| with open(filename, "w") as f: |
| for job in jobs: |
| json_string = json.dumps(job) |
| f.write(json_string + "\n") |
| ``` |
| |
| As with all jsonl files, take care that newlines in the content are properly escaped (json.dumps does this automatically). |
| """ |
|
|