Skip to content

feat: specify isolation level per transaction #3704

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Mar 28, 2025
12 changes: 12 additions & 0 deletions google-cloud-spanner/clirr-ignored-differences.xml
Original file line number Diff line number Diff line change
Expand Up @@ -863,6 +863,18 @@
<method>com.google.spanner.v1.TransactionOptions$IsolationLevel getDefaultIsolationLevel()</method>
</difference>

<!-- Isolation level per transaction -->
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/connection/Connection</className>
<method>void beginTransaction(com.google.spanner.v1.TransactionOptions$IsolationLevel)</method>
</difference>
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/connection/Connection</className>
<method>com.google.api.core.ApiFuture beginTransactionAsync(com.google.spanner.v1.TransactionOptions$IsolationLevel)</method>
</difference>

<!-- Removed ConnectionOptions$ConnectionProperty in favor of the more generic ConnectionProperty class. -->
<difference>
<differenceType>8001</differenceType>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,14 @@ public static AbstractStatementParser getInstance(Dialect dialect) {
}
}

static final Set<String> ddlStatements =
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These statements have been moved up in this file to make sure that they are executed before the static block on line 151.

ImmutableSet.of("CREATE", "DROP", "ALTER", "ANALYZE", "GRANT", "REVOKE", "RENAME");
static final Set<String> selectStatements =
ImmutableSet.of("SELECT", "WITH", "SHOW", "FROM", "GRAPH");
static final Set<String> SELECT_STATEMENTS_ALLOWING_PRECEDING_BRACKETS =
ImmutableSet.of("SELECT", "FROM");
static final Set<String> dmlStatements = ImmutableSet.of("INSERT", "UPDATE", "DELETE");

/*
* The following fixed pre-parsed statements are used internally by the Connection API. These do
* not need to be parsed using a specific dialect, as they are equal for all dialects, and
Expand Down Expand Up @@ -416,13 +424,6 @@ ClientSideStatement getClientSideStatement() {
}
}

static final Set<String> ddlStatements =
ImmutableSet.of("CREATE", "DROP", "ALTER", "ANALYZE", "GRANT", "REVOKE", "RENAME");
static final Set<String> selectStatements =
ImmutableSet.of("SELECT", "WITH", "SHOW", "FROM", "GRAPH");
static final Set<String> SELECT_STATEMENTS_ALLOWING_PRECEDING_BRACKETS =
ImmutableSet.of("SELECT", "FROM");
static final Set<String> dmlStatements = ImmutableSet.of("INSERT", "UPDATE", "DELETE");
private final Set<ClientSideStatementImpl> statements;

/** The default maximum size of the statement cache in Mb. */
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Copyright 2025 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.spanner.connection;

import com.google.cloud.spanner.ErrorCode;
import com.google.cloud.spanner.SpannerExceptionFactory;
import com.google.cloud.spanner.connection.AbstractStatementParser.ParsedStatement;
import com.google.cloud.spanner.connection.ClientSideStatementImpl.CompileException;
import com.google.cloud.spanner.connection.ClientSideStatementValueConverters.IsolationLevelConverter;
import com.google.spanner.v1.TransactionOptions.IsolationLevel;
import java.lang.reflect.Method;
import java.util.regex.Matcher;

