Skip to content

Commit b6c1f33

Browse files
rozzanhachicha
andauthored
Implemented new prose test for Transactions (#1824)
JAVA-5684 --------- Co-authored-by: Nabil Hachicha <nabil.hachicha@gmail.com>
1 parent cefd9f1 commit b6c1f33

File tree

3 files changed

+238
-102
lines changed

3 files changed

+238
-102
lines changed

‎driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/ClientSessionPublisherImpl.java‎

Lines changed: 62 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -143,69 +143,74 @@ private WriteConcern getWriteConcern(@Nullable final TimeoutContext timeoutConte
143143

144144
@Override
145145
public Publisher<Void> commitTransaction() {
146-
if (transactionState == TransactionState.ABORTED) {
147-
throw new IllegalStateException("Cannot call commitTransaction after calling abortTransaction");
148-
}
149-
if (transactionState == TransactionState.NONE) {
150-
throw new IllegalStateException("There is no transaction started");
151-
}
152-
if (!messageSentInCurrentTransaction) {
153-
cleanupTransaction(TransactionState.COMMITTED);
154-
return Mono.create(MonoSink::success);
155-
} else {
156-
ReadConcern readConcern = transactionOptions.getReadConcern();
157-
if (readConcern == null) {
158-
throw new MongoInternalException("Invariant violated. Transaction options read concern can not be null");
146+
return Mono.defer(() -> {
147+
if (transactionState == TransactionState.ABORTED) {
148+
return Mono.error(new IllegalStateException("Cannot call commitTransaction after calling abortTransaction"));
159149
}
160-
boolean alreadyCommitted = commitInProgress || transactionState == TransactionState.COMMITTED;
161-
commitInProgress = true;
162-
resetTimeout();
163-
TimeoutContext timeoutContext = getTimeoutContext();
164-
WriteConcern writeConcern = assertNotNull(getWriteConcern(timeoutContext));
165-
return executor
166-
.execute(
167-
new CommitTransactionOperation(writeConcern, alreadyCommitted)
168-
.recoveryToken(getRecoveryToken()), readConcern, this)
169-
.doOnTerminate(() -> {
170-
commitInProgress = false;
171-
transactionState = TransactionState.COMMITTED;
172-
})
173-
.doOnError(MongoException.class, this::clearTransactionContextOnError);
174-
}
150+
if (transactionState == TransactionState.NONE) {
151+
return Mono.error(new IllegalStateException("There is no transaction started"));
152+
}
153+
if (!messageSentInCurrentTransaction) {
154+
cleanupTransaction(TransactionState.COMMITTED);
155+
return Mono.create(MonoSink::success);
156+
} else {
157+
ReadConcern readConcern = transactionOptions.getReadConcern();
158+
if (readConcern == null) {
159+
return Mono.error(new MongoInternalException("Invariant violated. Transaction options read concern can not be null"));
160+
}
161+
boolean alreadyCommitted = commitInProgress || transactionState == TransactionState.COMMITTED;
162+
commitInProgress = true;
163+
resetTimeout();
164+
TimeoutContext timeoutContext = getTimeoutContext();
165+
WriteConcern writeConcern = assertNotNull(getWriteConcern(timeoutContext));
166+
return executor
167+
.execute(
168+
new CommitTransactionOperation(writeConcern, alreadyCommitted)
169+
.recoveryToken(getRecoveryToken()), readConcern, this)
170+
.doOnTerminate(() -> {
171+
commitInProgress = false;
172+
transactionState = TransactionState.COMMITTED;
173+
})
174+
.doOnError(MongoException.class, this::clearTransactionContextOnError);
175+
}
176+
});
175177
}
176178

179+
177180
@Override
178181
public Publisher<Void> abortTransaction() {
179-
if (transactionState == TransactionState.ABORTED) {
180-
throw new IllegalStateException("Cannot call abortTransaction twice");
181-
}
182-
if (transactionState == TransactionState.COMMITTED) {
183-
throw new IllegalStateException("Cannot call abortTransaction after calling commitTransaction");
184-
}
185-
if (transactionState == TransactionState.NONE) {
186-
throw new IllegalStateException("There is no transaction started");
187-
}
188-
if (!messageSentInCurrentTransaction) {
189-
cleanupTransaction(TransactionState.ABORTED);
190-
return Mono.create(MonoSink::success);
191-
} else {
192-
ReadConcern readConcern = transactionOptions.getReadConcern();
193-
if (readConcern == null) {
194-
throw new MongoInternalException("Invariant violated. Transaction options read concern can not be null");
182+
return Mono.defer(() -> {
183+
if (transactionState == TransactionState.ABORTED) {
184+
throw new IllegalStateException("Cannot call abortTransaction twice");
195185
}
196-
197-
resetTimeout();
198-
TimeoutContext timeoutContext = getTimeoutContext();
199-
WriteConcern writeConcern = assertNotNull(getWriteConcern(timeoutContext));
200-
return executor
201-
.execute(new AbortTransactionOperation(writeConcern)
202-
.recoveryToken(getRecoveryToken()), readConcern, this)
203-
.onErrorResume(Throwable.class, (e) -> Mono.empty())
204-
.doOnTerminate(() -> {
205-
clearTransactionContext();
206-
cleanupTransaction(TransactionState.ABORTED);
207-
});
208-
}
186+
if (transactionState == TransactionState.COMMITTED) {
187+
throw new IllegalStateException("Cannot call abortTransaction after calling commitTransaction");
188+
}
189+
if (transactionState == TransactionState.NONE) {
190+
throw new IllegalStateException("There is no transaction started");
191+
}
192+
if (!messageSentInCurrentTransaction) {
193+
cleanupTransaction(TransactionState.ABORTED);
194+
return Mono.create(MonoSink::success);
195+
} else {
196+
ReadConcern readConcern = transactionOptions.getReadConcern();
197+
if (readConcern == null) {
198+
throw new MongoInternalException("Invariant violated. Transaction options read concern can not be null");
199+
}
200+
201+
resetTimeout();
202+
TimeoutContext timeoutContext = getTimeoutContext();
203+
WriteConcern writeConcern = assertNotNull(getWriteConcern(timeoutContext));
204+
return executor
205+
.execute(new AbortTransactionOperation(writeConcern)
206+
.recoveryToken(getRecoveryToken()), readConcern, this)
207+
.onErrorResume(Throwable.class, (e) -> Mono.empty())
208+
.doOnTerminate(() -> {
209+
clearTransactionContext();
210+
cleanupTransaction(TransactionState.ABORTED);
211+
});
212+
}
213+
});
209214
}
210215

211216
private void clearTransactionContextOnError(final MongoException e) {
Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
/*
2+
* Copyright 2008-present MongoDB, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.mongodb.reactivestreams.client;
17+
18+
import com.mongodb.ConnectionString;
19+
import com.mongodb.MongoClientSettings;
20+
import com.mongodb.WriteConcern;
21+
import org.bson.Document;
22+
import org.junit.jupiter.api.AfterEach;
23+
import org.junit.jupiter.api.Assertions;
24+
import org.junit.jupiter.api.BeforeEach;
25+
import org.junit.jupiter.api.DisplayName;
26+
import org.junit.jupiter.api.Test;
27+
import reactor.core.publisher.Mono;
28+
import reactor.test.StepVerifier;
29+
30+
import static com.mongodb.ClusterFixture.getConnectionString;
31+
import static com.mongodb.ClusterFixture.getDefaultDatabaseName;
32+
import static com.mongodb.ClusterFixture.getMultiMongosConnectionString;
33+
import static com.mongodb.ClusterFixture.isSharded;
34+
import static com.mongodb.ClusterFixture.isStandalone;
35+
import static com.mongodb.ClusterFixture.serverVersionAtLeast;
36+
import static org.junit.jupiter.api.Assumptions.abort;
37+
import static org.junit.jupiter.api.Assumptions.assumeTrue;
38+
39+
public class TransactionProseTest {
40+
private MongoClient client;
41+
private MongoCollection<Document> collection;
42+
43+
@BeforeEach
44+
public void setUp() {
45+
assumeTrue(!isStandalone());
46+
ConnectionString connectionString = getConnectionString();
47+
if (isSharded()) {
48+
assumeTrue(serverVersionAtLeast(4, 2));
49+
connectionString = getMultiMongosConnectionString();
50+
assumeTrue(connectionString != null);
51+
}
52+
53+
client = MongoClients.create(MongoClientSettings.builder()
54+
.applyConnectionString(connectionString)
55+
.build());
56+
57+
collection = client.getDatabase(getDefaultDatabaseName()).getCollection(getClass().getName());
58+
StepVerifier.create(Mono.from(collection.drop())).verifyComplete();
59+
}
60+
61+
@AfterEach
62+
public void tearDown() {
63+
if (collection != null) {
64+
StepVerifier.create(Mono.from(collection.drop())).verifyComplete();
65+
}
66+
if (client != null) {
67+
client.close();
68+
}
69+
}
70+
71+
@DisplayName("Mongos Pinning Prose Tests: 1. Test that starting a new transaction on a pinned ClientSession unpins the session "
72+
+ "and normal server selection is performed for the next operation.")
73+
@Test
74+
void testNewTransactionUnpinsSession() {
75+
abort("There is no ability to get the server address with the reactive api");
76+
}
77+
78+
@DisplayName("Mongos Pinning Prose Tests: 2. Test non-transaction operations using a pinned ClientSession unpins the session"
79+
+ " and normal server selection is performed")
80+
@Test
81+
void testNonTransactionOpsUnpinsSession() {
82+
abort("There is no ability to get the server address with the reactive api");
83+
}
84+
85+
@DisplayName("Options Inside Transaction Prose Tests. 1. Write concern not inherited from collection object inside transaction")
86+
@Test
87+
void testWriteConcernInheritance() {
88+
// Create a MongoClient running against a configured sharded/replica set/load balanced cluster.
89+
// transactions require a 4.0+ server when non-sharded and 4.2+ when sharded
90+
if (isSharded()) {
91+
assumeTrue(serverVersionAtLeast(4, 2));
92+
} else {
93+
assumeTrue(serverVersionAtLeast(4, 0));
94+
}
95+
96+
Mono.from(collection.insertOne(Document.parse("{}"))).block();
97+
98+
Mono<Document> testWriteConcern = Mono.from(client.startSession())
99+
.flatMap(session ->
100+
Mono.fromRunnable(session::startTransaction)
101+
.then(Mono.from(collection.withWriteConcern(WriteConcern.UNACKNOWLEDGED).insertOne(session, new Document("n", 1))))
102+
.then(Mono.from(session.commitTransaction()))
103+
.then(Mono.from(collection.find(new Document("n", 1)).first()))
104+
.doFinally(signalType -> session.close())
105+
);
106+
107+
StepVerifier.create(testWriteConcern).assertNext(Assertions::assertNotNull).verifyComplete();
108+
}
109+
}

0 commit comments

Comments
 (0)