Skip to content

feat: Add ability to specify RetryOptions and BigQueryRetryConfig when create job and waitFor #3398

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
Jul 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -561,7 +561,7 @@ public static JobListOption fields(JobField... fields) {
/** Class for specifying table get and create options. */
class JobOption extends Option {

private static final long serialVersionUID = -3111736712316353665L;
private static final long serialVersionUID = -3111736712316353664L;

private JobOption(BigQueryRpc.Option option, Object value) {
super(option, value);
Expand All @@ -578,6 +578,16 @@ public static JobOption fields(JobField... fields) {
return new JobOption(
BigQueryRpc.Option.FIELDS, Helper.selector(JobField.REQUIRED_FIELDS, fields));
}

/** Returns an option to specify the job's BigQuery retry configuration. */
public static JobOption bigQueryRetryConfig(BigQueryRetryConfig bigQueryRetryConfig) {
return new JobOption(BigQueryRpc.Option.BIGQUERY_RETRY_CONFIG, bigQueryRetryConfig);
}

/** Returns an option to specify the job's retry options. */
public static JobOption retryOptions(RetryOption... options) {
return new JobOption(BigQueryRpc.Option.RETRY_OPTIONS, options);
}
}

/** Class for specifying query results options. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import com.google.cloud.Policy;
import com.google.cloud.RetryHelper;
import com.google.cloud.RetryHelper.RetryHelperException;
import com.google.cloud.RetryOption;
import com.google.cloud.Tuple;
import com.google.cloud.bigquery.InsertAllRequest.RowToInsert;
import com.google.cloud.bigquery.QueryJobConfiguration.JobCreationMode;
Expand Down Expand Up @@ -415,10 +416,15 @@ public com.google.api.services.bigquery.model.Job call() {
}
}
},
getOptions().getRetrySettings(),
getRetryOptions(optionsMap) != null
? RetryOption.mergeToSettings(
getOptions().getRetrySettings(), getRetryOptions(optionsMap))
: getOptions().getRetrySettings(),
BigQueryBaseService.BIGQUERY_EXCEPTION_HANDLER,
getOptions().getClock(),
DEFAULT_RETRY_CONFIG));
getBigQueryRetryConfig(optionsMap) != null
? getBigQueryRetryConfig(optionsMap)
: DEFAULT_RETRY_CONFIG));
} catch (BigQueryRetryHelper.BigQueryRetryHelperException e) {
throw BigQueryException.translateAndThrow(e);
}
Expand Down Expand Up @@ -1628,4 +1634,13 @@ public com.google.api.services.bigquery.model.TestIamPermissionsResponse call()
}
return optionMap;
}

static BigQueryRetryConfig getBigQueryRetryConfig(Map<BigQueryRpc.Option, ?> options) {
return (BigQueryRetryConfig)
options.getOrDefault(BigQueryRpc.Option.BIGQUERY_RETRY_CONFIG, null);
}

static RetryOption[] getRetryOptions(Map<BigQueryRpc.Option, ?> options) {
return (RetryOption[]) options.getOrDefault(BigQueryRpc.Option.RETRY_OPTIONS, null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -196,12 +196,21 @@ public boolean isDone() {
Job job = bigquery.getJob(getJobId(), JobOption.fields(BigQuery.JobField.STATUS));
return job == null || JobStatus.State.DONE.equals(job.getStatus().getState());
}

/** See {@link #waitFor(BigQueryRetryConfig, RetryOption...)} */
public Job waitFor(RetryOption... waitOptions) throws InterruptedException {
return waitForInternal(DEFAULT_RETRY_CONFIG, waitOptions);
}

/**
* Blocks until this job completes its execution, either failing or succeeding. This method
* returns current job's latest information. If the job no longer exists, this method returns
* {@code null}. By default, the job status is checked using jittered exponential backoff with 1
* second as an initial delay, 2.0 as a backoff factor, 1 minute as maximum delay between polls,
* 12 hours as a total timeout and unlimited number of attempts.
* 12 hours as a total timeout and unlimited number of attempts. For query jobs, the job status
* check can be configured to retry on specific BigQuery error messages using {@link
* BigQueryRetryConfig}. This {@link BigQueryRetryConfig} configuration is not available for
* non-query jobs.
*
* <p>Example usage of {@code waitFor()}.
*
Expand Down Expand Up @@ -232,18 +241,46 @@ public boolean isDone() {
* }
* }</pre>
*
* <p>Example usage of {@code waitFor()} with BigQuery retry configuration to retry on rate limit
* exceeded error messages for query jobs.
*
* <pre>{@code
* Job completedJob =
* job.waitFor(
* BigQueryRetryConfig.newBuilder()
* .retryOnMessage(BigQueryErrorMessages.RATE_LIMIT_EXCEEDED_MSG)
* .retryOnMessage(BigQueryErrorMessages.JOB_RATE_LIMIT_EXCEEDED_MSG)
* .retryOnRegEx(BigQueryErrorMessages.RetryRegExPatterns.RATE_LIMIT_EXCEEDED_REGEX)
* .build());
* if (completedJob == null) {
* // job no longer exists
* } else if (completedJob.getStatus().getError() != null) {
* // job failed, handle error
* } else {
* // job completed successfully
* }
* }</pre>
*
* @param bigQueryRetryConfig configures retries for query jobs for BigQuery failures
* @param waitOptions options to configure checking period and timeout
* @throws BigQueryException upon failure, check {@link BigQueryException#getCause()} for details
* @throws InterruptedException if the current thread gets interrupted while waiting for the job
* to complete
*/
public Job waitFor(RetryOption... waitOptions) throws InterruptedException {
public Job waitFor(BigQueryRetryConfig bigQueryRetryConfig, RetryOption... waitOptions)
throws InterruptedException {
return waitForInternal(bigQueryRetryConfig, waitOptions);
}

private Job waitForInternal(BigQueryRetryConfig bigQueryRetryConfig, RetryOption... waitOptions)
throws InterruptedException {
checkNotDryRun("waitFor");
Object completedJobResponse;
if (getConfiguration().getType() == Type.QUERY) {
completedJobResponse =
waitForQueryResults(
RetryOption.mergeToSettings(DEFAULT_JOB_WAIT_SETTINGS, waitOptions),
bigQueryRetryConfig,
DEFAULT_QUERY_WAIT_OPTIONS);
} else {
completedJobResponse =
Expand Down Expand Up @@ -294,7 +331,9 @@ public TableResult getQueryResults(QueryResultsOption... options)

QueryResponse response =
waitForQueryResults(
DEFAULT_JOB_WAIT_SETTINGS, waitOptions.toArray(new QueryResultsOption[0]));
DEFAULT_JOB_WAIT_SETTINGS,
DEFAULT_RETRY_CONFIG,
waitOptions.toArray(new QueryResultsOption[0]));

// Get the job resource to determine if it has errored.
Job job = this;
Expand Down Expand Up @@ -334,7 +373,9 @@ public TableResult getQueryResults(QueryResultsOption... options)
}

private QueryResponse waitForQueryResults(
RetrySettings retrySettings, final QueryResultsOption... resultsOptions)
RetrySettings retrySettings,
BigQueryRetryConfig bigQueryRetryConfig,
final QueryResultsOption... resultsOptions)
throws InterruptedException {
if (getConfiguration().getType() != Type.QUERY) {
throw new UnsupportedOperationException(
Expand All @@ -360,7 +401,7 @@ public boolean shouldRetry(
}
},
options.getClock(),
DEFAULT_RETRY_CONFIG);
bigQueryRetryConfig);
} catch (BigQueryRetryHelper.BigQueryRetryHelperException e) {
throw BigQueryException.translateAndThrow(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,9 @@ enum Option {
STATE_FILTER("stateFilter"),
TIMEOUT("timeoutMs"),
REQUESTED_POLICY_VERSION("requestedPolicyVersion"),
TABLE_METADATA_VIEW("view");
TABLE_METADATA_VIEW("view"),
RETRY_OPTIONS("retryOptions"),
BIGQUERY_RETRY_CONFIG("bigQueryRetryConfig");

private final String value;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.google.api.services.bigquery.model.*;
import com.google.api.services.bigquery.model.JobStatistics;
import com.google.cloud.Policy;
import com.google.cloud.RetryOption;
import com.google.cloud.ServiceOptions;
import com.google.cloud.Tuple;
import com.google.cloud.bigquery.BigQuery.JobOption;
Expand Down Expand Up @@ -1594,6 +1595,119 @@ public void testCreateJobFailureShouldRetry() {
verify(bigqueryRpcMock, times(6)).create(jobCapture.capture(), eq(EMPTY_RPC_OPTIONS));
}

@Test
public void testCreateJobWithBigQueryRetryConfigFailureShouldRetry() {
// Validate create job with BigQueryRetryConfig that retries on rate limit error message.
JobOption bigQueryRetryConfigOption =
JobOption.bigQueryRetryConfig(
BigQueryRetryConfig.newBuilder()
.retryOnMessage(BigQueryErrorMessages.RATE_LIMIT_EXCEEDED_MSG)
.retryOnMessage(BigQueryErrorMessages.JOB_RATE_LIMIT_EXCEEDED_MSG)
.retryOnRegEx(BigQueryErrorMessages.RetryRegExPatterns.RATE_LIMIT_EXCEEDED_REGEX)
.build());

Map<BigQueryRpc.Option, ?> bigQueryRpcOptions = optionMap(bigQueryRetryConfigOption);
when(bigqueryRpcMock.create(jobCapture.capture(), eq(bigQueryRpcOptions)))
.thenThrow(
new BigQueryException(
400, RATE_LIMIT_ERROR_MSG)) // retrial on based on RATE_LIMIT_EXCEEDED_MSG
.thenThrow(new BigQueryException(200, RATE_LIMIT_ERROR_MSG))
.thenReturn(newJobPb());

bigquery = options.getService();
bigquery =
options
.toBuilder()
.setRetrySettings(ServiceOptions.getDefaultRetrySettings())
.build()
.getService();

((BigQueryImpl) bigquery)
.create(JobInfo.of(QUERY_JOB_CONFIGURATION_FOR_DMLQUERY), bigQueryRetryConfigOption);
verify(bigqueryRpcMock, times(3)).create(jobCapture.capture(), eq(bigQueryRpcOptions));
}

@Test
public void testCreateJobWithBigQueryRetryConfigFailureShouldNotRetry() {
// Validate create job with BigQueryRetryConfig that does not retry on rate limit error message.
JobOption bigQueryRetryConfigOption =
JobOption.bigQueryRetryConfig(BigQueryRetryConfig.newBuilder().build());

Map<BigQueryRpc.Option, ?> bigQueryRpcOptions = optionMap(bigQueryRetryConfigOption);
when(bigqueryRpcMock.create(jobCapture.capture(), eq(bigQueryRpcOptions)))
.thenThrow(new BigQueryException(400, RATE_LIMIT_ERROR_MSG));

bigquery = options.getService();
bigquery =
options
.toBuilder()
.setRetrySettings(ServiceOptions.getDefaultRetrySettings())
.build()
.getService();

try {
((BigQueryImpl) bigquery)
.create(JobInfo.of(QUERY_JOB_CONFIGURATION_FOR_DMLQUERY), bigQueryRetryConfigOption);
fail("JobException expected");
} catch (BigQueryException e) {
assertNotNull(e.getMessage());
}
// Verify that getQueryResults is attempted only once and not retried since the error message
// does not match.
verify(bigqueryRpcMock, times(1)).create(jobCapture.capture(), eq(bigQueryRpcOptions));
}

@Test
public void testCreateJobWithRetryOptionsFailureShouldRetry() {
// Validate create job with RetryOptions.
JobOption retryOptions = JobOption.retryOptions(RetryOption.maxAttempts(4));
Map<BigQueryRpc.Option, ?> bigQueryRpcOptions = optionMap(retryOptions);
when(bigqueryRpcMock.create(jobCapture.capture(), eq(bigQueryRpcOptions)))
.thenThrow(new BigQueryException(500, "InternalError"))
.thenThrow(new BigQueryException(502, "Bad Gateway"))
.thenThrow(new BigQueryException(503, "Service Unavailable"))
.thenReturn(newJobPb());

bigquery = options.getService();
bigquery =
options
.toBuilder()
.setRetrySettings(ServiceOptions.getDefaultRetrySettings())
.build()
.getService();

((BigQueryImpl) bigquery)
.create(JobInfo.of(QUERY_JOB_CONFIGURATION_FOR_DMLQUERY), retryOptions);
verify(bigqueryRpcMock, times(4)).create(jobCapture.capture(), eq(bigQueryRpcOptions));
}

@Test
public void testCreateJobWithRetryOptionsFailureShouldNotRetry() {
// Validate create job with RetryOptions that only attempts once (no retry).
JobOption retryOptions = JobOption.retryOptions(RetryOption.maxAttempts(1));
Map<BigQueryRpc.Option, ?> bigQueryRpcOptions = optionMap(retryOptions);
when(bigqueryRpcMock.create(jobCapture.capture(), eq(bigQueryRpcOptions)))
.thenThrow(new BigQueryException(500, "InternalError"))
.thenReturn(newJobPb());

bigquery = options.getService();
bigquery =
options
.toBuilder()
.setRetrySettings(ServiceOptions.getDefaultRetrySettings())
.build()
.getService();

try {
((BigQueryImpl) bigquery)
.create(JobInfo.of(QUERY_JOB_CONFIGURATION_FOR_DMLQUERY), retryOptions);
fail("JobException expected");
} catch (BigQueryException e) {
assertNotNull(e.getMessage());
}
verify(bigqueryRpcMock, times(1)).create(jobCapture.capture(), eq(bigQueryRpcOptions));
}

@Test
public void testCreateJobWithSelectedFields() {
when(bigqueryRpcMock.create(
Expand Down
Loading
Loading