-
Notifications
You must be signed in to change notification settings - Fork 7.2k
[Data] Get rid of generators to avoid intermediate state pinning #60598
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request is a solid piece of engineering that refactors the data processing pipeline to replace generator functions with iterator classes. This is a crucial change to prevent potential memory leaks caused by chained generators holding references to intermediate data. The changes are applied consistently across block_batching and map_transformer components. New iterator classes like _BatchingIterator, ShapeBlocksIterator, and _TransformingBatchIterator are introduced to encapsulate the iteration logic previously found in generators. A new test, test_chained_transforms_release_intermediates_between_batches, is added to verify that intermediate object references are correctly released, which is an excellent addition. The overall change is well-executed and improves memory management in Ray Data's critical path.
| res = [batch] | ||
| out_batch = next(self._cur_output_iter) | ||
| except StopIteration: | ||
| pass |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For improved clarity and robustness, it's better to explicitly reset self._cur_output_iter to None and continue the loop when the iterator is exhausted. This makes the state transition explicit and avoids relying on the iterator being overwritten later in the loop.
| pass | |
| self._cur_output_iter = None | |
| continue |
e3e8439 to
44470b4
Compare
|
The idea and motivation look reasonable. Have you done any benchmarks on real workloads? E.g., how much memory can we save? |
Not yet. But we can math it out: Currently we're using per single Map task With this change it will be just the block-size |
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
…potentially pinning these objects in memory until next iteration Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
… of generators Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com> # Conflicts: # python/ray/data/_internal/execution/operators/map_operator.py # python/ray/data/_internal/execution/operators/map_transformer.py Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Tidying up Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
47015a3 to
a817c4c
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cursor Bugbot has reviewed your changes and found 1 potential issue.
| try: | ||
| return next(self._input) | ||
| finally: | ||
| self._transformer._report_udf_time(time.perf_counter() - start) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
UDF timing records time even when exceptions occur
Medium Severity
The new _UDFTimingIterator.__next__ uses a finally block to record UDF time, which means timing is recorded even when next(self._input) raises an exception. The old _udf_timed_iter only recorded timing after a successful next() call (the timing increment came after the next() returned). This changes timing metrics behavior: if UDFs fail repeatedly, the new code accumulates time for each failure while the old code wouldn't record time for failures.


Description
I’ve realized that for fused Map transforms we’re holding a whole stack of intermediate results (batches) simply due to how yield works in Python:
When method yields all of its frame state (local vars) is preserved, pinning all of its intermediate state till the next iteration and not releasing it.
This is in contrast with the pure
Iterator.__next__method, returning from which, stack frame with all of its intermediate state is destroyed.While this is not an issue most of the time, it's a big problem in cases when multiple Maps are fused:
batch_sizeis None by default meaning that the whole block is an input and an output substantially increasing memory requirements.Consider following example:
Related issues
Additional information