/** Executor for BEGIN TRANSACTION [ISOLATION LEVEL SERIALIZABLE|REPEATABLE READ] statements. */
class ClientSideStatementBeginExecutor implements ClientSideStatementExecutor {
private final ClientSideStatementImpl statement;
private final Method method;
private final IsolationLevelConverter converter;

ClientSideStatementBeginExecutor(ClientSideStatementImpl statement) throws CompileException {
try {
this.statement = statement;
this.converter = new IsolationLevelConverter();
this.method =
ConnectionStatementExecutor.class.getDeclaredMethod(
statement.getMethodName(), converter.getParameterClass());
} catch (Exception e) {
throw new CompileException(e, statement);
}
}

@Override
public StatementResult execute(ConnectionStatementExecutor connection, ParsedStatement statement)
throws Exception {
return (StatementResult)
method.invoke(connection, getParameterValue(statement.getSqlWithoutComments()));
}

IsolationLevel getParameterValue(String sql) {
Matcher matcher = statement.getPattern().matcher(sql);
// Match the 'isolation level (serializable|repeatable read)' part.
// Group 1 is the isolation level.
if (matcher.find() && matcher.groupCount() >= 1) {
String value = matcher.group(1);
if (value != null) {
// Convert the text to an isolation level enum.
// This returns null if the string is not a valid isolation level value.
IsolationLevel res = converter.convert(value.trim());
if (res != null) {
return res;
}
throw SpannerExceptionFactory.newSpannerException(
ErrorCode.INVALID_ARGUMENT, String.format("Unknown isolation level: %s", value));
}
}
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -394,7 +394,7 @@ static class IsolationLevelConverter
private final CaseInsensitiveEnumMap<TransactionOptions.IsolationLevel> values =
new CaseInsensitiveEnumMap<>(TransactionOptions.IsolationLevel.class);

private IsolationLevelConverter() {}
IsolationLevelConverter() {}

/** Constructor needed for reflection. */
public IsolationLevelConverter(String allowedValues) {}
Expand All @@ -406,6 +406,11 @@ public Class<TransactionOptions.IsolationLevel> getParameterClass() {

@Override
public TransactionOptions.IsolationLevel convert(String value) {
if (value != null) {
// This ensures that 'repeatable read' is translated to 'repeatable_read'. The text between
// 'repeatable' and 'read' can be any number of valid whitespace characters.
value = value.trim().replaceFirst("\\s+", "_");
}
return values.get(value);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,8 @@ public interface Connection extends AutoCloseable {
void cancel();

/**
* Begins a new transaction for this connection.
* Begins a new transaction for this connection. The transaction will use the default isolation
* level of this connection.
*
* <ul>
* <li>Calling this method on a connection that has no transaction and that is
Expand All @@ -313,9 +314,16 @@ public interface Connection extends AutoCloseable {
*/
void beginTransaction();

/**
* Same as {@link #beginTransaction()}, but this transaction will use the given isolation level,
* instead of the default isolation level of this connection.
*/
void beginTransaction(IsolationLevel isolationLevel);

/**
* Begins a new transaction for this connection. This method is guaranteed to be non-blocking. The
* returned {@link ApiFuture} will be done when the transaction has been initialized.
* returned {@link ApiFuture} will be done when the transaction has been initialized. The
* transaction will use the default isolation level of this connection.
*
* <ul>
* <li>Calling this method on a connection that has no transaction and that is
Expand All @@ -332,6 +340,12 @@ public interface Connection extends AutoCloseable {
*/
ApiFuture<Void> beginTransactionAsync();

/**
* Same as {@link #beginTransactionAsync()}, but this transaction will use the given isolation
* level, instead of the default isolation level of this connection.
*/
ApiFuture<Void> beginTransactionAsync(IsolationLevel isolationLevel);

/**
* Sets the transaction mode to use for current transaction. This method may only be called when
* in a transaction, and before the transaction is actually started, i.e. before any statements
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,7 @@ static UnitOfWorkType of(TransactionMode transactionMode) {

// The following properties are not 'normal' connection properties, but transient properties that
// are automatically reset after executing a transaction or statement.
private IsolationLevel transactionIsolationLevel;
private String transactionTag;
private String statementTag;
private boolean excludeTxnFromChangeStreams;
Expand Down Expand Up @@ -336,7 +337,7 @@ && getDialect() == Dialect.POSTGRESQL
: Type.NON_TRANSACTIONAL));

// (Re)set the state of the connection to the default.
setDefaultTransactionOptions();
setDefaultTransactionOptions(getDefaultIsolationLevel());
}

/** Constructor only for test purposes. */
Expand Down Expand Up @@ -370,7 +371,7 @@ && getDialect() == Dialect.POSTGRESQL
setReadOnly(options.isReadOnly());
setAutocommit(options.isAutocommit());
setReturnCommitStats(options.isReturnCommitStats());
setDefaultTransactionOptions();
setDefaultTransactionOptions(getDefaultIsolationLevel());
}

@Override
Expand Down Expand Up @@ -505,7 +506,7 @@ private void reset(Context context, boolean inTransaction) {
this.protoDescriptorsFilePath = null;

if (!isTransactionStarted()) {
setDefaultTransactionOptions();
setDefaultTransactionOptions(getDefaultIsolationLevel());
}
}

Expand Down Expand Up @@ -595,7 +596,7 @@ public void setAutocommit(boolean autocommit) {
// middle of a transaction.
this.connectionState.commit();
}
clearLastTransactionAndSetDefaultTransactionOptions();
clearLastTransactionAndSetDefaultTransactionOptions(getDefaultIsolationLevel());
// Reset the readOnlyStaleness value if it is no longer compatible with the new autocommit
// value.
if (!autocommit) {
Expand Down Expand Up @@ -629,7 +630,7 @@ public void setReadOnly(boolean readOnly) {
ConnectionPreconditions.checkState(
!transactionBeginMarked, "Cannot set read-only when a transaction has begun");
setConnectionPropertyValue(READONLY, readOnly);
clearLastTransactionAndSetDefaultTransactionOptions();
clearLastTransactionAndSetDefaultTransactionOptions(getDefaultIsolationLevel());
}

@Override
Expand All @@ -647,7 +648,7 @@ public void setDefaultIsolationLevel(IsolationLevel isolationLevel) {
!isTransactionStarted(),
"Cannot set default isolation level while a transaction is active");
setConnectionPropertyValue(DEFAULT_ISOLATION_LEVEL, isolationLevel);
clearLastTransactionAndSetDefaultTransactionOptions();
clearLastTransactionAndSetDefaultTransactionOptions(isolationLevel);
}

@Override
Expand All @@ -656,8 +657,8 @@ public IsolationLevel getDefaultIsolationLevel() {
return getConnectionPropertyValue(DEFAULT_ISOLATION_LEVEL);
}

private void clearLastTransactionAndSetDefaultTransactionOptions() {
setDefaultTransactionOptions();
private void clearLastTransactionAndSetDefaultTransactionOptions(IsolationLevel isolationLevel) {
setDefaultTransactionOptions(isolationLevel);
this.currentUnitOfWork = null;
}

Expand Down Expand Up @@ -1139,13 +1140,14 @@ public boolean isKeepTransactionAlive() {
}

/** Resets this connection to its default transaction options. */
private void setDefaultTransactionOptions() {
private void setDefaultTransactionOptions(IsolationLevel isolationLevel) {
if (transactionStack.isEmpty()) {
unitOfWorkType =
isReadOnly()
? UnitOfWorkType.READ_ONLY_TRANSACTION
: UnitOfWorkType.READ_WRITE_TRANSACTION;
batchMode = BatchMode.NONE;
transactionIsolationLevel = isolationLevel;
transactionTag = null;
excludeTxnFromChangeStreams = false;
} else {
Expand All @@ -1155,11 +1157,21 @@ private void setDefaultTransactionOptions() {

@Override
public void beginTransaction() {
get(beginTransactionAsync());
get(beginTransactionAsync(getConnectionPropertyValue(DEFAULT_ISOLATION_LEVEL)));
}

@Override
public void beginTransaction(IsolationLevel isolationLevel) {
get(beginTransactionAsync(isolationLevel));
}

@Override
public ApiFuture<Void> beginTransactionAsync() {
return beginTransactionAsync(getConnectionPropertyValue(DEFAULT_ISOLATION_LEVEL));
}

@Override
public ApiFuture<Void> beginTransactionAsync(IsolationLevel isolationLevel) {
ConnectionPreconditions.checkState(!isClosed(), CLOSED_ERROR_MSG);
ConnectionPreconditions.checkState(
!isBatchActive(), "This connection has an active batch and cannot begin a transaction");
Expand All @@ -1169,7 +1181,7 @@ public ApiFuture<Void> beginTransactionAsync() {
ConnectionPreconditions.checkState(!transactionBeginMarked, "A transaction has already begun");

transactionBeginMarked = true;
clearLastTransactionAndSetDefaultTransactionOptions();
clearLastTransactionAndSetDefaultTransactionOptions(isolationLevel);
if (isAutocommit()) {
inTransaction = true;
}
Expand Down Expand Up @@ -1284,7 +1296,7 @@ private ApiFuture<Void> endCurrentTransactionAsync(
if (isAutocommit()) {
inTransaction = false;
}
setDefaultTransactionOptions();
setDefaultTransactionOptions(getDefaultIsolationLevel());
}
return res;
}
Expand Down Expand Up @@ -2196,7 +2208,7 @@ UnitOfWork createNewUnitOfWork(
.build();
if (!isInternalMetadataQuery && !forceSingleUse) {
// Reset the transaction options after starting a single-use transaction.
setDefaultTransactionOptions();
setDefaultTransactionOptions(getDefaultIsolationLevel());
}
return singleUseTransaction;
} else {
Expand All @@ -2217,7 +2229,7 @@ UnitOfWork createNewUnitOfWork(
.setUsesEmulator(options.usesEmulator())
.setUseAutoSavepointsForEmulator(options.useAutoSavepointsForEmulator())
.setDatabaseClient(dbClient)
.setIsolationLevel(getConnectionPropertyValue(DEFAULT_ISOLATION_LEVEL))
.setIsolationLevel(transactionIsolationLevel)
.setDelayTransactionStartUntilFirstWrite(
getConnectionPropertyValue(DELAY_TRANSACTION_START_UNTIL_FIRST_WRITE))
.setKeepTransactionAlive(getConnectionPropertyValue(KEEP_TRANSACTION_ALIVE))
Expand Down Expand Up @@ -2401,7 +2413,7 @@ public ApiFuture<long[]> runBatchAsync() {
this.protoDescriptorsFilePath = null;
}
this.batchMode = BatchMode.NONE;
setDefaultTransactionOptions();
setDefaultTransactionOptions(getDefaultIsolationLevel());
}
}

Expand All @@ -2415,7 +2427,7 @@ public void abortBatch() {
}
} finally {
this.batchMode = BatchMode.NONE;
setDefaultTransactionOptions();
setDefaultTransactionOptions(getDefaultIsolationLevel());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.google.cloud.spanner.TimestampBound;
import com.google.cloud.spanner.connection.PgTransactionMode.IsolationLevel;
import com.google.spanner.v1.DirectedReadOptions;
import com.google.spanner.v1.TransactionOptions;
import java.time.Duration;

/**
Expand Down Expand Up @@ -107,7 +108,7 @@ StatementResult statementSetDelayTransactionStartUntilFirstWrite(

StatementResult statementShowExcludeTxnFromChangeStreams();

StatementResult statementBeginTransaction();
StatementResult statementBeginTransaction(TransactionOptions.IsolationLevel isolationLevel);

StatementResult statementBeginPgTransaction(PgTransactionMode transactionMode);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@
import com.google.spanner.v1.PlanNode;
import com.google.spanner.v1.QueryPlan;
import com.google.spanner.v1.RequestOptions;
import com.google.spanner.v1.TransactionOptions;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
Expand Down Expand Up @@ -443,8 +444,13 @@ public StatementResult statementShowExcludeTxnFromChangeStreams() {
}

@Override
public StatementResult statementBeginTransaction() {
getConnection().beginTransaction();
public StatementResult statementBeginTransaction(
TransactionOptions.IsolationLevel isolationLevel) {
if (isolationLevel != null) {
getConnection().beginTransaction(isolationLevel);
} else {
getConnection().beginTransaction();
}
return noResult(BEGIN);
}

Expand Down
Loading
Loading