-
Notifications
You must be signed in to change notification settings - Fork 7.2k
[Data][Iceberg] - Add retry policy for Storage + Catalog Writes #60620
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
[Data][Iceberg] - Add retry policy for Storage + Catalog Writes #60620
Conversation
Signed-off-by: Goutam <goutam@anyscale.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request introduces a retry policy for Iceberg storage and catalog write operations to improve resiliency. The changes involve adding new retry configurations to DataContext and wrapping relevant I/O and catalog calls in iceberg_datasink.py with call_with_retry. The implementation is sound and includes a test for storage write retries.
My review focuses on improving code maintainability and test coverage. I've suggested refactoring the duplicated retry logic for catalog operations and adding tests for the newly introduced catalog retry mechanisms.
| def test_write_retry_on_transient_error(pyiceberg_table, fast_retry_config): | ||
| """Test that transient errors during file writes trigger retries.""" | ||
| from unittest.mock import patch | ||
|
|
||
| from ray.data._internal.datasource.iceberg_datasink import IcebergDatasink | ||
| from ray.data._internal.execution.interfaces import TaskContext | ||
|
|
||
| # Create datasink and initialize it | ||
| datasink = IcebergDatasink( | ||
| table_identifier=f"{_DB_NAME}.{_TABLE_NAME}", | ||
| catalog_kwargs=_CATALOG_KWARGS.copy(), | ||
| ) | ||
| datasink.on_write_start() | ||
|
|
||
| # Track call count to simulate transient failures | ||
| call_count = {"count": 0} | ||
|
|
||
| # Import original function before patching | ||
| from pyiceberg.io.pyarrow import _dataframe_to_data_files | ||
|
|
||
| original_func = _dataframe_to_data_files | ||
|
|
||
| def flaky_dataframe_to_data_files(*args, **kwargs): | ||
| call_count["count"] += 1 | ||
| if call_count["count"] <= 2: | ||
| # Fail first 2 attempts with a retryable error | ||
| raise IOError("TestTransientError: simulated transient failure") | ||
| # Succeed on 3rd attempt | ||
| return original_func(*args, **kwargs) | ||
|
|
||
| # Create test data | ||
| data = pa.Table.from_pydict( | ||
| {"col_a": [200, 201], "col_b": ["x", "y"], "col_c": [5, 5]}, | ||
| schema=_SCHEMA, | ||
| ) | ||
|
|
||
| # Patch at pyiceberg module level and call write directly | ||
| with patch( | ||
| "pyiceberg.io.pyarrow._dataframe_to_data_files", | ||
| side_effect=flaky_dataframe_to_data_files, | ||
| ): | ||
| # Call write directly (bypassing Ray workers) | ||
| task_ctx = TaskContext(task_idx=0, op_name="Write") | ||
| result = datasink.write([data], task_ctx) | ||
|
|
||
| # Verify retries occurred (called 3 times: 2 failures + 1 success) | ||
| assert ( | ||
| call_count["count"] == 3 | ||
| ), f"Expected 3 calls (2 retries + 1 success), got {call_count['count']}" | ||
|
|
||
| # Verify write result has data files | ||
| assert len(result.data_files) > 0, "Expected data files in result" | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test effectively covers the retry logic for storage writes (_dataframe_to_data_files). However, the PR also adds retry logic for several catalog operations (load_catalog, load_table, commit_transaction, update_schema) which are not covered by tests.
Please add tests to verify the retry mechanism for these catalog operations as well. You could create a new fixture similar to fast_retry_config for catalog retries and then write a test that patches a catalog method to simulate transient failures.
|
/gemini summary |
Summary of ChangesThis pull request significantly improves the fault tolerance of Iceberg data writes within Ray Data. By implementing comprehensive retry policies for both storage and catalog operations, it ensures that transient network issues or service unavailability do not lead to data loss or failed write operations, thereby making data pipelines more reliable and resilient. Highlights
🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Changelog
Activity
|
| iceberg_write_file_max_attempts: int = DEFAULT_ICEBERG_WRITE_FILE_MAX_ATTEMPTS | ||
| iceberg_write_file_retry_max_backoff_s: int = ( | ||
| DEFAULT_ICEBERG_WRITE_FILE_RETRY_MAX_BACKOFF_S | ||
| ) | ||
| iceberg_catalog_max_attempts: int = DEFAULT_ICEBERG_CATALOG_MAX_ATTEMPTS | ||
| iceberg_catalog_retry_max_backoff_s: int = ( | ||
| DEFAULT_ICEBERG_CATALOG_RETRY_MAX_BACKOFF_S | ||
| ) | ||
| iceberg_catalog_retried_errors: List[str] = field( | ||
| default_factory=lambda: list(DEFAULT_ICEBERG_CATALOG_RETRIED_ERRORS) | ||
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think adding 5 file-format-specific settings to the top-level DataContext might clutter it. Should we encapsulate this in a single dataclass?
| DEFAULT_ICEBERG_CATALOG_RETRY_MAX_BACKOFF_S = env_integer( | ||
| "RAY_DATA_ICEBERG_CATALOG_RETRY_MAX_BACKOFF_S", 16 | ||
| ) | ||
| DEFAULT_ICEBERG_CATALOG_RETRIED_ERRORS = ( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Out-of-scope for this PR maybe, but I think it might confuse users if there's both write_file_retry_on_errors and iceberg_catalog_retried_errors
Description
Ensure that high volume writes are retried for better resiliency.
Tested this with the following script on GCP:
Related issues
Additional information