koichi12 commited on
Commit
edb79de
·
verified ·
1 Parent(s): 7cace8b

Add files using upload-large-folder tool

Browse files
This view is limited to 50 files because it contains too many changes.   See raw diff
Files changed (50) hide show
  1. .venv/lib/python3.11/site-packages/ray/data/_internal/__pycache__/block_builder.cpython-311.pyc +0 -0
  2. .venv/lib/python3.11/site-packages/ray/data/_internal/__pycache__/delegating_block_builder.cpython-311.pyc +0 -0
  3. .venv/lib/python3.11/site-packages/ray/data/_internal/__pycache__/equalize.cpython-311.pyc +0 -0
  4. .venv/lib/python3.11/site-packages/ray/data/_internal/__pycache__/logging.cpython-311.pyc +0 -0
  5. .venv/lib/python3.11/site-packages/ray/data/_internal/__pycache__/output_buffer.cpython-311.pyc +0 -0
  6. .venv/lib/python3.11/site-packages/ray/data/_internal/__pycache__/plan.cpython-311.pyc +0 -0
  7. .venv/lib/python3.11/site-packages/ray/data/_internal/__pycache__/remote_fn.cpython-311.pyc +0 -0
  8. .venv/lib/python3.11/site-packages/ray/data/_internal/__pycache__/row.cpython-311.pyc +0 -0
  9. .venv/lib/python3.11/site-packages/ray/data/_internal/__pycache__/stats.cpython-311.pyc +0 -0
  10. .venv/lib/python3.11/site-packages/ray/data/_internal/__pycache__/table_block.cpython-311.pyc +0 -0
  11. .venv/lib/python3.11/site-packages/ray/data/_internal/__pycache__/util.cpython-311.pyc +0 -0
  12. .venv/lib/python3.11/site-packages/ray/data/_internal/execution/__pycache__/autoscaling_requester.cpython-311.pyc +0 -0
  13. .venv/lib/python3.11/site-packages/ray/data/_internal/execution/__pycache__/execution_callback.cpython-311.pyc +0 -0
  14. .venv/lib/python3.11/site-packages/ray/data/_internal/execution/__pycache__/legacy_compat.cpython-311.pyc +0 -0
  15. .venv/lib/python3.11/site-packages/ray/data/_internal/execution/__pycache__/resource_manager.cpython-311.pyc +0 -0
  16. .venv/lib/python3.11/site-packages/ray/data/_internal/execution/__pycache__/streaming_executor.cpython-311.pyc +0 -0
  17. .venv/lib/python3.11/site-packages/ray/data/_internal/execution/__pycache__/streaming_executor_state.cpython-311.pyc +0 -0
  18. .venv/lib/python3.11/site-packages/ray/data/_internal/execution/__pycache__/util.cpython-311.pyc +0 -0
  19. .venv/lib/python3.11/site-packages/ray/data/_internal/execution/autoscaler/__pycache__/__init__.cpython-311.pyc +0 -0
  20. .venv/lib/python3.11/site-packages/ray/data/_internal/execution/autoscaler/__pycache__/autoscaler.cpython-311.pyc +0 -0
  21. .venv/lib/python3.11/site-packages/ray/data/_internal/execution/autoscaler/__pycache__/default_autoscaler.cpython-311.pyc +0 -0
  22. .venv/lib/python3.11/site-packages/ray/data/_internal/execution/bundle_queue/fifo_bundle_queue.py +129 -0
  23. .venv/lib/python3.11/site-packages/ray/data/_internal/execution/interfaces/__pycache__/__init__.cpython-311.pyc +0 -0
  24. .venv/lib/python3.11/site-packages/ray/data/_internal/execution/interfaces/__pycache__/common.cpython-311.pyc +0 -0
  25. .venv/lib/python3.11/site-packages/ray/data/_internal/execution/interfaces/__pycache__/execution_options.cpython-311.pyc +0 -0
  26. .venv/lib/python3.11/site-packages/ray/data/_internal/execution/interfaces/__pycache__/executor.cpython-311.pyc +0 -0
  27. .venv/lib/python3.11/site-packages/ray/data/_internal/execution/interfaces/__pycache__/op_runtime_metrics.cpython-311.pyc +0 -0
  28. .venv/lib/python3.11/site-packages/ray/data/_internal/execution/interfaces/__pycache__/physical_operator.cpython-311.pyc +0 -0
  29. .venv/lib/python3.11/site-packages/ray/data/_internal/execution/interfaces/__pycache__/ref_bundle.cpython-311.pyc +0 -0
  30. .venv/lib/python3.11/site-packages/ray/data/_internal/execution/interfaces/__pycache__/task_context.cpython-311.pyc +0 -0
  31. .venv/lib/python3.11/site-packages/ray/data/_internal/execution/interfaces/__pycache__/transform_fn.cpython-311.pyc +0 -0
  32. .venv/lib/python3.11/site-packages/ray/data/_internal/logical/optimizers.py +94 -0
  33. .venv/lib/python3.11/site-packages/ray/data/_internal/logical/rules/inherit_batch_format.py +42 -0
  34. .venv/lib/python3.11/site-packages/ray/data/_internal/logical/rules/inherit_target_max_block_size.py +30 -0
  35. .venv/lib/python3.11/site-packages/ray/data/_internal/logical/rules/limit_pushdown.py +133 -0
  36. .venv/lib/python3.11/site-packages/ray/data/_internal/logical/rules/operator_fusion.py +491 -0
  37. .venv/lib/python3.11/site-packages/ray/data/_internal/logical/rules/set_read_parallelism.py +132 -0
  38. .venv/lib/python3.11/site-packages/ray/data/_internal/logical/rules/zero_copy_map_fusion.py +88 -0
  39. .venv/lib/python3.11/site-packages/ray/data/_internal/logical/util.py +112 -0
  40. .venv/lib/python3.11/site-packages/ray/data/_internal/planner/__init__.py +0 -0
  41. .venv/lib/python3.11/site-packages/ray/data/_internal/planner/__pycache__/plan_all_to_all_op.cpython-311.pyc +0 -0
  42. .venv/lib/python3.11/site-packages/ray/data/_internal/planner/__pycache__/plan_from_arrow_op.cpython-311.pyc +0 -0
  43. .venv/lib/python3.11/site-packages/ray/data/_internal/planner/__pycache__/plan_from_items_op.cpython-311.pyc +0 -0
  44. .venv/lib/python3.11/site-packages/ray/data/_internal/planner/__pycache__/plan_from_numpy_op.cpython-311.pyc +0 -0
  45. .venv/lib/python3.11/site-packages/ray/data/_internal/planner/__pycache__/plan_udf_map_op.cpython-311.pyc +0 -0
  46. .venv/lib/python3.11/site-packages/ray/data/_internal/planner/__pycache__/planner.cpython-311.pyc +0 -0
  47. .venv/lib/python3.11/site-packages/ray/data/_internal/planner/__pycache__/random_shuffle.cpython-311.pyc +0 -0
  48. .venv/lib/python3.11/site-packages/ray/data/_internal/planner/__pycache__/randomize_blocks.cpython-311.pyc +0 -0
  49. .venv/lib/python3.11/site-packages/ray/data/_internal/planner/__pycache__/repartition.cpython-311.pyc +0 -0
  50. .venv/lib/python3.11/site-packages/ray/data/_internal/planner/__pycache__/sort.cpython-311.pyc +0 -0
.venv/lib/python3.11/site-packages/ray/data/_internal/__pycache__/block_builder.cpython-311.pyc ADDED
Binary file (2.56 kB). View file
 
.venv/lib/python3.11/site-packages/ray/data/_internal/__pycache__/delegating_block_builder.cpython-311.pyc ADDED
Binary file (4.85 kB). View file
 
.venv/lib/python3.11/site-packages/ray/data/_internal/__pycache__/equalize.cpython-311.pyc ADDED
Binary file (6.88 kB). View file
 
.venv/lib/python3.11/site-packages/ray/data/_internal/__pycache__/logging.cpython-311.pyc ADDED
Binary file (8.64 kB). View file
 
.venv/lib/python3.11/site-packages/ray/data/_internal/__pycache__/output_buffer.cpython-311.pyc ADDED
Binary file (5.44 kB). View file
 
.venv/lib/python3.11/site-packages/ray/data/_internal/__pycache__/plan.cpython-311.pyc ADDED
Binary file (25.4 kB). View file
 
.venv/lib/python3.11/site-packages/ray/data/_internal/__pycache__/remote_fn.cpython-311.pyc ADDED
Binary file (4.03 kB). View file
 
.venv/lib/python3.11/site-packages/ray/data/_internal/__pycache__/row.cpython-311.pyc ADDED
Binary file (2.44 kB). View file
 
