Skip to content

Commit 5abd97d

Browse files
feat: Batch commit requests (#883)
also merge TrivialProxyService into ProxyService
1 parent b24bcb4 commit 5abd97d

18 files changed

+272
-79
lines changed

‎google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/cloudpubsub/internal/MultiPartitionSubscriber.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,12 @@
1818

1919
import com.google.api.gax.rpc.ApiException;
2020
import com.google.cloud.pubsublite.cloudpubsub.Subscriber;
21-
import com.google.cloud.pubsublite.internal.TrivialProxyService;
21+
import com.google.cloud.pubsublite.internal.ProxyService;
2222
import java.util.List;
2323

2424
// A MultiPartitionSubscriber wraps multiple subscribers into a single ApiService that can be
2525
// interacted with. If any single subscriber fails, all others are stopped.
26-
public class MultiPartitionSubscriber extends TrivialProxyService implements Subscriber {
26+
public class MultiPartitionSubscriber extends ProxyService implements Subscriber {
2727
public static Subscriber of(List<Subscriber> subscribers) throws ApiException {
2828
return new MultiPartitionSubscriber(subscribers);
2929
}

‎google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/cloudpubsub/internal/WrappingPublisher.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,13 +26,13 @@
2626
import com.google.cloud.pubsublite.MessageTransformer;
2727
import com.google.cloud.pubsublite.cloudpubsub.Publisher;
2828
import com.google.cloud.pubsublite.internal.CheckedApiException;
29-
import com.google.cloud.pubsublite.internal.TrivialProxyService;
29+
import com.google.cloud.pubsublite.internal.ProxyService;
3030
import com.google.cloud.pubsublite.internal.wire.SystemExecutors;
3131
import com.google.pubsub.v1.PubsubMessage;
3232

3333
// A WrappingPublisher wraps the wire protocol client with a Cloud Pub/Sub api compliant
3434
// publisher. It encodes a MessageMetadata object in the response string.
35-
public class WrappingPublisher extends TrivialProxyService implements Publisher {
35+
public class WrappingPublisher extends ProxyService implements Publisher {
3636
private final com.google.cloud.pubsublite.internal.Publisher<MessageMetadata> wirePublisher;
3737
private final MessageTransformer<PubsubMessage, Message> transformer;
3838

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
/*
2+
* Copyright 2021 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.google.cloud.pubsublite.internal;
18+
19+
import com.google.api.core.ApiFuture;
20+
import com.google.api.core.ApiFutureCallback;
21+
import com.google.api.core.ApiFutures;
22+
import com.google.api.core.SettableApiFuture;
23+
import com.google.cloud.pubsublite.internal.wire.SystemExecutors;
24+
25+
public final class MoreApiFutures {
26+
private MoreApiFutures() {}
27+
28+
public static <T> void connectFutures(
29+
ApiFuture<T> source, SettableApiFuture<? super T> toConnect) {
30+
ApiFutures.addCallback(
31+
source,
32+
new ApiFutureCallback<T>() {
33+
@Override
34+
public void onFailure(Throwable throwable) {
35+
toConnect.setException(throwable);
36+
}
37+
38+
@Override
39+
public void onSuccess(T t) {
40+
toConnect.set(t);
41+
}
42+
},
43+
SystemExecutors.getFuturesExecutor());
44+
}
45+
}

‎google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/ProxyService.java

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import com.google.common.collect.ImmutableList;
2929
import com.google.common.flogger.GoogleLogger;
3030
import java.util.ArrayList;
31+
import java.util.Arrays;
3132
import java.util.Collection;
3233
import java.util.List;
3334
import java.util.concurrent.atomic.AtomicBoolean;
@@ -41,7 +42,13 @@ public abstract class ProxyService extends AbstractApiService {
4142
private final List<ApiService> services = new ArrayList<>();
4243
private final AtomicBoolean stoppedOrFailed = new AtomicBoolean(false);
4344

44-
protected ProxyService() {}
45+
protected <T extends ApiService> ProxyService(Collection<T> services) {
46+
addServices(services);
47+
}
48+
49+
protected ProxyService(ApiService... services) throws ApiException {
50+
this(Arrays.asList(services));
51+
}
4552

4653
// Add a new ApiServices to this. Requires that all of them are in state NEW and this is in state
4754
// NEW.
@@ -59,13 +66,13 @@ protected final void addServices(ApiService... services) throws ApiException {
5966
}
6067

6168
// Method to be called on service start after dependent services start.
62-
protected abstract void start() throws CheckedApiException;
69+
protected void start() throws CheckedApiException {}
6370
// Method to be called on service stop before dependent services stop.
64-
protected abstract void stop() throws CheckedApiException;
71+
protected void stop() throws CheckedApiException {}
6572

6673
// Method to be called for class-specific permanent error handling after trying to stop all other
6774
// services. May not throw.
68-
protected abstract void handlePermanentError(CheckedApiException error);
75+
protected void handlePermanentError(CheckedApiException error) {}
6976

7077
// Tries to stop all dependent services and sets this service into the FAILED state.
7178
protected final void onPermanentError(CheckedApiException error) {

‎google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/TrivialProxyService.java

Lines changed: 0 additions & 43 deletions
This file was deleted.

‎google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/ApiExceptionCommitter.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,9 @@
2222
import com.google.api.gax.rpc.ApiException;
2323
import com.google.cloud.pubsublite.Offset;
2424
import com.google.cloud.pubsublite.internal.CheckedApiException;
25-
import com.google.cloud.pubsublite.internal.TrivialProxyService;
25+
import com.google.cloud.pubsublite.internal.ProxyService;
2626

27-
class ApiExceptionCommitter extends TrivialProxyService implements Committer {
27+
class ApiExceptionCommitter extends ProxyService implements Committer {
2828
private final Committer committer;
2929

3030
ApiExceptionCommitter(Committer committer) throws ApiException {

‎google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/ApiExceptionPublisher.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,11 @@
2121
import com.google.api.core.ApiFuture;
2222
import com.google.api.gax.rpc.ApiException;
2323
import com.google.cloud.pubsublite.Message;
24+
import com.google.cloud.pubsublite.internal.ProxyService;
2425
import com.google.cloud.pubsublite.internal.Publisher;
25-
import com.google.cloud.pubsublite.internal.TrivialProxyService;
2626
import java.io.IOException;
2727

28-
public class ApiExceptionPublisher<T> extends TrivialProxyService implements Publisher<T> {
28+
public class ApiExceptionPublisher<T> extends ProxyService implements Publisher<T> {
2929
private final Publisher<T> publisher;
3030

3131
ApiExceptionPublisher(Publisher<T> publisher) throws ApiException {

‎google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/ApiServiceUtils.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020

2121
import com.google.api.core.AbstractApiService;
2222
import com.google.api.core.ApiService;
23-
import com.google.api.gax.core.BackgroundResource;
2423
import com.google.api.gax.rpc.ApiException;
2524
import com.google.cloud.pubsublite.internal.CheckedApiException;
2625
import com.google.common.collect.ImmutableList;
@@ -31,7 +30,7 @@ public class ApiServiceUtils {
3130

3231
private ApiServiceUtils() {}
3332

34-
public static ApiService backgroundResourceAsApiService(BackgroundResource resource) {
33+
public static ApiService autoCloseableAsApiService(AutoCloseable resource) {
3534
return new AbstractApiService() {
3635
@Override
3736
protected void doStart() {

‎google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/AssignerImpl.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,13 @@
1616

1717
package com.google.cloud.pubsublite.internal.wire;
1818

19-
import static com.google.cloud.pubsublite.internal.wire.ApiServiceUtils.backgroundResourceAsApiService;
19+
import static com.google.cloud.pubsublite.internal.wire.ApiServiceUtils.autoCloseableAsApiService;
2020

2121
import com.google.api.gax.rpc.ApiException;
2222
import com.google.cloud.pubsublite.Partition;
2323
import com.google.cloud.pubsublite.internal.CheckedApiException;
2424
import com.google.cloud.pubsublite.internal.CloseableMonitor;
25-
import com.google.cloud.pubsublite.internal.TrivialProxyService;
25+
import com.google.cloud.pubsublite.internal.ProxyService;
2626
import com.google.cloud.pubsublite.proto.InitialPartitionAssignmentRequest;
2727
import com.google.cloud.pubsublite.proto.PartitionAssignment;
2828
import com.google.cloud.pubsublite.proto.PartitionAssignmentRequest;
@@ -33,7 +33,7 @@
3333
import java.util.HashSet;
3434
import java.util.Set;
3535

36-
public class AssignerImpl extends TrivialProxyService
36+
public class AssignerImpl extends ProxyService
3737
implements Assigner, RetryingConnectionObserver<PartitionAssignment> {
3838
private static final GoogleLogger logger = GoogleLogger.forEnclosingClass();
3939

@@ -72,7 +72,7 @@ public AssignerImpl(
7272
new ConnectedAssignerImpl.Factory(),
7373
initialRequest,
7474
receiver);
75-
addServices(backgroundResourceAsApiService(client));
75+
addServices(autoCloseableAsApiService(client));
7676
}
7777

7878
@Override
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
/*
2+
* Copyright 2021 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.google.cloud.pubsublite.internal.wire;
18+
19+
import com.google.api.core.ApiFuture;
20+
import com.google.api.core.SettableApiFuture;
21+
import com.google.cloud.pubsublite.Offset;
22+
import com.google.cloud.pubsublite.internal.AlarmFactory;
23+
import com.google.cloud.pubsublite.internal.CheckedApiException;
24+
import com.google.cloud.pubsublite.internal.MoreApiFutures;
25+
import com.google.cloud.pubsublite.internal.ProxyService;
26+
import com.google.errorprone.annotations.concurrent.GuardedBy;
27+
import java.util.Optional;
28+
import java.util.concurrent.Future;
29+
30+
public class BatchingCommitter extends ProxyService implements Committer {
31+
private final Committer underlying;
32+
33+
@GuardedBy("this")
34+
private SettableApiFuture<Void> currentFuture = SettableApiFuture.create();
35+
36+
@GuardedBy("this")
37+
private Optional<Offset> currentOffset = Optional.empty();
38+
39+
BatchingCommitter(Committer underlying, AlarmFactory alarmFactory) {
40+
super(underlying);
41+
this.underlying = underlying;
42+
Future<?> alarm = alarmFactory.newAlarm(this::flush);
43+
addServices(ApiServiceUtils.autoCloseableAsApiService(() -> alarm.cancel(false)));
44+
}
45+
46+
@Override
47+
public synchronized ApiFuture<Void> commitOffset(Offset offset) {
48+
currentOffset = Optional.of(offset);
49+
return currentFuture;
50+
}
51+
52+
@Override
53+
public void waitUntilEmpty() throws CheckedApiException {
54+
flush();
55+
underlying.waitUntilEmpty();
56+
}
57+
58+
@Override
59+
protected void stop() {
60+
flush();
61+
}
62+
63+
private synchronized void flush() {
64+
if (!currentOffset.isPresent()) {
65+
return;
66+
}
67+
ApiFuture<Void> underlyingFuture = underlying.commitOffset(currentOffset.get());
68+
MoreApiFutures.connectFutures(underlyingFuture, currentFuture);
69+
currentOffset = Optional.empty();
70+
currentFuture = SettableApiFuture.create();
71+
}
72+
}

‎google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/CommitterImpl.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
package com.google.cloud.pubsublite.internal.wire;
1818

1919
import static com.google.cloud.pubsublite.internal.CheckedApiPreconditions.checkState;
20-
import static com.google.cloud.pubsublite.internal.wire.ApiServiceUtils.backgroundResourceAsApiService;
20+
import static com.google.cloud.pubsublite.internal.wire.ApiServiceUtils.autoCloseableAsApiService;
2121

2222
import com.google.api.core.ApiFuture;
2323
import com.google.api.core.ApiFutures;
@@ -81,7 +81,7 @@ public CommitterImpl(CursorServiceClient client, InitialCommitCursorRequest requ
8181
stream -> client.streamingCommitCursorCallable().splitCall(stream),
8282
new ConnectedCommitterImpl.Factory(),
8383
request);
84-
addServices(backgroundResourceAsApiService(client));
84+
addServices(autoCloseableAsApiService(client));
8585
}
8686

8787
// ProxyService implementation.
@@ -94,9 +94,6 @@ protected void handlePermanentError(CheckedApiException error) {
9494
}
9595
}
9696

97-
@Override
98-
protected void start() {}
99-
10097
@Override
10198
protected void stop() {
10299
try (CloseableMonitor.Hold h = monitor.enter()) {

‎google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/CommitterSettings.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,10 @@
1919
import com.google.auto.value.AutoValue;
2020
import com.google.cloud.pubsublite.Partition;
2121
import com.google.cloud.pubsublite.SubscriptionPath;
22+
import com.google.cloud.pubsublite.internal.AlarmFactory;
2223
import com.google.cloud.pubsublite.proto.InitialCommitCursorRequest;
2324
import com.google.cloud.pubsublite.v1.CursorServiceClient;
25+
import java.time.Duration;
2426

2527
@AutoValue
2628
public abstract class CommitterSettings {
@@ -54,6 +56,8 @@ public Committer instantiate() {
5456
.setPartition(partition().value())
5557
.build();
5658
return new ApiExceptionCommitter(
57-
new CommitterImpl(serviceClient(), initialCommitCursorRequest));
59+
new BatchingCommitter(
60+
new CommitterImpl(serviceClient(), initialCommitCursorRequest),
61+
AlarmFactory.create(Duration.ofMillis(50))));
5862
}
5963
}

‎google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/PartitionCountWatchingPublisher.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -200,9 +200,6 @@ private void handleConfig(long partitionCount) {
200200
}
201201
}
202202

203-
@Override
204-
protected void start() {}
205-
206203
@Override
207204
protected void stop() {
208205
Optional<PartitionsWithRouting> current;

‎google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/PublisherImpl.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
package com.google.cloud.pubsublite.internal.wire;
1818

1919
import static com.google.cloud.pubsublite.internal.CheckedApiPreconditions.checkState;
20-
import static com.google.cloud.pubsublite.internal.wire.ApiServiceUtils.backgroundResourceAsApiService;
20+
import static com.google.cloud.pubsublite.internal.wire.ApiServiceUtils.autoCloseableAsApiService;
2121

2222
import com.google.api.core.ApiFuture;
2323
import com.google.api.core.ApiFutures;
@@ -136,7 +136,7 @@ public PublisherImpl(
136136
Objects.requireNonNull(batchingSettings.getDelayThreshold()).toNanos())),
137137
initialRequest,
138138
batchingSettings);
139-
addServices(backgroundResourceAsApiService(client));
139+
addServices(autoCloseableAsApiService(client));
140140
}
141141

142142
@GuardedBy("monitor.monitor")

0 commit comments

Comments
 (0)