diff --git a/.venv/lib/python3.11/site-packages/ray/data/_internal/__pycache__/block_builder.cpython-311.pyc b/.venv/lib/python3.11/site-packages/ray/data/_internal/__pycache__/block_builder.cpython-311.pyc new file mode 100644 index 0000000000000000000000000000000000000000..c3c8bf6a2c2b20884648fc4bce7e35d759e4e6fc Binary files /dev/null and b/.venv/lib/python3.11/site-packages/ray/data/_internal/__pycache__/block_builder.cpython-311.pyc differ diff --git a/.venv/lib/python3.11/site-packages/ray/data/_internal/__pycache__/delegating_block_builder.cpython-311.pyc b/.venv/lib/python3.11/site-packages/ray/data/_internal/__pycache__/delegating_block_builder.cpython-311.pyc new file mode 100644 index 0000000000000000000000000000000000000000..2c83c6222ebd9aa96eb0424c028e3ffcbb64d75b Binary files /dev/null and b/.venv/lib/python3.11/site-packages/ray/data/_internal/__pycache__/delegating_block_builder.cpython-311.pyc differ diff --git a/.venv/lib/python3.11/site-packages/ray/data/_internal/__pycache__/equalize.cpython-311.pyc b/.venv/lib/python3.11/site-packages/ray/data/_internal/__pycache__/equalize.cpython-311.pyc new file mode 100644 index 0000000000000000000000000000000000000000..5ab20eae11b7df513fb0c22701e8adc1d1a1e2b2 Binary files /dev/null and b/.venv/lib/python3.11/site-packages/ray/data/_internal/__pycache__/equalize.cpython-311.pyc differ diff --git a/.venv/lib/python3.11/site-packages/ray/data/_internal/__pycache__/logging.cpython-311.pyc b/.venv/lib/python3.11/site-packages/ray/data/_internal/__pycache__/logging.cpython-311.pyc new file mode 100644 index 0000000000000000000000000000000000000000..c7321946ac204eda607cf648bd4608c21112b688 Binary files /dev/null and b/.venv/lib/python3.11/site-packages/ray/data/_internal/__pycache__/logging.cpython-311.pyc differ diff --git a/.venv/lib/python3.11/site-packages/ray/data/_internal/__pycache__/output_buffer.cpython-311.pyc b/.venv/lib/python3.11/site-packages/ray/data/_internal/__pycache__/output_buffer.cpython-311.pyc new file mode 100644 index 0000000000000000000000000000000000000000..e2caa3cea0c45d3fb700d021e2fe8f150131c885 Binary files /dev/null and b/.venv/lib/python3.11/site-packages/ray/data/_internal/__pycache__/output_buffer.cpython-311.pyc differ diff --git a/.venv/lib/python3.11/site-packages/ray/data/_internal/__pycache__/plan.cpython-311.pyc b/.venv/lib/python3.11/site-packages/ray/data/_internal/__pycache__/plan.cpython-311.pyc new file mode 100644 index 0000000000000000000000000000000000000000..b7d873ad8b7deb9bd902f6ccd3b7d93bb9cda1fe Binary files /dev/null and b/.venv/lib/python3.11/site-packages/ray/data/_internal/__pycache__/plan.cpython-311.pyc differ diff --git a/.venv/lib/python3.11/site-packages/ray/data/_internal/__pycache__/remote_fn.cpython-311.pyc b/.venv/lib/python3.11/site-packages/ray/data/_internal/__pycache__/remote_fn.cpython-311.pyc new file mode 100644 index 0000000000000000000000000000000000000000..f5921d1535b3942c8c8c8e67429b29fc49d0330b Binary files /dev/null and b/.venv/lib/python3.11/site-packages/ray/data/_internal/__pycache__/remote_fn.cpython-311.pyc differ diff --git a/.venv/lib/python3.11/site-packages/ray/data/_internal/__pycache__/row.cpython-311.pyc b/.venv/lib/python3.11/site-packages/ray/data/_internal/__pycache__/row.cpython-311.pyc new file mode 100644 index 0000000000000000000000000000000000000000..70ada3e93bf7918aff730bf83238f99f52a1e9ca Binary files /dev/null and b/.venv/lib/python3.11/site-packages/ray/data/_internal/__pycache__/row.cpython-311.pyc differ diff --git a/.venv/lib/python3.11/site-packages/ray/data/_internal/__pycache__/stats.cpython-311.pyc b/.venv/lib/python3.11/site-packages/ray/data/_internal/__pycache__/stats.cpython-311.pyc new file mode 100644 index 0000000000000000000000000000000000000000..870614fc9283a1e2ff3628b95bb04a9327c6d970 Binary files /dev/null and b/.venv/lib/python3.11/site-packages/ray/data/_internal/__pycache__/stats.cpython-311.pyc differ diff --git a/.venv/lib/python3.11/site-packages/ray/data/_internal/__pycache__/table_block.cpython-311.pyc b/.venv/lib/python3.11/site-packages/ray/data/_internal/__pycache__/table_block.cpython-311.pyc new file mode 100644 index 0000000000000000000000000000000000000000..33545ac3ca0aab0b05984b05dd18f46ec1702005 Binary files /dev/null and b/.venv/lib/python3.11/site-packages/ray/data/_internal/__pycache__/table_block.cpython-311.pyc differ diff --git a/.venv/lib/python3.11/site-packages/ray/data/_internal/__pycache__/util.cpython-311.pyc b/.venv/lib/python3.11/site-packages/ray/data/_internal/__pycache__/util.cpython-311.pyc new file mode 100644 index 0000000000000000000000000000000000000000..571b63703821d769bc24e854169eea5030f79101 Binary files /dev/null and b/.venv/lib/python3.11/site-packages/ray/data/_internal/__pycache__/util.cpython-311.pyc differ diff --git a/.venv/lib/python3.11/site-packages/ray/data/_internal/execution/__pycache__/autoscaling_requester.cpython-311.pyc b/.venv/lib/python3.11/site-packages/ray/data/_internal/execution/__pycache__/autoscaling_requester.cpython-311.pyc new file mode 100644 index 0000000000000000000000000000000000000000..8e04ca0f4f4d5181b8bc7da28c8877b93a6ad941 Binary files /dev/null and b/.venv/lib/python3.11/site-packages/ray/data/_internal/execution/__pycache__/autoscaling_requester.cpython-311.pyc differ diff --git a/.venv/lib/python3.11/site-packages/ray/data/_internal/execution/__pycache__/execution_callback.cpython-311.pyc b/.venv/lib/python3.11/site-packages/ray/data/_internal/execution/__pycache__/execution_callback.cpython-311.pyc new file mode 100644 index 0000000000000000000000000000000000000000..760e99df0c1b497e4b105601c663b2b3a4fdd50a Binary files /dev/null and b/.venv/lib/python3.11/site-packages/ray/data/_internal/execution/__pycache__/execution_callback.cpython-311.pyc differ diff --git a/.venv/lib/python3.11/site-packages/ray/data/_internal/execution/__pycache__/legacy_compat.cpython-311.pyc b/.venv/lib/python3.11/site-packages/ray/data/_internal/execution/__pycache__/legacy_compat.cpython-311.pyc new file mode 100644 index 0000000000000000000000000000000000000000..8a241d392cf737f885d1e731bb7a12ba3bde4856 Binary files /dev/null and b/.venv/lib/python3.11/site-packages/ray/data/_internal/execution/__pycache__/legacy_compat.cpython-311.pyc differ diff --git a/.venv/lib/python3.11/site-packages/ray/data/_internal/execution/__pycache__/resource_manager.cpython-311.pyc b/.venv/lib/python3.11/site-packages/ray/data/_internal/execution/__pycache__/resource_manager.cpython-311.pyc new file mode 100644 index 0000000000000000000000000000000000000000..04b7b8799b6d28652a319bacf16e89da7b5420c7 Binary files /dev/null and b/.venv/lib/python3.11/site-packages/ray/data/_internal/execution/__pycache__/resource_manager.cpython-311.pyc differ diff --git a/.venv/lib/python3.11/site-packages/ray/data/_internal/execution/__pycache__/streaming_executor.cpython-311.pyc b/.venv/lib/python3.11/site-packages/ray/data/_internal/execution/__pycache__/streaming_executor.cpython-311.pyc new file mode 100644 index 0000000000000000000000000000000000000000..afdf6a5b028f7f836078a72c4d18afadcfac24fc Binary files /dev/null and b/.venv/lib/python3.11/site-packages/ray/data/_internal/execution/__pycache__/streaming_executor.cpython-311.pyc differ diff --git a/.venv/lib/python3.11/site-packages/ray/data/_internal/execution/__pycache__/streaming_executor_state.cpython-311.pyc b/.venv/lib/python3.11/site-packages/ray/data/_internal/execution/__pycache__/streaming_executor_state.cpython-311.pyc new file mode 100644 index 0000000000000000000000000000000000000000..9e728df746b183e16435b9cdcd51efb3869c08a6 Binary files /dev/null and b/.venv/lib/python3.11/site-packages/ray/data/_internal/execution/__pycache__/streaming_executor_state.cpython-311.pyc differ diff --git a/.venv/lib/python3.11/site-packages/ray/data/_internal/execution/__pycache__/util.cpython-311.pyc b/.venv/lib/python3.11/site-packages/ray/data/_internal/execution/__pycache__/util.cpython-311.pyc new file mode 100644 index 0000000000000000000000000000000000000000..32b9f129f6babc0e7c2387e5678a76490ca44cd9 Binary files /dev/null and b/.venv/lib/python3.11/site-packages/ray/data/_internal/execution/__pycache__/util.cpython-311.pyc differ diff --git a/.venv/lib/python3.11/site-packages/ray/data/_internal/execution/autoscaler/__pycache__/__init__.cpython-311.pyc b/.venv/lib/python3.11/site-packages/ray/data/_internal/execution/autoscaler/__pycache__/__init__.cpython-311.pyc new file mode 100644 index 0000000000000000000000000000000000000000..47f57a9c1439d2b6445563adb695f40c02544f2d Binary files /dev/null and b/.venv/lib/python3.11/site-packages/ray/data/_internal/execution/autoscaler/__pycache__/__init__.cpython-311.pyc differ diff --git a/.venv/lib/python3.11/site-packages/ray/data/_internal/execution/autoscaler/__pycache__/autoscaler.cpython-311.pyc b/.venv/lib/python3.11/site-packages/ray/data/_internal/execution/autoscaler/__pycache__/autoscaler.cpython-311.pyc new file mode 100644 index 0000000000000000000000000000000000000000..96dc8f630c82c2298f0f760450c34743e5fd67e5 Binary files /dev/null and b/.venv/lib/python3.11/site-packages/ray/data/_internal/execution/autoscaler/__pycache__/autoscaler.cpython-311.pyc differ diff --git a/.venv/lib/python3.11/site-packages/ray/data/_internal/execution/autoscaler/__pycache__/default_autoscaler.cpython-311.pyc b/.venv/lib/python3.11/site-packages/ray/data/_internal/execution/autoscaler/__pycache__/default_autoscaler.cpython-311.pyc new file mode 100644 index 0000000000000000000000000000000000000000..740e413549d0af66769ac92237f67e18036bb273 Binary files /dev/null and b/.venv/lib/python3.11/site-packages/ray/data/_internal/execution/autoscaler/__pycache__/default_autoscaler.cpython-311.pyc differ diff --git a/.venv/lib/python3.11/site-packages/ray/data/_internal/execution/bundle_queue/fifo_bundle_queue.py b/.venv/lib/python3.11/site-packages/ray/data/_internal/execution/bundle_queue/fifo_bundle_queue.py new file mode 100644 index 0000000000000000000000000000000000000000..4422c8798eac423f7b3e0276a1b9575d5d906e96 --- /dev/null +++ b/.venv/lib/python3.11/site-packages/ray/data/_internal/execution/bundle_queue/fifo_bundle_queue.py @@ -0,0 +1,129 @@ +from collections import defaultdict, deque +from dataclasses import dataclass +from typing import TYPE_CHECKING, Dict, List, Optional + +from .bundle_queue import BundleQueue + +if TYPE_CHECKING: + from ray.data._internal.execution.interfaces import RefBundle + + +@dataclass +class _Node: + value: "RefBundle" + next: Optional["_Node"] = None + prev: Optional["_Node"] = None + + +class FIFOBundleQueue(BundleQueue): + """A bundle queue that follows a first-in-first-out policy.""" + + def __init__(self): + # We manually implement a linked list because we need to remove elements + # efficiently, and Python's built-in data structures have O(n) removal time. + self._head: Optional[_Node] = None + self._tail: Optional[_Node] = None + # We use a dictionary to keep track of the nodes corresponding to each bundle. + # This allows us to remove a bundle from the queue in O(1) time. We need a list + # because a bundle can be added to the queue multiple times. Nodes in each list + # are insertion-ordered. + self._bundle_to_nodes: Dict["RefBundle", List[_Node]] = defaultdict(deque) + + self._nbytes = 0 + self._num_bundles = 0 + + def __len__(self) -> int: + return self._num_bundles + + def __contains__(self, bundle: "RefBundle") -> bool: + return bundle in self._bundle_to_nodes + + def add(self, bundle: "RefBundle") -> None: + """Add a bundle to the end (right) of the queue.""" + new_node = _Node(value=bundle, next=None, prev=self._tail) + # Case 1: The queue is empty. + if self._head is None: + assert self._tail is None + self._head = new_node + self._tail = new_node + # Case 2: The queue has at least one element. + else: + self._tail.next = new_node + self._tail = new_node + + self._bundle_to_nodes[bundle].append(new_node) + + self._nbytes += bundle.size_bytes() + self._num_bundles += 1 + + def pop(self) -> "RefBundle": + """Return the first (left) bundle in the queue.""" + # Case 1: The queue is empty. + if not self._head: + raise IndexError("You can't pop from an empty queue") + + bundle = self._head.value + self.remove(bundle) + + return bundle + + def peek(self) -> Optional["RefBundle"]: + """Return the first (left) bundle in the queue without removing it.""" + if self._head is None: + return None + + return self._head.value + + def remove(self, bundle: "RefBundle"): + """Remove a bundle from the queue. + + If there are multiple instances of the bundle in the queue, this method only + removes the first one. + """ + # Case 1: The queue is empty. + if bundle not in self._bundle_to_nodes: + raise ValueError(f"The bundle {bundle} is not in the queue.") + + node = self._bundle_to_nodes[bundle].popleft() + if not self._bundle_to_nodes[bundle]: + del self._bundle_to_nodes[bundle] + + # Case 2: The bundle is the only element in the queue. + if self._head is self._tail: + self._head = None + self._tail = None + # Case 3: The bundle is the first element in the queue. + elif node is self._head: + self._head = node.next + self._head.prev = None + # Case 4: The bundle is the last element in the queue. + elif node is self._tail: + self._tail = node.prev + self._tail.next = None + # Case 5: The bundle is in the middle of the queue. + else: + node.prev.next = node.next + node.next.prev = node.prev + + self._nbytes -= bundle.size_bytes() + assert self._nbytes >= 0, ( + "Expected the total size of objects in the queue to be non-negative, but " + f"got {self._nbytes} bytes instead." + ) + + self._num_bundles -= 1 + + return node.value + + def clear(self): + self._head = None + self._tail = None + self._bundle_to_nodes.clear() + self._nbytes = 0 + self._num_bundles = 0 + + def estimate_size_bytes(self) -> int: + return self._nbytes + + def is_empty(self): + return not self._bundle_to_nodes and self._head is None and self._tail is None diff --git a/.venv/lib/python3.11/site-packages/ray/data/_internal/execution/interfaces/__pycache__/__init__.cpython-311.pyc b/.venv/lib/python3.11/site-packages/ray/data/_internal/execution/interfaces/__pycache__/__init__.cpython-311.pyc new file mode 100644 index 0000000000000000000000000000000000000000..44060571b03077768b97fb3ee13f0de3464d473b Binary files /dev/null and b/.venv/lib/python3.11/site-packages/ray/data/_internal/execution/interfaces/__pycache__/__init__.cpython-311.pyc differ diff --git a/.venv/lib/python3.11/site-packages/ray/data/_internal/execution/interfaces/__pycache__/common.cpython-311.pyc b/.venv/lib/python3.11/site-packages/ray/data/_internal/execution/interfaces/__pycache__/common.cpython-311.pyc new file mode 100644 index 0000000000000000000000000000000000000000..3ee75ebc0a509f3409626db3c8e8b6d3b2e8a4cf Binary files /dev/null and b/.venv/lib/python3.11/site-packages/ray/data/_internal/execution/interfaces/__pycache__/common.cpython-311.pyc differ diff --git a/.venv/lib/python3.11/site-packages/ray/data/_internal/execution/interfaces/__pycache__/execution_options.cpython-311.pyc b/.venv/lib/python3.11/site-packages/ray/data/_internal/execution/interfaces/__pycache__/execution_options.cpython-311.pyc new file mode 100644 index 0000000000000000000000000000000000000000..49997aa239544f746d42e5f7f3f989db032ce2ed Binary files /dev/null and b/.venv/lib/python3.11/site-packages/ray/data/_internal/execution/interfaces/__pycache__/execution_options.cpython-311.pyc differ diff --git a/.venv/lib/python3.11/site-packages/ray/data/_internal/execution/interfaces/__pycache__/executor.cpython-311.pyc b/.venv/lib/python3.11/site-packages/ray/data/_internal/execution/interfaces/__pycache__/executor.cpython-311.pyc new file mode 100644 index 0000000000000000000000000000000000000000..5b43ed51c15ebba3835591aab34299a7b1eee57c Binary files /dev/null and b/.venv/lib/python3.11/site-packages/ray/data/_internal/execution/interfaces/__pycache__/executor.cpython-311.pyc differ diff --git a/.venv/lib/python3.11/site-packages/ray/data/_internal/execution/interfaces/__pycache__/op_runtime_metrics.cpython-311.pyc b/.venv/lib/python3.11/site-packages/ray/data/_internal/execution/interfaces/__pycache__/op_runtime_metrics.cpython-311.pyc new file mode 100644 index 0000000000000000000000000000000000000000..0f87d69a0ecaa5afab2a7279e3fa9a384159363a Binary files /dev/null and b/.venv/lib/python3.11/site-packages/ray/data/_internal/execution/interfaces/__pycache__/op_runtime_metrics.cpython-311.pyc differ diff --git a/.venv/lib/python3.11/site-packages/ray/data/_internal/execution/interfaces/__pycache__/physical_operator.cpython-311.pyc b/.venv/lib/python3.11/site-packages/ray/data/_internal/execution/interfaces/__pycache__/physical_operator.cpython-311.pyc new file mode 100644 index 0000000000000000000000000000000000000000..bde4be337d48a54b0cbaaf669cef5b4a049b4805 Binary files /dev/null and b/.venv/lib/python3.11/site-packages/ray/data/_internal/execution/interfaces/__pycache__/physical_operator.cpython-311.pyc differ diff --git a/.venv/lib/python3.11/site-packages/ray/data/_internal/execution/interfaces/__pycache__/ref_bundle.cpython-311.pyc b/.venv/lib/python3.11/site-packages/ray/data/_internal/execution/interfaces/__pycache__/ref_bundle.cpython-311.pyc new file mode 100644 index 0000000000000000000000000000000000000000..83573454ec388ee5d05b964bd94ed82755d6b483 Binary files /dev/null and b/.venv/lib/python3.11/site-packages/ray/data/_internal/execution/interfaces/__pycache__/ref_bundle.cpython-311.pyc differ diff --git a/.venv/lib/python3.11/site-packages/ray/data/_internal/execution/interfaces/__pycache__/task_context.cpython-311.pyc b/.venv/lib/python3.11/site-packages/ray/data/_internal/execution/interfaces/__pycache__/task_context.cpython-311.pyc new file mode 100644 index 0000000000000000000000000000000000000000..db9ba3891e191977072673d9c2f188c256fe4624 Binary files /dev/null and b/.venv/lib/python3.11/site-packages/ray/data/_internal/execution/interfaces/__pycache__/task_context.cpython-311.pyc differ diff --git a/.venv/lib/python3.11/site-packages/ray/data/_internal/execution/interfaces/__pycache__/transform_fn.cpython-311.pyc b/.venv/lib/python3.11/site-packages/ray/data/_internal/execution/interfaces/__pycache__/transform_fn.cpython-311.pyc new file mode 100644 index 0000000000000000000000000000000000000000..7ec42414c7fa7b96e011686708be902140b88ca2 Binary files /dev/null and b/.venv/lib/python3.11/site-packages/ray/data/_internal/execution/interfaces/__pycache__/transform_fn.cpython-311.pyc differ diff --git a/.venv/lib/python3.11/site-packages/ray/data/_internal/logical/optimizers.py b/.venv/lib/python3.11/site-packages/ray/data/_internal/logical/optimizers.py new file mode 100644 index 0000000000000000000000000000000000000000..a7c2b68c06feac94761695904001d2ead6a0bde4 --- /dev/null +++ b/.venv/lib/python3.11/site-packages/ray/data/_internal/logical/optimizers.py @@ -0,0 +1,94 @@ +from typing import List, Optional, Type + +from ray.data._internal.logical.interfaces import ( + LogicalPlan, + Optimizer, + PhysicalPlan, + Rule, +) +from ray.data._internal.logical.rules.inherit_batch_format import InheritBatchFormatRule +from ray.data._internal.logical.rules.inherit_target_max_block_size import ( + InheritTargetMaxBlockSizeRule, +) +from ray.data._internal.logical.rules.operator_fusion import OperatorFusionRule +from ray.data._internal.logical.rules.randomize_blocks import ReorderRandomizeBlocksRule +from ray.data._internal.logical.rules.set_read_parallelism import SetReadParallelismRule +from ray.data._internal.logical.rules.zero_copy_map_fusion import ( + EliminateBuildOutputBlocks, +) +from ray.data._internal.planner.planner import Planner +from ray.util.annotations import DeveloperAPI + +_LOGICAL_RULES = [ + ReorderRandomizeBlocksRule, + InheritBatchFormatRule, +] + +_PHYSICAL_RULES = [ + InheritTargetMaxBlockSizeRule, + SetReadParallelismRule, + OperatorFusionRule, + EliminateBuildOutputBlocks, +] + + +@DeveloperAPI +def register_logical_rule(cls: Type[Rule], insert_index: Optional[int] = None): + if cls in _LOGICAL_RULES: + return + + if insert_index is None: + _LOGICAL_RULES.append(cls) + else: + _LOGICAL_RULES.insert(insert_index, cls) + + +@DeveloperAPI +def get_logical_rules() -> List[Type[Rule]]: + return list(_LOGICAL_RULES) + + +@DeveloperAPI +def register_physical_rule(cls: Type[Rule], insert_index: Optional[int] = None): + if cls in _PHYSICAL_RULES: + return + + if insert_index is None: + _PHYSICAL_RULES.append(cls) + else: + _PHYSICAL_RULES.insert(insert_index, cls) + + +@DeveloperAPI +def get_physical_rules() -> List[Type[Rule]]: + return list(_PHYSICAL_RULES) + + +class LogicalOptimizer(Optimizer): + """The optimizer for logical operators.""" + + @property + def rules(self) -> List[Rule]: + return [rule_cls() for rule_cls in _LOGICAL_RULES] + + +class PhysicalOptimizer(Optimizer): + """The optimizer for physical operators.""" + + @property + def rules(self) -> List[Rule]: + return [rule_cls() for rule_cls in _PHYSICAL_RULES] + + +def get_execution_plan(logical_plan: LogicalPlan) -> PhysicalPlan: + """Get the physical execution plan for the provided logical plan. + + This process has 3 steps: + (1) logical optimization: optimize logical operators. + (2) planning: convert logical to physical operators. + (3) physical optimization: optimize physical operators. + """ + optimized_logical_plan = LogicalOptimizer().optimize(logical_plan) + logical_plan._dag = optimized_logical_plan.dag + physical_plan = Planner().plan(optimized_logical_plan) + return PhysicalOptimizer().optimize(physical_plan) diff --git a/.venv/lib/python3.11/site-packages/ray/data/_internal/logical/rules/inherit_batch_format.py b/.venv/lib/python3.11/site-packages/ray/data/_internal/logical/rules/inherit_batch_format.py new file mode 100644 index 0000000000000000000000000000000000000000..2dd265cd08b119250d08dcf71691e1b3701fb08a --- /dev/null +++ b/.venv/lib/python3.11/site-packages/ray/data/_internal/logical/rules/inherit_batch_format.py @@ -0,0 +1,42 @@ +from collections import deque +from typing import Iterable + +from ray.data._internal.logical.interfaces import LogicalOperator, LogicalPlan, Rule +from ray.data._internal.logical.operators.all_to_all_operator import AbstractAllToAll +from ray.data._internal.logical.operators.map_operator import MapBatches + + +class InheritBatchFormatRule(Rule): + """For AbstractAllToAll based operator, apply this rule + to inherit batch_format from upstream operator by traversing + the entire DAG.""" + + def apply(self, plan: LogicalPlan) -> LogicalPlan: + optimized_dag: LogicalOperator = self._apply(plan.dag) + new_plan = LogicalPlan(dag=optimized_dag, context=plan.context) + return new_plan + + def _apply(self, op: LogicalOperator): + # Post-order traversal. + nodes: Iterable[LogicalOperator] = deque() + for node in op.post_order_iter(): + nodes.appendleft(node) + + while len(nodes) > 0: + current_op = nodes.pop() + + if isinstance(current_op, AbstractAllToAll): + # traversal up the DAG until we find MapBatches with batch_format + # or we reach to source op and do nothing + upstream_op = current_op.input_dependencies[0] + while upstream_op.input_dependencies: + if ( + isinstance(upstream_op, MapBatches) + and upstream_op._batch_format + ): + current_op._batch_format = upstream_op._batch_format + break + upstream_op = upstream_op.input_dependencies[0] + + # just return the default op + return op diff --git a/.venv/lib/python3.11/site-packages/ray/data/_internal/logical/rules/inherit_target_max_block_size.py b/.venv/lib/python3.11/site-packages/ray/data/_internal/logical/rules/inherit_target_max_block_size.py new file mode 100644 index 0000000000000000000000000000000000000000..298ff6c4edbff9cfab6ea14418dc61b81c93e1e8 --- /dev/null +++ b/.venv/lib/python3.11/site-packages/ray/data/_internal/logical/rules/inherit_target_max_block_size.py @@ -0,0 +1,30 @@ +from typing import Optional + +from ray.data._internal.execution.interfaces import PhysicalOperator +from ray.data._internal.logical.interfaces import PhysicalPlan, Rule + + +class InheritTargetMaxBlockSizeRule(Rule): + """For each op that has overridden the default target max block size, + propagate to upstream ops until we reach an op that has also overridden the + target max block size.""" + + def apply(self, plan: PhysicalPlan) -> PhysicalPlan: + self._propagate_target_max_block_size_to_upstream_ops(plan.dag) + return plan + + def _propagate_target_max_block_size_to_upstream_ops( + self, dag: PhysicalOperator, target_max_block_size: Optional[int] = None + ): + if dag.target_max_block_size is not None: + # Set the target block size to inherit for + # upstream ops. + target_max_block_size = dag.target_max_block_size + elif target_max_block_size is not None: + # Inherit from downstream op. + dag.set_target_max_block_size(target_max_block_size) + + for upstream_op in dag.input_dependencies: + self._propagate_target_max_block_size_to_upstream_ops( + upstream_op, target_max_block_size + ) diff --git a/.venv/lib/python3.11/site-packages/ray/data/_internal/logical/rules/limit_pushdown.py b/.venv/lib/python3.11/site-packages/ray/data/_internal/logical/rules/limit_pushdown.py new file mode 100644 index 0000000000000000000000000000000000000000..a13378eb991eb6bdc7e389cf6842ae3df91e66a2 --- /dev/null +++ b/.venv/lib/python3.11/site-packages/ray/data/_internal/logical/rules/limit_pushdown.py @@ -0,0 +1,133 @@ +import copy +from collections import deque +from typing import Iterable, List + +from ray.data._internal.logical.interfaces import LogicalOperator, LogicalPlan, Rule +from ray.data._internal.logical.operators.one_to_one_operator import ( + AbstractOneToOne, + Limit, +) +from ray.data._internal.logical.operators.read_operator import Read + + +class LimitPushdownRule(Rule): + """Rule for pushing down the limit operator. + + When a limit operator is present, we apply the limit on the + most upstream operator that supports it. Notably, we move the + Limit operator downstream from Read op, any other non-OneToOne operator, + or any operator which could potentially change the number of output rows. + + In addition, we also fuse consecutive Limit operators into a single + Limit operator, i.e. `Limit[n] -> Limit[m]` becomes `Limit[min(n, m)]`. + """ + + def apply(self, plan: LogicalPlan) -> LogicalPlan: + optimized_dag = self._apply_limit_pushdown(plan.dag) + optimized_dag = self._apply_limit_fusion(optimized_dag) + return LogicalPlan(dag=optimized_dag, context=plan.context) + + def _apply_limit_pushdown(self, op: LogicalOperator) -> LogicalOperator: + """Given a DAG of LogicalOperators, traverse the DAG and push down + Limit operators, i.e. move Limit operators as far upstream as possible. + + Returns a new LogicalOperator with the Limit operators pushed down.""" + # Post-order traversal. + nodes: Iterable[LogicalOperator] = deque() + for node in op.post_order_iter(): + nodes.appendleft(node) + + while len(nodes) > 0: + current_op = nodes.pop() + + # If we encounter a Limit op, move it upstream until it reaches: + # - Read operator + # - A non-AbstractOneToOne operator (e.g. AbstractAllToAll) + # - An AbstractOneToOne operator that could change the number of output rows + + # TODO(scottjlee): in our current abstraction, we have Read extend + # AbstractMap (with no input dependency), which extends AbstractOneToOne. + # So we have to explicitly separate the Read op in its own check. + # We should remove this case once we refactor Read op to no longer + # be an AbstractOneToOne op. + if isinstance(current_op, Limit): + limit_op_copy = copy.copy(current_op) + + # Traverse up the DAG until we reach the first operator that meets + # one of the conditions above, which will serve as the new input + # into the Limit operator. + new_input_into_limit = current_op.input_dependency + ops_between_new_input_and_limit: List[LogicalOperator] = [] + while ( + isinstance(new_input_into_limit, AbstractOneToOne) + and not isinstance(new_input_into_limit, Read) + and not getattr(new_input_into_limit, "can_modify_num_rows", False) + ): + new_input_into_limit_copy = copy.copy(new_input_into_limit) + ops_between_new_input_and_limit.append(new_input_into_limit_copy) + new_input_into_limit = new_input_into_limit.input_dependency + + # Link the Limit operator and its newly designated input op from above. + limit_op_copy._input_dependencies = [new_input_into_limit] + new_input_into_limit._output_dependencies = [limit_op_copy] + + # Build the chain of operator dependencies between the new + # input and the Limit operator, using copies of traversed operators. + ops_between_new_input_and_limit.append(limit_op_copy) + for idx in range(len(ops_between_new_input_and_limit) - 1): + curr_op, up_op = ( + ops_between_new_input_and_limit[idx], + ops_between_new_input_and_limit[idx + 1], + ) + curr_op._input_dependencies = [up_op] + up_op._output_dependencies = [curr_op] + # Add the copied operator to the list of nodes to be traversed. + nodes.append(curr_op) + + # Link the Limit operator to its new input operator. + for limit_output_op in current_op.output_dependencies: + limit_output_op._input_dependencies = [ + ops_between_new_input_and_limit[0] + ] + last_op = ops_between_new_input_and_limit[0] + last_op._output_dependencies = current_op.output_dependencies + + return current_op + + def _apply_limit_fusion(self, op: LogicalOperator) -> LogicalOperator: + """Given a DAG of LogicalOperators, traverse the DAG and fuse all + back-to-back Limit operators, i.e. + Limit[n] -> Limit[m] becomes Limit[min(n, m)]. + + Returns a new LogicalOperator with the Limit operators fusion applied.""" + + # Post-order traversal. + nodes: Iterable[LogicalOperator] = deque() + for node in op.post_order_iter(): + nodes.appendleft(node) + + while len(nodes) > 0: + current_op = nodes.pop() + + # If we encounter two back-to-back Limit operators, fuse them. + if isinstance(current_op, Limit): + upstream_op = current_op.input_dependency + if isinstance(upstream_op, Limit): + new_limit = min(current_op._limit, upstream_op._limit) + fused_limit_op = Limit(upstream_op.input_dependency, new_limit) + + # Link the fused Limit operator to its input and output ops, i.e.: + # `upstream_input -> limit_upstream -> limit_downstream -> downstream_output` # noqa: E501 + # becomes `upstream_input -> fused_limit -> downstream_output` + fused_limit_op._input_dependencies = upstream_op.input_dependencies + fused_limit_op._output_dependencies = current_op.output_dependencies + + # Replace occurrences of the upstream Limit operator in + # output_dependencies with the newly fused Limit operator. + upstream_input = upstream_op.input_dependency + upstream_input._output_dependencies = [fused_limit_op] + + for current_output in current_op.output_dependencies: + current_output._input_dependencies = [fused_limit_op] + nodes.append(fused_limit_op) + return current_op diff --git a/.venv/lib/python3.11/site-packages/ray/data/_internal/logical/rules/operator_fusion.py b/.venv/lib/python3.11/site-packages/ray/data/_internal/logical/rules/operator_fusion.py new file mode 100644 index 0000000000000000000000000000000000000000..f287e2a98f6e998f8166acaf14f5148a2a912cb6 --- /dev/null +++ b/.venv/lib/python3.11/site-packages/ray/data/_internal/logical/rules/operator_fusion.py @@ -0,0 +1,491 @@ +import itertools +from typing import List, Optional, Tuple + +from ray.data._internal.compute import ( + ActorPoolStrategy, + ComputeStrategy, + TaskPoolStrategy, +) +from ray.data._internal.execution.interfaces import ( + PhysicalOperator, + RefBundle, + TaskContext, +) +from ray.data._internal.execution.operators.actor_pool_map_operator import ( + ActorPoolMapOperator, +) +from ray.data._internal.execution.operators.base_physical_operator import ( + AllToAllOperator, +) +from ray.data._internal.execution.operators.map_operator import MapOperator +from ray.data._internal.execution.operators.task_pool_map_operator import ( + TaskPoolMapOperator, +) +from ray.data._internal.logical.interfaces import PhysicalPlan, Rule +from ray.data._internal.logical.operators.all_to_all_operator import ( + AbstractAllToAll, + RandomShuffle, + Repartition, +) +from ray.data._internal.logical.operators.map_operator import ( + AbstractMap, + AbstractUDFMap, +) +from ray.data._internal.stats import StatsDict +from ray.data.context import DataContext + +# Scheduling strategy can be inherited from upstream operator if not specified. +INHERITABLE_REMOTE_ARGS = ["scheduling_strategy"] + + +class OperatorFusionRule(Rule): + """Fuses linear chains of compatible physical operators.""" + + def apply(self, plan: PhysicalPlan) -> PhysicalPlan: + self._op_map = plan.op_map.copy() + # Do DFS fusion on compatible pairwise operators in two passes. + # In the first pass, only fuse back-to-back map operators together. + fused_dag = self._fuse_map_operators_in_dag(plan.dag) + + # Now that we have fused together all back-to-back map operators, + # we fuse together MapOperator -> AllToAllOperator pairs. + fused_dag = self._fuse_all_to_all_operators_in_dag(fused_dag) + + # Update output dependencies after fusion. + # TODO(hchen): Instead of updating the depdencies manually, + # we need a better abstraction for manipulating the DAG. + self._remove_output_depes(fused_dag) + self._update_output_depes(fused_dag) + + new_plan = PhysicalPlan(fused_dag, self._op_map, plan.context) + return new_plan + + def _remove_output_depes(self, op: PhysicalOperator) -> None: + for input in op._input_dependencies: + input._output_dependencies = [] + self._remove_output_depes(input) + + def _update_output_depes(self, op: PhysicalOperator) -> None: + for input in op._input_dependencies: + input._output_dependencies.append(op) + self._update_output_depes(input) + + def _fuse_map_operators_in_dag(self, dag: PhysicalOperator) -> MapOperator: + """Starting at the given operator, traverses up the DAG of operators + and recursively fuses compatible MapOperator -> MapOperator pairs. + Returns the current (root) operator after completing upstream operator fusions. + """ + upstream_ops = dag.input_dependencies + while ( + len(upstream_ops) == 1 + and isinstance(dag, MapOperator) + and isinstance(upstream_ops[0], MapOperator) + and self._can_fuse(dag, upstream_ops[0]) + ): + # Fuse operator with its upstream op. + dag = self._get_fused_map_operator(dag, upstream_ops[0]) + upstream_ops = dag.input_dependencies + + # Done fusing back-to-back map operators together here, + # move up the DAG to find the next map operators to fuse. + dag._input_dependencies = [ + self._fuse_map_operators_in_dag(upstream_op) for upstream_op in upstream_ops + ] + return dag + + def _fuse_all_to_all_operators_in_dag( + self, dag: AllToAllOperator + ) -> AllToAllOperator: + """Starting at the given operator, traverses up the DAG of operators + and recursively fuses compatible MapOperator -> AllToAllOperator pairs. + + Also, sets the target block size of the immediately upstream map op to + match the shuffle block size. We use a larger block size for shuffles + because tiny blocks are bad for I/O performance. + + Returns the current (root) operator after completing upstream operator fusions. + """ + upstream_ops = dag.input_dependencies + while ( + len(upstream_ops) == 1 + and isinstance(dag, AllToAllOperator) + and isinstance(upstream_ops[0], MapOperator) + and self._can_fuse(dag, upstream_ops[0]) + ): + # Fuse operator with its upstream op. + dag = self._get_fused_all_to_all_operator(dag, upstream_ops[0]) + upstream_ops = dag.input_dependencies + + # Done fusing MapOperator -> AllToAllOperator together here, + # move up the DAG to find the next pair of operators to fuse. + dag._input_dependencies = [ + self._fuse_all_to_all_operators_in_dag(upstream_op) + for upstream_op in upstream_ops + ] + return dag + + def _can_fuse(self, down_op: PhysicalOperator, up_op: PhysicalOperator) -> bool: + """Returns whether the provided downstream operator can be fused with the given + upstream operator. + + We currently support fusing two operators if the following are all true: + * We are fusing either MapOperator -> MapOperator or + MapOperator -> AllToAllOperator. + * They either use the same compute configuration, or the upstream operator + uses a task pool while the downstream operator uses an actor pool. + * If both operators involve callable classes, the callable classes are + the same class AND constructor args are the same for both. + * They have compatible remote arguments. + """ + if not up_op.supports_fusion() or not down_op.supports_fusion(): + return False + + # We currently only support fusing for the following cases: + # - TaskPoolMapOperator -> TaskPoolMapOperator/ActorPoolMapOperator + # - TaskPoolMapOperator -> AllToAllOperator + # (only RandomShuffle and Repartition LogicalOperators are currently supported) + if not ( + ( + isinstance(up_op, TaskPoolMapOperator) + and isinstance(down_op, (TaskPoolMapOperator, ActorPoolMapOperator)) + ) + or ( + isinstance(up_op, TaskPoolMapOperator) + and isinstance(down_op, AllToAllOperator) + ) + ): + return False + + down_logical_op = self._op_map[down_op] + up_logical_op = self._op_map[up_op] + + if up_op.get_additional_split_factor() > 1: + return False + + # If the downstream operator takes no input, it cannot be fused with + # the upstream operator. + if not down_logical_op._input_dependencies: + return False + + # We currently only support fusing for the following cases: + # - AbstractMap -> AbstractMap + # - AbstractMap -> RandomShuffle + # - AbstractMap -> Repartition (shuffle=True) + if not ( + ( + isinstance(up_logical_op, AbstractMap) + and isinstance(down_logical_op, AbstractMap) + ) + or ( + isinstance(up_logical_op, AbstractMap) + and isinstance(down_logical_op, RandomShuffle) + ) + or ( + isinstance(up_logical_op, AbstractMap) + and isinstance(down_logical_op, Repartition) + ) + ): + return False + + # Do not fuse Repartition operator if shuffle is disabled + # (i.e. using split shuffle). + if isinstance(down_logical_op, Repartition) and not down_logical_op._shuffle: + return False + + if isinstance(down_logical_op, AbstractMap) and isinstance( + up_logical_op, AbstractMap + ): + if ( + self._fuse_compute_strategy( + up_logical_op._compute, + down_logical_op._compute, + ) + is None + ): + return False + + # Only fuse if the ops' remote arguments are compatible. + if not _are_remote_args_compatible( + getattr(up_logical_op, "_ray_remote_args", {}), + getattr(down_logical_op, "_ray_remote_args", {}), + ): + return False + + # Do not fuse if either op specifies a `_ray_remote_args_fn`, + # since it is not known whether the generated args will be compatible. + if getattr(up_logical_op, "_ray_remote_args_fn", None) or getattr( + down_logical_op, "_ray_remote_args_fn", None + ): + return False + + if not self._can_merge_target_max_block_size( + up_op.target_max_block_size, + down_op.target_max_block_size, + up_op.data_context, + ): + return False + + # Otherwise, ops are compatible for fusion. + return True + + def _fuse_compute_strategy( + self, up_compute: ComputeStrategy, down_compute: ComputeStrategy + ) -> Optional[ComputeStrategy]: + """Fuse the compute strategies of the upstream and downstream operators. + Returns None if they are not compatible. + + Task->Task and Task->Actor are allowed. + Actor->Actor and Actor->Task are not allowed. + """ + if isinstance(up_compute, ActorPoolStrategy): + return None + assert isinstance(up_compute, TaskPoolStrategy) + if isinstance(down_compute, TaskPoolStrategy): + # For Task->Task, the sizes must match. + if up_compute.size != down_compute.size: + return None + return down_compute + else: + assert isinstance(down_compute, ActorPoolStrategy) + # For Task->Actor, if Task's size is set, it must match Actor's max_size. + if up_compute.size is not None and up_compute.size != down_compute.max_size: + return None + return down_compute + + def _can_merge_target_max_block_size( + self, + up_target_max_block_size: Optional[int], + down_target_max_block_size: Optional[int], + data_context: DataContext, + ): + # If the upstream op overrode the target max block size, only fuse if + # they are equal. + if up_target_max_block_size is not None: + if down_target_max_block_size is None: + down_target_max_block_size = data_context.target_max_block_size + if up_target_max_block_size != down_target_max_block_size: + return False + return True + + def _get_merged_target_max_block_size( + self, + up_target_max_block_size: Optional[int], + down_target_max_block_size: Optional[int], + ): + if up_target_max_block_size is not None: + # If the upstream op overrode the target max block size, we can + # only merge if the downstream op matches or uses the default. + assert ( + down_target_max_block_size is None + or down_target_max_block_size == up_target_max_block_size + ) + return up_target_max_block_size + else: + # Upstream op inherits the downstream op's target max block size, + # because the downstream op is the one that outputs the final + # blocks. + return down_target_max_block_size + + def _get_fused_map_operator( + self, down_op: MapOperator, up_op: MapOperator + ) -> MapOperator: + assert self._can_fuse(down_op, up_op), ( + "Current rule supports fusing MapOperator->MapOperator, but received: " + f"{type(up_op).__name__} -> {type(down_op).__name__}" + ) + + # Fuse operator names. + name = up_op.name + "->" + down_op.name + + down_logical_op = self._op_map.pop(down_op) + up_logical_op = self._op_map.pop(up_op) + assert isinstance(down_logical_op, AbstractMap) + assert isinstance(up_logical_op, AbstractMap) + + # Merge minimum block sizes. + down_min_rows_per_bundled_input = down_logical_op._min_rows_per_bundled_input + up_min_rows_per_bundled_input = up_logical_op._min_rows_per_bundled_input + if ( + down_min_rows_per_bundled_input is not None + and up_min_rows_per_bundled_input is not None + ): + min_rows_per_bundled_input = max( + down_min_rows_per_bundled_input, up_min_rows_per_bundled_input + ) + elif up_min_rows_per_bundled_input is not None: + min_rows_per_bundled_input = up_min_rows_per_bundled_input + else: + min_rows_per_bundled_input = down_min_rows_per_bundled_input + + target_max_block_size = self._get_merged_target_max_block_size( + up_op.target_max_block_size, down_op.target_max_block_size + ) + + compute = self._fuse_compute_strategy( + up_logical_op._compute, down_logical_op._compute + ) + assert compute is not None + ray_remote_args = up_logical_op._ray_remote_args + ray_remote_args_fn = ( + up_logical_op._ray_remote_args_fn or down_logical_op._ray_remote_args_fn + ) + # Make the upstream operator's inputs the new, fused operator's inputs. + input_deps = up_op.input_dependencies + assert len(input_deps) == 1 + input_op = input_deps[0] + + # Fused physical map operator. + assert up_op.data_context is down_op.data_context + op = MapOperator.create( + up_op.get_map_transformer().fuse(down_op.get_map_transformer()), + input_op, + up_op.data_context, + target_max_block_size=target_max_block_size, + name=name, + compute_strategy=compute, + min_rows_per_bundle=min_rows_per_bundled_input, + ray_remote_args=ray_remote_args, + ray_remote_args_fn=ray_remote_args_fn, + ) + op.set_logical_operators(*up_op._logical_operators, *down_op._logical_operators) + for map_task_kwargs_fn in itertools.chain( + up_op._map_task_kwargs_fns, down_op._map_task_kwargs_fns + ): + op.add_map_task_kwargs_fn(map_task_kwargs_fn) + + # Build a map logical operator to be used as a reference for further fusion. + # TODO(Scott): This is hacky, remove this once we push fusion to be purely based + # on a lower-level operator spec. + if isinstance(up_logical_op, AbstractUDFMap): + input_op = up_logical_op.input_dependency + else: + # Bottom out at the source logical op (e.g. Read()). + input_op = up_logical_op + if isinstance(down_logical_op, AbstractUDFMap): + logical_op = AbstractUDFMap( + name, + input_op, + down_logical_op._fn, + down_logical_op._fn_args, + down_logical_op._fn_kwargs, + down_logical_op._fn_constructor_args, + down_logical_op._fn_constructor_kwargs, + min_rows_per_bundled_input, + compute, + ray_remote_args_fn, + ray_remote_args, + ) + else: + # The downstream op is AbstractMap instead of AbstractUDFMap. + logical_op = AbstractMap( + name, + input_op, + min_rows_per_bundled_input=min_rows_per_bundled_input, + ray_remote_args_fn=ray_remote_args_fn, + ray_remote_args=ray_remote_args, + ) + self._op_map[op] = logical_op + # Return the fused physical operator. + return op + + def _get_fused_all_to_all_operator( + self, down_op: AllToAllOperator, up_op: MapOperator + ) -> AllToAllOperator: + assert self._can_fuse(down_op, up_op), ( + "Current rule supports fusing MapOperator -> AllToAllOperator" + f", but received: {type(up_op).__name__} -> {type(down_op).__name__}" + ) + + # Fuse operator names. + name = up_op.name + "->" + down_op.name + + down_logical_op = self._op_map.pop(down_op) + up_logical_op = self._op_map.pop(up_op) + assert isinstance(down_logical_op, AbstractAllToAll) + assert isinstance(up_logical_op, AbstractMap) + + # Fuse transformation functions. + ray_remote_args = up_logical_op._ray_remote_args + down_transform_fn = down_op.get_transformation_fn() + up_map_transformer = up_op.get_map_transformer() + + def fused_all_to_all_transform_fn( + blocks: List[RefBundle], ctx: TaskContext + ) -> Tuple[List[RefBundle], StatsDict]: + """To fuse MapOperator->AllToAllOperator, we store the map function + in the TaskContext so that it may be used by the downstream + AllToAllOperator's transform function.""" + ctx.upstream_map_transformer = up_map_transformer + ctx.upstream_map_ray_remote_args = ray_remote_args + return down_transform_fn(blocks, ctx) + + # Make the upstream operator's inputs the new, fused operator's inputs. + input_deps = up_op.input_dependencies + assert len(input_deps) == 1 + input_op = input_deps[0] + + target_max_block_size = self._get_merged_target_max_block_size( + up_op.target_max_block_size, down_op.target_max_block_size + ) + + assert up_op.data_context is down_op.data_context + op = AllToAllOperator( + fused_all_to_all_transform_fn, + input_op, + up_op.data_context, + target_max_block_size=target_max_block_size, + num_outputs=down_op._num_outputs, + # Transfer over the existing sub-progress bars from + # the AllToAllOperator (if any) into the fused operator. + sub_progress_bar_names=down_op._sub_progress_bar_names, + name=name, + ) + # Bottom out at the source logical op (e.g. Read()). + input_op = up_logical_op + + if isinstance(down_logical_op, RandomShuffle): + logical_op = RandomShuffle( + input_op, + name=name, + ray_remote_args=ray_remote_args, + ) + elif isinstance(down_logical_op, Repartition): + logical_op = Repartition( + input_op, + num_outputs=down_logical_op._num_outputs, + shuffle=down_logical_op._shuffle, + ) + self._op_map[op] = logical_op + # Return the fused physical operator. + return op + + +def _are_remote_args_compatible(prev_args, next_args): + """Check if Ray remote arguments are compatible for merging.""" + prev_args = _canonicalize(prev_args) + next_args = _canonicalize(next_args) + remote_args = next_args.copy() + for key in INHERITABLE_REMOTE_ARGS: + # NOTE: We only carry over inheritable value in case + # of it not being provided in the remote args + if key in prev_args and key not in remote_args: + remote_args[key] = prev_args[key] + + if prev_args != remote_args: + return False + return True + + +def _canonicalize(remote_args: dict) -> dict: + """Returns canonical form of given remote args.""" + remote_args = remote_args.copy() + if "num_cpus" not in remote_args or remote_args["num_cpus"] is None: + remote_args["num_cpus"] = 1 + if "num_gpus" not in remote_args or remote_args["num_gpus"] is None: + remote_args["num_gpus"] = 0 + resources = remote_args.get("resources", {}) + for k, v in list(resources.items()): + if v is None or v == 0.0: + del resources[k] + remote_args["resources"] = resources + return remote_args diff --git a/.venv/lib/python3.11/site-packages/ray/data/_internal/logical/rules/set_read_parallelism.py b/.venv/lib/python3.11/site-packages/ray/data/_internal/logical/rules/set_read_parallelism.py new file mode 100644 index 0000000000000000000000000000000000000000..0f9bb1b56ada4375eec9061dc47d30cd1862bd98 --- /dev/null +++ b/.venv/lib/python3.11/site-packages/ray/data/_internal/logical/rules/set_read_parallelism.py @@ -0,0 +1,132 @@ +import logging +import math +from typing import Optional, Tuple, Union + +from ray import available_resources as ray_available_resources +from ray.data._internal.execution.interfaces import PhysicalOperator +from ray.data._internal.execution.operators.input_data_buffer import InputDataBuffer +from ray.data._internal.logical.interfaces import PhysicalPlan, Rule +from ray.data._internal.logical.operators.read_operator import Read +from ray.data._internal.util import _autodetect_parallelism +from ray.data.context import WARN_PREFIX, DataContext +from ray.data.datasource.datasource import Datasource, Reader + +logger = logging.getLogger(__name__) + + +def compute_additional_split_factor( + datasource_or_legacy_reader: Union[Datasource, Reader], + parallelism: int, + mem_size: int, + target_max_block_size: int, + cur_additional_split_factor: Optional[int] = None, +) -> Tuple[int, str, int, Optional[int]]: + ctx = DataContext.get_current() + detected_parallelism, reason, _ = _autodetect_parallelism( + parallelism, target_max_block_size, ctx, datasource_or_legacy_reader, mem_size + ) + num_read_tasks = len( + datasource_or_legacy_reader.get_read_tasks(detected_parallelism) + ) + expected_block_size = None + if mem_size: + expected_block_size = mem_size / num_read_tasks + logger.debug( + f"Expected in-memory size {mem_size}," f" block size {expected_block_size}" + ) + size_based_splits = round(max(1, expected_block_size / target_max_block_size)) + else: + size_based_splits = 1 + if cur_additional_split_factor: + size_based_splits *= cur_additional_split_factor + logger.debug(f"Size based split factor {size_based_splits}") + estimated_num_blocks = num_read_tasks * size_based_splits + logger.debug(f"Blocks after size splits {estimated_num_blocks}") + + available_cpu_slots = ray_available_resources().get("CPU", 1) + if ( + parallelism != -1 + and num_read_tasks >= available_cpu_slots * 4 + and num_read_tasks >= 5000 + ): + logger.warning( + f"{WARN_PREFIX} The requested number of read blocks of {parallelism} " + "is more than 4x the number of available CPU slots in the cluster of " + f"{available_cpu_slots}. This can " + "lead to slowdowns during the data reading phase due to excessive " + "task creation. Reduce the value to match with the available " + "CPU slots in the cluster, or set override_num_blocks to -1 for Ray Data " + "to automatically determine the number of read tasks blocks." + "You can ignore this message if the cluster is expected to autoscale." + ) + + # Add more output splitting for each read task if needed. + # TODO(swang): For parallelism=-1 (user did not explicitly set + # parallelism), and if the following operator produces much larger blocks, + # we should scale down the target max block size here instead of using + # splitting, which can have higher memory usage. + if estimated_num_blocks < detected_parallelism and estimated_num_blocks > 0: + k = math.ceil(detected_parallelism / estimated_num_blocks) + estimated_num_blocks = estimated_num_blocks * k + return detected_parallelism, reason, estimated_num_blocks, k + + return detected_parallelism, reason, estimated_num_blocks, None + + +class SetReadParallelismRule(Rule): + """ + This rule sets the read op's task parallelism based on the target block + size, the requested parallelism, the number of read files, and the + available resources in the cluster. + + If the parallelism is lower than requested, this rule also sets a split + factor to split the output blocks of the read task, so that the following + operator will have the desired parallelism. + """ + + def apply(self, plan: PhysicalPlan) -> PhysicalPlan: + ops = [plan.dag] + + while len(ops) > 0: + op = ops.pop(0) + if isinstance(op, InputDataBuffer): + continue + logical_op = plan.op_map[op] + if isinstance(logical_op, Read): + self._apply(op, logical_op) + ops += op.input_dependencies + + return plan + + def _apply(self, op: PhysicalOperator, logical_op: Read): + ( + detected_parallelism, + reason, + estimated_num_blocks, + k, + ) = compute_additional_split_factor( + logical_op._datasource_or_legacy_reader, + logical_op._parallelism, + logical_op._mem_size, + op.actual_target_max_block_size, + op._additional_split_factor, + ) + + if logical_op._parallelism == -1: + assert reason != "" + logger.debug( + f"Using autodetected parallelism={detected_parallelism} " + f"for operator {logical_op.name} to satisfy {reason}." + ) + logical_op.set_detected_parallelism(detected_parallelism) + + if k is not None: + logger.debug( + f"To satisfy the requested parallelism of {detected_parallelism}, " + f"each read task output is split into {k} smaller blocks." + ) + + if k is not None: + op.set_additional_split_factor(k) + + logger.debug(f"Estimated num output blocks {estimated_num_blocks}") diff --git a/.venv/lib/python3.11/site-packages/ray/data/_internal/logical/rules/zero_copy_map_fusion.py b/.venv/lib/python3.11/site-packages/ray/data/_internal/logical/rules/zero_copy_map_fusion.py new file mode 100644 index 0000000000000000000000000000000000000000..6495f64f10a49e781c75b1b5061ba78e66715ad2 --- /dev/null +++ b/.venv/lib/python3.11/site-packages/ray/data/_internal/logical/rules/zero_copy_map_fusion.py @@ -0,0 +1,88 @@ +from abc import abstractmethod +from typing import List + +from ray.data._internal.execution.operators.map_operator import MapOperator +from ray.data._internal.execution.operators.map_transformer import ( + BuildOutputBlocksMapTransformFn, + MapTransformFn, + MapTransformFnDataType, +) +from ray.data._internal.logical.interfaces.optimizer import Rule +from ray.data._internal.logical.interfaces.physical_plan import PhysicalPlan + + +class ZeroCopyMapFusionRule(Rule): + """Base abstract class for all zero-copy map fusion rules. + + A zero-copy map fusion rule is a rule that optimizes the transform_fn chain of + a fused MapOperator. The optimization is usually done by removing unnecessary + data conversions. + + This base abstract class defines the common util functions. And subclasses + should implement the `_optimize` method for the concrete optimization + strategy. + """ + + def apply(self, plan: PhysicalPlan) -> PhysicalPlan: + self._traverse(plan.dag) + return plan + + def _traverse(self, op): + """Traverse the DAG and apply the optimization to each MapOperator.""" + if isinstance(op, MapOperator): + map_transformer = op.get_map_transformer() + transform_fns = map_transformer.get_transform_fns() + new_transform_fns = self._optimize(transform_fns) + # Physical operators won't be shared, + # so it's safe to modify the transform_fns in place. + map_transformer.set_transform_fns(new_transform_fns) + + for input_op in op.input_dependencies: + self._traverse(input_op) + + @abstractmethod + def _optimize(self, transform_fns: List[MapTransformFn]) -> List[MapTransformFn]: + """Optimize the transform_fns chain of a MapOperator. + + Args: + transform_fns: The old transform_fns chain. + Returns: + The optimized transform_fns chain. + """ + ... + + +class EliminateBuildOutputBlocks(ZeroCopyMapFusionRule): + """This rule eliminates unnecessary BuildOutputBlocksMapTransformFn, + if the previous fn already outputs blocks. + + This happens for the "Read -> Map/Write" fusion. + """ + + def _optimize(self, transform_fns: List[MapTransformFn]) -> List[MapTransformFn]: + # For the following subsquence, + # 1. Any MapTransformFn with block output. + # 2. BuildOutputBlocksMapTransformFn + # 3. Any MapTransformFn with block input. + # We drop the BuildOutputBlocksMapTransformFn in the middle. + new_transform_fns = [] + + for i in range(len(transform_fns)): + cur_fn = transform_fns[i] + drop = False + if ( + i > 0 + and i < len(transform_fns) - 1 + and isinstance(cur_fn, BuildOutputBlocksMapTransformFn) + ): + prev_fn = transform_fns[i - 1] + next_fn = transform_fns[i + 1] + if ( + prev_fn.output_type == MapTransformFnDataType.Block + and next_fn.input_type == MapTransformFnDataType.Block + ): + drop = True + if not drop: + new_transform_fns.append(cur_fn) + + return new_transform_fns diff --git a/.venv/lib/python3.11/site-packages/ray/data/_internal/logical/util.py b/.venv/lib/python3.11/site-packages/ray/data/_internal/logical/util.py new file mode 100644 index 0000000000000000000000000000000000000000..af6f2420a2696f2be39350044c2b15dda886d74a --- /dev/null +++ b/.venv/lib/python3.11/site-packages/ray/data/_internal/logical/util.py @@ -0,0 +1,112 @@ +import json +import re +import threading +from typing import Dict + +from ray._private.usage.usage_lib import TagKey, record_extra_usage_tag +from ray.data._internal.logical.interfaces import LogicalOperator +from ray.data._internal.logical.operators.map_operator import AbstractUDFMap +from ray.data._internal.logical.operators.read_operator import Read +from ray.data._internal.logical.operators.write_operator import Write + +# The dictionary for the operator name and count. +_recorded_operators = dict() +_recorded_operators_lock = threading.Lock() + +# The white list of operator names allowed to be recorded. +_op_name_white_list = [ + # Read + "ReadBigQuery", + "ReadRange", + "ReadMongo", + "ReadParquet", + "ReadParquetBulk", + "ReadImage", + "ReadJSON", + "ReadCSV", + "ReadText", + "ReadNumpy", + "ReadTFRecord", + "ReadBinary", + "ReadTorch", + "ReadAvro", + "ReadWebDataset", + "ReadSQL", + "ReadDatabricksUC", + "ReadLance", + "ReadHuggingFace", + "ReadCustom", + # From + "FromArrow", + "FromItems", + "FromNumpy", + "FromPandas", + # Write + "WriteBigQuery", + "WriteParquet", + "WriteJSON", + "WriteCSV", + "WriteTFRecord", + "WriteNumpy", + "WriteMongo", + "WriteWebDataset", + "WriteSQL", + "WriteCustom", + # Map + "Map", + "MapBatches", + "Filter", + "FlatMap", + # All-to-all + "RandomizeBlockOrder", + "RandomShuffle", + "Repartition", + "Sort", + "Aggregate", + # N-ary + "Zip", + "Union", +] + + +def record_operators_usage(op: LogicalOperator): + """Record logical operator usage with Ray telemetry.""" + ops_dict = dict() + _collect_operators_to_dict(op, ops_dict) + ops_json_str = "" + with _recorded_operators_lock: + for op, count in ops_dict.items(): + _recorded_operators.setdefault(op, 0) + _recorded_operators[op] += count + ops_json_str = json.dumps(_recorded_operators) + + record_extra_usage_tag(TagKey.DATA_LOGICAL_OPS, ops_json_str) + + +def _collect_operators_to_dict(op: LogicalOperator, ops_dict: Dict[str, int]): + """Collect the logical operator name and count into `ops_dict`.""" + for child in op.input_dependencies: + _collect_operators_to_dict(child, ops_dict) + + op_name = op.name + + # Check read and write operator, and anonymize user-defined data source. + if isinstance(op, Read): + op_name = f"Read{op._datasource.get_name()}" + if op_name not in _op_name_white_list: + op_name = "ReadCustom" + elif isinstance(op, Write): + op_name = f"Write{op._datasink_or_legacy_datasource.get_name()}" + if op_name not in _op_name_white_list: + op_name = "WriteCustom" + elif isinstance(op, AbstractUDFMap): + # Remove the function name from the map operator name. + # E.g., Map() -> Map + op_name = re.sub("\\(.*\\)$", "", op_name) + + # Anonymize any operator name if not in white list. + if op_name not in _op_name_white_list: + op_name = "Unknown" + + ops_dict.setdefault(op_name, 0) + ops_dict[op_name] += 1 diff --git a/.venv/lib/python3.11/site-packages/ray/data/_internal/planner/__init__.py b/.venv/lib/python3.11/site-packages/ray/data/_internal/planner/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/.venv/lib/python3.11/site-packages/ray/data/_internal/planner/__pycache__/plan_all_to_all_op.cpython-311.pyc b/.venv/lib/python3.11/site-packages/ray/data/_internal/planner/__pycache__/plan_all_to_all_op.cpython-311.pyc new file mode 100644 index 0000000000000000000000000000000000000000..5bb06116d7163a86ca1df13aebde850fd41787e5 Binary files /dev/null and b/.venv/lib/python3.11/site-packages/ray/data/_internal/planner/__pycache__/plan_all_to_all_op.cpython-311.pyc differ diff --git a/.venv/lib/python3.11/site-packages/ray/data/_internal/planner/__pycache__/plan_from_arrow_op.cpython-311.pyc b/.venv/lib/python3.11/site-packages/ray/data/_internal/planner/__pycache__/plan_from_arrow_op.cpython-311.pyc new file mode 100644 index 0000000000000000000000000000000000000000..12b8e06e0b24c73c8ce6a55d37267ddc3c03e294 Binary files /dev/null and b/.venv/lib/python3.11/site-packages/ray/data/_internal/planner/__pycache__/plan_from_arrow_op.cpython-311.pyc differ diff --git a/.venv/lib/python3.11/site-packages/ray/data/_internal/planner/__pycache__/plan_from_items_op.cpython-311.pyc b/.venv/lib/python3.11/site-packages/ray/data/_internal/planner/__pycache__/plan_from_items_op.cpython-311.pyc new file mode 100644 index 0000000000000000000000000000000000000000..440bd3ec273d107f06f6ec81bbfef6efdfd2092e Binary files /dev/null and b/.venv/lib/python3.11/site-packages/ray/data/_internal/planner/__pycache__/plan_from_items_op.cpython-311.pyc differ diff --git a/.venv/lib/python3.11/site-packages/ray/data/_internal/planner/__pycache__/plan_from_numpy_op.cpython-311.pyc b/.venv/lib/python3.11/site-packages/ray/data/_internal/planner/__pycache__/plan_from_numpy_op.cpython-311.pyc new file mode 100644 index 0000000000000000000000000000000000000000..7696c2c6ea2301bee6d36c86ea230d8f9bff8b47 Binary files /dev/null and b/.venv/lib/python3.11/site-packages/ray/data/_internal/planner/__pycache__/plan_from_numpy_op.cpython-311.pyc differ diff --git a/.venv/lib/python3.11/site-packages/ray/data/_internal/planner/__pycache__/plan_udf_map_op.cpython-311.pyc b/.venv/lib/python3.11/site-packages/ray/data/_internal/planner/__pycache__/plan_udf_map_op.cpython-311.pyc new file mode 100644 index 0000000000000000000000000000000000000000..ae407a7b5d4c635c361f3052d0dac87253b74037 Binary files /dev/null and b/.venv/lib/python3.11/site-packages/ray/data/_internal/planner/__pycache__/plan_udf_map_op.cpython-311.pyc differ diff --git a/.venv/lib/python3.11/site-packages/ray/data/_internal/planner/__pycache__/planner.cpython-311.pyc b/.venv/lib/python3.11/site-packages/ray/data/_internal/planner/__pycache__/planner.cpython-311.pyc new file mode 100644 index 0000000000000000000000000000000000000000..20f14368876bbe89822414565bfb9fed13f5c617 Binary files /dev/null and b/.venv/lib/python3.11/site-packages/ray/data/_internal/planner/__pycache__/planner.cpython-311.pyc differ diff --git a/.venv/lib/python3.11/site-packages/ray/data/_internal/planner/__pycache__/random_shuffle.cpython-311.pyc b/.venv/lib/python3.11/site-packages/ray/data/_internal/planner/__pycache__/random_shuffle.cpython-311.pyc new file mode 100644 index 0000000000000000000000000000000000000000..ff21e97492b1bc0c98e715b6563e8ddde4b79559 Binary files /dev/null and b/.venv/lib/python3.11/site-packages/ray/data/_internal/planner/__pycache__/random_shuffle.cpython-311.pyc differ diff --git a/.venv/lib/python3.11/site-packages/ray/data/_internal/planner/__pycache__/randomize_blocks.cpython-311.pyc b/.venv/lib/python3.11/site-packages/ray/data/_internal/planner/__pycache__/randomize_blocks.cpython-311.pyc new file mode 100644 index 0000000000000000000000000000000000000000..f63ec0c5d498920c7c2fd6ee197c77820032c1ce Binary files /dev/null and b/.venv/lib/python3.11/site-packages/ray/data/_internal/planner/__pycache__/randomize_blocks.cpython-311.pyc differ diff --git a/.venv/lib/python3.11/site-packages/ray/data/_internal/planner/__pycache__/repartition.cpython-311.pyc b/.venv/lib/python3.11/site-packages/ray/data/_internal/planner/__pycache__/repartition.cpython-311.pyc new file mode 100644 index 0000000000000000000000000000000000000000..032c98e95c4c393f6f8174b712b380395018da57 Binary files /dev/null and b/.venv/lib/python3.11/site-packages/ray/data/_internal/planner/__pycache__/repartition.cpython-311.pyc differ diff --git a/.venv/lib/python3.11/site-packages/ray/data/_internal/planner/__pycache__/sort.cpython-311.pyc b/.venv/lib/python3.11/site-packages/ray/data/_internal/planner/__pycache__/sort.cpython-311.pyc new file mode 100644 index 0000000000000000000000000000000000000000..f20bf0f76961c23b26156cde4b43f7f5e1fda001 Binary files /dev/null and b/.venv/lib/python3.11/site-packages/ray/data/_internal/planner/__pycache__/sort.cpython-311.pyc differ diff --git a/.venv/lib/python3.11/site-packages/ray/data/_internal/planner/aggregate.py b/.venv/lib/python3.11/site-packages/ray/data/_internal/planner/aggregate.py new file mode 100644 index 0000000000000000000000000000000000000000..2d32719dc8492808a9d1d3be234a659e9e01c708 --- /dev/null +++ b/.venv/lib/python3.11/site-packages/ray/data/_internal/planner/aggregate.py @@ -0,0 +1,89 @@ +from typing import List, Optional, Tuple, Union + +from ray.data._internal.execution.interfaces import ( + AllToAllTransformFn, + RefBundle, + TaskContext, +) +from ray.data._internal.planner.exchange.aggregate_task_spec import ( + SortAggregateTaskSpec, +) +from ray.data._internal.planner.exchange.pull_based_shuffle_task_scheduler import ( + PullBasedShuffleTaskScheduler, +) +from ray.data._internal.planner.exchange.push_based_shuffle_task_scheduler import ( + PushBasedShuffleTaskScheduler, +) +from ray.data._internal.planner.exchange.sort_task_spec import SortKey, SortTaskSpec +from ray.data._internal.stats import StatsDict +from ray.data._internal.util import unify_block_metadata_schema +from ray.data.aggregate import AggregateFn +from ray.data.context import DataContext + + +def generate_aggregate_fn( + key: Optional[Union[str, List[str]]], + aggs: List[AggregateFn], + batch_format: str, + _debug_limit_shuffle_execution_to_num_blocks: Optional[int] = None, +) -> AllToAllTransformFn: + """Generate function to aggregate blocks by the specified key column or key + function. + """ + if len(aggs) == 0: + raise ValueError("Aggregate requires at least one aggregation") + + def fn( + refs: List[RefBundle], + ctx: TaskContext, + ) -> Tuple[List[RefBundle], StatsDict]: + blocks = [] + metadata = [] + for ref_bundle in refs: + blocks.extend(ref_bundle.block_refs) + metadata.extend(ref_bundle.metadata) + if len(blocks) == 0: + return (blocks, {}) + unified_schema = unify_block_metadata_schema(metadata) + for agg_fn in aggs: + agg_fn._validate(unified_schema) + + num_mappers = len(blocks) + + sort_key = SortKey(key) + + if key is None: + num_outputs = 1 + boundaries = [] + else: + # Use same number of output partitions. + num_outputs = num_mappers + sample_bar = ctx.sub_progress_bar_dict[ + SortTaskSpec.SORT_SAMPLE_SUB_PROGRESS_BAR_NAME + ] + # Sample boundaries for aggregate key. + boundaries = SortTaskSpec.sample_boundaries( + blocks, sort_key, num_outputs, sample_bar + ) + + agg_spec = SortAggregateTaskSpec( + boundaries=boundaries, + key=sort_key, + aggs=aggs, + batch_format=batch_format, + ) + if DataContext.get_current().use_push_based_shuffle: + scheduler = PushBasedShuffleTaskScheduler(agg_spec) + else: + scheduler = PullBasedShuffleTaskScheduler(agg_spec) + + return scheduler.execute( + refs, + num_outputs, + ctx, + _debug_limit_execution_to_num_blocks=( + _debug_limit_shuffle_execution_to_num_blocks + ), + ) + + return fn diff --git a/.venv/lib/python3.11/site-packages/ray/data/_internal/planner/exchange/__init__.py b/.venv/lib/python3.11/site-packages/ray/data/_internal/planner/exchange/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/.venv/lib/python3.11/site-packages/ray/data/_internal/planner/exchange/__pycache__/__init__.cpython-311.pyc b/.venv/lib/python3.11/site-packages/ray/data/_internal/planner/exchange/__pycache__/__init__.cpython-311.pyc new file mode 100644 index 0000000000000000000000000000000000000000..0cc5545e5888c4f23d025c24bff9b4d1a6fa6648 Binary files /dev/null and b/.venv/lib/python3.11/site-packages/ray/data/_internal/planner/exchange/__pycache__/__init__.cpython-311.pyc differ diff --git a/.venv/lib/python3.11/site-packages/ray/data/_internal/planner/exchange/__pycache__/aggregate_task_spec.cpython-311.pyc b/.venv/lib/python3.11/site-packages/ray/data/_internal/planner/exchange/__pycache__/aggregate_task_spec.cpython-311.pyc new file mode 100644 index 0000000000000000000000000000000000000000..d9d6f8574bf668aedfd3a53b9bb1ddd194d8e287 Binary files /dev/null and b/.venv/lib/python3.11/site-packages/ray/data/_internal/planner/exchange/__pycache__/aggregate_task_spec.cpython-311.pyc differ diff --git a/.venv/lib/python3.11/site-packages/ray/data/_internal/planner/exchange/__pycache__/interfaces.cpython-311.pyc b/.venv/lib/python3.11/site-packages/ray/data/_internal/planner/exchange/__pycache__/interfaces.cpython-311.pyc new file mode 100644 index 0000000000000000000000000000000000000000..0b907f3274b4e4511f3db171be5407bf58a68e03 Binary files /dev/null and b/.venv/lib/python3.11/site-packages/ray/data/_internal/planner/exchange/__pycache__/interfaces.cpython-311.pyc differ diff --git a/.venv/lib/python3.11/site-packages/ray/data/_internal/planner/exchange/__pycache__/pull_based_shuffle_task_scheduler.cpython-311.pyc b/.venv/lib/python3.11/site-packages/ray/data/_internal/planner/exchange/__pycache__/pull_based_shuffle_task_scheduler.cpython-311.pyc new file mode 100644 index 0000000000000000000000000000000000000000..8024df2ddeeedf1ecf59466b816797c0fbfc40be Binary files /dev/null and b/.venv/lib/python3.11/site-packages/ray/data/_internal/planner/exchange/__pycache__/pull_based_shuffle_task_scheduler.cpython-311.pyc differ diff --git a/.venv/lib/python3.11/site-packages/ray/data/_internal/planner/exchange/__pycache__/push_based_shuffle_task_scheduler.cpython-311.pyc b/.venv/lib/python3.11/site-packages/ray/data/_internal/planner/exchange/__pycache__/push_based_shuffle_task_scheduler.cpython-311.pyc new file mode 100644 index 0000000000000000000000000000000000000000..34c3008be25da0e4cfc1137287786bb92cb860fb Binary files /dev/null and b/.venv/lib/python3.11/site-packages/ray/data/_internal/planner/exchange/__pycache__/push_based_shuffle_task_scheduler.cpython-311.pyc differ diff --git a/.venv/lib/python3.11/site-packages/ray/data/_internal/planner/exchange/__pycache__/shuffle_task_spec.cpython-311.pyc b/.venv/lib/python3.11/site-packages/ray/data/_internal/planner/exchange/__pycache__/shuffle_task_spec.cpython-311.pyc new file mode 100644 index 0000000000000000000000000000000000000000..b650b31f36bb56de83d728c80cfe21822d78ed26 Binary files /dev/null and b/.venv/lib/python3.11/site-packages/ray/data/_internal/planner/exchange/__pycache__/shuffle_task_spec.cpython-311.pyc differ diff --git a/.venv/lib/python3.11/site-packages/ray/data/_internal/planner/exchange/__pycache__/sort_task_spec.cpython-311.pyc b/.venv/lib/python3.11/site-packages/ray/data/_internal/planner/exchange/__pycache__/sort_task_spec.cpython-311.pyc new file mode 100644 index 0000000000000000000000000000000000000000..cb5143011b49e91217b692ccf71836525a0e0f3b Binary files /dev/null and b/.venv/lib/python3.11/site-packages/ray/data/_internal/planner/exchange/__pycache__/sort_task_spec.cpython-311.pyc differ diff --git a/.venv/lib/python3.11/site-packages/ray/data/_internal/planner/exchange/__pycache__/split_repartition_task_scheduler.cpython-311.pyc b/.venv/lib/python3.11/site-packages/ray/data/_internal/planner/exchange/__pycache__/split_repartition_task_scheduler.cpython-311.pyc new file mode 100644 index 0000000000000000000000000000000000000000..71c932781eda0864405f5319d744f9b5d7f12c8f Binary files /dev/null and b/.venv/lib/python3.11/site-packages/ray/data/_internal/planner/exchange/__pycache__/split_repartition_task_scheduler.cpython-311.pyc differ diff --git a/.venv/lib/python3.11/site-packages/ray/data/_internal/planner/exchange/aggregate_task_spec.py b/.venv/lib/python3.11/site-packages/ray/data/_internal/planner/exchange/aggregate_task_spec.py new file mode 100644 index 0000000000000000000000000000000000000000..a07a35302ba04298bb727003020c18099146183c --- /dev/null +++ b/.venv/lib/python3.11/site-packages/ray/data/_internal/planner/exchange/aggregate_task_spec.py @@ -0,0 +1,110 @@ +from typing import List, Tuple, Union + +from ray.data._internal.aggregate import Count, _AggregateOnKeyBase +from ray.data._internal.planner.exchange.interfaces import ExchangeTaskSpec +from ray.data._internal.planner.exchange.sort_task_spec import SortKey +from ray.data._internal.table_block import TableBlockAccessor +from ray.data.aggregate import AggregateFn +from ray.data.block import Block, BlockAccessor, BlockExecStats, BlockMetadata, KeyType + + +class SortAggregateTaskSpec(ExchangeTaskSpec): + """ + The implementation for sort-based aggregate tasks. + + Aggregate is done in 2 steps: partial aggregate of individual blocks, and + final aggregate of sorted blocks. + + Partial aggregate (`map`): each block is sorted locally, then partitioned into + smaller blocks according to the boundaries. Each partitioned block is aggregated + separately, then passed to a final aggregate task. + + Final aggregate (`reduce`): each task would receive a block from every worker that + consists of items in a certain range. It then merges the sorted blocks and + aggregates on-the-fly. + """ + + def __init__( + self, + boundaries: List[KeyType], + key: SortKey, + aggs: List[AggregateFn], + batch_format: str, + ): + super().__init__( + map_args=[boundaries, key, aggs], + reduce_args=[key, aggs, batch_format], + ) + + @staticmethod + def map( + idx: int, + block: Block, + output_num_blocks: int, + boundaries: List[KeyType], + sort_key: SortKey, + aggs: List[AggregateFn], + ) -> List[Union[BlockMetadata, Block]]: + stats = BlockExecStats.builder() + + block = SortAggregateTaskSpec._prune_unused_columns(block, sort_key, aggs) + if sort_key.get_columns(): + partitions = BlockAccessor.for_block(block).sort_and_partition( + boundaries, + sort_key, + ) + else: + partitions = [block] + parts = [BlockAccessor.for_block(p).combine(sort_key, aggs) for p in partitions] + meta = BlockAccessor.for_block(block).get_metadata(exec_stats=stats.build()) + return parts + [meta] + + @staticmethod + def reduce( + key: SortKey, + aggs: List[AggregateFn], + batch_format: str, + *mapper_outputs: List[Block], + partial_reduce: bool = False, + ) -> Tuple[Block, BlockMetadata]: + normalized_blocks = TableBlockAccessor.normalize_block_types( + mapper_outputs, normalize_type=batch_format + ) + return BlockAccessor.for_block(normalized_blocks[0]).aggregate_combined_blocks( + list(normalized_blocks), key, aggs, finalize=not partial_reduce + ) + + @staticmethod + def _prune_unused_columns( + block: Block, + sort_key: SortKey, + aggs: Tuple[AggregateFn], + ) -> Block: + """Prune unused columns from block before aggregate.""" + prune_columns = True + columns = set() + key = sort_key.get_columns() + + if isinstance(key, str): + columns.add(key) + elif isinstance(key, list): + columns.update(key) + elif callable(key): + prune_columns = False + + for agg in aggs: + if isinstance(agg, _AggregateOnKeyBase) and isinstance(agg._key_fn, str): + columns.add(agg._key_fn) + elif not isinstance(agg, Count): + # Don't prune columns if any aggregate key is not string. + prune_columns = False + + block_accessor = BlockAccessor.for_block(block) + if ( + prune_columns + and isinstance(block_accessor, TableBlockAccessor) + and block_accessor.num_rows() > 0 + ): + return block_accessor.select(list(columns)) + else: + return block diff --git a/.venv/lib/python3.11/site-packages/ray/data/_internal/planner/exchange/interfaces.py b/.venv/lib/python3.11/site-packages/ray/data/_internal/planner/exchange/interfaces.py new file mode 100644 index 0000000000000000000000000000000000000000..568135165abcb075f4d6a8bc991f5503c99f0606 --- /dev/null +++ b/.venv/lib/python3.11/site-packages/ray/data/_internal/planner/exchange/interfaces.py @@ -0,0 +1,130 @@ +import logging +from typing import Any, Dict, List, Optional, Tuple, Union + +import ray._private.worker +from ray.data._internal.execution.interfaces import RefBundle +from ray.data._internal.stats import StatsDict +from ray.data._internal.util import convert_bytes_to_human_readable_str +from ray.data.block import Block, BlockMetadata +from ray.data.context import DataContext + +logger = logging.getLogger(__name__) + + +class ExchangeTaskSpec: + """ + An interface to specify the exchange map and reduce tasks. + + Subclasses should implement the `map` and `reduce` static methods. + `map` method is to transform one input block into multiple output blocks. + `reduce` is to combine multiple map output blocks. Both methods are + single-task operations. See `ExchangeScheduler` for how to distribute + the operations across multiple tasks. + + Any custom arguments for `map` and `reduce` methods should be specified by + setting `map_args` and `reduce_args`. + + The concept here is similar to the exchange operator described in + "Volcano - An Extensible and Parallel Query Evaluation System" + (https://dl.acm.org/doi/10.1109/69.273032). + """ + + MAP_SUB_PROGRESS_BAR_NAME = "Shuffle Map" + REDUCE_SUB_PROGRESS_BAR_NAME = "Shuffle Reduce" + + def __init__(self, map_args: List[Any] = None, reduce_args: List[Any] = None): + self._map_args = map_args or [] + self._reduce_args = reduce_args or [] + assert isinstance(self._map_args, list) + assert isinstance(self._reduce_args, list) + + @staticmethod + def map( + idx: int, + block: Block, + output_num_blocks: int, + ) -> List[Union[BlockMetadata, Block]]: + """ + Map function to be run on each input block. + + Returns list of [BlockMetadata, Block1, Block2, ..., BlockN]. + """ + raise NotImplementedError + + @staticmethod + def reduce( + *mapper_outputs: List[Block], + partial_reduce: bool = False, + ) -> Tuple[Block, BlockMetadata]: + """ + Reduce function to be run for each output block. + + Args: + mapper_outputs: List of map output blocks to reduce. + partial_reduce: Whether should partially or fully reduce. + + Returns: + The reduced block and its metadata. + """ + raise NotImplementedError + + +class ExchangeTaskScheduler: + """ + An interface to schedule exchange tasks (`exchange_spec`) for multi-nodes + execution. + """ + + def __init__(self, exchange_spec: ExchangeTaskSpec): + """ + Args: + exchange_spec: The implementation of exchange tasks to execute. + """ + self._exchange_spec = exchange_spec + # If driver memory exceeds this threshold, warn the user. For now, this + # only applies to shuffle ops because most other ops are unlikely to use as + # much driver memory. + self.warn_on_driver_memory_usage_bytes: Optional[ + int + ] = DataContext.get_current().warn_on_driver_memory_usage_bytes + + def execute( + self, + refs: List[RefBundle], + output_num_blocks: int, + map_ray_remote_args: Optional[Dict[str, Any]] = None, + reduce_ray_remote_args: Optional[Dict[str, Any]] = None, + warn_on_driver_memory_usage: Optional[int] = None, + ) -> Tuple[List[RefBundle], StatsDict]: + """ + Execute the exchange tasks on input `refs`. + """ + raise NotImplementedError + + def warn_on_high_local_memory_store_usage(self) -> None: + ray_core_worker = ray._private.worker.global_worker.core_worker + local_memory_store_bytes_used = ( + ray_core_worker.get_local_memory_store_bytes_used() + ) + self.warn_on_driver_memory_usage( + local_memory_store_bytes_used, + "More than " + f"{convert_bytes_to_human_readable_str(local_memory_store_bytes_used)} " + "of driver memory used to store Ray Data block data and metadata. " + "This job may exit if driver memory is insufficient.\n\n" + "This can happen when many tiny blocks are created. " + "Check the block size using Dataset.stats() and see " + "https://docs.ray.io/en/latest/data/performance-tips.html" + " for mitigation.", + ) + + def warn_on_driver_memory_usage( + self, memory_usage_bytes: int, log_str: str + ) -> None: + if self.warn_on_driver_memory_usage_bytes is None: + return + + if memory_usage_bytes > self.warn_on_driver_memory_usage_bytes: + logger.warning(log_str) + # Double the threshold to avoid verbose warnings. + self.warn_on_driver_memory_usage_bytes = memory_usage_bytes * 2 diff --git a/.venv/lib/python3.11/site-packages/ray/data/_internal/planner/exchange/push_based_shuffle_task_scheduler.py b/.venv/lib/python3.11/site-packages/ray/data/_internal/planner/exchange/push_based_shuffle_task_scheduler.py new file mode 100644 index 0000000000000000000000000000000000000000..c409d5ce4bee253f9c5b91da7a7504a834970da2 --- /dev/null +++ b/.venv/lib/python3.11/site-packages/ray/data/_internal/planner/exchange/push_based_shuffle_task_scheduler.py @@ -0,0 +1,827 @@ +import logging +import math +from typing import Any, Dict, List, Optional, Tuple, TypeVar, Union + +import ray +from ray._private.ray_constants import CALLER_MEMORY_USAGE_PER_OBJECT_REF +from ray.data._internal.execution.interfaces import RefBundle, TaskContext +from ray.data._internal.planner.exchange.interfaces import ( + ExchangeTaskScheduler, + ExchangeTaskSpec, +) +from ray.data._internal.progress_bar import ProgressBar +from ray.data._internal.remote_fn import cached_remote_fn +from ray.data._internal.stats import StatsDict +from ray.data._internal.util import convert_bytes_to_human_readable_str +from ray.data.block import Block, BlockAccessor, BlockExecStats, BlockMetadata +from ray.data.context import DataContext +from ray.types import ObjectRef +from ray.util.scheduling_strategies import NodeAffinitySchedulingStrategy + +logger = logging.getLogger(__name__) + + +T = TypeVar("T") +U = TypeVar("U") + + +class _MergeTaskSchedule: + def __init__(self, output_num_blocks: int, num_merge_tasks_per_round: int): + self.output_num_blocks = output_num_blocks + self.num_merge_tasks_per_round = num_merge_tasks_per_round + self.num_reducers_per_merger = output_num_blocks // num_merge_tasks_per_round + self._num_mergers_with_extra_reducer = ( + output_num_blocks % num_merge_tasks_per_round + ) + + if self.num_reducers_per_merger == 0: + self.num_merge_tasks_per_round = self._num_mergers_with_extra_reducer + self.num_reducers_per_merger = 1 + self._num_mergers_with_extra_reducer = 0 + + def __repr__(self): + return ( + f" num merge tasks per round: {self.num_merge_tasks_per_round}\n" + f" num reduce tasks per merge task: {self.num_reducers_per_merger}\n" + " num merge tasks with extra reduce task: " + f"{self._num_mergers_with_extra_reducer}" + ) + + def get_num_reducers_per_merge_idx(self, merge_idx: int) -> int: + """ + Each intermediate merge task will produce outputs for a partition of P + final reduce tasks. This helper function returns P based on the merge + task index. + """ + assert merge_idx < self.num_merge_tasks_per_round + num_reducers_for_cur_merger = self.num_reducers_per_merger + if merge_idx < self._num_mergers_with_extra_reducer: + num_reducers_for_cur_merger += 1 + return num_reducers_for_cur_merger + + def get_merge_idx_for_reducer_idx(self, reducer_idx: int) -> int: + if ( + reducer_idx + < (self.num_reducers_per_merger + 1) * self._num_mergers_with_extra_reducer + ): + merge_idx = reducer_idx // (self.num_reducers_per_merger + 1) + else: + reducer_idx -= ( + self.num_reducers_per_merger + 1 + ) * self._num_mergers_with_extra_reducer + merge_idx = ( + self._num_mergers_with_extra_reducer + + reducer_idx // self.num_reducers_per_merger + ) + assert merge_idx < self.num_merge_tasks_per_round + return merge_idx + + def round_robin_reduce_idx_iterator(self): + """ + When there are multiple nodes, merge tasks are spread throughout the + cluster to improve load-balancing. Each merge task produces outputs for + a contiguous partition of reduce tasks. This method creates an iterator + that returns reduce task indices round-robin across the merge tasks. + This can be used to submit reduce tasks in a way that spreads the load + evenly across the cluster. + """ + idx = 0 + round_idx = 0 + while idx < self.output_num_blocks: + for merge_idx in range(self.num_merge_tasks_per_round): + if merge_idx < self._num_mergers_with_extra_reducer: + reduce_idx = merge_idx * (self.num_reducers_per_merger + 1) + num_reducers_for_cur_merger = self.num_reducers_per_merger + 1 + else: + reduce_idx = self._num_mergers_with_extra_reducer * ( + self.num_reducers_per_merger + 1 + ) + merge_idx -= self._num_mergers_with_extra_reducer + reduce_idx += merge_idx * self.num_reducers_per_merger + num_reducers_for_cur_merger = self.num_reducers_per_merger + + if round_idx >= num_reducers_for_cur_merger: + continue + + reduce_idx += round_idx + yield reduce_idx + idx += 1 + round_idx += 1 + + +class _PushBasedShuffleStage: + def __init__( + self, + output_num_blocks: int, + num_rounds: int, + num_map_tasks_per_round: int, + merge_task_placement: List[str], + ): + # The number of rounds of map-merge tasks. Reducer tasks are given the + # outputs of the merge tasks as inputs. Reducer tasks receive one input + # per round. + self.num_rounds = num_rounds + # The number of map tasks per round of map-merge tasks. The map task + # produces one output per merge task in the same round. + self.num_map_tasks_per_round = num_map_tasks_per_round + + node_strategies = { + node_id: { + "scheduling_strategy": NodeAffinitySchedulingStrategy( + node_id, soft=True + ) + } + for node_id in set(merge_task_placement) + } + self._merge_task_options = [ + node_strategies[node_id] for node_id in merge_task_placement + ] + + self.merge_schedule = _MergeTaskSchedule( + output_num_blocks, len(merge_task_placement) + ) + + def get_estimated_num_refs(self) -> int: + # Number of intermediate blocks = Number of rounds x (map tasks per + # round * merge tasks per round). + num_intermediate_refs = self.num_rounds * ( + self.num_map_tasks_per_round * self.merge_schedule.num_merge_tasks_per_round + ) + # Number of input blocks + intermediate blocks + output blocks. + num_refs_total = ( + (self.num_rounds * self.num_map_tasks_per_round) + + num_intermediate_refs + + self.merge_schedule.output_num_blocks + ) + return num_refs_total + + def get_merge_task_options(self, merge_idx): + return self._merge_task_options[merge_idx] + + def __repr__(self): + return ( + "num map tasks per round (num args per merge task): " + f"{self.num_map_tasks_per_round}\n" + f"num rounds (num args per reduce task): {self.num_rounds}\n" + "merge task placement: \n" + f"{self.merge_schedule}" + ) + + +class _PipelinedStageExecutor: + def __init__( + self, + stage_iter, + num_tasks_per_round: int, + max_concurrent_rounds: int = 1, + progress_bar: Optional[ProgressBar] = None, + ): + self._stage_iter = stage_iter + self._num_tasks_per_round = num_tasks_per_round + self._max_concurrent_rounds = max_concurrent_rounds + self._progress_bar = progress_bar + + self._rounds: List[List[ObjectRef]] = [] + self._task_idx = 0 + + self._submit_round() + + self._num_block_bytes_stored_at_driver = 0 + + def __iter__(self): + return self + + def __next__(self) -> List[BlockMetadata]: + """ + Submit one round of tasks. If we already have the max concurrent rounds + in flight, first wait for the oldest round of tasks to finish. + """ + prev_metadata = [] + if all(len(r) == 0 for r in self._rounds): + raise StopIteration + + if len(self._rounds) >= self._max_concurrent_rounds: + prev_metadata_refs = self._rounds.pop(0) + if prev_metadata_refs: + if self._progress_bar is not None: + prev_metadata = self._progress_bar.fetch_until_complete( + prev_metadata_refs + ) + # TODO(swang): Eagerly free the previous round's args. + # See https://github.com/ray-project/ray/issues/42145. + else: + prev_metadata = ray.get(prev_metadata_refs) + + self._submit_round() + + return prev_metadata + + def _submit_round(self): + assert len(self._rounds) < self._max_concurrent_rounds + task_round = [] + for _ in range(self._num_tasks_per_round): + try: + task_round.append(next(self._stage_iter)) + except StopIteration: + break + self._rounds.append(task_round) + + +class _MapStageIterator: + def __init__(self, input_blocks_list, shuffle_map, map_args): + self._input_blocks_list = input_blocks_list + self._shuffle_map = shuffle_map + self._map_args = map_args + + self._mapper_idx = 0 + self._map_results = [] + + def __iter__(self): + return self + + def __next__(self): + if not self._input_blocks_list: + raise StopIteration + + block = self._input_blocks_list.pop(0) + # NOTE(swang): Results are shuffled between map and merge tasks, so + # there is no advantage to colocating specific map and merge tasks. + # Therefore, we do not specify a node affinity policy for map tasks + # in case the caller or Ray has a better scheduling strategy, e.g., + # based on data locality. + map_result = self._shuffle_map.remote( + self._mapper_idx, + block, + *self._map_args, + ) + metadata_ref = map_result.pop(-1) + self._map_results.append(map_result) + self._mapper_idx += 1 + return metadata_ref + + def pop_map_results(self) -> List[List[ObjectRef]]: + map_results = self._map_results + self._map_results = [] + return map_results + + +class _MergeStageIterator: + def __init__( + self, + map_stage_iter: _MapStageIterator, + shuffle_merge, + stage: _PushBasedShuffleStage, + reduce_args, + ): + self._map_stage_iter = map_stage_iter + self._shuffle_merge = shuffle_merge + self._stage = stage + self._reduce_args = reduce_args + + self._merge_idx = 0 + self._map_result_buffer = None + # Final outputs from the map-merge stage. + # This is a map from merge task index to a nested list of merge results + # (ObjectRefs). Each merge task index corresponds to a partition of P + # final reduce tasks. + self._all_merge_results = [ + [] for _ in range(self._stage.merge_schedule.num_merge_tasks_per_round) + ] + + def __next__(self): + if not self._map_result_buffer or not self._map_result_buffer[0]: + assert self._merge_idx == 0 + self._map_result_buffer = self._map_stage_iter.pop_map_results() + + if not self._map_result_buffer: + raise StopIteration + + # Shuffle the map results for the merge tasks. + merge_args = [map_result.pop(0) for map_result in self._map_result_buffer] + num_merge_returns = self._stage.merge_schedule.get_num_reducers_per_merge_idx( + self._merge_idx + ) + merge_result = self._shuffle_merge.options( + num_returns=1 + num_merge_returns, + **self._stage.get_merge_task_options(self._merge_idx), + ).remote( + *merge_args, + reduce_args=self._reduce_args, + ) + metadata_ref = merge_result.pop(-1) + self._all_merge_results[self._merge_idx].append(merge_result) + del merge_result + + self._merge_idx += 1 + self._merge_idx %= self._stage.merge_schedule.num_merge_tasks_per_round + return metadata_ref + + def pop_merge_results(self) -> List[List[ObjectRef]]: + """Return a nested list of merge task results. The list at index i + stores the outputs of the i-th merge task submitted during each + map-merge round. Each merge task returns a list of outputs because it + may produce outputs for multiple downstream reduce tasks. + """ + all_merge_results = self._all_merge_results + self._all_merge_results = [] + return all_merge_results + + +class _ReduceStageIterator: + def __init__( + self, + stage: _PushBasedShuffleStage, + shuffle_reduce, + all_merge_results: List[List[List[ObjectRef]]], + ray_remote_args, + reduce_args: List[Any], + _debug_limit_execution_to_num_blocks: Optional[int], + ): + self._shuffle_reduce = shuffle_reduce + self._stage = stage + self._reduce_arg_blocks: List[Tuple[int, List[ObjectRef]]] = [] + self._ray_remote_args = ray_remote_args + self._reduce_args = reduce_args + + for reduce_idx in self._stage.merge_schedule.round_robin_reduce_idx_iterator(): + merge_idx = self._stage.merge_schedule.get_merge_idx_for_reducer_idx( + reduce_idx + ) + reduce_arg_blocks = [ + merge_results.pop(0) for merge_results in all_merge_results[merge_idx] + ] + self._reduce_arg_blocks.append((reduce_idx, reduce_arg_blocks)) + + assert len(self._reduce_arg_blocks) == stage.merge_schedule.output_num_blocks + + if _debug_limit_execution_to_num_blocks is not None: + self._reduce_arg_blocks = self._reduce_arg_blocks[ + :_debug_limit_execution_to_num_blocks + ] + logger.debug( + f"Limiting execution to {len(self._reduce_arg_blocks)} reduce tasks" + ) + + for merge_idx, merge_results in enumerate(all_merge_results): + assert all(len(merge_result) == 0 for merge_result in merge_results), ( + "Reduce stage did not process outputs from merge tasks at index: " + f"{merge_idx}" + ) + + self._reduce_results: List[Tuple[int, ObjectRef]] = [] + + def __iter__(self): + return self + + def __next__(self): + if not self._reduce_arg_blocks: + raise StopIteration + + reduce_idx, reduce_arg_blocks = self._reduce_arg_blocks.pop(0) + merge_idx = self._stage.merge_schedule.get_merge_idx_for_reducer_idx(reduce_idx) + # Submit one partition of reduce tasks, one for each of the P + # outputs produced by the corresponding merge task. + # We also add the merge task arguments so that the reduce task + # is colocated with its inputs. + block, meta = self._shuffle_reduce.options( + **self._ray_remote_args, + **self._stage.get_merge_task_options(merge_idx), + num_returns=2, + ).remote(*self._reduce_args, *reduce_arg_blocks, partial_reduce=False) + self._reduce_results.append((reduce_idx, block)) + return meta + + def pop_reduce_results(self): + reduce_results = self._reduce_results + self._reduce_results = [] + return reduce_results + + +class PushBasedShuffleTaskScheduler(ExchangeTaskScheduler): + """ + Push-based shuffle merges intermediate map outputs on the reducer nodes + while other map tasks are executing. The merged outputs are merged again + during a final reduce stage. This works as follows: + + 1. Submit rounds of concurrent map and merge tasks until all map inputs + have been processed. In each round, we execute: + + M map tasks + Each produces N outputs. Each output contains P blocks. + N merge tasks + Takes 1 output from each of M map tasks. + Each produces P outputs. + Where M and N are chosen to maximize parallelism across CPUs. Note that + this assumes that all CPUs in the cluster will be dedicated to the + shuffle job. + + Map and merge tasks are pipelined so that we always merge the previous + round of map outputs while executing the next round of map tasks. + + 2. In the final reduce stage: + R reduce tasks + Takes 1 output from one of the merge tasks from every round. + + Notes: + N * P = R = total number of output blocks + M / N = merge factor - the ratio of map : merge tasks is to improve + pipelined parallelism. For example, if map takes twice as long to + execute as merge, then we should set this to 2. If pipeline bubbles + appear and the merge tasks are much longer than the map tasks, then + the merge factor should be decreased, and vice versa. + See paper at https://arxiv.org/abs/2203.05072 for more details. + """ + + def execute( + self, + refs: List[RefBundle], + output_num_blocks: int, + task_ctx: TaskContext, + map_ray_remote_args: Optional[Dict[str, Any]] = None, + reduce_ray_remote_args: Optional[Dict[str, Any]] = None, + merge_factor: float = 2, + _debug_limit_execution_to_num_blocks: int = None, + ) -> Tuple[List[RefBundle], StatsDict]: + logger.debug("Using experimental push-based shuffle.") + # TODO: Preemptively clear the blocks list since we will incrementally delete + # the last remaining references as we submit the dependent map tasks during the + # map-merge stage. + + # TODO(swang): For jobs whose reduce work is heavier than the map work, + # we should support fractional merge factors. + # TODO(swang): For large jobs, we should try to choose the merge factor + # automatically, e.g., by running one test round of map and merge tasks + # and comparing their run times. + # TODO(swang): Add option to automatically reduce write amplification + # during map-merge stage, by limiting how many partitions can be + # processed concurrently. + input_blocks_list = [] + for ref_bundle in refs: + input_blocks_list.extend(ref_bundle.block_refs) + input_owned = all(b.owns_blocks for b in refs) + + if map_ray_remote_args is None: + map_ray_remote_args = {} + if reduce_ray_remote_args is None: + reduce_ray_remote_args = {} + # The placement strategy for reduce tasks is overwritten to colocate + # them with their inputs from the merge stage, so remove any + # pre-specified scheduling strategy here. + reduce_ray_remote_args = reduce_ray_remote_args.copy() + reduce_ray_remote_args.pop("scheduling_strategy", None) + + # Compute all constants used for task scheduling. + num_cpus_per_node_map = _get_num_cpus_per_node_map() + stage = self._compute_shuffle_schedule( + num_cpus_per_node_map, + len(input_blocks_list), + merge_factor, + output_num_blocks, + ) + + caller_memory_usage = ( + stage.get_estimated_num_refs() * CALLER_MEMORY_USAGE_PER_OBJECT_REF + ) + self.warn_on_driver_memory_usage( + caller_memory_usage, + "Execution is estimated to use at least " + f"{convert_bytes_to_human_readable_str(caller_memory_usage)}" + " of driver memory. Ensure that the driver machine has at least " + "this much memory to ensure job completion.", + ) + + # TODO(swang): Use INFO level. Currently there is no easy way to set + # the logging level to DEBUG from a driver script, so just print + # verbosely for now. + # See https://github.com/ray-project/ray/issues/42002. + logger.debug(f"Push-based shuffle schedule:\n{stage}") + + map_fn = self._map_partition + merge_fn = self._merge + + def map_partition(*args, **kwargs): + return map_fn(self._exchange_spec.map, *args, **kwargs) + + def merge(*args, **kwargs): + return merge_fn(self._exchange_spec.reduce, *args, **kwargs) + + shuffle_map = cached_remote_fn(map_partition) + shuffle_map = shuffle_map.options( + **map_ray_remote_args, + num_returns=1 + stage.merge_schedule.num_merge_tasks_per_round, + ) + + if _debug_limit_execution_to_num_blocks is not None: + input_blocks_list = input_blocks_list[:_debug_limit_execution_to_num_blocks] + logger.debug(f"Limiting execution to {len(input_blocks_list)} map tasks") + map_stage_iter = _MapStageIterator( + input_blocks_list, + shuffle_map, + [output_num_blocks, stage.merge_schedule, *self._exchange_spec._map_args], + ) + + sub_progress_bar_dict = task_ctx.sub_progress_bar_dict + bar_name = ExchangeTaskSpec.MAP_SUB_PROGRESS_BAR_NAME + assert bar_name in sub_progress_bar_dict, sub_progress_bar_dict + map_bar = sub_progress_bar_dict[bar_name] + map_stage_executor = _PipelinedStageExecutor( + map_stage_iter, stage.num_map_tasks_per_round, progress_bar=map_bar + ) + + shuffle_merge = cached_remote_fn(merge) + merge_stage_iter = _MergeStageIterator( + map_stage_iter, shuffle_merge, stage, self._exchange_spec._reduce_args + ) + merge_stage_executor = _PipelinedStageExecutor( + merge_stage_iter, + stage.merge_schedule.num_merge_tasks_per_round, + max_concurrent_rounds=2, + ) + # Execute the map-merge stage. This submits tasks in rounds of M map + # tasks and N merge tasks each. Task execution between map and merge is + # pipelined, so that while executing merge for one round of inputs, we + # also execute the map tasks for the following round. + map_done = False + merge_done = False + map_stage_metadata = [] + merge_stage_metadata = [] + while not (map_done and merge_done): + try: + map_stage_metadata += next(map_stage_executor) + except StopIteration: + map_done = True + break + + try: + merge_stage_metadata += next(merge_stage_executor) + except StopIteration: + merge_done = True + break + + self.warn_on_high_local_memory_store_usage() + + all_merge_results = merge_stage_iter.pop_merge_results() + + if _debug_limit_execution_to_num_blocks is not None: + for merge_idx in range(len(all_merge_results)): + while len(all_merge_results[merge_idx]) < stage.num_rounds: + # Repeat the first merge task's results. + all_merge_results[merge_idx].append( + all_merge_results[merge_idx][0][:] + ) + + # Execute and wait for the reduce stage. + bar_name = ExchangeTaskSpec.REDUCE_SUB_PROGRESS_BAR_NAME + assert bar_name in sub_progress_bar_dict, sub_progress_bar_dict + reduce_bar = sub_progress_bar_dict[bar_name] + + shuffle_reduce = cached_remote_fn(self._exchange_spec.reduce) + reduce_stage_iter = _ReduceStageIterator( + stage, + shuffle_reduce, + all_merge_results, + reduce_ray_remote_args, + self._exchange_spec._reduce_args, + _debug_limit_execution_to_num_blocks, + ) + + max_reduce_tasks_in_flight = output_num_blocks + ctx = DataContext.get_current() + if ctx.pipeline_push_based_shuffle_reduce_tasks: + # If pipelining is enabled, we should still try to utilize all + # cores. + max_reduce_tasks_in_flight = min( + max_reduce_tasks_in_flight, sum(num_cpus_per_node_map.values()) + ) + + reduce_stage_executor = _PipelinedStageExecutor( + reduce_stage_iter, + max_reduce_tasks_in_flight, + max_concurrent_rounds=2, + progress_bar=reduce_bar, + ) + reduce_stage_metadata = [] + while True: + try: + reduce_stage_metadata += next(reduce_stage_executor) + except StopIteration: + break + + self.warn_on_high_local_memory_store_usage() + + new_blocks = reduce_stage_iter.pop_reduce_results() + sorted_blocks = [ + (block[0], block[1], reduce_stage_metadata[i]) + for i, block in enumerate(new_blocks) + ] + sorted_blocks.sort(key=lambda x: x[0]) + + new_blocks, reduce_stage_metadata = [], [] + if sorted_blocks: + _, new_blocks, reduce_stage_metadata = zip(*sorted_blocks) + del sorted_blocks + + if _debug_limit_execution_to_num_blocks is not None: + output_num_blocks = min( + _debug_limit_execution_to_num_blocks, output_num_blocks + ) + + assert ( + len(new_blocks) == output_num_blocks + ), f"Expected {output_num_blocks} outputs, produced {len(new_blocks)}" + + output = [] + for block, meta in zip(new_blocks, reduce_stage_metadata): + output.append( + RefBundle( + [ + ( + block, + meta, + ) + ], + owns_blocks=input_owned, + ) + ) + stats = { + "map": map_stage_metadata, + "merge": merge_stage_metadata, + "reduce": reduce_stage_metadata, + } + + return (output, stats) + + @staticmethod + def _map_partition( + map_fn, + idx: int, + block: Block, + output_num_blocks: int, + schedule: _MergeTaskSchedule, + *map_args: List[Any], + ) -> List[Union[BlockMetadata, Block]]: + mapper_outputs = map_fn(idx, block, output_num_blocks, *map_args) + + # A merge task may produce results for multiple downstream reducer + # tasks. Therefore, each map task should give each merge task a + # partition of its outputs, where the length of the partition is equal + # to the number of reducers downstream to the merge task. + partition = [] + merge_idx = 0 + while merge_idx < schedule.num_merge_tasks_per_round and mapper_outputs: + output = mapper_outputs.pop(0) + partition.append(output) + + if len(partition) == schedule.get_num_reducers_per_merge_idx(merge_idx): + yield partition + + partition = [] + merge_idx += 1 + + assert not partition + assert len(mapper_outputs) == 1, ( + mapper_outputs, + "The last output should be a BlockMetadata", + ) + assert isinstance(mapper_outputs[0], BlockMetadata) + yield mapper_outputs[0] + + assert merge_idx == schedule.num_merge_tasks_per_round, ( + merge_idx, + schedule.num_merge_tasks_per_round, + ) + + @staticmethod + def _merge( + reduce_fn, + *all_mapper_outputs: List[List[Block]], + reduce_args: Optional[List[Any]] = None, + ) -> List[Union[BlockMetadata, Block]]: + """ + Returns list of [BlockMetadata, O1, O2, O3, ...output_num_blocks]. + """ + assert ( + len({len(mapper_outputs) for mapper_outputs in all_mapper_outputs}) == 1 + ), "Received different number of map inputs" + stats = BlockExecStats.builder() + if not reduce_args: + reduce_args = [] + + num_rows = 0 + size_bytes = 0 + schema = None + for i, mapper_outputs in enumerate(zip(*all_mapper_outputs)): + block, meta = reduce_fn(*reduce_args, *mapper_outputs, partial_reduce=True) + yield block + + block = BlockAccessor.for_block(block) + num_rows += block.num_rows() + size_bytes += block.size_bytes() + schema = block.schema() + del block + + yield BlockMetadata( + num_rows=num_rows, + size_bytes=size_bytes, + schema=schema, + input_files=None, + exec_stats=stats.build(), + ) + + @staticmethod + def _compute_shuffle_schedule( + num_cpus_per_node_map: Dict[str, int], + num_input_blocks: int, + merge_factor: float, + num_output_blocks: int, + ) -> _PushBasedShuffleStage: + num_cpus_total = sum(v for v in num_cpus_per_node_map.values()) + logger.debug( + f"Found {num_cpus_total} CPUs available CPUs for push-based shuffle." + ) + num_tasks_per_map_merge_group = merge_factor + 1 + num_total_merge_tasks = math.ceil(num_input_blocks / merge_factor) + + num_merge_tasks_per_round = 0 + merge_task_placement = [] + leftover_cpus = 0 + # Compute the total number of merge tasks and their node placement. + # Each merge task should be grouped with `merge_factor` map tasks for + # pipelining. These groups should then be spread across nodes according + # to CPU availability for load-balancing. + num_input_blocks_remaining = num_input_blocks + for node, num_cpus in num_cpus_per_node_map.items(): + # First find how many merge tasks we should run on this node. + # We take the min of the number of CPUs on this node and the number + # of input blocks that we haven't scheduled yet, in case there are + # fewer input blocks than CPU slots on this node. + num_cpu_slots = min(num_cpus, num_input_blocks_remaining) + num_merge_tasks_on_cur_node = round( + num_cpu_slots / num_tasks_per_map_merge_group + ) + # For small datasets, the number of tasks to run may be less than + # the total CPU slots available. + num_merge_tasks_on_cur_node = min( + num_merge_tasks_on_cur_node, num_total_merge_tasks + ) + for i in range(num_merge_tasks_on_cur_node): + merge_task_placement.append(node) + # We schedule `merge_factor` many map tasks for every merge + # task. Subtract from the number of input blocks remaining to + # account for cases where the number of map tasks is smaller + # than the available CPU slots. + num_input_blocks_remaining -= merge_factor + num_cpus -= num_tasks_per_map_merge_group + num_merge_tasks_per_round += num_merge_tasks_on_cur_node + + # Handle the case where a single node cannot fit a group of map and + # merge tasks, but we can spread the group across multiple distinct + # nodes. + leftover_cpus += num_cpus + if ( + leftover_cpus >= num_tasks_per_map_merge_group + and num_merge_tasks_per_round < num_total_merge_tasks + ): + merge_task_placement.append(node) + num_merge_tasks_per_round += 1 + leftover_cpus -= num_tasks_per_map_merge_group + num_input_blocks_remaining -= merge_factor + + num_input_blocks_remaining = max(0, num_input_blocks_remaining) + + if num_merge_tasks_per_round == 0: + # For small datasets, make sure we have at least one merge task. + for node, num_cpus in num_cpus_per_node_map.items(): + if num_cpus >= 1: + merge_task_placement.append(node) + num_merge_tasks_per_round = 1 + break + + assert num_merge_tasks_per_round == len(merge_task_placement) + assert num_merge_tasks_per_round > 0, num_merge_tasks_per_round + # Use the remaining CPUs to execute map tasks. + num_map_tasks_per_round = num_cpus_total - num_merge_tasks_per_round + num_map_tasks_per_round = min(num_map_tasks_per_round, num_input_blocks) + # Make sure there is at least one map task in each round. + num_map_tasks_per_round = max(num_map_tasks_per_round, 1) + + num_rounds = math.ceil(num_input_blocks / num_map_tasks_per_round) + return _PushBasedShuffleStage( + num_output_blocks, + num_rounds, + num_map_tasks_per_round, + merge_task_placement, + ) + + +def _get_num_cpus_per_node_map() -> Dict[str, int]: + total_resources_by_node = ray.state.total_resources_per_node() + # Map from per-node resource name to number of CPUs available on that + # node. + num_cpus_per_node_map = {} + for node_id, resources in total_resources_by_node.items(): + num_cpus = int(resources.get("CPU", 0)) + if num_cpus == 0: + continue + num_cpus_per_node_map[node_id] = num_cpus + return num_cpus_per_node_map diff --git a/.venv/lib/python3.11/site-packages/ray/data/_internal/planner/plan_expression/__init__.py b/.venv/lib/python3.11/site-packages/ray/data/_internal/planner/plan_expression/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/.venv/lib/python3.11/site-packages/ray/data/_internal/planner/plan_expression/__pycache__/__init__.cpython-311.pyc b/.venv/lib/python3.11/site-packages/ray/data/_internal/planner/plan_expression/__pycache__/__init__.cpython-311.pyc new file mode 100644 index 0000000000000000000000000000000000000000..a6216b7c5512240c3142c42a4a8d22f34e0d7ec6 Binary files /dev/null and b/.venv/lib/python3.11/site-packages/ray/data/_internal/planner/plan_expression/__pycache__/__init__.cpython-311.pyc differ diff --git a/.venv/lib/python3.11/site-packages/ray/data/_internal/planner/plan_expression/__pycache__/expression_evaluator.cpython-311.pyc b/.venv/lib/python3.11/site-packages/ray/data/_internal/planner/plan_expression/__pycache__/expression_evaluator.cpython-311.pyc new file mode 100644 index 0000000000000000000000000000000000000000..8a6030c9b65fd4a1898b2b7d09f3f941c8ed8717 Binary files /dev/null and b/.venv/lib/python3.11/site-packages/ray/data/_internal/planner/plan_expression/__pycache__/expression_evaluator.cpython-311.pyc differ diff --git a/.venv/lib/python3.11/site-packages/ray/data/_internal/planner/plan_expression/expression_evaluator.py b/.venv/lib/python3.11/site-packages/ray/data/_internal/planner/plan_expression/expression_evaluator.py new file mode 100644 index 0000000000000000000000000000000000000000..4af5146ae32315e6cddf24d51f1443b6537cc2e2 --- /dev/null +++ b/.venv/lib/python3.11/site-packages/ray/data/_internal/planner/plan_expression/expression_evaluator.py @@ -0,0 +1,232 @@ +import ast +import logging +from typing import Any, List, Union + +import pyarrow as pa +import pyarrow.compute as pc + +logger = logging.getLogger(__name__) + + +# NOTE: (srinathk) There are 3 distinct stages of handling passed in exprs: +# 1. Parsing it (as text) +# 2. Resolving unbound names (to schema) +# 3. Converting resolved expressions to PA ones +# Need to break up the abstraction provided by ExpressionEvaluator. + + +class ExpressionEvaluator: + def get_filters(self, expression: str) -> pc.Expression: + """Parse and evaluate the expression to generate a filter condition. + + Args: + expression: A string representing the filter expression to parse. + + Returns: + A PyArrow compute expression for filtering data. + + """ + try: + tree = ast.parse(expression, mode="eval") + return self._build_filter_condition(tree.body) + except SyntaxError as e: + raise ValueError(f"Invalid syntax in the expression: {expression}") from e + except Exception as e: + logger.exception(f"Error processing expression: {e}") + raise + + def _build_filter_condition(self, node) -> Union[pc.Expression, List[Any], str]: + """Recursively evaluate an AST node to build the filter condition. + + Args: + node: The AST node to evaluate, representing part of the expression. + + Returns: + The evaluated result for the node, which could be a + filter condition, list, or field name. + """ + visitor = _ConvertToArrowExpressionVisitor() + return visitor.visit(node) + + +class _ConvertToArrowExpressionVisitor(ast.NodeVisitor): + def visit_Compare(self, node: ast.Compare) -> pc.Expression: + """Handle comparison operations (e.g., a == b, a < b, a in b). + + Args: + node: The AST node representing a comparison operation. + + Returns: + An expression representing the comparison. + """ + # Handle left operand + # TODO Validate columns + if isinstance(node.left, ast.Attribute): + left_expr = self.visit(node.left) # Visit and handle attributes + elif isinstance(node.left, ast.Name): + left_expr = self.visit(node.left) # Treat as a simple field + elif isinstance(node.left, ast.Constant): + left_expr = node.left.value # Constant values are used directly + else: + raise ValueError(f"Unsupported left operand type: {type(node.left)}") + + comparators = [self.visit(comp) for comp in node.comparators] + + op = node.ops[0] + if isinstance(op, ast.In): + return left_expr.isin(comparators[0]) + elif isinstance(op, ast.NotIn): + return ~left_expr.isin(comparators[0]) + elif isinstance(op, ast.Eq): + return left_expr == comparators[0] + elif isinstance(op, ast.NotEq): + return left_expr != comparators[0] + elif isinstance(op, ast.Lt): + return left_expr < comparators[0] + elif isinstance(op, ast.LtE): + return left_expr <= comparators[0] + elif isinstance(op, ast.Gt): + return left_expr > comparators[0] + elif isinstance(op, ast.GtE): + return left_expr >= comparators[0] + else: + raise ValueError(f"Unsupported operator type: {op}") + + def visit_BoolOp(self, node: ast.BoolOp) -> pc.Expression: + """Handle logical operations (e.g., a and b, a or b). + + Args: + node: The AST node representing a boolean operation. + + Returns: + An expression representing the logical operation. + """ + conditions = [self.visit(value) for value in node.values] + combined_expr = conditions[0] + + for condition in conditions[1:]: + if isinstance(node.op, ast.And): + # Combine conditions with logical AND + combined_expr &= condition + elif isinstance(node.op, ast.Or): + # Combine conditions with logical OR + combined_expr |= condition + else: + raise ValueError( + f"Unsupported logical operator: {type(node.op).__name__}" + ) + + return combined_expr + + def visit_Name(self, node: ast.Name) -> pc.Expression: + """Handle variable (name) nodes and return them as pc.Expression. + + Even if the name contains periods, it's treated as a single string. + + Args: + node: The AST node representing a variable. + + Returns: + The variable wrapped as a pc.Expression. + """ + field_name = ( + node.id + ) # Directly use the field name as a string (even if it contains periods) + return pc.field(field_name) + + def visit_Attribute(self, node: ast.Attribute) -> object: + """Handle attribute access (e.g., np.nan). + + Args: + node: The AST node representing an attribute access. + + Returns: + object: The attribute value. + + Raises: + ValueError: If the attribute is unsupported. + """ + # Recursively visit the left side (base object or previous attribute) + if isinstance(node.value, ast.Attribute): + # If the value is an attribute, recursively resolve it + left_expr = self.visit(node.value) + return pc.field(f"{left_expr}.{node.attr}") + + elif isinstance(node.value, ast.Name): + # If the value is a name (e.g., "foo"), we can directly return the field + left_name = node.value.id # The base name, e.g., "foo" + return pc.field(f"{left_name}.{node.attr}") + + raise ValueError(f"Unsupported attribute: {node.attr}") + + def visit_List(self, node: ast.List) -> pc.Expression: + """Handle list literals. + + Args: + node: The AST node representing a list. + + Returns: + The list of elements wrapped as a pc.Expression. + """ + elements = [self.visit(elt) for elt in node.elts] + return pa.array(elements) + + # TODO (srinathk) Note that visit_Constant does not return pc.Expression + # because to support function in() which takes in a List, the elements in the List + # needs to values instead of pc.Expression per pyarrow.dataset.Expression + # specification. May be down the road, we can update it as Arrow relaxes this + # constraint. + def visit_Constant(self, node: ast.Constant) -> object: + """Handle constant values (e.g., numbers, strings). + + Args: + node: The AST node representing a constant value. + + Returns: + object: The constant value itself (e.g., number, string, or boolean). + """ + return node.value # Return the constant value directly. + + def visit_Call(self, node: ast.Call) -> pc.Expression: + """Handle function calls (e.g., is_nan(a), is_valid(b)). + + Args: + node: The AST node representing a function call. + + Returns: + The corresponding expression based on the function called. + + Raises: + ValueError: If the function is unsupported or has incorrect arguments. + """ + func_name = node.func.id + function_map = { + "is_nan": lambda arg: arg.is_nan(), + "is_null": lambda arg, nan_is_null=False: arg.is_null( + nan_is_null=nan_is_null + ), + "is_valid": lambda arg: arg.is_valid(), + "isin": lambda arg1, arg2: arg1.isin(arg2), + } + + if func_name in function_map: + # Visit all arguments of the function call + args = [self.visit(arg) for arg in node.args] + # Handle the "is_null" function with one or two arguments + if func_name == "is_null": + if len(args) == 1: + return function_map[func_name](args[0]) + elif len(args) == 2: + return function_map[func_name](args[0], args[1]) + else: + raise ValueError("is_null function requires one or two arguments.") + # Handle the "isin" function with exactly two arguments + elif func_name == "isin" and len(args) != 2: + raise ValueError("isin function requires two arguments.") + # Ensure the function has one argument (for functions like is_valid) + elif func_name != "isin" and len(args) != 1: + raise ValueError(f"{func_name} function requires exactly one argument.") + # Call the corresponding function with the arguments + return function_map[func_name](*args) + else: + raise ValueError(f"Unsupported function: {func_name}") diff --git a/.venv/lib/python3.11/site-packages/ray/data/_internal/planner/plan_from_items_op.py b/.venv/lib/python3.11/site-packages/ray/data/_internal/planner/plan_from_items_op.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/.venv/lib/python3.11/site-packages/ray/data/_internal/planner/plan_from_pandas_op.py b/.venv/lib/python3.11/site-packages/ray/data/_internal/planner/plan_from_pandas_op.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/.venv/lib/python3.11/site-packages/ray/data/_internal/planner/plan_read_op.py b/.venv/lib/python3.11/site-packages/ray/data/_internal/planner/plan_read_op.py new file mode 100644 index 0000000000000000000000000000000000000000..23984a81cbe88a68c26ebef95fb00de75ec7f002 --- /dev/null +++ b/.venv/lib/python3.11/site-packages/ray/data/_internal/planner/plan_read_op.py @@ -0,0 +1,124 @@ +import logging +import warnings +from typing import Iterable, List + +import ray +from ray.data._internal.compute import TaskPoolStrategy +from ray.data._internal.execution.interfaces import PhysicalOperator, RefBundle +from ray.data._internal.execution.interfaces.task_context import TaskContext +from ray.data._internal.execution.operators.input_data_buffer import InputDataBuffer +from ray.data._internal.execution.operators.map_operator import MapOperator +from ray.data._internal.execution.operators.map_transformer import ( + BlockMapTransformFn, + BuildOutputBlocksMapTransformFn, + MapTransformer, + MapTransformFn, +) +from ray.data._internal.execution.util import memory_string +from ray.data._internal.logical.operators.read_operator import Read +from ray.data._internal.util import _warn_on_high_parallelism +from ray.data.block import Block, BlockMetadata +from ray.data.context import DataContext +from ray.data.datasource.datasource import ReadTask +from ray.experimental.locations import get_local_object_locations +from ray.util.debug import log_once + +TASK_SIZE_WARN_THRESHOLD_BYTES = 1024 * 1024 # 1 MiB + +logger = logging.getLogger(__name__) + + +def cleaned_metadata(read_task: ReadTask, read_task_ref) -> BlockMetadata: + # NOTE: Use the `get_local_object_locations` API to get the size of the + # serialized ReadTask, instead of pickling. + # Because the ReadTask may capture ObjectRef objects, which cannot + # be serialized out-of-band. + locations = get_local_object_locations([read_task_ref]) + task_size = locations[read_task_ref]["object_size"] + if task_size > TASK_SIZE_WARN_THRESHOLD_BYTES and log_once( + f"large_read_task_{read_task.read_fn.__name__}" + ): + warnings.warn( + "The serialized size of your read function named " + f"'{read_task.read_fn.__name__}' is {memory_string(task_size)}. This size " + "relatively large. As a result, Ray might excessively " + "spill objects during execution. To fix this issue, avoid accessing " + f"`self` or other large objects in '{read_task.read_fn.__name__}'." + ) + + # Defensively compute the size of the block as the max size reported by the + # datasource and the actual read task size. This is to guard against issues + # with bad metadata reporting. + block_meta = read_task.metadata + if block_meta.size_bytes is None or task_size > block_meta.size_bytes: + block_meta.size_bytes = task_size + + return block_meta + + +def plan_read_op( + op: Read, + physical_children: List[PhysicalOperator], + data_context: DataContext, +) -> PhysicalOperator: + """Get the corresponding DAG of physical operators for Read. + + Note this method only converts the given `op`, but not its input dependencies. + See Planner.plan() for more details. + """ + assert len(physical_children) == 0 + + def get_input_data(target_max_block_size) -> List[RefBundle]: + parallelism = op.get_detected_parallelism() + assert ( + parallelism is not None + ), "Read parallelism must be set by the optimizer before execution" + read_tasks = op._datasource_or_legacy_reader.get_read_tasks(parallelism) + _warn_on_high_parallelism(parallelism, len(read_tasks)) + + ret = [] + for read_task in read_tasks: + read_task_ref = ray.put(read_task) + ref_bundle = RefBundle( + [ + ( + # TODO(chengsu): figure out a better way to pass read + # tasks other than ray.put(). + read_task_ref, + cleaned_metadata(read_task, read_task_ref), + ) + ], + # `owns_blocks` is False, because these refs are the root of the + # DAG. We shouldn't eagerly free them. Otherwise, the DAG cannot + # be reconstructed. + owns_blocks=False, + ) + ret.append(ref_bundle) + return ret + + inputs = InputDataBuffer( + data_context, + input_data_factory=get_input_data, + ) + + def do_read(blocks: Iterable[ReadTask], _: TaskContext) -> Iterable[Block]: + for read_task in blocks: + yield from read_task() + + # Create a MapTransformer for a read operator + transform_fns: List[MapTransformFn] = [ + # First, execute the read tasks. + BlockMapTransformFn(do_read), + ] + transform_fns.append(BuildOutputBlocksMapTransformFn.for_blocks()) + map_transformer = MapTransformer(transform_fns) + + return MapOperator.create( + map_transformer, + inputs, + data_context, + name=op.name, + target_max_block_size=None, + compute_strategy=TaskPoolStrategy(op._concurrency), + ray_remote_args=op._ray_remote_args, + ) diff --git a/.venv/lib/python3.11/site-packages/ray/data/_internal/planner/planner.py b/.venv/lib/python3.11/site-packages/ray/data/_internal/planner/planner.py new file mode 100644 index 0000000000000000000000000000000000000000..d7c3a1af3f92a0034d2b270d7f481f59537c282c --- /dev/null +++ b/.venv/lib/python3.11/site-packages/ray/data/_internal/planner/planner.py @@ -0,0 +1,185 @@ +from typing import Callable, Dict, List, Tuple, Type, TypeVar + +from ray.data._internal.execution.interfaces import PhysicalOperator +from ray.data._internal.logical.interfaces import ( + LogicalOperator, + LogicalPlan, + PhysicalPlan, +) +from ray.data.context import DataContext +from ray.util.annotations import DeveloperAPI + +LogicalOperatorType = TypeVar("LogicalOperatorType", bound=LogicalOperator) + +PlanLogicalOpFn = Callable[ + [LogicalOperatorType, List[PhysicalOperator], DataContext], PhysicalOperator +] + +# A list of registered plan functions for logical operators. +PLAN_LOGICAL_OP_FNS: List[Tuple[Type[LogicalOperator], PlanLogicalOpFn]] = [] + + +@DeveloperAPI +def register_plan_logical_op_fn( + logical_op_type: Type[LogicalOperator], + plan_fn: PlanLogicalOpFn, +): + """Register a plan function for a logical operator type.""" + PLAN_LOGICAL_OP_FNS.append((logical_op_type, plan_fn)) + + +def _register_default_plan_logical_op_fns(): + from ray.data._internal.execution.operators.aggregate_num_rows import ( + AggregateNumRows, + ) + from ray.data._internal.execution.operators.input_data_buffer import InputDataBuffer + from ray.data._internal.execution.operators.limit_operator import LimitOperator + from ray.data._internal.execution.operators.union_operator import UnionOperator + from ray.data._internal.execution.operators.zip_operator import ZipOperator + from ray.data._internal.logical.operators.all_to_all_operator import ( + AbstractAllToAll, + ) + from ray.data._internal.logical.operators.count_operator import Count + from ray.data._internal.logical.operators.from_operators import AbstractFrom + from ray.data._internal.logical.operators.input_data_operator import InputData + from ray.data._internal.logical.operators.map_operator import ( + AbstractUDFMap, + Filter, + Project, + ) + from ray.data._internal.logical.operators.n_ary_operator import Union, Zip + from ray.data._internal.logical.operators.one_to_one_operator import Limit + from ray.data._internal.logical.operators.read_operator import Read + from ray.data._internal.logical.operators.write_operator import Write + from ray.data._internal.planner.plan_all_to_all_op import plan_all_to_all_op + from ray.data._internal.planner.plan_read_op import plan_read_op + from ray.data._internal.planner.plan_udf_map_op import ( + plan_filter_op, + plan_project_op, + plan_udf_map_op, + ) + from ray.data._internal.planner.plan_write_op import plan_write_op + + register_plan_logical_op_fn(Read, plan_read_op) + + def plan_input_data_op( + logical_op: InputData, + physical_children: List[PhysicalOperator], + data_context: DataContext, + ) -> PhysicalOperator: + """Get the corresponding DAG of physical operators for InputData.""" + assert len(physical_children) == 0 + + return InputDataBuffer( + data_context, + input_data=logical_op.input_data, + input_data_factory=logical_op.input_data_factory, + ) + + register_plan_logical_op_fn(InputData, plan_input_data_op) + register_plan_logical_op_fn(Write, plan_write_op) + + def plan_from_op( + op: AbstractFrom, + physical_children: List[PhysicalOperator], + data_context: DataContext, + ) -> PhysicalOperator: + assert len(physical_children) == 0 + return InputDataBuffer(data_context, op.input_data) + + register_plan_logical_op_fn(AbstractFrom, plan_from_op) + # Filter is also a AbstractUDFMap, so it needs to resolve + # before the AbstractUDFMap plan + # TODO(rliaw): Break up plan_udf_map_op + register_plan_logical_op_fn(Filter, plan_filter_op) + register_plan_logical_op_fn(AbstractUDFMap, plan_udf_map_op) + register_plan_logical_op_fn(AbstractAllToAll, plan_all_to_all_op) + + def plan_zip_op(_, physical_children, data_context): + assert len(physical_children) == 2 + return ZipOperator(physical_children[0], physical_children[1], data_context) + + register_plan_logical_op_fn(Zip, plan_zip_op) + + def plan_union_op(_, physical_children, data_context): + assert len(physical_children) >= 2 + return UnionOperator(data_context, *physical_children) + + register_plan_logical_op_fn(Union, plan_union_op) + + def plan_limit_op(logical_op, physical_children, data_context): + assert len(physical_children) == 1 + return LimitOperator(logical_op._limit, physical_children[0], data_context) + + register_plan_logical_op_fn(Limit, plan_limit_op) + + def plan_count_op(logical_op, physical_children, data_context): + assert len(physical_children) == 1 + return AggregateNumRows( + [physical_children[0]], data_context, column_name=Count.COLUMN_NAME + ) + + register_plan_logical_op_fn(Count, plan_count_op) + + register_plan_logical_op_fn(Project, plan_project_op) + + +_register_default_plan_logical_op_fns() + + +class Planner: + """The planner to convert optimized logical to physical operators. + + Note that planner is only doing operators conversion. Physical optimization work is + done by physical optimizer. + """ + + def __init__(self): + self._physical_op_to_logical_op: Dict[PhysicalOperator, LogicalOperator] = {} + + def plan(self, logical_plan: LogicalPlan) -> PhysicalPlan: + """Convert logical to physical operators recursively in post-order.""" + physical_dag = self._plan(logical_plan.dag, logical_plan.context) + physical_plan = PhysicalPlan( + physical_dag, + self._physical_op_to_logical_op, + logical_plan.context, + ) + return physical_plan + + def _plan( + self, logical_op: LogicalOperator, data_context: DataContext + ) -> PhysicalOperator: + # Plan the input dependencies first. + physical_children = [] + for child in logical_op.input_dependencies: + physical_children.append(self._plan(child, data_context)) + + physical_op = None + for op_type, plan_fn in PLAN_LOGICAL_OP_FNS: + if isinstance(logical_op, op_type): + # We will call `set_logical_operators()` in the following for-loop, + # no need to do it here. + physical_op = plan_fn(logical_op, physical_children, data_context) + break + + if physical_op is None: + raise ValueError( + f"Found unknown logical operator during planning: {logical_op}" + ) + + # Traverse up the DAG, and set the mapping from physical to logical operators. + # At this point, all physical operators without logical operators set + # must have been created by the current logical operator. + queue = [physical_op] + while queue: + curr_physical_op = queue.pop() + # Once we find an operator with a logical operator set, we can stop. + if curr_physical_op._logical_operators: + break + + curr_physical_op.set_logical_operators(logical_op) + queue.extend(physical_op.input_dependencies) + + self._physical_op_to_logical_op[physical_op] = logical_op + return physical_op diff --git a/.venv/lib/python3.11/site-packages/ray/data/_internal/planner/random_shuffle.py b/.venv/lib/python3.11/site-packages/ray/data/_internal/planner/random_shuffle.py new file mode 100644 index 0000000000000000000000000000000000000000..88e5b255cd0e902ee078f3d3620529bb2fd903b2 --- /dev/null +++ b/.venv/lib/python3.11/site-packages/ray/data/_internal/planner/random_shuffle.py @@ -0,0 +1,84 @@ +from typing import Any, Dict, List, Optional, Tuple + +from ray.data._internal.execution.interfaces import ( + AllToAllTransformFn, + RefBundle, + TaskContext, +) +from ray.data._internal.execution.operators.map_transformer import MapTransformer +from ray.data._internal.planner.exchange.pull_based_shuffle_task_scheduler import ( + PullBasedShuffleTaskScheduler, +) +from ray.data._internal.planner.exchange.push_based_shuffle_task_scheduler import ( + PushBasedShuffleTaskScheduler, +) +from ray.data._internal.planner.exchange.shuffle_task_spec import ShuffleTaskSpec +from ray.data._internal.stats import StatsDict +from ray.data.context import DataContext + + +def generate_random_shuffle_fn( + seed: Optional[int], + num_outputs: Optional[int] = None, + ray_remote_args: Optional[Dict[str, Any]] = None, + _debug_limit_shuffle_execution_to_num_blocks: Optional[int] = None, +) -> AllToAllTransformFn: + """Generate function to randomly shuffle each records of blocks.""" + + def fn( + refs: List[RefBundle], + ctx: TaskContext, + ) -> Tuple[List[RefBundle], StatsDict]: + num_input_blocks = sum(len(r.blocks) for r in refs) + + # If map_transformer is specified (e.g. from fusing + # MapOperator->AllToAllOperator), we pass a map function which + # is applied to each block before shuffling. + map_transformer: Optional[MapTransformer] = ctx.upstream_map_transformer + upstream_map_fn = None + nonlocal ray_remote_args + if map_transformer: + # NOTE(swang): We override the target block size with infinity, to + # prevent the upstream map from slicing its output into smaller + # blocks. Since the shuffle task will just fuse these back + # together, the extra slicing and re-fusing can add high memory + # overhead. This can be removed once dynamic block splitting is + # supported for all-to-all ops. + # See https://github.com/ray-project/ray/issues/40518. + map_transformer.set_target_max_block_size(float("inf")) + + def upstream_map_fn(blocks): + return map_transformer.apply_transform(blocks, ctx) + + # If there is a fused upstream operator, + # also use the ray_remote_args from the fused upstream operator. + ray_remote_args = ctx.upstream_map_ray_remote_args + + shuffle_spec = ShuffleTaskSpec( + ctx.target_max_block_size, + random_shuffle=True, + random_seed=seed, + upstream_map_fn=upstream_map_fn, + ) + + if DataContext.get_current().use_push_based_shuffle: + if num_outputs is not None: + raise NotImplementedError( + "Push-based shuffle doesn't support setting num_blocks yet." + ) + scheduler = PushBasedShuffleTaskScheduler(shuffle_spec) + else: + scheduler = PullBasedShuffleTaskScheduler(shuffle_spec) + + return scheduler.execute( + refs, + num_outputs or num_input_blocks, + task_ctx=ctx, + map_ray_remote_args=ray_remote_args, + reduce_ray_remote_args=ray_remote_args, + _debug_limit_execution_to_num_blocks=( + _debug_limit_shuffle_execution_to_num_blocks + ), + ) + + return fn diff --git a/.venv/lib/python3.11/site-packages/ray/data/_internal/planner/randomize_blocks.py b/.venv/lib/python3.11/site-packages/ray/data/_internal/planner/randomize_blocks.py new file mode 100644 index 0000000000000000000000000000000000000000..835017f2cafd5a87450e34d03b713a5a58bb03c1 --- /dev/null +++ b/.venv/lib/python3.11/site-packages/ray/data/_internal/planner/randomize_blocks.py @@ -0,0 +1,51 @@ +from typing import List, Tuple + +from ray.data._internal.execution.interfaces import ( + AllToAllTransformFn, + RefBundle, + TaskContext, +) +from ray.data._internal.logical.operators.all_to_all_operator import RandomizeBlocks +from ray.data._internal.stats import StatsDict + + +def generate_randomize_blocks_fn( + op: RandomizeBlocks, +) -> AllToAllTransformFn: + """Generate function to randomize order of blocks.""" + + def fn( + refs: List[RefBundle], context: TaskContext + ) -> Tuple[List[RefBundle], StatsDict]: + import random + + nonlocal op + blocks_with_metadata = [] + for ref_bundle in refs: + blocks_with_metadata.extend(ref_bundle.blocks) + + if len(blocks_with_metadata) == 0: + return refs, {op._name: []} + else: + if op._seed is not None: + random.seed(op._seed) + input_owned = all(b.owns_blocks for b in refs) + random.shuffle(blocks_with_metadata) + output = [] + meta_list = [] + for block, meta in blocks_with_metadata: + meta_list.append(meta) + output.append( + RefBundle( + [ + ( + block, + meta, + ) + ], + owns_blocks=input_owned, + ) + ) + return output, {op._name: meta_list} + + return fn diff --git a/.venv/lib/python3.11/site-packages/ray/data/_internal/planner/repartition.py b/.venv/lib/python3.11/site-packages/ray/data/_internal/planner/repartition.py new file mode 100644 index 0000000000000000000000000000000000000000..73059f70353507f151e6640c2ee9e0c0d28f5304 --- /dev/null +++ b/.venv/lib/python3.11/site-packages/ray/data/_internal/planner/repartition.py @@ -0,0 +1,82 @@ +from typing import List, Optional, Tuple + +from ray.data._internal.execution.interfaces import ( + AllToAllTransformFn, + RefBundle, + TaskContext, +) +from ray.data._internal.execution.operators.map_transformer import MapTransformer +from ray.data._internal.planner.exchange.pull_based_shuffle_task_scheduler import ( + PullBasedShuffleTaskScheduler, +) +from ray.data._internal.planner.exchange.push_based_shuffle_task_scheduler import ( + PushBasedShuffleTaskScheduler, +) +from ray.data._internal.planner.exchange.shuffle_task_spec import ShuffleTaskSpec +from ray.data._internal.planner.exchange.split_repartition_task_scheduler import ( + SplitRepartitionTaskScheduler, +) +from ray.data._internal.stats import StatsDict +from ray.data.context import DataContext + + +def generate_repartition_fn( + num_outputs: int, + shuffle: bool, + _debug_limit_shuffle_execution_to_num_blocks: Optional[int] = None, +) -> AllToAllTransformFn: + """Generate function to partition each records of blocks.""" + + def shuffle_repartition_fn( + refs: List[RefBundle], + ctx: TaskContext, + ) -> Tuple[List[RefBundle], StatsDict]: + # If map_transformer is specified (e.g. from fusing + # MapOperator->AllToAllOperator), we pass a map function which + # is applied to each block before shuffling. + map_transformer: Optional["MapTransformer"] = ctx.upstream_map_transformer + upstream_map_fn = None + if map_transformer: + # NOTE(swang): We override the target block size with infinity, to + # prevent the upstream map from slicing its output into smaller + # blocks. Since the shuffle task will just fuse these back + # together, the extra slicing and re-fusing can add high memory + # overhead. This can be removed once dynamic block splitting is + # supported for all-to-all ops. + # See https://github.com/ray-project/ray/issues/40518. + map_transformer.set_target_max_block_size(float("inf")) + + def upstream_map_fn(blocks): + return map_transformer.apply_transform(blocks, ctx) + + shuffle_spec = ShuffleTaskSpec( + ctx.target_max_block_size, + random_shuffle=False, + upstream_map_fn=upstream_map_fn, + ) + + if DataContext.get_current().use_push_based_shuffle: + scheduler = PushBasedShuffleTaskScheduler(shuffle_spec) + else: + scheduler = PullBasedShuffleTaskScheduler(shuffle_spec) + + return scheduler.execute( + refs, + num_outputs, + ctx, + _debug_limit_execution_to_num_blocks=( + _debug_limit_shuffle_execution_to_num_blocks + ), + ) + + def split_repartition_fn( + refs: List[RefBundle], + ctx: TaskContext, + ) -> Tuple[List[RefBundle], StatsDict]: + shuffle_spec = ShuffleTaskSpec(ctx.target_max_block_size, random_shuffle=False) + scheduler = SplitRepartitionTaskScheduler(shuffle_spec) + return scheduler.execute(refs, num_outputs, ctx) + + if shuffle: + return shuffle_repartition_fn + return split_repartition_fn diff --git a/.venv/lib/python3.11/site-packages/ray/data/_internal/planner/sort.py b/.venv/lib/python3.11/site-packages/ray/data/_internal/planner/sort.py new file mode 100644 index 0000000000000000000000000000000000000000..ffb936d74bea47881fbc25954b25643ca540e7d0 --- /dev/null +++ b/.venv/lib/python3.11/site-packages/ray/data/_internal/planner/sort.py @@ -0,0 +1,83 @@ +from functools import partial +from typing import List, Optional, Tuple + +from ray.data._internal.execution.interfaces import ( + AllToAllTransformFn, + RefBundle, + TaskContext, +) +from ray.data._internal.planner.exchange.pull_based_shuffle_task_scheduler import ( + PullBasedShuffleTaskScheduler, +) +from ray.data._internal.planner.exchange.push_based_shuffle_task_scheduler import ( + PushBasedShuffleTaskScheduler, +) +from ray.data._internal.planner.exchange.sort_task_spec import SortKey, SortTaskSpec +from ray.data._internal.stats import StatsDict +from ray.data._internal.util import unify_block_metadata_schema +from ray.data.context import DataContext + + +def generate_sort_fn( + sort_key: SortKey, + batch_format: str, + _debug_limit_shuffle_execution_to_num_blocks: Optional[int] = None, +) -> AllToAllTransformFn: + """Generate function to sort blocks by the specified key column or key function.""" + + def fn( + sort_key: SortKey, + refs: List[RefBundle], + ctx: TaskContext, + ) -> Tuple[List[RefBundle], StatsDict]: + blocks = [] + metadata = [] + for ref_bundle in refs: + blocks.extend(ref_bundle.block_refs) + metadata.extend(ref_bundle.metadata) + if len(blocks) == 0: + return (blocks, {}) + sort_key.validate_schema(unify_block_metadata_schema(metadata)) + + num_mappers = len(blocks) + # Use same number of output partitions. + num_outputs = num_mappers + + # Sample boundaries for sort key. + if not sort_key.boundaries: + sample_bar = ctx.sub_progress_bar_dict[ + SortTaskSpec.SORT_SAMPLE_SUB_PROGRESS_BAR_NAME + ] + boundaries = SortTaskSpec.sample_boundaries( + blocks, sort_key, num_outputs, sample_bar + ) + else: + # For user-specified boundaries (which only partition by the primary + # sort key), reverse `boundaries` so that the partitions are produced + # in descending order, as desired. + boundaries = [(b,) for b in sort_key.boundaries] + if sort_key.get_descending()[0]: + boundaries = boundaries[::-1] + num_outputs = len(boundaries) + 1 + sort_spec = SortTaskSpec( + boundaries=boundaries, sort_key=sort_key, batch_format=batch_format + ) + + if DataContext.get_current().use_push_based_shuffle: + scheduler = PushBasedShuffleTaskScheduler(sort_spec) + else: + scheduler = PullBasedShuffleTaskScheduler(sort_spec) + + return scheduler.execute( + refs, + num_outputs, + ctx, + _debug_limit_execution_to_num_blocks=( + _debug_limit_shuffle_execution_to_num_blocks + ), + ) + + # NOTE: use partial function to pass parameters to avoid error like + # "UnboundLocalError: local variable ... referenced before assignment", + # because `key` and `descending` variables are reassigned in `fn()`. + return partial(fn, sort_key) diff --git a/llm_tutorial/llm_recipes/models/hf-model-eval/llm-jp-v3-3.7b_ja-zh_3M-pairs/iter_0000992/model-00004-of-00004.safetensors b/llm_tutorial/llm_recipes/models/hf-model-eval/llm-jp-v3-3.7b_ja-zh_3M-pairs/iter_0000992/model-00004-of-00004.safetensors new file mode 100644 index 0000000000000000000000000000000000000000..28a8a275832584e8d47a4a6bbe4785ca5f57091d --- /dev/null +++ b/llm_tutorial/llm_recipes/models/hf-model-eval/llm-jp-v3-3.7b_ja-zh_3M-pairs/iter_0000992/model-00004-of-00004.safetensors @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:928f76700437a67d2a6c848264c51d979e37f4c2778e002bc9c2d63fcd044961 +size 1223688320