Skip to content

Commit f3678b7

Browse files
authored
fix: mitigate gRPC stream connection issues (#1038)
Mitigates hanging gRPC streams by detecting idle streams and reconnecting after a timeout (default 10min for partition assignment streams, 2min for all others). The StreamIdleTimer is restarted when the client receives a response on the stream. The stream is reinitialized when the timeout expires. For publish and commit streams, the timeout will still expire even if there is no user activity.
1 parent a8a969e commit f3678b7

File tree

10 files changed

+346
-26
lines changed

10 files changed

+346
-26
lines changed
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<!-- see http://www.mojohaus.org/clirr-maven-plugin/examples/ignored-differences.html -->
3+
<differences>
4+
<difference>
5+
<differenceType>7004</differenceType>
6+
<className>com/google/cloud/pubsublite/internal/**</className>
7+
<method>*</method>
8+
</difference>
9+
</differences>

‎google-cloud-pubsublite/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,11 @@
134134
<artifactId>gson</artifactId>
135135
<scope>test</scope>
136136
</dependency>
137+
<dependency>
138+
<groupId>com.google.guava</groupId>
139+
<artifactId>guava-testlib</artifactId>
140+
<scope>test</scope>
141+
</dependency>
137142
<!-- Need testing utility classes for generated gRPC clients tests -->
138143
<dependency>
139144
<groupId>com.google.api</groupId>

‎google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/ConnectedAssignerImpl.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,13 @@
2525
import com.google.cloud.pubsublite.proto.PartitionAssignmentAck;
2626
import com.google.cloud.pubsublite.proto.PartitionAssignmentRequest;
2727
import com.google.errorprone.annotations.concurrent.GuardedBy;
28+
import java.time.Duration;
2829

2930
public class ConnectedAssignerImpl
3031
extends SingleConnection<PartitionAssignmentRequest, PartitionAssignment, PartitionAssignment>
3132
implements ConnectedAssigner {
33+
private static final Duration STREAM_IDLE_TIMEOUT = Duration.ofMinutes(10);
34+
3235
private final CloseableMonitor monitor = new CloseableMonitor();
3336

3437
@GuardedBy("monitor.monitor")
@@ -38,7 +41,7 @@ private ConnectedAssignerImpl(
3841
StreamFactory<PartitionAssignmentRequest, PartitionAssignment> streamFactory,
3942
ResponseObserver<PartitionAssignment> clientStream,
4043
PartitionAssignmentRequest initialRequest) {
41-
super(streamFactory, clientStream, /*expectInitialResponse=*/ false);
44+
super(streamFactory, clientStream, STREAM_IDLE_TIMEOUT, /*expectInitialResponse=*/ false);
4245
initialize(initialRequest);
4346
}
4447

‎google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/ConnectedCommitterImpl.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,18 +26,22 @@
2626
import com.google.cloud.pubsublite.proto.SequencedCommitCursorResponse;
2727
import com.google.cloud.pubsublite.proto.StreamingCommitCursorRequest;
2828
import com.google.cloud.pubsublite.proto.StreamingCommitCursorResponse;
29+
import com.google.common.annotations.VisibleForTesting;
30+
import java.time.Duration;
2931

