| """ |
| API REQUEST PARALLEL PROCESSOR |
| |
| Using the OpenAI API to process lots of text quickly takes some care. |
| If you trickle in a million API requests one by one, they'll take days to complete. |
| If you flood a million API requests in parallel, they'll exceed the rate limits and fail with errors. |
| To maximize throughput, parallel requests need to be throttled to stay under rate limits. |
| |
| This script parallelizes requests to the OpenAI API while throttling to stay under rate limits. |
| |
| Features: |
| - Streams requests from file, to avoid running out of memory for giant jobs |
| - Makes requests concurrently, to maximize throughput |
| - Throttles request and token usage, to stay under rate limits |
| - Retries failed requests up to {max_attempts} times, to avoid missing data |
| - Logs errors, to diagnose problems with requests |
| |
| Example command to call script: |
| ``` |
| python examples/api_request_parallel_processor.py \ |
| --requests_filepath examples/data/example_requests_to_parallel_process.jsonl \ |
| --save_filepath examples/data/example_requests_to_parallel_process_results.jsonl \ |
| --request_url https://api.openai.com/v1/embeddings \ |
| --max_requests_per_minute 1500 \ |
| --max_tokens_per_minute 6250000 \ |
| --token_encoding_name cl100k_base \ |
| --max_attempts 5 \ |
| --logging_level 20 |
| ``` |
| |
| Inputs: |
| - requests_filepath : str |
| - path to the file containing the requests to be processed |
| - file should be a jsonl file, where each line is a json object with API parameters and an optional metadata field |
| - e.g., {"model": "text-embedding-ada-002", "input": "embed me", "metadata": {"row_id": 1}} |
| - as with all jsonl files, take care that newlines in the content are properly escaped (json.dumps does this automatically) |
| - an example file is provided at examples/data/example_requests_to_parallel_process.jsonl |
| - the code to generate the example file is appended to the bottom of this script |
| - save_filepath : str, optional |
| - path to the file where the results will be saved |
| - file will be a jsonl file, where each line is an array with the original request plus the API response |
| - e.g., [{"model": "text-embedding-ada-002", "input": "embed me"}, {...}] |
| - if omitted, results will be saved to {requests_filename}_results.jsonl |
| - request_url : str, optional |
| - URL of the API endpoint to call |
| - if omitted, will default to "https://api.openai.com/v1/embeddings" |
| - api_key : str, optional |
| - API key to use |
| - if omitted, the script will attempt to read it from an environment variable {os.getenv("OPENAI_API_KEY")} |
| - max_requests_per_minute : float, optional |
| - target number of requests to make per minute (will make less if limited by tokens) |
| - leave headroom by setting this to 50% or 75% of your limit |
| - if requests are limiting you, try batching multiple embeddings or completions into one request |
| - if omitted, will default to 1,500 |
| - max_tokens_per_minute : float, optional |
| - target number of tokens to use per minute (will use less if limited by requests) |
| - leave headroom by setting this to 50% or 75% of your limit |
| - if omitted, will default to 125,000 |
| - token_encoding_name : str, optional |
| - name of the token encoding used, as defined in the `tiktoken` package |
| - if omitted, will default to "cl100k_base" (used by `text-embedding-ada-002`) |
| - max_attempts : int, optional |
| - number of times to retry a failed request before giving up |
| - if omitted, will default to 5 |
| - logging_level : int, optional |
| - level of logging to use; higher numbers will log fewer messages |
| - 40 = ERROR; will log only when requests fail after all retries |
| - 30 = WARNING; will log when requests his rate limits or other errors |
| - 20 = INFO; will log when requests start and the status at finish |
| - 10 = DEBUG; will log various things as the loop runs to see when they occur |
| - if omitted, will default to 20 (INFO). |
| |
| The script is structured as follows: |
| - Imports |
| - Define main() |
| - Initialize things |
| - In main loop: |
| - Get next request if one is not already waiting for capacity |
| - Update available token & request capacity |
| - If enough capacity available, call API |
| - The loop pauses if a rate limit error is hit |
| - The loop breaks when no tasks remain |
| - Define dataclasses |
| - StatusTracker (stores script metadata counters; only one instance is created) |
| - APIRequest (stores API inputs, outputs, metadata; one method to call API) |
| - Define functions |
| - api_endpoint_from_url (extracts API endpoint from request URL) |
| - append_to_jsonl (writes to results file) |
| - num_tokens_consumed_from_request (bigger function to infer token usage from request) |
| - task_id_generator_function (yields 0, 1, 2, ...) |
| - Run main() |
| """ |
|
|
| |
| import aiohttp |
| import argparse |
| import asyncio |
| import json |
| import logging |
| import os |
| import re |
| import tiktoken |
| import time |
| from dataclasses import ( |
| dataclass, |
| field, |
| ) |
|
|
|
|
| def process_api_requests_from_file( |
| vendor_name: str, |
| requests_filepath: str, |
| save_filepath: str, |
| request_url: str, |
| api_key: str, |
| max_requests_per_minute: float, |
| max_tokens_per_minute: float, |
| token_encoding_name: str, |
| max_attempts: int, |
| logging_level: int, |
| ): |
| """Processes API requests sequentially.""" |
| |
| logging.basicConfig(level=logging_level) |
| logging.debug(f"Logging initialized at level {logging_level}") |
|
|
| |
| api_endpoint = api_endpoint_from_url(request_url, vendor_name) |
| request_header = None |
| if vendor_name == "openai": |
| request_header = {"Authorization": f"Bearer {api_key}"} |
| elif vendor_name == "anthropic": |
| request_header = { |
| "x-api-key": api_key, |
| "anthropic-version": "2023-06-01", |
| "content-type": "application/json", |
| } |
| elif vendor_name == "meta" or vendor_name == "google": |
| request_header = { |
| "Content-Type": "application/json", |
| "Authorization": f"Bearer {api_key}", |
| } |
| else: |
| print("Error. Invalid Model Input. Exiting") |
|
|
| |
| task_id_generator = task_id_generator_function() |
| status_tracker = StatusTracker() |
|
|
| |
| with open(requests_filepath) as file, requests.Session() as session: |
| for line in file: |
| request_json = json.loads(line) |
| request = APIRequest( |
| task_id=next(task_id_generator), |
| request_json=request_json, |
| token_consumption=0, |
| attempts_left=max_attempts, |
| metadata=request_json.pop("metadata", None), |
| ) |
| status_tracker.num_tasks_started += 1 |
| logging.debug(f"Processing request {request.task_id}: {request}") |
|
|
| while request.attempts_left > 0: |
| error = None |
| try: |
| response = session.post( |
| url=request_url, |
| headers=request_header, |
| json=request.request_json, |
| ).json() |
| if "error" in response: |
| logging.warning( |
| f"Request {request.task_id} failed with error {response['error']}" |
| ) |
| status_tracker.num_api_errors += 1 |
| error = response |
| if "Rate limit" in response["error"].get("message", ""): |
| status_tracker.num_rate_limit_errors += 1 |
| status_tracker.num_api_errors -= 1 |
| except Exception as e: |
| logging.warning(f"Request {request.task_id} failed with Exception {e}") |
| status_tracker.num_other_errors += 1 |
| error = e |
|
|
| if error: |
| request.result.append(error) |
| request.attempts_left -= 1 |
| if request.attempts_left == 0: |
| logging.error( |
| f"Request {request.request_json} failed after all attempts. Saving errors: {request.result}" |
| ) |
| data = ( |
| [request.request_json, [str(e) for e in request.result], request.metadata] |
| if request.metadata |
| else [request.request_json, [str(e) for e in request.result]] |
| ) |
| append_to_jsonl(data, save_filepath) |
| status_tracker.num_tasks_failed += 1 |
| else: |
| data = ( |
| [request.request_json, response, request.metadata] |
| if request.metadata |
| else [request.request_json, response] |
| ) |
| append_to_jsonl(data, save_filepath) |
| status_tracker.num_tasks_succeeded += 1 |
| logging.debug(f"Request {request.task_id} saved to {save_filepath}") |
| break |
|
|
| |
| logging.info(f"Sequential processing complete. Results saved to {save_filepath}") |
| if status_tracker.num_tasks_failed > 0: |
| logging.warning( |
| f"{status_tracker.num_tasks_failed} / {status_tracker.num_tasks_started} requests failed. Errors logged to {save_filepath}." |
| ) |
| if status_tracker.num_rate_limit_errors > 0: |
| logging.warning( |
| f"{status_tracker.num_rate_limit_errors} rate limit errors received. Consider running at a lower rate." |
| ) |
| |
|
|
|
|
| @dataclass |
| class StatusTracker: |
| """Stores metadata about the script's progress. Only one instance is created.""" |
|
|
| num_tasks_started: int = 0 |
| num_tasks_in_progress: int = 0 |
| num_tasks_succeeded: int = 0 |
| num_tasks_failed: int = 0 |
| num_rate_limit_errors: int = 0 |
| num_api_errors: int = 0 |
| num_other_errors: int = 0 |
| time_of_last_rate_limit_error: int = 0 |
|
|
|
|
| @dataclass |
| class APIRequest: |
| """Stores an API request's inputs, outputs, and other metadata. Contains a method to make an API call.""" |
|
|
| task_id: int |
| request_json: dict |
| token_consumption: int |
| attempts_left: int |
| metadata: dict |
| result: list = field(default_factory=list) |
|
|
| async def call_api( |
| self, |
| session: aiohttp.ClientSession, |
| request_url: str, |
| request_header: dict, |
| retry_queue: asyncio.Queue, |
| save_filepath: str, |
| status_tracker: StatusTracker, |
| ): |
| """Calls the OpenAI API and saves results.""" |
| logging.info(f"Starting request #{self.task_id}") |
| error = None |
| try: |
| async with session.post( |
| url=request_url, headers=request_header, json=self.request_json |
| ) as response: |
| response = await response.json() |
| if "error" in response: |
| logging.warning( |
| f"Request {self.task_id} failed with error {response['error']}" |
| ) |
| status_tracker.num_api_errors += 1 |
| error = response |
| if "Rate limit" in response["error"].get("message", ""): |
| status_tracker.time_of_last_rate_limit_error = time.time() |
| status_tracker.num_rate_limit_errors += 1 |
| status_tracker.num_api_errors -= ( |
| 1 |
| ) |
|
|
| except ( |
| Exception |
| ) as e: |
| logging.warning(f"Request {self.task_id} failed with Exception {e}") |
| status_tracker.num_other_errors += 1 |
| error = e |
| if error: |
| self.result.append(error) |
| if self.attempts_left: |
| retry_queue.put_nowait(self) |
| else: |
| logging.error( |
| f"Request {self.request_json} failed after all attempts. Saving errors: {self.result}" |
| ) |
| data = ( |
| [self.request_json, [str(e) for e in self.result], self.metadata] |
| if self.metadata |
| else [self.request_json, [str(e) for e in self.result]] |
| ) |
| append_to_jsonl(data, save_filepath) |
| status_tracker.num_tasks_in_progress -= 1 |
| status_tracker.num_tasks_failed += 1 |
| else: |
| data = ( |
| [self.request_json, response, self.metadata] |
| if self.metadata |
| else [self.request_json, response] |
| ) |
| append_to_jsonl(data, save_filepath) |
| status_tracker.num_tasks_in_progress -= 1 |
| status_tracker.num_tasks_succeeded += 1 |
| logging.debug(f"Request {self.task_id} saved to {save_filepath}") |
|
|
|
|
| |
|
|
|
|
| def api_endpoint_from_url(request_url, vendor_name): |
| """Extract the API endpoint from the request URL.""" |
| match=None |
| if vendor_name=="openai": |
| match = re.search("^https://[^/]+/v\\d+/(.+)$", request_url) |
| elif vendor_name=="anthropic": |
| match = re.search(r"^https://[^/]+/v1/(.+)$", request_url) |
| elif vendor_name == "meta" or vendor_name == "google": |
| match = re.search(r"^https://[^/]+/api/v1/(.+)$", request_url) |
| else: |
| print("Error. Invalid Model Input. Exiting") |
| |
| if match is None: |
| |
| match = re.search(r"^https://[^/]+/openai/deployments/[^/]+/(.+?)(\?|$)", request_url) |
| return match[1] |
|
|
|
|
| def append_to_jsonl(data, filename: str) -> None: |
| """Append a json payload to the end of a jsonl file.""" |
| json_string = json.dumps(data) |
| with open(filename, "a") as f: |
| f.write(json_string + "\n") |
|
|
|
|
|
|
| def task_id_generator_function(): |
| """Generate integers 0, 1, 2, and so on.""" |
| task_id = 0 |
| while True: |
| yield task_id |
| task_id += 1 |
|
|
|
|
| |
|
|
|
|
| if __name__ == "__main__": |
| |
| parser = argparse.ArgumentParser() |
| parser.add_argument("--vendor_name", default=None) |
| parser.add_argument("--requests_filepath") |
| parser.add_argument("--save_filepath", default=None) |
| parser.add_argument("--request_url", default=None) |
| parser.add_argument("--api_key", default=None) |
| parser.add_argument("--max_requests_per_minute", type=int, default=3_000 * 0.5) |
| parser.add_argument("--max_tokens_per_minute", type=int, default=250_000 * 0.5) |
| parser.add_argument("--token_encoding_name", default="cl100k_base") |
| parser.add_argument("--max_attempts", type=int, default=5) |
| parser.add_argument("--logging_level", default=logging.INFO) |
| |
| args = parser.parse_args() |
| if args.vendor_name=="openai": |
| args.api_key=os.getenv("OPENAI_API_KEY") |
| args.request_url="https://api.openai.com/v1/chat/completions" |
| elif args.vendor_name=="anthropic": |
| args.api_key=os.getenv("ANTHROPIC_API_KEY") |
| args.request_url="https://api.anthropic.com/v1/messages" |
| elif args.vendor_name == "meta" or args.vendor_name == "google" : |
| args.api_key = os.getenv("OPENROUTER_API_KEY") |
| args.request_url = "https://openrouter.ai/api/v1/chat/completions" |
| else: |
| print("Error. Invalid Model Input. Exiting") |
| |
| if args.save_filepath is None: |
| args.save_filepath = args.requests_filepath.replace(".jsonl", "_results.jsonl") |
|
|
| |
| asyncio.run( |
| process_api_requests_from_file( |
| vendor_name=args.vendor_name, |
| requests_filepath=args.requests_filepath, |
| save_filepath=args.save_filepath, |
| request_url=args.request_url, |
| api_key=args.api_key, |
| max_requests_per_minute=float(args.max_requests_per_minute), |
| max_tokens_per_minute=float(args.max_tokens_per_minute), |
| token_encoding_name=args.token_encoding_name, |
| max_attempts=int(args.max_attempts), |
| logging_level=int(args.logging_level), |
| ) |
| ) |
|
|
|
|
| """ |
| APPENDIX |
| |
| The example requests file at openai-cookbook/examples/data/example_requests_to_parallel_process.jsonl contains 10,000 requests to text-embedding-ada-002. |
| |
| It was generated with the following code: |
| |
| ```python |
| import json |
| |
| filename = "data/example_requests_to_parallel_process.jsonl" |
| n_requests = 10_000 |
| jobs = [{"model": "text-embedding-ada-002", "input": str(x) + "\n"} for x in range(n_requests)] |
| with open(filename, "w") as f: |
| for job in jobs: |
| json_string = json.dumps(job) |
| f.write(json_string + "\n") |
| ``` |
| |
| As with all jsonl files, take care that newlines in the content are properly escaped (json.dumps does this automatically). |
| """ |
|
|