.venv/lib/python3.11/site-packages/ray/data/_internal/__pycache__/stats.cpython-311.pyc ADDED
Binary file (78.9 kB). View file
 
.venv/lib/python3.11/site-packages/ray/data/_internal/__pycache__/table_block.cpython-311.pyc ADDED
Binary file (18.3 kB). View file
 
.venv/lib/python3.11/site-packages/ray/data/_internal/__pycache__/util.cpython-311.pyc ADDED
Binary file (50.7 kB). View file
 
.venv/lib/python3.11/site-packages/ray/data/_internal/execution/__pycache__/autoscaling_requester.cpython-311.pyc ADDED
Binary file (6.43 kB). View file
 
.venv/lib/python3.11/site-packages/ray/data/_internal/execution/__pycache__/execution_callback.cpython-311.pyc ADDED
Binary file (2.58 kB). View file
 
.venv/lib/python3.11/site-packages/ray/data/_internal/execution/__pycache__/legacy_compat.cpython-311.pyc ADDED
Binary file (8.06 kB). View file
 
.venv/lib/python3.11/site-packages/ray/data/_internal/execution/__pycache__/resource_manager.cpython-311.pyc ADDED
Binary file (32 kB). View file
 
.venv/lib/python3.11/site-packages/ray/data/_internal/execution/__pycache__/streaming_executor.cpython-311.pyc ADDED
Binary file (25.8 kB). View file
 
.venv/lib/python3.11/site-packages/ray/data/_internal/execution/__pycache__/streaming_executor_state.cpython-311.pyc ADDED
Binary file (34 kB). View file
 
.venv/lib/python3.11/site-packages/ray/data/_internal/execution/__pycache__/util.cpython-311.pyc ADDED
Binary file (4.44 kB). View file
 
.venv/lib/python3.11/site-packages/ray/data/_internal/execution/autoscaler/__pycache__/__init__.cpython-311.pyc ADDED
Binary file (699 Bytes). View file
 
.venv/lib/python3.11/site-packages/ray/data/_internal/execution/autoscaler/__pycache__/autoscaler.cpython-311.pyc ADDED
Binary file (2.43 kB). View file
 
.venv/lib/python3.11/site-packages/ray/data/_internal/execution/autoscaler/__pycache__/default_autoscaler.cpython-311.pyc ADDED
Binary file (10.2 kB). View file
 
.venv/lib/python3.11/site-packages/ray/data/_internal/execution/bundle_queue/fifo_bundle_queue.py ADDED
@@ -0,0 +1,129 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from collections import defaultdict, deque
2
+ from dataclasses import dataclass
3
+ from typing import TYPE_CHECKING, Dict, List, Optional
4
+
5
+ from .bundle_queue import BundleQueue
6
+
7
+ if TYPE_CHECKING:
8
+ from ray.data._internal.execution.interfaces import RefBundle
9
+
10
+
11
+ @dataclass
12
+ class _Node:
13
+ value: "RefBundle"
14
+ next: Optional["_Node"] = None
15
+ prev: Optional["_Node"] = None
16
+
17
+
18
+ class FIFOBundleQueue(BundleQueue):
19
+ """A bundle queue that follows a first-in-first-out policy."""
20
+
21
+ def __init__(self):
22
+ # We manually implement a linked list because we need to remove elements
23
+ # efficiently, and Python's built-in data structures have O(n) removal time.
24
+ self._head: Optional[_Node] = None
25
+ self._tail: Optional[_Node] = None
26
+ # We use a dictionary to keep track of the nodes corresponding to each bundle.
27
+ # This allows us to remove a bundle from the queue in O(1) time. We need a list
28
+ # because a bundle can be added to the queue multiple times. Nodes in each list
29
+ # are insertion-ordered.
30
+ self._bundle_to_nodes: Dict["RefBundle", List[_Node]] = defaultdict(deque)
31
+
32
+ self._nbytes = 0
33
+ self._num_bundles = 0
34
+
35
+ def __len__(self) -> int:
36
+ return self._num_bundles
37
+
38
+ def __contains__(self, bundle: "RefBundle") -> bool:
39
+ return bundle in self._bundle_to_nodes
40
+
41
+ def add(self, bundle: "RefBundle") -> None:
42
+ """Add a bundle to the end (right) of the queue."""
43
+ new_node = _Node(value=bundle, next=None, prev=self._tail)
44
+ # Case 1: The queue is empty.
45
+ if self._head is None:
46
+ assert self._tail is None
47
+ self._head = new_node
48
+ self._tail = new_node
49
+ # Case 2: The queue has at least one element.
50
+ else:
51
+ self._tail.next = new_node
52
+ self._tail = new_node
53
+
54
+ self._bundle_to_nodes[bundle].append(new_node)
55
+
56
+ self._nbytes += bundle.size_bytes()
57
+ self._num_bundles += 1
58
+
59
+ def pop(self) -> "RefBundle":
60
+ """Return the first (left) bundle in the queue."""
61
+ # Case 1: The queue is empty.
62
+ if not self._head:
63
+ raise IndexError("You can't pop from an empty queue")
64
+
65
+ bundle = self._head.value
66
+ self.remove(bundle)
67
+
68
+ return bundle
69
+
70
+ def peek(self) -> Optional["RefBundle"]:
71
+ """Return the first (left) bundle in the queue without removing it."""
72
+ if self._head is None:
73
+ return None
74
+
75
+ return self._head.value
76
+
77
+ def remove(self, bundle: "RefBundle"):
78
+ """Remove a bundle from the queue.
79
+
80
+ If there are multiple instances of the bundle in the queue, this method only
81
+ removes the first one.
82
+ """
83
+ # Case 1: The queue is empty.
84
+ if bundle not in self._bundle_to_nodes:
85
+ raise ValueError(f"The bundle {bundle} is not in the queue.")
86
+
87
+ node = self._bundle_to_nodes[bundle].popleft()
88
+ if not self._bundle_to_nodes[bundle]:
89
+ del self._bundle_to_nodes[bundle]
90
+
91
+ # Case 2: The bundle is the only element in the queue.
92
+ if self._head is self._tail:
93
+ self._head = None
94
+ self._tail = None
95
+ # Case 3: The bundle is the first element in the queue.
96
+ elif node is self._head:
97
+ self._head = node.next
98
+ self._head.prev = None
99
+ # Case 4: The bundle is the last element in the queue.
100
+ elif node is self._tail:
101
+ self._tail = node.prev
102
+ self._tail.next = None
103
+ # Case 5: The bundle is in the middle of the queue.
104
+ else:
105
+ node.prev.next = node.next
106
+ node.next.prev = node.prev
107
+
108
+ self._nbytes -= bundle.size_bytes()
109
+ assert self._nbytes >= 0, (
110
+ "Expected the total size of objects in the queue to be non-negative, but "
111
+ f"got {self._nbytes} bytes instead."
112
+ )
113
+
114
+ self._num_bundles -= 1
115
+
116
+ return node.value
117
+
118
+ def clear(self):
119
+ self._head = None
120
+ self._tail = None
121
+ self._bundle_to_nodes.clear()
122
+ self._nbytes = 0
123
+ self._num_bundles = 0
124
+
125
+ def estimate_size_bytes(self) -> int:
126
+ return self._nbytes
127
+
128
+ def is_empty(self):
129
+ return not self._bundle_to_nodes and self._head is None and self._tail is None
.venv/lib/python3.11/site-packages/ray/data/_internal/execution/interfaces/__pycache__/__init__.cpython-311.pyc ADDED
Binary file (812 Bytes). View file
 
.venv/lib/python3.11/site-packages/ray/data/_internal/execution/interfaces/__pycache__/common.cpython-311.pyc ADDED
Binary file (226 Bytes). View file
 
.venv/lib/python3.11/site-packages/ray/data/_internal/execution/interfaces/__pycache__/execution_options.cpython-311.pyc ADDED
Binary file (15 kB). View file
 
.venv/lib/python3.11/site-packages/ray/data/_internal/execution/interfaces/__pycache__/executor.cpython-311.pyc ADDED
Binary file (4.23 kB). View file
 
.venv/lib/python3.11/site-packages/ray/data/_internal/execution/interfaces/__pycache__/op_runtime_metrics.cpython-311.pyc ADDED
Binary file (25.3 kB). View file
 
.venv/lib/python3.11/site-packages/ray/data/_internal/execution/interfaces/__pycache__/physical_operator.cpython-311.pyc ADDED
Binary file (27.5 kB). View file
 
.venv/lib/python3.11/site-packages/ray/data/_internal/execution/interfaces/__pycache__/ref_bundle.cpython-311.pyc ADDED
Binary file (8.14 kB). View file
 
