Skip to content

Conversation

@slfan1989
Copy link
Contributor

Description

This PR disables resource-based throttling for UnionOperator by overriding the throttling_disabled() method to return True.

Why it's needed:

  • UnionOperator only manipulates bundle metadata (routes RefBundle objects between input/output buffers)
  • It doesn't launch any Ray tasks or consume computational resources (CPU/memory)
  • Currently it gets allocated resources unnecessarily, which is wasteful and impacts scheduling efficiency
  • This aligns UnionOperator with other metadata-only operators (LimitOperator, OutputSplitter, AggregateNumRows) that already have throttling disabled

Related issues

Link related issues: "Fixes #60627", "Closes #60627", or "Related to #60627".

Additional information

Implementation details:

Added throttling_disabled() method in python/ray/data/_internal/execution/operators/union_operator.py:

def throttling_disabled(self) -> bool:
    """Disables resource-based throttling.

    Union operator only manipulates bundle metadata and doesn't launch
    any tasks, so it doesn't need resource allocation.
    """
    return True

Testing:
Added two unit tests in python/ray/data/tests/test_operators.py:

  1. test_union_operator_throttling_disabled() - Verifies that throttling_disabled() returns True
  2. test_union_operator_throttling_matches_similar_operators() - Ensures consistency with LimitOperator and OutputSplitter
UnionOperator only manipulates bundle metadata and doesn't launch
any tasks, so it should not be allocated resources by the resource
manager.

This change overrides throttling_disabled() to return True, following
the same pattern as other metadata-only operators like LimitOperator,
OutputSplitter, and AggregateNumRows.

Changes:
- Add throttling_disabled() method to UnionOperator
- Add unit tests to verify throttling is disabled
- Add test to ensure consistency with similar operators

Signed-off-by: slfan1989 <slfan1989@apache.org>
@slfan1989 slfan1989 requested a review from a team as a code owner January 31, 2026 07:44
Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request correctly disables resource-based throttling for the UnionOperator by overriding the throttling_disabled() method. This is a good optimization, as the operator only manipulates metadata and does not consume computational resources. The change is accompanied by two new unit tests that effectively verify the new behavior and ensure consistency with other similar operators. The implementation is clean and the reasoning is sound. I have one minor suggestion to simplify one of the new tests to improve its clarity and focus.

Comment on lines +227 to +237
ctx = DataContext.get_current()

# Create input buffers with some test data
input_bundles_1 = make_ref_bundles([[1, 2, 3]])
input_bundles_2 = make_ref_bundles([[4, 5, 6]])

input_op1 = InputDataBuffer(ctx, input_bundles_1)
input_op2 = InputDataBuffer(ctx, input_bundles_2)

# Create union operator
union_op = UnionOperator(ctx, input_op1, input_op2)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The setup for this test can be simplified. Since throttling_disabled() is a static property of the operator and doesn't depend on its inputs, we can instantiate UnionOperator without any input operators. This makes the test more focused and easier to read.

Suggested change
ctx = DataContext.get_current()
# Create input buffers with some test data
input_bundles_1 = make_ref_bundles([[1, 2, 3]])
input_bundles_2 = make_ref_bundles([[4, 5, 6]])
input_op1 = InputDataBuffer(ctx, input_bundles_1)
input_op2 = InputDataBuffer(ctx, input_bundles_2)
# Create union operator
union_op = UnionOperator(ctx, input_op1, input_op2)
ctx = DataContext.get_current()
union_op = UnionOperator(ctx)
@ray-gardener ray-gardener bot added data Ray Data-related issues community-contribution Contributed by the community labels Jan 31, 2026
UnionOperator only manipulates bundle metadata and doesn't launch
any tasks, so it should not be allocated resources by the resource
manager.

This change overrides throttling_disabled() to return True, following
the same pattern as other metadata-only operators like LimitOperator,
OutputSplitter, and AggregateNumRows.

Changes:
- Add throttling_disabled() method to UnionOperator
- Add unit tests to verify throttling is disabled
- Add test to ensure consistency with similar operators

Signed-off-by: slfan1989 <slfan1989@apache.org>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

community-contribution Contributed by the community data Ray Data-related issues

1 participant