Skip to content

Conversation

@goutamvenkat-anyscale
Copy link
Contributor

Description

Ensure that high volume writes are retried for better resiliency.

Tested this with the following script on GCP:

import logging
logging.getLogger("ray._common.retry").setLevel(logging.DEBUG)
logging.basicConfig(level=logging.DEBUG)

ds = ray.data.range(10000).repartition(5000)
ds.write_iceberg(
    table_identifier="my_namespace.my_table",
    catalog_kwargs={
        "uri": "https://biglake.googleapis.com/iceberg/v1/restcatalog",
        "warehouse": "gs://my-bucket,
        "auth": {
            "type": "google",
            "google": {
                "scopes": ["https://www.googleapis.com/auth/cloud-platform"],
            },
        },
        "header.x-goog-user-project": "...",
        "header.X-Iceberg-Access-Delegation": "",
    }
)

Related issues

Link related issues: "Fixes #1234", "Closes #1234", or "Related to #1234".

Additional information

Optional: Add implementation details, API changes, usage examples, screenshots, etc.

Signed-off-by: Goutam <goutam@anyscale.com>
@goutamvenkat-anyscale goutamvenkat-anyscale requested a review from a team as a code owner January 30, 2026 21:06
Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a retry policy for Iceberg storage and catalog write operations to improve resiliency. The changes involve adding new retry configurations to DataContext and wrapping relevant I/O and catalog calls in iceberg_datasink.py with call_with_retry. The implementation is sound and includes a test for storage write retries.

My review focuses on improving code maintainability and test coverage. I've suggested refactoring the duplicated retry logic for catalog operations and adding tests for the newly introduced catalog retry mechanisms.

Comment on lines +1596 to +1648
def test_write_retry_on_transient_error(pyiceberg_table, fast_retry_config):
"""Test that transient errors during file writes trigger retries."""
from unittest.mock import patch

from ray.data._internal.datasource.iceberg_datasink import IcebergDatasink
from ray.data._internal.execution.interfaces import TaskContext

# Create datasink and initialize it
datasink = IcebergDatasink(
table_identifier=f"{_DB_NAME}.{_TABLE_NAME}",
catalog_kwargs=_CATALOG_KWARGS.copy(),
)
datasink.on_write_start()

# Track call count to simulate transient failures
call_count = {"count": 0}

# Import original function before patching
from pyiceberg.io.pyarrow import _dataframe_to_data_files

original_func = _dataframe_to_data_files

def flaky_dataframe_to_data_files(*args, **kwargs):
call_count["count"] += 1
if call_count["count"] <= 2:
# Fail first 2 attempts with a retryable error
raise IOError("TestTransientError: simulated transient failure")
# Succeed on 3rd attempt
return original_func(*args, **kwargs)

# Create test data
data = pa.Table.from_pydict(
{"col_a": [200, 201], "col_b": ["x", "y"], "col_c": [5, 5]},
schema=_SCHEMA,
)

# Patch at pyiceberg module level and call write directly
with patch(
"pyiceberg.io.pyarrow._dataframe_to_data_files",
side_effect=flaky_dataframe_to_data_files,
):
# Call write directly (bypassing Ray workers)
task_ctx = TaskContext(task_idx=0, op_name="Write")
result = datasink.write([data], task_ctx)

# Verify retries occurred (called 3 times: 2 failures + 1 success)
assert (
call_count["count"] == 3
), f"Expected 3 calls (2 retries + 1 success), got {call_count['count']}"

# Verify write result has data files
assert len(result.data_files) > 0, "Expected data files in result"

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This test effectively covers the retry logic for storage writes (_dataframe_to_data_files). However, the PR also adds retry logic for several catalog operations (load_catalog, load_table, commit_transaction, update_schema) which are not covered by tests.

Please add tests to verify the retry mechanism for these catalog operations as well. You could create a new fixture similar to fast_retry_config for catalog retries and then write a test that patches a catalog method to simulate transient failures.

@goutamvenkat-anyscale
Copy link
Contributor Author

/gemini summary

@gemini-code-assist
Copy link
Contributor

Summary of Changes