.venv/lib/python3.11/site-packages/ray/data/_internal/execution/interfaces/__pycache__/task_context.cpython-311.pyc ADDED
Binary file (1.57 kB). View file
 
.venv/lib/python3.11/site-packages/ray/data/_internal/execution/interfaces/__pycache__/transform_fn.cpython-311.pyc ADDED
Binary file (644 Bytes). View file
 
.venv/lib/python3.11/site-packages/ray/data/_internal/logical/optimizers.py ADDED
@@ -0,0 +1,94 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from typing import List, Optional, Type
2
+
3
+ from ray.data._internal.logical.interfaces import (
4
+ LogicalPlan,
5
+ Optimizer,
6
+ PhysicalPlan,
7
+ Rule,
8
+ )
9
+ from ray.data._internal.logical.rules.inherit_batch_format import InheritBatchFormatRule
10
+ from ray.data._internal.logical.rules.inherit_target_max_block_size import (
11
+ InheritTargetMaxBlockSizeRule,
12
+ )
13
+ from ray.data._internal.logical.rules.operator_fusion import OperatorFusionRule
14
+ from ray.data._internal.logical.rules.randomize_blocks import ReorderRandomizeBlocksRule
15
+ from ray.data._internal.logical.rules.set_read_parallelism import SetReadParallelismRule
16
+ from ray.data._internal.logical.rules.zero_copy_map_fusion import (
17
+ EliminateBuildOutputBlocks,
18
+ )
19
+ from ray.data._internal.planner.planner import Planner
20
+ from ray.util.annotations import DeveloperAPI
21
+
22
+ _LOGICAL_RULES = [
23
+ ReorderRandomizeBlocksRule,
24
+ InheritBatchFormatRule,
25
+ ]
26
+
27
+ _PHYSICAL_RULES = [
28
+ InheritTargetMaxBlockSizeRule,
29
+ SetReadParallelismRule,
30
+ OperatorFusionRule,
31
+ EliminateBuildOutputBlocks,
32
+ ]
33
+
34
+
35
+ @DeveloperAPI
36
+ def register_logical_rule(cls: Type[Rule], insert_index: Optional[int] = None):
37
+ if cls in _LOGICAL_RULES:
38
+ return
39
+
40
+ if insert_index is None:
41
+ _LOGICAL_RULES.append(cls)
42
+ else:
43
+ _LOGICAL_RULES.insert(insert_index, cls)
44
+
45
+
46
+ @DeveloperAPI
47
+ def get_logical_rules() -> List[Type[Rule]]:
48
+ return list(_LOGICAL_RULES)
49
+
50
+
51
+ @DeveloperAPI
52
+ def register_physical_rule(cls: Type[Rule], insert_index: Optional[int] = None):
53
+ if cls in _PHYSICAL_RULES:
54
+ return
55
+
56
+ if insert_index is None:
57
+ _PHYSICAL_RULES.append(cls)
58
+ else:
59
+ _PHYSICAL_RULES.insert(insert_index, cls)
60
+
61
+
62
+ @DeveloperAPI
63
+ def get_physical_rules() -> List[Type[Rule]]:
64
+ return list(_PHYSICAL_RULES)
65
+
66
+
67
+ class LogicalOptimizer(Optimizer):
68
+ """The optimizer for logical operators."""
69
+
70
+ @property
71
+ def rules(self) -> List[Rule]:
72
+ return [rule_cls() for rule_cls in _LOGICAL_RULES]
73
+
74
+
75
+ class PhysicalOptimizer(Optimizer):
76
+ """The optimizer for physical operators."""
77
+
78
+ @property
79
+ def rules(self) -> List[Rule]:
80
+ return [rule_cls() for rule_cls in _PHYSICAL_RULES]
81
+
82
+
83
+ def get_execution_plan(logical_plan: LogicalPlan) -> PhysicalPlan:
84
+ """Get the physical execution plan for the provided logical plan.
85
+
86
+ This process has 3 steps:
87
+ (1) logical optimization: optimize logical operators.
88
+ (2) planning: convert logical to physical operators.
89
+ (3) physical optimization: optimize physical operators.
90
+ """
91
+ optimized_logical_plan = LogicalOptimizer().optimize(logical_plan)
92
+ logical_plan._dag = optimized_logical_plan.dag
93
+ physical_plan = Planner().plan(optimized_logical_plan)
94
+ return PhysicalOptimizer().optimize(physical_plan)
.venv/lib/python3.11/site-packages/ray/data/_internal/logical/rules/inherit_batch_format.py ADDED
@@ -0,0 +1,42 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from collections import deque
2
+ from typing import Iterable
3
+
4
+ from ray.data._internal.logical.interfaces import LogicalOperator, LogicalPlan, Rule
5
+ from ray.data._internal.logical.operators.all_to_all_operator import AbstractAllToAll
6
+ from ray.data._internal.logical.operators.map_operator import MapBatches
7
+
8
+
9
+ class InheritBatchFormatRule(Rule):
10
+ """For AbstractAllToAll based operator, apply this rule
11
+ to inherit batch_format from upstream operator by traversing
12
+ the entire DAG."""
13
+
14
+ def apply(self, plan: LogicalPlan) -> LogicalPlan:
15
+ optimized_dag: LogicalOperator = self._apply(plan.dag)
16
+ new_plan = LogicalPlan(dag=optimized_dag, context=plan.context)
17
+ return new_plan
18
+
19
+ def _apply(self, op: LogicalOperator):
20
+ # Post-order traversal.
21
+ nodes: Iterable[LogicalOperator] = deque()
22
+ for node in op.post_order_iter():
23
+ nodes.appendleft(node)
24
+
25
+ while len(nodes) > 0:
26
+ current_op = nodes.pop()
27
+
28
+ if isinstance(current_op, AbstractAllToAll):
29
+ # traversal up the DAG until we find MapBatches with batch_format
30
+ # or we reach to source op and do nothing
31
+ upstream_op = current_op.input_dependencies[0]
32
+ while upstream_op.input_dependencies:
33
+ if (
34
+ isinstance(upstream_op, MapBatches)
35
+ and upstream_op._batch_format
36
+ ):
37
+ current_op._batch_format = upstream_op._batch_format
38
+ break
39
+ upstream_op = upstream_op.input_dependencies[0]
40
+
41
+ # just return the default op
42
+ return op
.venv/lib/python3.11/site-packages/ray/data/_internal/logical/rules/inherit_target_max_block_size.py ADDED
@@ -0,0 +1,30 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from typing import Optional
2
+
3
+ from ray.data._internal.execution.interfaces import PhysicalOperator
4
+ from ray.data._internal.logical.interfaces import PhysicalPlan, Rule
5
+
6
+
7
+ class InheritTargetMaxBlockSizeRule(Rule):
8
+ """For each op that has overridden the default target max block size,
9
+ propagate to upstream ops until we reach an op that has also overridden the
10
+ target max block size."""
11
+
12
+ def apply(self, plan: PhysicalPlan) -> PhysicalPlan:
13
+ self._propagate_target_max_block_size_to_upstream_ops(plan.dag)
14
+ return plan
15
+
16
+ def _propagate_target_max_block_size_to_upstream_ops(
17
+ self, dag: PhysicalOperator, target_max_block_size: Optional[int] = None
18
+ ):
19
+ if dag.target_max_block_size is not None:
20
+ # Set the target block size to inherit for
21
+ # upstream ops.
22
+ target_max_block_size = dag.target_max_block_size
23
+ elif target_max_block_size is not None:
24
+ # Inherit from downstream op.
25
+ dag.set_target_max_block_size(target_max_block_size)
26
+
27
+ for upstream_op in dag.input_dependencies:
28
+ self._propagate_target_max_block_size_to_upstream_ops(
29
+ upstream_op, target_max_block_size
30
+ )
.venv/lib/python3.11/site-packages/ray/data/_internal/logical/rules/limit_pushdown.py ADDED
@@ -0,0 +1,133 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import copy
2
+ from collections import deque
3
+ from typing import Iterable, List
4
+
5
+ from ray.data._internal.logical.interfaces import LogicalOperator, LogicalPlan, Rule
6
+ from ray.data._internal.logical.operators.one_to_one_operator import (
7
+ AbstractOneToOne,
8
+ Limit,
9
+ )
10
+ from ray.data._internal.logical.operators.read_operator import Read
11
+
12
+
13
+ class LimitPushdownRule(Rule):
14
+ """Rule for pushing down the limit operator.
15
+
16
+ When a limit operator is present, we apply the limit on the
17
+ most upstream operator that supports it. Notably, we move the
18
+ Limit operator downstream from Read op, any other non-OneToOne operator,
19
+ or any operator which could potentially change the number of output rows.
20
+
21
+ In addition, we also fuse consecutive Limit operators into a single
22
+ Limit operator, i.e. `Limit[n] -> Limit[m]` becomes `Limit[min(n, m)]`.
23
+ """
24
+
25
+ def apply(self, plan: LogicalPlan) -> LogicalPlan:
26
+ optimized_dag = self._apply_limit_pushdown(plan.dag)
27
+ optimized_dag = self._apply_limit_fusion(optimized_dag)
28
+ return LogicalPlan(dag=optimized_dag, context=plan.context)
29
+
30
+ def _apply_limit_pushdown(self, op: LogicalOperator) -> LogicalOperator:
31
+ """Given a DAG of LogicalOperators, traverse the DAG and push down
32
+ Limit operators, i.e. move Limit operators as far upstream as possible.
33
+
34
+ Returns a new LogicalOperator with the Limit operators pushed down."""
35
+ # Post-order traversal.
36
+ nodes: Iterable[LogicalOperator] = deque()
37
+ for node in op.post_order_iter():
38
+ nodes.appendleft(node)
39
+
40
+ while len(nodes) > 0:
41
+ current_op = nodes.pop()
42
+
43
+ # If we encounter a Limit op, move it upstream until it reaches:
44
+ # - Read operator
45
+ # - A non-AbstractOneToOne operator (e.g. AbstractAllToAll)
46
+ # - An AbstractOneToOne operator that could change the number of output rows
47
+
48
+ # TODO(scottjlee): in our current abstraction, we have Read extend
49
+ # AbstractMap (with no input dependency), which extends AbstractOneToOne.
50
+ # So we have to explicitly separate the Read op in its own check.
51
+ # We should remove this case once we refactor Read op to no longer
52
+ # be an AbstractOneToOne op.
53
+ if isinstance(current_op, Limit):
54
+ limit_op_copy = copy.copy(current_op)
55
+
56
+ # Traverse up the DAG until we reach the first operator that meets
57
+ # one of the conditions above, which will serve as the new input
58
+ # into the Limit operator.
59
+ new_input_into_limit = current_op.input_dependency
60
+ ops_between_new_input_and_limit: List[LogicalOperator] = []
61
+ while (
62
+ isinstance(new_input_into_limit, AbstractOneToOne)
63
+ and not isinstance(new_input_into_limit, Read)
64
+ and not getattr(new_input_into_limit, "can_modify_num_rows", False)
65
+ ):
66
+ new_input_into_limit_copy = copy.copy(new_input_into_limit)
67
+ ops_between_new_input_and_limit.append(new_input_into_limit_copy)
68
+ new_input_into_limit = new_input_into_limit.input_dependency
69
+
70
+ # Link the Limit operator and its newly designated input op from above.
71
+ limit_op_copy._input_dependencies = [new_input_into_limit]
72
+ new_input_into_limit._output_dependencies = [limit_op_copy]
73
+
74
+ # Build the chain of operator dependencies between the new
75
+ # input and the Limit operator, using copies of traversed operators.
76
+ ops_between_new_input_and_limit.append(limit_op_copy)
77
+ for idx in range(len(ops_between_new_input_and_limit) - 1):
78
+ curr_op, up_op = (
79
+ ops_between_new_input_and_limit[idx],
80
+ ops_between_new_input_and_limit[idx + 1],
81
+ )
82
+ curr_op._input_dependencies = [up_op]
83
+ up_op._output_dependencies = [curr_op]
84
+ # Add the copied operator to the list of nodes to be traversed.
85
+ nodes.append(curr_op)
86
+
87
+ # Link the Limit operator to its new input operator.
88
+ for limit_output_op in current_op.output_dependencies:
89
+ limit_output_op._input_dependencies = [
90
+ ops_between_new_input_and_limit[0]
91
+ ]
92
+ last_op = ops_between_new_input_and_limit[0]
93
+ last_op._output_dependencies = current_op.output_dependencies
94
+
95
+ return current_op
96
+
97
+ def _apply_limit_fusion(self, op: LogicalOperator) -> LogicalOperator:
98
+ """Given a DAG of LogicalOperators, traverse the DAG and fuse all
99
+ back-to-back Limit operators, i.e.
100
+ Limit[n] -> Limit[m] becomes Limit[min(n, m)].
101
+
102
+ Returns a new LogicalOperator with the Limit operators fusion applied."""
103
+
104
+ # Post-order traversal.
105
+ nodes: Iterable[LogicalOperator] = deque()
106
+ for node in op.post_order_iter():
107
+ nodes.appendleft(node)
108
+
109
+ while len(nodes) > 0:
110
+ current_op = nodes.pop()
111
+
112
+ # If we encounter two back-to-back Limit operators, fuse them.
113
+ if isinstance(current_op, Limit):
114
+ upstream_op = current_op.input_dependency
115
+ if isinstance(upstream_op, Limit):
116
+ new_limit = min(current_op._limit, upstream_op._limit)
117
+ fused_limit_op = Limit(upstream_op.input_dependency, new_limit)
118
+
119
+ # Link the fused Limit operator to its input and output ops, i.e.:
120
+ # `upstream_input -> limit_upstream -> limit_downstream -> downstream_output` # noqa: E501
121
+ # becomes `upstream_input -> fused_limit -> downstream_output`
122
+ fused_limit_op._input_dependencies = upstream_op.input_dependencies
123
+ fused_limit_op._output_dependencies = current_op.output_dependencies
124
+
125
+ # Replace occurrences of the upstream Limit operator in
126
+ # output_dependencies with the newly fused Limit operator.
127
+ upstream_input = upstream_op.input_dependency
128
+ upstream_input._output_dependencies = [fused_limit_op]
129
+
130
+ for current_output in current_op.output_dependencies:
131
+ current_output._input_dependencies = [fused_limit_op]
132
+ nodes.append(fused_limit_op)
133
+ return current_op
.venv/lib/python3.11/site-packages/ray/data/_internal/logical/rules/operator_fusion.py ADDED
@@ -0,0 +1,491 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import itertools
2
+ from typing import List, Optional, Tuple
3
+
4
+ from ray.data._internal.compute import (
5
+ ActorPoolStrategy,
6
+ ComputeStrategy,
7
+ TaskPoolStrategy,
8
+ )
9
+ from ray.data._internal.execution.interfaces import (
10
+ PhysicalOperator,
11
+ RefBundle,
12
+ TaskContext,
13
+ )
14
+ from ray.data._internal.execution.operators.actor_pool_map_operator import (
15
+ ActorPoolMapOperator,
16
+ )
17
+ from ray.data._internal.execution.operators.base_physical_operator import (
18
+ AllToAllOperator,
19
+ )
20
+ from ray.data._internal.execution.operators.map_operator import MapOperator
21
+ from ray.data._internal.execution.operators.task_pool_map_operator import (
22
+ TaskPoolMapOperator,
23
+ )
24
+ from ray.data._internal.logical.interfaces import PhysicalPlan, Rule
25
+ from ray.data._internal.logical.operators.all_to_all_operator import (
26
+ AbstractAllToAll,
27
+ RandomShuffle,
28
+ Repartition,
29
+ )
30
+ from ray.data._internal.logical.operators.map_operator import (
31
+ AbstractMap,
32
+ AbstractUDFMap,
33
+ )
34
+ from ray.data._internal.stats import StatsDict
35
+ from ray.data.context import DataContext
36
+
37
+ # Scheduling strategy can be inherited from upstream operator if not specified.
38
+ INHERITABLE_REMOTE_ARGS = ["scheduling_strategy"]
39
+
40
+
41
+ class OperatorFusionRule(Rule):
42
+ """Fuses linear chains of compatible physical operators."""
43
+
44
+ def apply(self, plan: PhysicalPlan) -> PhysicalPlan:
45
+ self._op_map = plan.op_map.copy()
46
+ # Do DFS fusion on compatible pairwise operators in two passes.
47
+ # In the first pass, only fuse back-to-back map operators together.
48
+ fused_dag = self._fuse_map_operators_in_dag(plan.dag)
49
+
50
+ # Now that we have fused together all back-to-back map operators,
51
+ # we fuse together MapOperator -> AllToAllOperator pairs.
52
+ fused_dag = self._fuse_all_to_all_operators_in_dag(fused_dag)
53
+
54
+ # Update output dependencies after fusion.
55
+ # TODO(hchen): Instead of updating the depdencies manually,
56
+ # we need a better abstraction for manipulating the DAG.
57
+ self._remove_output_depes(fused_dag)
58
+ self._update_output_depes(fused_dag)
59
+
60
+ new_plan = PhysicalPlan(fused_dag, self._op_map, plan.context)
61
+ return new_plan
62
+
63
+ def _remove_output_depes(self, op: PhysicalOperator) -> None:
64
+ for input in op._input_dependencies:
65
+ input._output_dependencies = []
66
+ self._remove_output_depes(input)
67
+
68
+ def _update_output_depes(self, op: PhysicalOperator) -> None:
69
+ for input in op._input_dependencies:
70
+ input._output_dependencies.append(op)
71
+ self._update_output_depes(input)
72
+
73
+ def _fuse_map_operators_in_dag(self, dag: PhysicalOperator) -> MapOperator:
74
+ """Starting at the given operator, traverses up the DAG of operators
75
+ and recursively fuses compatible MapOperator -> MapOperator pairs.
76
+ Returns the current (root) operator after completing upstream operator fusions.
77
+ """
78
+ upstream_ops = dag.input_dependencies
79
+ while (
80
+ len(upstream_ops) == 1
81
+ and isinstance(dag, MapOperator)
82
+ and isinstance(upstream_ops[0], MapOperator)
83
+ and self._can_fuse(dag, upstream_ops[0])
84
+ ):
85
+ # Fuse operator with its upstream op.
86
+ dag = self._get_fused_map_operator(dag, upstream_ops[0])
87
+ upstream_ops = dag.input_dependencies
88
+
89
+ # Done fusing back-to-back map operators together here,
90
+ # move up the DAG to find the next map operators to fuse.
91
+ dag._input_dependencies = [
92
+ self._fuse_map_operators_in_dag(upstream_op) for upstream_op in upstream_ops
93
+ ]
94
+ return dag
95
+
96
+ def _fuse_all_to_all_operators_in_dag(
97
+ self, dag: AllToAllOperator
98
+ ) -> AllToAllOperator:
99
+ """Starting at the given operator, traverses up the DAG of operators
100
+ and recursively fuses compatible MapOperator -> AllToAllOperator pairs.
101
+
102
+ Also, sets the target block size of the immediately upstream map op to
103
+ match the shuffle block size. We use a larger block size for shuffles
104
+ because tiny blocks are bad for I/O performance.
105
+
106
+ Returns the current (root) operator after completing upstream operator fusions.
107
+ """
108
+ upstream_ops = dag.input_dependencies
109
+ while (
110
+ len(upstream_ops) == 1
111
+ and isinstance(dag, AllToAllOperator)
112
+ and isinstance(upstream_ops[0], MapOperator)
113
+ and self._can_fuse(dag, upstream_ops[0])
114
+ ):
115
+ # Fuse operator with its upstream op.
116
+ dag = self._get_fused_all_to_all_operator(dag, upstream_ops[0])
117
+ upstream_ops = dag.input_dependencies
118
+
119
+ # Done fusing MapOperator -> AllToAllOperator together here,
120
+ # move up the DAG to find the next pair of operators to fuse.
121
+ dag._input_dependencies = [
122
+ self._fuse_all_to_all_operators_in_dag(upstream_op)
123
+ for upstream_op in upstream_ops
124
+ ]
125
+ return dag
126
+
127
+ def _can_fuse(self, down_op: PhysicalOperator, up_op: PhysicalOperator) -> bool:
128
+ """Returns whether the provided downstream operator can be fused with the given
129
+ upstream operator.
130
+
131
+ We currently support fusing two operators if the following are all true:
132
+ * We are fusing either MapOperator -> MapOperator or
133
+ MapOperator -> AllToAllOperator.
134
+ * They either use the same compute configuration, or the upstream operator
135
+ uses a task pool while the downstream operator uses an actor pool.
136
+ * If both operators involve callable classes, the callable classes are
137
+ the same class AND constructor args are the same for both.
138
+ * They have compatible remote arguments.
139
+ """
140
+ if not up_op.supports_fusion() or not down_op.supports_fusion():
141
+ return False
142
+
143
+ # We currently only support fusing for the following cases:
144
+ # - TaskPoolMapOperator -> TaskPoolMapOperator/ActorPoolMapOperator
145
+ # - TaskPoolMapOperator -> AllToAllOperator
146
+ # (only RandomShuffle and Repartition LogicalOperators are currently supported)
147
+ if not (
148
+ (
149
+ isinstance(up_op, TaskPoolMapOperator)
150
+ and isinstance(down_op, (TaskPoolMapOperator, ActorPoolMapOperator))
151
+ )
152
+ or (
153
+ isinstance(up_op, TaskPoolMapOperator)
154
+ and isinstance(down_op, AllToAllOperator)
155
+ )
156
+ ):
157
+ return False
158
+
159
+ down_logical_op = self._op_map[down_op]
160
+ up_logical_op = self._op_map[up_op]
161
+
162
+ if up_op.get_additional_split_factor() > 1:
163
+ return False
164
+
165
+ # If the downstream operator takes no input, it cannot be fused with
166
+ # the upstream operator.
167
+ if not down_logical_op._input_dependencies:
168
+ return False
169
+
170
+ # We currently only support fusing for the following cases:
171
+ # - AbstractMap -> AbstractMap
172
+ # - AbstractMap -> RandomShuffle
173
+ # - AbstractMap -> Repartition (shuffle=True)
174
+ if not (
175
+ (
176
+ isinstance(up_logical_op, AbstractMap)
177
+ and isinstance(down_logical_op, AbstractMap)
178
+ )
179
+ or (
180
+ isinstance(up_logical_op, AbstractMap)
181
+ and isinstance(down_logical_op, RandomShuffle)
182
+ )
183
+ or (
184
+ isinstance(up_logical_op, AbstractMap)
185
+ and isinstance(down_logical_op, Repartition)
186
+ )
187
+ ):
188
+ return False
189
+
190
+ # Do not fuse Repartition operator if shuffle is disabled
191
+ # (i.e. using split shuffle).
192
+ if isinstance(down_logical_op, Repartition) and not down_logical_op._shuffle:
193
+ return False
194
+
195
+ if isinstance(down_logical_op, AbstractMap) and isinstance(
196
+ up_logical_op, AbstractMap
197
+ ):
198
+ if (
199
+ self._fuse_compute_strategy(
200
+ up_logical_op._compute,
201
+ down_logical_op._compute,
202
+ )
203
+ is None
204
+ ):
205
+ return False
206
+
207
+ # Only fuse if the ops' remote arguments are compatible.
208
+ if not _are_remote_args_compatible(
209
+ getattr(up_logical_op, "_ray_remote_args", {}),
210
+ getattr(down_logical_op, "_ray_remote_args", {}),
211
+ ):
212
+ return False
213
+
214
+ # Do not fuse if either op specifies a `_ray_remote_args_fn`,
215
+ # since it is not known whether the generated args will be compatible.
216
+ if getattr(up_logical_op, "_ray_remote_args_fn", None) or getattr(
217
+ down_logical_op, "_ray_remote_args_fn", None
218
+ ):
219
+ return False
220
+
221
+ if not self._can_merge_target_max_block_size(
222
+ up_op.target_max_block_size,
223
+ down_op.target_max_block_size,
224
+ up_op.data_context,
225
+ ):
226
+ return False
227
+
228
+ # Otherwise, ops are compatible for fusion.
229
+ return True
230
+
231
+ def _fuse_compute_strategy(
232
+ self, up_compute: ComputeStrategy, down_compute: ComputeStrategy
233
+ ) -> Optional[ComputeStrategy]:
234
+ """Fuse the compute strategies of the upstream and downstream operators.
235
+ Returns None if they are not compatible.
236
+
237
+ Task->Task and Task->Actor are allowed.
238
+ Actor->Actor and Actor->Task are not allowed.
239
+ """
240
+ if isinstance(up_compute, ActorPoolStrategy):
241
+ return None
242
+ assert isinstance(up_compute, TaskPoolStrategy)
243
+ if isinstance(down_compute, TaskPoolStrategy):
244
+ # For Task->Task, the sizes must match.
245
+ if up_compute.size != down_compute.size:
246
+ return None
247
+ return down_compute
248
+ else:
249
+ assert isinstance(down_compute, ActorPoolStrategy)
250
+ # For Task->Actor, if Task's size is set, it must match Actor's max_size.
251
+ if up_compute.size is not None and up_compute.size != down_compute.max_size:
252
+ return None
253
+ return down_compute
254
+
255
+ def _can_merge_target_max_block_size(
256
+ self,
257
+ up_target_max_block_size: Optional[int],
258
+ down_target_max_block_size: Optional[int],
259
+ data_context: DataContext,
260
+ ):
261
+ # If the upstream op overrode the target max block size, only fuse if
262
+ # they are equal.
263
+ if up_target_max_block_size is not None:
264
+ if down_target_max_block_size is None:
265
+ down_target_max_block_size = data_context.target_max_block_size
266
+ if up_target_max_block_size != down_target_max_block_size:
267
+ return False
268
+ return True
269
+
270
+ def _get_merged_target_max_block_size(
271
+ self,
272
+ up_target_max_block_size: Optional[int],
273
+ down_target_max_block_size: Optional[int],
274
+ ):
275
+ if up_target_max_block_size is not None:
276
+ # If the upstream op overrode the target max block size, we can
277
+ # only merge if the downstream op matches or uses the default.
278
+ assert (
279
+ down_target_max_block_size is None
280
+ or down_target_max_block_size == up_target_max_block_size
281
+ )
282
+ return up_target_max_block_size
283
+ else:
284
+ # Upstream op inherits the downstream op's target max block size,
285
+ # because the downstream op is the one that outputs the final
286
+ # blocks.
287
+ return down_target_max_block_size
288
+
289
+ def _get_fused_map_operator(
290
+ self, down_op: MapOperator, up_op: MapOperator
291
+ ) -> MapOperator:
292
+ assert self._can_fuse(down_op, up_op), (
293
+ "Current rule supports fusing MapOperator->MapOperator, but received: "
294
+ f"{type(up_op).__name__} -> {type(down_op).__name__}"
295
+ )
296
+
297
+ # Fuse operator names.
298
+ name = up_op.name + "->" + down_op.name
299
+
300
+ down_logical_op = self._op_map.pop(down_op)
301
+ up_logical_op = self._op_map.pop(up_op)
302
+ assert isinstance(down_logical_op, AbstractMap)
303
+ assert isinstance(up_logical_op, AbstractMap)
304
+
305
+ # Merge minimum block sizes.
306
+ down_min_rows_per_bundled_input = down_logical_op._min_rows_per_bundled_input
307
+ up_min_rows_per_bundled_input = up_logical_op._min_rows_per_bundled_input
308
+ if (
309
+ down_min_rows_per_bundled_input is not None
310
+ and up_min_rows_per_bundled_input is not None
311
+ ):
312
+ min_rows_per_bundled_input = max(
313
+ down_min_rows_per_bundled_input, up_min_rows_per_bundled_input
314
+ )
315
+ elif up_min_rows_per_bundled_input is not None:
316
+ min_rows_per_bundled_input = up_min_rows_per_bundled_input
317
+ else:
318
+ min_rows_per_bundled_input = down_min_rows_per_bundled_input
319
+
320
+ target_max_block_size = self._get_merged_target_max_block_size(
321
+ up_op.target_max_block_size, down_op.target_max_block_size
322
+ )
323
+
324
+ compute = self._fuse_compute_strategy(
325
+ up_logical_op._compute, down_logical_op._compute
326
+ )
327
+ assert compute is not None
328
+ ray_remote_args = up_logical_op._ray_remote_args
329
+ ray_remote_args_fn = (
330
+ up_logical_op._ray_remote_args_fn or down_logical_op._ray_remote_args_fn
331
+ )
332
+ # Make the upstream operator's inputs the new, fused operator's inputs.
333
+ input_deps = up_op.input_dependencies
334
+ assert len(input_deps) == 1
335
+ input_op = input_deps[0]
336
+
337
+ # Fused physical map operator.
338
+ assert up_op.data_context is down_op.data_context
339
+ op = MapOperator.create(
340
+ up_op.get_map_transformer().fuse(down_op.get_map_transformer()),
341
+ input_op,
342
+ up_op.data_context,
343
+ target_max_block_size=target_max_block_size,
344
+ name=name,
345
+ compute_strategy=compute,
346
+ min_rows_per_bundle=min_rows_per_bundled_input,
347
+ ray_remote_args=ray_remote_args,
348
+ ray_remote_args_fn=ray_remote_args_fn,
349
+ )
350
+ op.set_logical_operators(*up_op._logical_operators, *down_op._logical_operators)
351
+ for map_task_kwargs_fn in itertools.chain(
352
+ up_op._map_task_kwargs_fns, down_op._map_task_kwargs_fns
353
+ ):
354
+ op.add_map_task_kwargs_fn(map_task_kwargs_fn)
355
+
356
+ # Build a map logical operator to be used as a reference for further fusion.
357
+ # TODO(Scott): This is hacky, remove this once we push fusion to be purely based
358
+ # on a lower-level operator spec.
359
+ if isinstance(up_logical_op, AbstractUDFMap):
360
+ input_op = up_logical_op.input_dependency
361
+ else:
362
+ # Bottom out at the source logical op (e.g. Read()).
363
+ input_op = up_logical_op
364
+ if isinstance(down_logical_op, AbstractUDFMap):
365
+ logical_op = AbstractUDFMap(
366
+ name,
367
+ input_op,
368
+ down_logical_op._fn,
369
+ down_logical_op._fn_args,
370
+ down_logical_op._fn_kwargs,
371
+ down_logical_op._fn_constructor_args,
372
+ down_logical_op._fn_constructor_kwargs,
373
+ min_rows_per_bundled_input,
374
+ compute,
375
+ ray_remote_args_fn,
376
+ ray_remote_args,
377
+ )
378
+ else:
379
+ # The downstream op is AbstractMap instead of AbstractUDFMap.
380
+ logical_op = AbstractMap(
381
+ name,
382
+ input_op,
383
+ min_rows_per_bundled_input=min_rows_per_bundled_input,
384
+ ray_remote_args_fn=ray_remote_args_fn,
385
+ ray_remote_args=ray_remote_args,
386
+ )
387
+ self._op_map[op] = logical_op
388
+ # Return the fused physical operator.
389
+ return op
390
+
391
+ def _get_fused_all_to_all_operator(
392
+ self, down_op: AllToAllOperator, up_op: MapOperator
393
+ ) -> AllToAllOperator:
394
+ assert self._can_fuse(down_op, up_op), (
395
+ "Current rule supports fusing MapOperator -> AllToAllOperator"
396
+ f", but received: {type(up_op).__name__} -> {type(down_op).__name__}"
397
+ )
398
+
399
+ # Fuse operator names.
400
+ name = up_op.name + "->" + down_op.name
401
+
402
+ down_logical_op = self._op_map.pop(down_op)
403
+ up_logical_op = self._op_map.pop(up_op)
404
+ assert isinstance(down_logical_op, AbstractAllToAll)
405
+ assert isinstance(up_logical_op, AbstractMap)
406
+
407
+ # Fuse transformation functions.
408
+ ray_remote_args = up_logical_op._ray_remote_args
409
+ down_transform_fn = down_op.get_transformation_fn()
410
+ up_map_transformer = up_op.get_map_transformer()
411
+
412
+ def fused_all_to_all_transform_fn(
413
+ blocks: List[RefBundle], ctx: TaskContext
414
+ ) -> Tuple[List[RefBundle], StatsDict]:
415
+ """To fuse MapOperator->AllToAllOperator, we store the map function
416
+ in the TaskContext so that it may be used by the downstream
417
+ AllToAllOperator's transform function."""
418
+ ctx.upstream_map_transformer = up_map_transformer
419
+ ctx.upstream_map_ray_remote_args = ray_remote_args
420
+ return down_transform_fn(blocks, ctx)
421
+
422
+ # Make the upstream operator's inputs the new, fused operator's inputs.
423
+ input_deps = up_op.input_dependencies
424
+ assert len(input_deps) == 1
425
+ input_op = input_deps[0]
426
+
427
+ target_max_block_size = self._get_merged_target_max_block_size(
428
+ up_op.target_max_block_size, down_op.target_max_block_size
429
+ )
430
+
431
+ assert up_op.data_context is down_op.data_context
432
+ op = AllToAllOperator(
433
+ fused_all_to_all_transform_fn,
434
+ input_op,
435
+ up_op.data_context,
436
+ target_max_block_size=target_max_block_size,
437
+ num_outputs=down_op._num_outputs,
438
+ # Transfer over the existing sub-progress bars from
439
+ # the AllToAllOperator (if any) into the fused operator.
440
+ sub_progress_bar_names=down_op._sub_progress_bar_names,
441
+ name=name,
442
+ )
443
+ # Bottom out at the source logical op (e.g. Read()).
444
+ input_op = up_logical_op
445
+
446
+ if isinstance(down_logical_op, RandomShuffle):
447
+ logical_op = RandomShuffle(
448
+ input_op,
449
+ name=name,
450
+ ray_remote_args=ray_remote_args,
451
+ )
452
+ elif isinstance(down_logical_op, Repartition):
453
+ logical_op = Repartition(
454
+ input_op,
455
+ num_outputs=down_logical_op._num_outputs,
456
+ shuffle=down_logical_op._shuffle,
457
+ )
458
+ self._op_map[op] = logical_op
459
+ # Return the fused physical operator.
460
+ return op
461
+
462
+
463
+ def _are_remote_args_compatible(prev_args, next_args):
464
+ """Check if Ray remote arguments are compatible for merging."""
465
+ prev_args = _canonicalize(prev_args)
466
+ next_args = _canonicalize(next_args)
467
+ remote_args = next_args.copy()
468
+ for key in INHERITABLE_REMOTE_ARGS:
469
+ # NOTE: We only carry over inheritable value in case
470
+ # of it not being provided in the remote args
471
+ if key in prev_args and key not in remote_args:
472
+ remote_args[key] = prev_args[key]
473
+
474
+ if prev_args != remote_args:
475
+ return False
476
+ return True
477
+
478
+
479
+ def _canonicalize(remote_args: dict) -> dict:
480
+ """Returns canonical form of given remote args."""
481
+ remote_args = remote_args.copy()
482
+ if "num_cpus" not in remote_args or remote_args["num_cpus"] is None:
483
+ remote_args["num_cpus"] = 1
484
+ if "num_gpus" not in remote_args or remote_args["num_gpus"] is None:
485
+ remote_args["num_gpus"] = 0
486
+ resources = remote_args.get("resources", {})
487
+ for k, v in list(resources.items()):
488
+ if v is None or v == 0.0:
489
+ del resources[k]
490
+ remote_args["resources"] = resources
491
+ return remote_args
.venv/lib/python3.11/site-packages/ray/data/_internal/logical/rules/set_read_parallelism.py ADDED
@@ -0,0 +1,132 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import logging
2
+ import math
3
+ from typing import Optional, Tuple, Union
4
+
5
+ from ray import available_resources as ray_available_resources
6
+ from ray.data._internal.execution.interfaces import PhysicalOperator
7
+ from ray.data._internal.execution.operators.input_data_buffer import InputDataBuffer
8
+ from ray.data._internal.logical.interfaces import PhysicalPlan, Rule
9
+ from ray.data._internal.logical.operators.read_operator import Read
10
+ from ray.data._internal.util import _autodetect_parallelism
11
+ from ray.data.context import WARN_PREFIX, DataContext
12
+ from ray.data.datasource.datasource import Datasource, Reader
13
+
14
+ logger = logging.getLogger(__name__)
15
+
16
+
17
+ def compute_additional_split_factor(
18
+ datasource_or_legacy_reader: Union[Datasource, Reader],
19
+ parallelism: int,
20
+ mem_size: int,
21
+ target_max_block_size: int,
22
+ cur_additional_split_factor: Optional[int] = None,
23
+ ) -> Tuple[int, str, int, Optional[int]]:
24
+ ctx = DataContext.get_current()
25
+ detected_parallelism, reason, _ = _autodetect_parallelism(
26
+ parallelism, target_max_block_size, ctx, datasource_or_legacy_reader, mem_size
27
+ )
28
+ num_read_tasks = len(
29
+ datasource_or_legacy_reader.get_read_tasks(detected_parallelism)
30
+ )
31
+ expected_block_size = None
32
+ if mem_size:
33
+ expected_block_size = mem_size / num_read_tasks
34
+ logger.debug(
35
+ f"Expected in-memory size {mem_size}," f" block size {expected_block_size}"
36
+ )
37
+ size_based_splits = round(max(1, expected_block_size / target_max_block_size))
38
+ else:
39
+ size_based_splits = 1
40
+ if cur_additional_split_factor:
41
+ size_based_splits *= cur_additional_split_factor
42
+ logger.debug(f"Size based split factor {size_based_splits}")
43
+ estimated_num_blocks = num_read_tasks * size_based_splits
44
+ logger.debug(f"Blocks after size splits {estimated_num_blocks}")
45
+
46
+ available_cpu_slots = ray_available_resources().get("CPU", 1)
47
+ if (
48
+ parallelism != -1
49
+ and num_read_tasks >= available_cpu_slots * 4
50
+ and num_read_tasks >= 5000
51
+ ):
52
+ logger.warning(
53
+ f"{WARN_PREFIX} The requested number of read blocks of {parallelism} "
54
+ "is more than 4x the number of available CPU slots in the cluster of "
55
+ f"{available_cpu_slots}. This can "
56
+ "lead to slowdowns during the data reading phase due to excessive "
57
+ "task creation. Reduce the value to match with the available "
58
+ "CPU slots in the cluster, or set override_num_blocks to -1 for Ray Data "
59
+ "to automatically determine the number of read tasks blocks."
60
+ "You can ignore this message if the cluster is expected to autoscale."
61
+ )
62
+
63
+ # Add more output splitting for each read task if needed.
64
+ # TODO(swang): For parallelism=-1 (user did not explicitly set
65
+ # parallelism), and if the following operator produces much larger blocks,
66
+ # we should scale down the target max block size here instead of using
67
+ # splitting, which can have higher memory usage.
68
+ if estimated_num_blocks < detected_parallelism and estimated_num_blocks > 0:
69
+ k = math.ceil(detected_parallelism / estimated_num_blocks)
70
+ estimated_num_blocks = estimated_num_blocks * k
71
+ return detected_parallelism, reason, estimated_num_blocks, k
72
+
73
+ return detected_parallelism, reason, estimated_num_blocks, None
74
+
75
+
76
+ class SetReadParallelismRule(Rule):
77
+ """
78
+ This rule sets the read op's task parallelism based on the target block
79
+ size, the requested parallelism, the number of read files, and the
80
+ available resources in the cluster.
81
+
82
+ If the parallelism is lower than requested, this rule also sets a split
83
+ factor to split the output blocks of the read task, so that the following
84
+ operator will have the desired parallelism.
85
+ """
86
+
87
+ def apply(self, plan: PhysicalPlan) -> PhysicalPlan:
88
+ ops = [plan.dag]
89
+
90
+ while len(ops) > 0:
91
+ op = ops.pop(0)
92
+ if isinstance(op, InputDataBuffer):
93
+ continue
94
+ logical_op = plan.op_map[op]
95
+ if isinstance(logical_op, Read):
96
+ self._apply(op, logical_op)
97
+ ops += op.input_dependencies
98
+
99
+ return plan
100
+
101
+ def _apply(self, op: PhysicalOperator, logical_op: Read):
102
+ (
103
+ detected_parallelism,
104
+ reason,
105
+ estimated_num_blocks,
106
+ k,
107
+ ) = compute_additional_split_factor(
108
+ logical_op._datasource_or_legacy_reader,
109
+ logical_op._parallelism,
110
+ logical_op._mem_size,
111
+ op.actual_target_max_block_size,
112
+ op._additional_split_factor,
113
+ )
114
+
115
+ if logical_op._parallelism == -1:
116
+ assert reason != ""
117
+ logger.debug(
118
+ f"Using autodetected parallelism={detected_parallelism} "
119
+ f"for operator {logical_op.name} to satisfy {reason}."
120
+ )
121
+ logical_op.set_detected_parallelism(detected_parallelism)
122
+
123
+ if k is not None:
124
+ logger.debug(
125
+ f"To satisfy the requested parallelism of {detected_parallelism}, "
126
+ f"each read task output is split into {k} smaller blocks."
127
+ )
128
+
129
+ if k is not None:
130
+ op.set_additional_split_factor(k)
131
+
132
+ logger.debug(f"Estimated num output blocks {estimated_num_blocks}")
.venv/lib/python3.11/site-packages/ray/data/_internal/logical/rules/zero_copy_map_fusion.py ADDED
@@ -0,0 +1,88 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from abc import abstractmethod
2
+ from typing import List
3
+
4
+ from ray.data._internal.execution.operators.map_operator import MapOperator
5
+ from ray.data._internal.execution.operators.map_transformer import (
6
+ BuildOutputBlocksMapTransformFn,
7
+ MapTransformFn,
8
+ MapTransformFnDataType,
9
+ )
10
+ from ray.data._internal.logical.interfaces.optimizer import Rule
11
+ from ray.data._internal.logical.interfaces.physical_plan import PhysicalPlan
12
+
13
+
14
+ class ZeroCopyMapFusionRule(Rule):
15
+ """Base abstract class for all zero-copy map fusion rules.
16
+
17
+ A zero-copy map fusion rule is a rule that optimizes the transform_fn chain of
18
+ a fused MapOperator. The optimization is usually done by removing unnecessary
19
+ data conversions.
20
+
21
+ This base abstract class defines the common util functions. And subclasses
22
+ should implement the `_optimize` method for the concrete optimization
23
+ strategy.
24
+ """
25
+
26
+ def apply(self, plan: PhysicalPlan) -> PhysicalPlan:
27
+ self._traverse(plan.dag)
28
+ return plan
29
+
30
+ def _traverse(self, op):
31
+ """Traverse the DAG and apply the optimization to each MapOperator."""
32
+ if isinstance(op, MapOperator):
33
+ map_transformer = op.get_map_transformer()
34
+ transform_fns = map_transformer.get_transform_fns()
35
+ new_transform_fns = self._optimize(transform_fns)
36
+ # Physical operators won't be shared,
37
+ # so it's safe to modify the transform_fns in place.
38
+ map_transformer.set_transform_fns(new_transform_fns)
39
+
40
+ for input_op in op.input_dependencies:
41
+ self._traverse(input_op)
42
+
43
+ @abstractmethod
44
+ def _optimize(self, transform_fns: List[MapTransformFn]) -> List[MapTransformFn]:
45
+ """Optimize the transform_fns chain of a MapOperator.
46
+
47
+ Args:
48
+ transform_fns: The old transform_fns chain.
49
+ Returns:
50
+ The optimized transform_fns chain.
51
+ """
52
+ ...
53
+
54
+
55
+ class EliminateBuildOutputBlocks(ZeroCopyMapFusionRule):
56
+ """This rule eliminates unnecessary BuildOutputBlocksMapTransformFn,
57
+ if the previous fn already outputs blocks.
58
+
59
+ This happens for the "Read -> Map/Write" fusion.
60
+ """
61
+
62
+ def _optimize(self, transform_fns: List[MapTransformFn]) -> List[MapTransformFn]:
63
+ # For the following subsquence,
64
+ # 1. Any MapTransformFn with block output.
65
+ # 2. BuildOutputBlocksMapTransformFn
66
+ # 3. Any MapTransformFn with block input.
67
+ # We drop the BuildOutputBlocksMapTransformFn in the middle.
68
+ new_transform_fns = []
69
+
70
+ for i in range(len(transform_fns)):
71
+ cur_fn = transform_fns[i]
72
+ drop = False
73
+ if (
74
+ i > 0
75
+ and i < len(transform_fns) - 1
76
+ and isinstance(cur_fn, BuildOutputBlocksMapTransformFn)
77
+ ):
78
+ prev_fn = transform_fns[i - 1]
79
+ next_fn = transform_fns[i + 1]
80
+ if (
81
+ prev_fn.output_type == MapTransformFnDataType.Block
82
+ and next_fn.input_type == MapTransformFnDataType.Block
83
+ ):
84
+ drop = True
85
+ if not drop:
86
+ new_transform_fns.append(cur_fn)
87
+
88
+ return new_transform_fns
.venv/lib/python3.11/site-packages/ray/data/_internal/logical/util.py ADDED
@@ -0,0 +1,112 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import json
2
+ import re
3
+ import threading
4
+ from typing import Dict
5
+
6
+ from ray._private.usage.usage_lib import TagKey, record_extra_usage_tag
7
+ from ray.data._internal.logical.interfaces import LogicalOperator
8
+ from ray.data._internal.logical.operators.map_operator import AbstractUDFMap
9
+ from ray.data._internal.logical.operators.read_operator import Read
10
+ from ray.data._internal.logical.operators.write_operator import Write
11
+
12
+ # The dictionary for the operator name and count.
13
+ _recorded_operators = dict()
14
+ _recorded_operators_lock = threading.Lock()
15
+
16
+ # The white list of operator names allowed to be recorded.
17
+ _op_name_white_list = [
18
+ # Read
19
+ "ReadBigQuery",
20
+ "ReadRange",
21
+ "ReadMongo",
22
+ "ReadParquet",
23
+ "ReadParquetBulk",
24
+ "ReadImage",
25
+ "ReadJSON",
26
+ "ReadCSV",
27
+ "ReadText",
28
+ "ReadNumpy",
29
+ "ReadTFRecord",
30
+ "ReadBinary",
31
+ "ReadTorch",
32
+ "ReadAvro",
33
+ "ReadWebDataset",
34
+ "ReadSQL",
35
+ "ReadDatabricksUC",
36
+ "ReadLance",
37
+ "ReadHuggingFace",
38
+ "ReadCustom",
39
+ # From
40
+ "FromArrow",
41
+ "FromItems",
42
+ "FromNumpy",
43
+ "FromPandas",
44
+ # Write
45
+ "WriteBigQuery",
46
+ "WriteParquet",
47
+ "WriteJSON",
48
+ "WriteCSV",
49
+ "WriteTFRecord",
50
+ "WriteNumpy",
51
+ "WriteMongo",
52
+ "WriteWebDataset",
53
+ "WriteSQL",
54
+ "WriteCustom",
55
+ # Map
56
+ "Map",
57
+ "MapBatches",
58
+ "Filter",
59
+ "FlatMap",
60
+ # All-to-all
61
+ "RandomizeBlockOrder",
62
+ "RandomShuffle",
63
+ "Repartition",
64
+ "Sort",
65
+ "Aggregate",
66
+ # N-ary
67
+ "Zip",
68
+ "Union",
69
+ ]
70
+
71
+
72
+ def record_operators_usage(op: LogicalOperator):
73
+ """Record logical operator usage with Ray telemetry."""
74
+ ops_dict = dict()
75
+ _collect_operators_to_dict(op, ops_dict)
76
+ ops_json_str = ""
77
+ with _recorded_operators_lock:
78
+ for op, count in ops_dict.items():
79
+ _recorded_operators.setdefault(op, 0)
80
+ _recorded_operators[op] += count
81
+ ops_json_str = json.dumps(_recorded_operators)
82
+
83
+ record_extra_usage_tag(TagKey.DATA_LOGICAL_OPS, ops_json_str)
84
+
85
+
86
+ def _collect_operators_to_dict(op: LogicalOperator, ops_dict: Dict[str, int]):
87
+ """Collect the logical operator name and count into `ops_dict`."""
88
+ for child in op.input_dependencies:
89
+ _collect_operators_to_dict(child, ops_dict)
90
+
91
+ op_name = op.name
92
+
93
+ # Check read and write operator, and anonymize user-defined data source.
94
+ if isinstance(op, Read):
95
+ op_name = f"Read{op._datasource.get_name()}"
96
+ if op_name not in _op_name_white_list:
97
+ op_name = "ReadCustom"
98
+ elif isinstance(op, Write):
99
+ op_name = f"Write{op._datasink_or_legacy_datasource.get_name()}"
100
+ if op_name not in _op_name_white_list:
101
+ op_name = "WriteCustom"
102
+ elif isinstance(op, AbstractUDFMap):
103
+ # Remove the function name from the map operator name.
104
+ # E.g., Map(<lambda>) -> Map
105
+ op_name = re.sub("\\(.*\\)$", "", op_name)
106
+
107
+ # Anonymize any operator name if not in white list.
108
+ if op_name not in _op_name_white_list:
109
+ op_name = "Unknown"
110
+
111
+ ops_dict.setdefault(op_name, 0)
112
+ ops_dict[op_name] += 1
.venv/lib/python3.11/site-packages/ray/data/_internal/planner/__init__.py ADDED
File without changes
.venv/lib/python3.11/site-packages/ray/data/_internal/planner/__pycache__/plan_all_to_all_op.cpython-311.pyc ADDED
Binary file (3.73 kB). View file
 