3032
public class ConnectedCommitterImpl
3133
extends SingleConnection<
3234
StreamingCommitCursorRequest, StreamingCommitCursorResponse, SequencedCommitCursorResponse>
3335
implements ConnectedCommitter {
3436
private final StreamingCommitCursorRequest initialRequest;
3537

36-
private ConnectedCommitterImpl(
38+
@VisibleForTesting
39+
ConnectedCommitterImpl(
3740
StreamFactory<StreamingCommitCursorRequest, StreamingCommitCursorResponse> streamFactory,
3841
ResponseObserver<SequencedCommitCursorResponse> clientStream,
39-
StreamingCommitCursorRequest initialRequest) {
40-
super(streamFactory, clientStream);
42+
StreamingCommitCursorRequest initialRequest,
43+
Duration streamIdleTimeout) {
44+
super(streamFactory, clientStream, streamIdleTimeout, /*expectInitialResponse=*/ true);
4145
this.initialRequest = initialRequest;
4246
initialize(initialRequest);
4347
}
@@ -48,7 +52,8 @@ public ConnectedCommitter New(
4852
StreamFactory<StreamingCommitCursorRequest, StreamingCommitCursorResponse> streamFactory,
4953
ResponseObserver<SequencedCommitCursorResponse> clientStream,
5054
StreamingCommitCursorRequest initialRequest) {
51-
return new ConnectedCommitterImpl(streamFactory, clientStream, initialRequest);
55+
return new ConnectedCommitterImpl(
56+
streamFactory, clientStream, initialRequest, DEFAULT_STREAM_IDLE_TIMEOUT);
5257
}
5358
}
5459

‎google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/RetryingConnectionImpl.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,9 @@ protected void doStop() {
114114
if (completed) return;
115115
completed = true;
116116
logger.atFine().log("Terminating connection for %s", streamDescription());
117-
currentConnection.close();
117+
if (currentConnection != null) {
118+
currentConnection.close();
119+
}
118120
} catch (Throwable t) {
119121
logger.atWarning().withCause(t).log(
120122
"Failed while terminating connection for %s", streamDescription());
@@ -180,7 +182,9 @@ public final void onError(Throwable t) {
180182
Optional<Throwable> throwable = Optional.empty();
181183
long backoffTime = 0;
182184
try (CloseableMonitor.Hold h = connectionMonitor.enter()) {
183-
currentConnection.close();
185+
if (currentConnection != null) {
186+
currentConnection.close();
187+
}
184188
backoffTime = nextRetryBackoffDuration;
185189
nextRetryBackoffDuration = Math.min(backoffTime * 2, MAX_RECONNECT_BACKOFF_TIME.toMillis());
186190
} catch (Throwable t2) {
@@ -197,7 +201,7 @@ public final void onError(Throwable t) {
197201
logger.atFine().withCause(t).log(
198202
"Stream disconnected attempting retry, after %s milliseconds for %s",
199203
backoffTime, streamDescription());
200-
ScheduledFuture<?> retry =
204+
ScheduledFuture<?> unusedFuture =
201205
SystemExecutors.getAlarmExecutor()
202206
.schedule(() -> triggerReinitialize(statusOr.get()), backoffTime, MILLISECONDS);
203207
}

‎google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/SingleConnection.java

Lines changed: 37 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,15 @@
1818

1919
import com.google.api.gax.rpc.ClientStream;
2020
import com.google.api.gax.rpc.ResponseObserver;
21+
import com.google.api.gax.rpc.StatusCode.Code;
2122
import com.google.api.gax.rpc.StreamController;
2223
import com.google.cloud.pubsublite.internal.CheckedApiException;
2324
import com.google.cloud.pubsublite.internal.CloseableMonitor;
2425
import com.google.common.base.Preconditions;
2526
import com.google.common.flogger.GoogleLogger;
2627
import com.google.common.util.concurrent.Monitor.Guard;
2728
import com.google.errorprone.annotations.concurrent.GuardedBy;
29+
import java.time.Duration;
2830

2931
/**
3032
* A SingleConnection handles the state for a stream with an initial connection request that may
@@ -38,9 +40,12 @@ public abstract class SingleConnection<StreamRequestT, StreamResponseT, ClientRe
3840
implements ResponseObserver<StreamResponseT>, AutoCloseable {
3941
private static final GoogleLogger log = GoogleLogger.forEnclosingClass();
4042

43+
protected static final Duration DEFAULT_STREAM_IDLE_TIMEOUT = Duration.ofMinutes(2);
44+
4145
private final ClientStream<StreamRequestT> requestStream;
4246
private final ResponseObserver<ClientResponseT> clientStream;
4347
private final boolean expectInitial;
48+
private final StreamIdleTimer streamIdleTimer;
4449

4550
private final CloseableMonitor connectionMonitor = new CloseableMonitor();
4651

@@ -58,16 +63,18 @@ protected abstract void handleInitialResponse(StreamResponseT response)
5863
protected SingleConnection(
5964
StreamFactory<StreamRequestT, StreamResponseT> streamFactory,
6065
ResponseObserver<ClientResponseT> clientStream,
66+
Duration streamIdleTimeout,
6167
boolean expectInitialResponse) {
6268
this.clientStream = clientStream;
63-
this.requestStream = streamFactory.New(this);
6469
this.expectInitial = expectInitialResponse;
70+
this.streamIdleTimer = new StreamIdleTimer(streamIdleTimeout, this::onStreamIdle);
71+
this.requestStream = streamFactory.New(this);
6572
}
6673

6774
protected SingleConnection(
6875
StreamFactory<StreamRequestT, StreamResponseT> streamFactory,
6976
ResponseObserver<ClientResponseT> clientStream) {
70-
this(streamFactory, clientStream, /*expectInitialResponse=*/ true);
77+
this(streamFactory, clientStream, DEFAULT_STREAM_IDLE_TIMEOUT, /*expectInitialResponse=*/ true);
7178
}
7279

7380
protected void initialize(StreamRequestT initialRequest) {
@@ -122,20 +129,33 @@ protected boolean isCompleted() {
122129
}
123130
}
124131

125-
@Override
126-
public void close() {
132+
// Records the connection as completed and performs tear down, if not already completed. Returns
133+
// whether the connection was already complete.
134+
private boolean completeStream() {
127135
try (CloseableMonitor.Hold h = connectionMonitor.enter()) {
128-
if (completed) return;
136+
if (completed) {
137+
return true;
138+
}
129139
completed = true;
140+
streamIdleTimer.close();
141+
} catch (Exception e) {
142+
log.atSevere().withCause(e).log("Error occurred while shutting down connection.");
143+
}
144+
return false;
145+
}
146+
147+
@Override
148+
public void close() {
149+
if (completeStream()) {
150+
return;
130151
}
131152
requestStream.closeSend();
132153
clientStream.onComplete();
133154
}
134155

135156
private void abort(CheckedApiException error) {
136-
try (CloseableMonitor.Hold h = connectionMonitor.enter()) {
137-
if (completed) return;
138-
completed = true;
157+
if (completeStream()) {
158+
return;
139159
}
140160
requestStream.closeSendWithError(error.underlying);
141161
clientStream.onError(error);
@@ -149,6 +169,7 @@ public void onStart(StreamController streamController) {}
149169
public void onResponse(StreamResponseT response) {
150170
boolean isFirst;
151171
try (CloseableMonitor.Hold h = connectionMonitor.enter()) {
172+
streamIdleTimer.restart();
152173
if (completed) {
153174
log.atFine().log("Received response on stream after completion: %s", response);
154175
return;
@@ -169,21 +190,23 @@ public void onResponse(StreamResponseT response) {
169190

170191
@Override
171192
public void onError(Throwable t) {
172-
try (CloseableMonitor.Hold h = connectionMonitor.enter()) {
173-
if (completed) return;
174-
completed = true;
193+
if (completeStream()) {
194+
return;
175195
}
176196
clientStream.onError(t);
177197
requestStream.closeSendWithError(t);
178198
}
179199

180200
@Override
181201
public void onComplete() {
182-
try (CloseableMonitor.Hold h = connectionMonitor.enter()) {
183-
if (completed) return;
184-
completed = true;
202+
if (completeStream()) {
203+
return;
185204
}
186205
clientStream.onComplete();
187206
requestStream.closeSend();
188207
}
208+
209+
private void onStreamIdle() {
210+
onError(new CheckedApiException("Detected idle stream.", Code.ABORTED));
211+
}
189212
}
Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
/*
2+
* Copyright 2022 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.google.cloud.pubsublite.internal.wire;
18+
19+
import static com.google.common.collect.Comparators.min;
20+
21+
import com.google.cloud.pubsublite.internal.AlarmFactory;
22+
import com.google.common.annotations.VisibleForTesting;
23+
import com.google.common.base.Stopwatch;
24+
import com.google.common.base.Ticker;
25+
import java.time.Duration;
26+
import java.util.concurrent.Future;
27+
import javax.annotation.concurrent.GuardedBy;
28+
29+
/** An approximate timer used to detect idle streams. */
30+
class StreamIdleTimer implements AutoCloseable {
31+
/** Handles a timeout. */
32+
interface Handler {
33+
void onTimeout();
34+
}
35+
36+
private static final long POLL_DIVISOR = 4;
37+
private static final Duration MAX_POLL_INTERVAL = Duration.ofMinutes(1);
38+
39+
@VisibleForTesting
40+
static Duration getDelay(Duration timeout) {
41+
return min(MAX_POLL_INTERVAL, timeout.dividedBy(POLL_DIVISOR));
42+
}
43+
44+
private final Duration timeout;
45+
private final Handler handler;
46+
private final Future<?> task;
47+
48+
@GuardedBy("this")
49+
private final Stopwatch stopwatch;
50+
51+
/**
52+
* Creates a started timer.
53+
*
54+
* @param timeout Call the handler after this duration has elapsed. The call may be delayed up to
55+
* (timeout / POLL_DIVISOR) after the timeout duration.
56+
* @param handler Called after the timeout has expired and the timer is running.
57+
*/
58+
StreamIdleTimer(Duration timeout, Handler handler) {
59+
this(timeout, handler, Ticker.systemTicker(), AlarmFactory.create(getDelay(timeout)));
60+
}
61+
62+
@VisibleForTesting
63+
StreamIdleTimer(Duration timeout, Handler handler, Ticker ticker, AlarmFactory alarmFactory) {
64+
this.timeout = timeout;
65+
this.handler = handler;
66+
this.stopwatch = Stopwatch.createStarted(ticker);
67+
this.task = alarmFactory.newAlarm(this::onPoll);
68+
}
69+
70+
@Override
71+
public void close() throws Exception {
72+
task.cancel(false);
73+
}
74+
75+
/** Restart the timer from zero. */
76+
public synchronized void restart() {
77+
stopwatch.reset().start();
78+
}
79+
80+
private synchronized void onPoll() {
81+
if (stopwatch.elapsed().compareTo(timeout) > 0) {
82+
SystemExecutors.getFuturesExecutor().execute(handler::onTimeout);
83+
}
84+
}
85+
}

‎google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/SystemExecutors.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import java.util.concurrent.Executor;
2222
import java.util.concurrent.Executors;
2323
import java.util.concurrent.ScheduledExecutorService;
24+
import java.util.concurrent.ScheduledThreadPoolExecutor;
2425
import java.util.concurrent.ThreadFactory;
2526

2627
public class SystemExecutors {
@@ -31,8 +32,13 @@ private static ThreadFactory newDaemonThreadFactory(String prefix) {
3132
}
3233

3334
public static ScheduledExecutorService newDaemonExecutor(String prefix) {
34-
return Executors.newScheduledThreadPool(
35-
Math.max(4, Runtime.getRuntime().availableProcessors()), newDaemonThreadFactory(prefix));
35+
ScheduledThreadPoolExecutor executor =
36+
new ScheduledThreadPoolExecutor(
37+
Math.max(4, Runtime.getRuntime().availableProcessors()),
38+
newDaemonThreadFactory(prefix));
39+
// Remove scheduled tasks from the executor as soon as they are cancelled.
40+
executor.setRemoveOnCancelPolicy(true);
41+
return executor;
3642
}
3743

3844
private static final Lazy<ScheduledExecutorService> ALARM_EXECUTOR =

0 commit comments

Comments
 (0)