This pull request significantly improves the fault tolerance of Iceberg data writes within Ray Data. By implementing comprehensive retry policies for both storage and catalog operations, it ensures that transient network issues or service unavailability do not lead to data loss or failed write operations, thereby making data pipelines more reliable and resilient.

Highlights

  • Retry Policy Implementation: Introduced robust retry policies for both Iceberg storage writes and catalog operations to enhance the resiliency of high-volume data ingestion.
  • Configurable Retry Parameters: Added new configurable parameters to DataContext for Iceberg-specific retry behavior, including maximum attempts, maximum backoff time, and a list of retriable error messages for both file writes and catalog interactions.
  • Application of Retry Logic: Integrated the call_with_retry utility into key IcebergDatasink methods, specifically for loading catalogs, loading tables, committing transactions, updating schemas, and writing data files.
  • Test Coverage for File Writes: Added a new test case (test_write_retry_on_transient_error) to verify the retry mechanism for transient errors encountered during Iceberg data file writes.

🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console.

Changelog
  • python/ray/data/_internal/datasource/iceberg_datasink.py
    • Imported call_with_retry and DataContext.
    • Initialized _data_context in the IcebergDatasink constructor.
    • Wrapped calls to catalog.load_catalog, catalog.load_table, txn.commit_transaction, and _table.update_schema with call_with_retry.
    • Applied call_with_retry to the _dataframe_to_data_files function during data file writing.
  • python/ray/data/context.py
    • Defined new default constants for Iceberg write file and catalog retry parameters (e.g., DEFAULT_ICEBERG_WRITE_FILE_MAX_ATTEMPTS, DEFAULT_ICEBERG_CATALOG_RETRIED_ERRORS).
    • Added corresponding fields (iceberg_write_file_max_attempts, iceberg_catalog_retried_errors, etc.) to the DataContext class to allow configuration of these retry behaviors.
  • python/ray/data/tests/datasource/test_iceberg.py
    • Introduced a fast_retry_config pytest fixture to set up DataContext for accelerated retry testing.
    • Added test_write_retry_on_transient_error to validate the retry logic for transient errors during Iceberg data file writes.
Activity
  • The author goutamvenkat-anyscale requested a summary of the pull request.
  • A bot (gemini-code-assist[bot]) suggested refactoring the repeated retry logic for catalog operations into a single helper method to improve maintainability.
  • The bot also recommended adding tests to cover the retry mechanism for Iceberg catalog operations, noting that current tests only cover storage write retries.
Signed-off-by: Goutam <goutam@anyscale.com>
@goutamvenkat-anyscale goutamvenkat-anyscale added data Ray Data-related issues go add ONLY when ready to merge, run all tests labels Jan 30, 2026
Comment on lines +647 to +657
iceberg_write_file_max_attempts: int = DEFAULT_ICEBERG_WRITE_FILE_MAX_ATTEMPTS
iceberg_write_file_retry_max_backoff_s: int = (
DEFAULT_ICEBERG_WRITE_FILE_RETRY_MAX_BACKOFF_S
)
iceberg_catalog_max_attempts: int = DEFAULT_ICEBERG_CATALOG_MAX_ATTEMPTS
iceberg_catalog_retry_max_backoff_s: int = (
DEFAULT_ICEBERG_CATALOG_RETRY_MAX_BACKOFF_S
)
iceberg_catalog_retried_errors: List[str] = field(
default_factory=lambda: list(DEFAULT_ICEBERG_CATALOG_RETRIED_ERRORS)
)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think adding 5 file-format-specific settings to the top-level DataContext might clutter it. Should we encapsulate this in a single dataclass?

DEFAULT_ICEBERG_CATALOG_RETRY_MAX_BACKOFF_S = env_integer(
"RAY_DATA_ICEBERG_CATALOG_RETRY_MAX_BACKOFF_S", 16
)
DEFAULT_ICEBERG_CATALOG_RETRIED_ERRORS = (
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Out-of-scope for this PR maybe, but I think it might confuse users if there's both write_file_retry_on_errors and iceberg_catalog_retried_errors

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

data Ray Data-related issues go add ONLY when ready to merge, run all tests

2 participants