.venv/lib/python3.11/site-packages/ray/data/_internal/planner/__pycache__/plan_from_arrow_op.cpython-311.pyc ADDED
Binary file (209 Bytes). View file
 
.venv/lib/python3.11/site-packages/ray/data/_internal/planner/__pycache__/plan_from_items_op.cpython-311.pyc ADDED
Binary file (209 Bytes). View file
 
.venv/lib/python3.11/site-packages/ray/data/_internal/planner/__pycache__/plan_from_numpy_op.cpython-311.pyc ADDED
Binary file (209 Bytes). View file
 
.venv/lib/python3.11/site-packages/ray/data/_internal/planner/__pycache__/plan_udf_map_op.cpython-311.pyc ADDED
Binary file (29.5 kB). View file
 
.venv/lib/python3.11/site-packages/ray/data/_internal/planner/__pycache__/planner.cpython-311.pyc ADDED
Binary file (9.39 kB). View file
 
.venv/lib/python3.11/site-packages/ray/data/_internal/planner/__pycache__/random_shuffle.cpython-311.pyc ADDED
Binary file (3.83 kB). View file
 
.venv/lib/python3.11/site-packages/ray/data/_internal/planner/__pycache__/randomize_blocks.cpython-311.pyc ADDED
Binary file (2.43 kB). View file
 
.venv/lib/python3.11/site-packages/ray/data/_internal/planner/__pycache__/repartition.cpython-311.pyc ADDED
Binary file (3.8 kB). View file
 
.venv/lib/python3.11/site-packages/ray/data/_internal/planner/__pycache__/sort.cpython-311.pyc ADDED
Binary file (3.77 kB). View file