Skip to content

Commit a8a969e

Browse files
authored
fix: Ensure messages are delivered in order (#1031)
The messages are ordered when they are processed by the wire subscriber. However, if the user message receiver is slow, different message batches may be concurrently processed by the receiver. The wire subscriber now serializes delivery of the message batches.
1 parent d82b932 commit a8a969e

File tree

4 files changed

+184
-1
lines changed

4 files changed

+184
-1
lines changed
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
/*
2+
* Copyright 2022 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.google.cloud.pubsublite.internal;
18+
19+
import java.util.ArrayDeque;
20+
import java.util.Queue;
21+
import java.util.concurrent.Executor;
22+
import java.util.concurrent.atomic.AtomicBoolean;
23+
import javax.annotation.concurrent.GuardedBy;
24+
25+
/** An executor that runs tasks sequentially. */
26+
public final class SerialExecutor implements AutoCloseable, Executor {
27+
private final Executor executor;
28+
private final AtomicBoolean isShutdown = new AtomicBoolean(false);
29+
30+
@GuardedBy("this")
31+
private final Queue<Runnable> tasks;
32+
33+
@GuardedBy("this")
34+
private boolean isTaskActive;
35+
36+
public SerialExecutor(Executor executor) {
37+
this.executor = executor;
38+
this.tasks = new ArrayDeque<>();
39+
this.isTaskActive = false;
40+
}
41+
42+
/**
43+
* Shuts down the executor. No subsequent tasks will run, but any running task will be left to
44+
* complete.
45+
*/
46+
@Override
47+
public void close() {
48+
isShutdown.set(true);
49+
}
50+
51+
@Override
52+
public synchronized void execute(Runnable r) {
53+
if (isShutdown.get()) {
54+
return;
55+
}
56+
tasks.add(
57+
() -> {
58+
if (isShutdown.get()) {
59+
return;
60+
}
61+
try {
62+
r.run();
63+
} finally {
64+
scheduleNextTask();
65+
}
66+
});
67+
if (!isTaskActive) {
68+
scheduleNextTask();
69+
}
70+
}
71+
72+
private synchronized void scheduleNextTask() {
73+
isTaskActive = !tasks.isEmpty();
74+
if (isTaskActive) {
75+
executor.execute(tasks.poll());
76+
}
77+
}
78+
}

‎google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/SubscriberImpl.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import com.google.cloud.pubsublite.internal.CheckedApiException;
2525
import com.google.cloud.pubsublite.internal.CloseableMonitor;
2626
import com.google.cloud.pubsublite.internal.ProxyService;
27+
import com.google.cloud.pubsublite.internal.SerialExecutor;
2728
import com.google.cloud.pubsublite.internal.wire.StreamFactories.SubscribeStreamFactory;
2829
import com.google.cloud.pubsublite.proto.FlowControlRequest;
2930
import com.google.cloud.pubsublite.proto.InitialSubscribeRequest;
@@ -50,6 +51,9 @@ public class SubscriberImpl extends ProxyService
5051

5152
private final InitialSubscribeRequest baseInitialRequest;
5253

54+
// Used to ensure messages are delivered to consumers in order.
55+
private final SerialExecutor messageDeliveryExecutor;
56+
5357
private final CloseableMonitor monitor = new CloseableMonitor();
5458

5559
@GuardedBy("monitor.monitor")
@@ -84,6 +88,7 @@ public class SubscriberImpl extends ProxyService
8488
this.messageConsumer = messageConsumer;
8589
this.resetHandler = resetHandler;
8690
this.baseInitialRequest = baseInitialRequest;
91+
this.messageDeliveryExecutor = new SerialExecutor(SystemExecutors.getFuturesExecutor());
8792
this.initialLocation = initialLocation;
8893
this.connection =
8994
new RetryingConnectionImpl<>(streamFactory, factory, this, getInitialRequest());
@@ -121,6 +126,7 @@ protected void stop() {
121126
shutdown = true;
122127
this.alarmFuture.ifPresent(future -> future.cancel(false));
123128
this.alarmFuture = Optional.empty();
129+
messageDeliveryExecutor.close();
124130
}
125131
}
126132

