-
Notifications
You must be signed in to change notification settings - Fork 7.2k
[Data] Disable throttling for UnionOperator #60631
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
[Data] Disable throttling for UnionOperator #60631
Conversation
UnionOperator only manipulates bundle metadata and doesn't launch any tasks, so it should not be allocated resources by the resource manager. This change overrides throttling_disabled() to return True, following the same pattern as other metadata-only operators like LimitOperator, OutputSplitter, and AggregateNumRows. Changes: - Add throttling_disabled() method to UnionOperator - Add unit tests to verify throttling is disabled - Add test to ensure consistency with similar operators Signed-off-by: slfan1989 <slfan1989@apache.org>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request correctly disables resource-based throttling for the UnionOperator by overriding the throttling_disabled() method. This is a good optimization, as the operator only manipulates metadata and does not consume computational resources. The change is accompanied by two new unit tests that effectively verify the new behavior and ensure consistency with other similar operators. The implementation is clean and the reasoning is sound. I have one minor suggestion to simplify one of the new tests to improve its clarity and focus.
| ctx = DataContext.get_current() | ||
|
|
||
| # Create input buffers with some test data | ||
| input_bundles_1 = make_ref_bundles([[1, 2, 3]]) | ||
| input_bundles_2 = make_ref_bundles([[4, 5, 6]]) | ||
|
|
||
| input_op1 = InputDataBuffer(ctx, input_bundles_1) | ||
| input_op2 = InputDataBuffer(ctx, input_bundles_2) | ||
|
|
||
| # Create union operator | ||
| union_op = UnionOperator(ctx, input_op1, input_op2) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The setup for this test can be simplified. Since throttling_disabled() is a static property of the operator and doesn't depend on its inputs, we can instantiate UnionOperator without any input operators. This makes the test more focused and easier to read.
| ctx = DataContext.get_current() | |
| # Create input buffers with some test data | |
| input_bundles_1 = make_ref_bundles([[1, 2, 3]]) | |
| input_bundles_2 = make_ref_bundles([[4, 5, 6]]) | |
| input_op1 = InputDataBuffer(ctx, input_bundles_1) | |
| input_op2 = InputDataBuffer(ctx, input_bundles_2) | |
| # Create union operator | |
| union_op = UnionOperator(ctx, input_op1, input_op2) | |
| ctx = DataContext.get_current() | |
| union_op = UnionOperator(ctx) |
UnionOperator only manipulates bundle metadata and doesn't launch any tasks, so it should not be allocated resources by the resource manager. This change overrides throttling_disabled() to return True, following the same pattern as other metadata-only operators like LimitOperator, OutputSplitter, and AggregateNumRows. Changes: - Add throttling_disabled() method to UnionOperator - Add unit tests to verify throttling is disabled - Add test to ensure consistency with similar operators Signed-off-by: slfan1989 <slfan1989@apache.org>
Description
This PR disables resource-based throttling for
UnionOperatorby overriding thethrottling_disabled()method to returnTrue.Why it's needed:
UnionOperatoronly manipulates bundle metadata (routesRefBundleobjects between input/output buffers)UnionOperatorwith other metadata-only operators (LimitOperator,OutputSplitter,AggregateNumRows) that already have throttling disabledRelated issues
Additional information
Implementation details:
Added
throttling_disabled()method inpython/ray/data/_internal/execution/operators/union_operator.py:Testing:
Added two unit tests in python/ray/data/tests/test_operators.py:
test_union_operator_throttling_disabled()- Verifies thatthrottling_disabled()returnsTruetest_union_operator_throttling_matches_similar_operators()- Ensures consistency withLimitOperatorandOutputSplitter