Skip to content

Conversation

@alexeykudinkin
Copy link
Contributor

@alexeykudinkin alexeykudinkin commented Jan 30, 2026

Description

I’ve realized that for fused Map transforms we’re holding a whole stack of intermediate results (batches) simply due to how yield works in Python:

  • When method yields all of its frame state (local vars) is preserved, pinning all of its intermediate state till the next iteration and not releasing it.

  • This is in contrast with the pure Iterator.__next__ method, returning from which, stack frame with all of its intermediate state is destroyed.

While this is not an issue most of the time, it's a big problem in cases when multiple Maps are fused:

  • With multiple operators & corresponding transformations being fused
  • Intermediate state along with inputs and outputs of each one are pinned until the next iteration
  • Total size of required heap memory scales up proportionally to the # of operators fused (ie more operators more heap)
  • This is exacerbated by the fact that now batch_size is None by default meaning that the whole block is an input and an output substantially increasing memory requirements.

Consider following example:

Generator Chain (Problem)                                                                                                                                                     
                                                                                                                                                                                
  ┌─────────────────────────────────────────────────────────────────────────────┐                                                                                               
  │  Generator A                                                                │                                                                                               
  │  ┌────────────────────────────────────────────────────────────────────────┐ │                                                                                               
  │  │  def transform_a(inputs):                                              │ │                                                                                               
  │  │      for batch in inputs:           ◄─── suspended at yield            │ │                                                                                               
  │  │          result = process(batch)         `batch` PINNED in frame       │ │                                                                                               
  │  │          yield result               ◄─── `result` PINNED in frame      │ │                                                                                               
  │  └────────────────────────────────────────────────────────────────────────┘ │                                                                                               
  │                           │                                                 │                                                                                               
  │                           ▼                                                 │                                                                                               
  │  ┌────────────────────────────────────────────────────────────────────────┐ │                                                                                               
  │  │  def transform_b(inputs):                                              │ │                                                                                               
  │  │      for batch in inputs:           ◄─── suspended at yield            │ │                                                                                               
  │  │          result = process(batch)         `batch` PINNED (output of A)  │ │                                                                                               
  │  │          yield result               ◄─── `result` PINNED in frame      │ │                                                                                               
  │  └────────────────────────────────────────────────────────────────────────┘ │                                                                                               
  │                           │                                                 │                                                                                               
  │                           ▼                                                 │                                                                                               
  │  ┌────────────────────────────────────────────────────────────────────────┐ │                                                                                               
  │  │  def transform_c(inputs):                                              │ │                                                                                               
  │  │      for batch in inputs:           ◄─── suspended at yield            │ │                                                                                               
  │  │          result = process(batch)         `batch` PINNED (output of B)  │ │                                                                                               
  │  │          yield result               ◄─── `result` PINNED in frame      │ │                                                                                               
  │  └────────────────────────────────────────────────────────────────────────┘ │                                                                                               
  │                           │                                                 │                                                                                               
  │                           ▼                                                 │                                                                                               
  │                      to consumer                                            │                                                                                               
  └─────────────────────────────────────────────────────────────────────────────┘                                                                                               
                                                                                                                                                                                
  Memory at yield point:                                                                                                                                                        
  ┌─────────┬─────────┬─────────┬─────────┬─────────┬─────────┐                                                                                                                 
  │ input   │ A.batch │ A.result│ B.batch │ B.result│ C.batch │ ... ALL PINNED                                                                                                  
  └─────────┴─────────┴─────────┴─────────┴─────────┴─────────┘                                                                                                                 
             ═══════════════════════════════════════════════                                                                                                                    
                      Cannot be GC'd until next iteration                                                                                                                       
                                                                                                                                                                                
  Iterator Chain (Solution)                                                                                                                                                     
                                                                                                                                                                                
  ┌─────────────────────────────────────────────────────────────────────────────┐                                                                                               
  │  Iterator A                                                                 │                                                                                               
  │  ┌────────────────────────────────────────────────────────────────────────┐ │                                                                                               
  │  │  def __next__(self):                                                   │ │                                                                                               
  │  │      batch = next(self._input)      # local var                        │ │                                                                                               
  │  │      result = process(batch)        # local var                        │ │                                                                                               
  │  │      return result                  ◄─── method RETURNS                │ │                                                                                               
  │  │                                          locals GO OUT OF SCOPE        │ │                                                                                               
  │  └────────────────────────────────────────────────────────────────────────┘ │                                                                                               
  │                           │                                                 │                                                                                               
  │                           ▼                                                 │                                                                                               
  │  ┌────────────────────────────────────────────────────────────────────────┐ │                                                                                               
  │  │  def __next__(self):                                                   │ │                                                                                               
  │  │      batch = next(self._input)      # local var                        │ │                                                                                               
  │  │      result = process(batch)        # local var                        │ │                                                                                               
  │  │      return result                  ◄─── method RETURNS                │ │                                                                                               
  │  │                                          locals GO OUT OF SCOPE        │ │                                                                                               
  │  └────────────────────────────────────────────────────────────────────────┘ │                                                                                               
  │                           │                                                 │                                                                                               
  │                           ▼                                                 │                                                                                               
  │  ┌────────────────────────────────────────────────────────────────────────┐ │                                                                                               
  │  │  def __next__(self):                                                   │ │                                                                                               
  │  │      batch = next(self._input)      # local var                        │ │                                                                                               
  │  │      result = process(batch)        # local var                        │ │                                                                                               
  │  │      return result                  ◄─── method RETURNS                │ │                                                                                               
  │  │                                          locals GO OUT OF SCOPE        │ │                                                                                               
  │  └────────────────────────────────────────────────────────────────────────┘ │                                                                                               
  │                           │                                                 │                                                                                               
  │                           ▼                                                 │                                                                                               
  │                      to consumer                                            │                                                                                               
  └─────────────────────────────────────────────────────────────────────────────┘                                                                                               
                                                                                                                                                                                
  Memory after return:                                                                                                                                                          
  ┌─────────┬─────────┐                                                                                                                                                         
  │ input   │ output  │  ... ONLY 2 objects pinned                                                                                                                              
  └─────────┴─────────┘                                                                                                                                                         
                                                                                                                                                                                
  All intermediates eligible for GC immediately after each __next__ returns                                                                                                     
                                                                                                                                                                                
  Key Difference                                                                                                                                                                
                                                                                                                                                                                
  GENERATOR                              ITERATOR                                                                                                                               
  ─────────────────────────────────────────────────────────────────                                                                                                             
  yield suspends execution         vs    return completes execution                                                                                                             
  frame stays alive                vs    frame is destroyed                                                                                                                     
  locals pinned until resume       vs    locals released immediately                                                                                                            
                                                                                                                                                                                
             ┌──────────┐                        ┌──────────┐                                                                                                                   
    yield ──►│ SUSPENDED│               return ──►│ COMPLETE │                                                                                                                  
             │  frame   │                        │  frame   │                                                                                                                   
             │  alive   │                        │destroyed │                                                                                                                   
             └──────────┘                        └──────────┘                                                                                                                   
                 │                                    │                                                                                                                         
                 ▼                                    ▼                                                                                                                         
           refs HELD                            refs RELEASED    