@@ -199,8 +205,8 @@ public void onClientResponse(List<SequencedMessage> messages) throws CheckedApiE
199205
if (shutdown) return;
200206
nextOffsetTracker.onMessages(messages);
201207
flowControlBatcher.onMessages(messages);
208+
messageDeliveryExecutor.execute(() -> messageConsumer.accept(messages));
202209
}
203-
messageConsumer.accept(messages);
204210
}
205211

206212
private void processBatchFlowRequest() {
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
/*
2+
* Copyright 2022 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.google.cloud.pubsublite.internal;
18+
19+
import static com.google.common.truth.Truth.assertThat;
20+
import static java.util.concurrent.TimeUnit.SECONDS;
21+
22+
import com.google.cloud.pubsublite.internal.wire.SystemExecutors;
23+
import java.util.ArrayList;
24+
import java.util.List;
25+
import java.util.concurrent.CountDownLatch;
26+
import org.junit.Test;
27+
import org.junit.runner.RunWith;
28+
import org.junit.runners.JUnit4;
29+
30+
@RunWith(JUnit4.class)
31+
public final class SerialExecutorTest {
32+
private final SerialExecutor executor = new SerialExecutor(SystemExecutors.getFuturesExecutor());
33+
34+
@Test
35+
public void serializesTasks() throws Exception {
36+
final int numTasks = 100;
37+
List<Integer> receivedSequences = new ArrayList<>();
38+
CountDownLatch tasksDone = new CountDownLatch(numTasks);
39+
for (int i = 0; i < numTasks; i++) {
40+
int sequence = i;
41+
executor.execute(
42+
() -> {
43+
synchronized (receivedSequences) {
44+
receivedSequences.add(sequence);
45+
}
46+
tasksDone.countDown();
47+
});
48+
}
49+
assertThat(tasksDone.await(30, SECONDS)).isTrue();
50+
51+
for (int i = 0; i < receivedSequences.size(); i++) {
52+
assertThat(receivedSequences.get(i)).isEqualTo(i);
53+
}
54+
}
55+
56+
@Test
57+
public void closeDiscardsTasks() throws Exception {
58+
List<Integer> receivedSequences = new ArrayList<>();
59+
CountDownLatch tasksDone = new CountDownLatch(1);
60+
for (int i = 0; i < 10; i++) {
61+
int sequence = i;
62+
executor.execute(
63+
() -> {
64+
synchronized (receivedSequences) {
65+
receivedSequences.add(sequence);
66+
}
67+
tasksDone.countDown();
68+
executor.close();
69+
});
70+
}
71+
assertThat(tasksDone.await(10, SECONDS)).isTrue();
72+
73+
assertThat(receivedSequences).containsExactly(0);
74+
}
75+
}

‎google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/internal/wire/SubscriberImplTest.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import static com.google.cloud.pubsublite.internal.ApiExceptionMatcher.assertThrowableMatches;
2020
import static com.google.cloud.pubsublite.internal.testing.RetryingConnectionHelpers.whenFailed;
2121
import static com.google.common.truth.Truth.assertThat;
22+
import static java.util.concurrent.TimeUnit.SECONDS;
2223
import static org.junit.Assert.assertThrows;
2324
import static org.mockito.ArgumentMatchers.any;
2425
import static org.mockito.ArgumentMatchers.eq;
@@ -54,6 +55,7 @@
5455
import com.google.common.collect.ImmutableList;
5556
import com.google.protobuf.util.Timestamps;
5657
import java.util.List;
58+
import java.util.concurrent.CountDownLatch;
5759
import java.util.concurrent.Future;
5860
import java.util.function.Consumer;
5961
import org.junit.Before;
@@ -97,6 +99,18 @@ private static SubscribeRequest initialRequest() {
9799
private ResponseObserver<List<SequencedMessage>> leakedResponseObserver;
98100
private Runnable leakedFlowControlAlarm;
99101

102+
private CountDownLatch countdownMessageBatches(int count) {
103+
CountDownLatch received = new CountDownLatch(count);
104+
doAnswer(
105+
args -> {
106+
received.countDown();
107+
return null;
108+
})
109+
.when(mockMessageConsumer)
110+
.accept(any());
111+
return received;
112+
}
113+
100114
@Before
101115
public void setUp() throws CheckedApiException {
102116
initMocks(this);
@@ -216,7 +230,9 @@ public void messagesOrdered_Ok() throws Exception {
216230
ImmutableList.of(
217231
SequencedMessage.of(Message.builder().build(), Timestamps.EPOCH, Offset.of(0), 10),
218232
SequencedMessage.of(Message.builder().build(), Timestamps.EPOCH, Offset.of(1), 10));
233+
CountDownLatch messagesReceived = countdownMessageBatches(1);
219234
leakedResponseObserver.onResponse(messages);
235+
assertThat(messagesReceived.await(10, SECONDS)).isTrue();
220236

221237
verify(mockMessageConsumer).accept(messages);
222238
assertThat(subscriber.isRunning()).isTrue();
@@ -237,7 +253,9 @@ public void messageResponseSubtracts() throws Exception {
237253
ImmutableList<SequencedMessage> messages2 =
238254
ImmutableList.of(
239255
SequencedMessage.of(Message.builder().build(), Timestamps.EPOCH, Offset.of(3), 2));
256+
CountDownLatch messagesReceived = countdownMessageBatches(1);
240257
leakedResponseObserver.onResponse(messages1);
258+
assertThat(messagesReceived.await(10, SECONDS)).isTrue();
241259
verify(mockMessageConsumer).accept(messages1);
242260
assertThat(subscriber.isRunning()).isTrue();
243261
leakedResponseObserver.onResponse(messages2);
@@ -254,7 +272,9 @@ public void reinitialize_reconnectsToNextOffset() throws Exception {
254272
ImmutableList.of(
255273
SequencedMessage.of(Message.builder().build(), Timestamps.EPOCH, Offset.of(0), 10),
256274
SequencedMessage.of(Message.builder().build(), Timestamps.EPOCH, Offset.of(1), 10));
275+
CountDownLatch messagesReceived = countdownMessageBatches(1);
257276
leakedResponseObserver.onResponse(messages);
277+
assertThat(messagesReceived.await(10, SECONDS)).isTrue();
258278
verify(mockMessageConsumer).accept(messages);
259279

260280
final SubscribeRequest nextOffsetRequest =
@@ -288,7 +308,9 @@ public void reinitialize_handlesSuccessfulReset() throws Exception {
288308
ImmutableList.of(
289309
SequencedMessage.of(Message.builder().build(), Timestamps.EPOCH, Offset.of(0), 10),
290310
SequencedMessage.of(Message.builder().build(), Timestamps.EPOCH, Offset.of(1), 10));
311+
CountDownLatch messagesReceived = countdownMessageBatches(1);
291312
leakedResponseObserver.onResponse(messages);
313+
assertThat(messagesReceived.await(10, SECONDS)).isTrue();
292314
verify(mockMessageConsumer).accept(messages);
293315

294316
doAnswer(
@@ -317,7 +339,9 @@ public void reinitialize_handlesIgnoredReset() throws Exception {
317339
ImmutableList.of(
318340
SequencedMessage.of(Message.builder().build(), Timestamps.EPOCH, Offset.of(0), 10),
319341
SequencedMessage.of(Message.builder().build(), Timestamps.EPOCH, Offset.of(1), 10));
342+
CountDownLatch messagesReceived = countdownMessageBatches(1);
320343
leakedResponseObserver.onResponse(messages);
344+
assertThat(messagesReceived.await(10, SECONDS)).isTrue();
321345
verify(mockMessageConsumer).accept(messages);
322346

323347
final SubscribeRequest nextOffsetRequest =

0 commit comments

Comments
 (0)