Skip to content

Conversation

@slfan1989
Copy link
Contributor

@slfan1989 slfan1989 commented Jan 31, 2026

Description

Fix a flaky test in test_operator_fusion.py where test_map_fusion_with_concurrency_arg fails intermittently due to non-deterministic block ordering when using concurrency=2.
The test fails when the two data blocks (containing values 0-4 and 5-9) are processed concurrently. Due to race conditions, block 2 may complete before block 1, resulting in output order [5, 6, 7, 8, 9, 0, 1, 2, 3, 4] instead of the expected [0, 1, 2, 3, 4, 5, 6, 7, 8, 9].
Since this test validates operator fusion behavior rather than output ordering, this PR sorts the results before comparison to make the test robust against concurrent execution order variations.

Related issues

Related to flaky test failures in CI when running test_map_fusion_with_concurrency_arg with parameterized concurrency configurations.

https://buildkite.com/ray-project/microcheck/builds/37063/steps/canvas?sid=019c130a-0406-436a-b12f-1f4cb5842e19


[2026-01-31T09:44:06Z]     def test_map_fusion_with_concurrency_arg(
--
[2026-01-31T09:44:06Z]         ray_start_regular_shared_2_cpus,
[2026-01-31T09:44:06Z]         up_use_actor,
[2026-01-31T09:44:06Z]         up_concurrency,
[2026-01-31T09:44:06Z]         down_use_actor,
[2026-01-31T09:44:06Z]         down_concurrency,
[2026-01-31T09:44:06Z]         should_fuse,
[2026-01-31T09:44:06Z]     ):
[2026-01-31T09:44:06Z]         """Test map operator fusion with different concurrency settings."""
[2026-01-31T09:44:06Z]
[2026-01-31T09:44:06Z]         class Map:
[2026-01-31T09:44:06Z]             def __call__(self, row):
[2026-01-31T09:44:06Z]                 return row
[2026-01-31T09:44:06Z]
[2026-01-31T09:44:06Z]         def map(row):
[2026-01-31T09:44:06Z]             return row
[2026-01-31T09:44:06Z]
[2026-01-31T09:44:06Z]         ds = ray.data.range(10, override_num_blocks=2)
[2026-01-31T09:44:06Z]         if not up_use_actor:
[2026-01-31T09:44:06Z]             ds = ds.map(map, num_cpus=0, concurrency=up_concurrency)
[2026-01-31T09:44:06Z]             up_name = "Map(map)"
[2026-01-31T09:44:06Z]         else:
[2026-01-31T09:44:06Z]             ds = ds.map(Map, num_cpus=0, concurrency=up_concurrency)
[2026-01-31T09:44:06Z]             up_name = "Map(Map)"
[2026-01-31T09:44:06Z]
[2026-01-31T09:44:06Z]         if not down_use_actor:
[2026-01-31T09:44:06Z]             ds = ds.map(map, num_cpus=0, concurrency=down_concurrency)
[2026-01-31T09:44:06Z]             down_name = "Map(map)"
[2026-01-31T09:44:06Z]         else:
[2026-01-31T09:44:06Z]             ds = ds.map(Map, num_cpus=0, concurrency=down_concurrency)
[2026-01-31T09:44:06Z]             down_name = "Map(Map)"
[2026-01-31T09:44:06Z]
[2026-01-31T09:44:06Z] >       assert extract_values("id", ds.take_all()) == list(range(10))
[2026-01-31T09:44:06Z] E       assert [5, 6, 7, 8, 9, 0, ...] == [0, 1, 2, 3, 4, 5, ...]
[2026-01-31T09:44:06Z] E         At index 0 diff: 5 != 0
[2026-01-31T09:44:06Z] E         Full diff:
[2026-01-31T09:44:06Z] E         - [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
[2026-01-31T09:44:06Z] E         + [5, 6, 7, 8, 9, 0, 1, 2, 3, 4]
[2026-01-31T09:44:06Z]

Additional information

Test failure example:

assert extract_values("id", ds.take_all()) == list(range(10))
AssertionError:
assert [5, 6, 7, 8, 9, 0, 1, 2, 3, 4] == [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

Change:

# Before
assert extract_values("id", ds.take_all()) == list(range(10))

# After  
assert sorted(extract_values("id", ds.take_all())) == list(range(10))

The test still validates the core functionality (operator fusion logic with different concurrency settings) while being resilient to non-deterministic execution ordering.

…n-deterministic ordering

The test `test_map_fusion_with_concurrency_arg` fails intermittently when
concurrency=2 is used with 2 blocks. The two blocks are processed concurrently,
and there's no guarantee about the order they complete. Block 2 (values 5-9)
can complete before block 1 (values 0-4), resulting in [5,6,7,8,9,0,1,2,3,4]
instead of [0,1,2,3,4,5,6,7,8,9].

Since this test validates operator fusion behavior rather than output ordering,
sort the results before comparison to make it robust against concurrent
execution order variations.

Signed-off-by: slfan1989 <slfan1989@apache.org>
@slfan1989 slfan1989 requested a review from a team as a code owner January 31, 2026 14:41
Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request fixes a flaky test test_map_fusion_with_concurrency_arg by sorting the output before making an assertion. This correctly handles the non-deterministic block ordering from concurrent execution. My review includes a suggestion to use a set comparison instead of sorted, which would align with the pattern used in other tests in this file for order-independent comparisons.

Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
Signed-off-by: slfan1989 <55643692+slfan1989@users.noreply.github.com>
Copy link

@cursor cursor bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 1 potential issue.

down_name = "Map(Map)"

assert extract_values("id", ds.take_all()) == list(range(10))
assert set(extract_values("id", ds.take_all())) == set(range(10))
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using set instead of sorted loses duplicate detection

Low Severity

The test uses set() for comparison instead of sorted() as stated in the PR description. Using set() cannot detect duplicate values in the output - if concurrency bugs caused the same item to be processed twice (e.g., returning 11 values with one duplicate), the set() comparison would still pass while sorted() would correctly fail. This weakens the test's ability to catch real concurrency-related bugs in the map fusion logic.

Fix in Cursor Fix in Web

@ray-gardener ray-gardener bot added data Ray Data-related issues community-contribution Contributed by the community labels Jan 31, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

community-contribution Contributed by the community data Ray Data-related issues

1 participant