Related issues

Link related issues: "Fixes #1234", "Closes #1234", or "Related to #1234".

Additional information

Optional: Add implementation details, API changes, usage examples, screenshots, etc.

@alexeykudinkin alexeykudinkin requested a review from a team as a code owner January 30, 2026 04:17
@alexeykudinkin alexeykudinkin added the go add ONLY when ready to merge, run all tests label Jan 30, 2026
Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request is a solid piece of engineering that refactors the data processing pipeline to replace generator functions with iterator classes. This is a crucial change to prevent potential memory leaks caused by chained generators holding references to intermediate data. The changes are applied consistently across block_batching and map_transformer components. New iterator classes like _BatchingIterator, ShapeBlocksIterator, and _TransformingBatchIterator are introduced to encapsulate the iteration logic previously found in generators. A new test, test_chained_transforms_release_intermediates_between_batches, is added to verify that intermediate object references are correctly released, which is an excellent addition. The overall change is well-executed and improves memory management in Ray Data's critical path.

res = [batch]
out_batch = next(self._cur_output_iter)
except StopIteration:
pass
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

For improved clarity and robustness, it's better to explicitly reset self._cur_output_iter to None and continue the loop when the iterator is exhausted. This makes the state transition explicit and avoids relying on the iterator being overwritten later in the loop.

Suggested change
pass
self._cur_output_iter = None
continue
@alexeykudinkin alexeykudinkin changed the title [WIP][Data] Get rid of generators on the critical path Jan 30, 2026
@alexeykudinkin alexeykudinkin changed the title [Data] Get rid of generators on the critical path Jan 30, 2026
@ray-gardener ray-gardener bot added the data Ray Data-related issues label Jan 30, 2026
@raulchen
Copy link
Contributor

The idea and motivation look reasonable. Have you done any benchmarks on real workloads? E.g., how much memory can we save?

@alexeykudinkin
Copy link
Contributor Author

The idea and motivation look reasonable. Have you done any benchmarks on real workloads? E.g., how much memory can we save?

Not yet. But we can math it out:

Currently we're using per single Map task

block-size (128Mb default) x N (number of fused transformations) 

With this change it will be just the block-size

Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
…potentially pinning these objects in memory until next iteration

Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
… of generators

Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>

# Conflicts:
#	python/ray/data/_internal/execution/operators/map_operator.py
#	python/ray/data/_internal/execution/operators/map_transformer.py

Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Tidying up

Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Copy link

@cursor cursor bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 1 potential issue.

try:
return next(self._input)
finally:
self._transformer._report_udf_time(time.perf_counter() - start)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

UDF timing records time even when exceptions occur

Medium Severity

The new _UDFTimingIterator.__next__ uses a finally block to record UDF time, which means timing is recorded even when next(self._input) raises an exception. The old _udf_timed_iter only recorded timing after a successful next() call (the timing increment came after the next() returned). This changes timing metrics behavior: if UDFs fail repeatedly, the new code accumulates time for each failure while the old code wouldn't record time for failures.

Fix in Cursor Fix in Web

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

data Ray Data-related issues go add ONLY when ready to merge, run all tests

3 participants