Skip to content

feat(transfer-manager): add ParallelUploadConfig.Builder#setUploadBlobInfoFactory #2936

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Feb 18, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright 2025 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.storage.transfermanager;

public final class BucketNameMismatchException extends RuntimeException {

public BucketNameMismatchException(String actual, String expected) {
super(
String.format(
"Bucket name in produced BlobInfo did not match bucket name from config. (%s != %s)",
actual, expected));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@

import static com.google.common.base.Preconditions.checkNotNull;

import com.google.cloud.storage.BlobInfo;
import com.google.cloud.storage.Storage.BlobWriteOption;
import com.google.common.base.MoreObjects;
import com.google.common.collect.ImmutableList;
import java.util.List;
import java.util.Objects;
import java.util.function.Function;
import org.checkerframework.checker.nullness.qual.NonNull;

/**
Expand All @@ -33,19 +35,19 @@
public final class ParallelUploadConfig {

private final boolean skipIfExists;
@NonNull private final String prefix;
@NonNull private final String bucketName;
@NonNull private final UploadBlobInfoFactory uploadBlobInfoFactory;

@NonNull private final List<BlobWriteOption> writeOptsPerRequest;

private ParallelUploadConfig(
boolean skipIfExists,
@NonNull String prefix,
@NonNull String bucketName,
@NonNull UploadBlobInfoFactory uploadBlobInfoFactory,
@NonNull List<BlobWriteOption> writeOptsPerRequest) {
this.skipIfExists = skipIfExists;
this.prefix = prefix;
this.bucketName = bucketName;
this.uploadBlobInfoFactory = uploadBlobInfoFactory;
this.writeOptsPerRequest = applySkipIfExists(skipIfExists, writeOptsPerRequest);
}

Expand All @@ -63,9 +65,26 @@ public boolean isSkipIfExists() {
* A common prefix that will be applied to all object paths in the destination bucket
*
* @see Builder#setPrefix(String)
* @see Builder#setUploadBlobInfoFactory(UploadBlobInfoFactory)
* @see UploadBlobInfoFactory#prefixObjectNames(String)
*/
public @NonNull String getPrefix() {
return prefix;
if (uploadBlobInfoFactory instanceof PrefixObjectNames) {
PrefixObjectNames prefixObjectNames = (PrefixObjectNames) uploadBlobInfoFactory;
return prefixObjectNames.prefix;
}
return "";
}

/**
* The {@link UploadBlobInfoFactory} which will be used to produce a {@link BlobInfo}s based on a
* provided bucket name and file name.
*
* @see Builder#setUploadBlobInfoFactory(UploadBlobInfoFactory)
* @since 2.49.0
*/
public @NonNull UploadBlobInfoFactory getUploadBlobInfoFactory() {
return uploadBlobInfoFactory;
}

/**
Expand Down Expand Up @@ -96,22 +115,22 @@ public boolean equals(Object o) {
}
ParallelUploadConfig that = (ParallelUploadConfig) o;
return skipIfExists == that.skipIfExists
&& prefix.equals(that.prefix)
&& bucketName.equals(that.bucketName)
&& uploadBlobInfoFactory.equals(that.uploadBlobInfoFactory)
&& writeOptsPerRequest.equals(that.writeOptsPerRequest);
}

@Override
public int hashCode() {
return Objects.hash(skipIfExists, prefix, bucketName, writeOptsPerRequest);
return Objects.hash(skipIfExists, bucketName, uploadBlobInfoFactory, writeOptsPerRequest);
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("skipIfExists", skipIfExists)
.add("prefix", prefix)
.add("bucketName", bucketName)
.add("uploadBlobInfoFactory", uploadBlobInfoFactory)
.add("writeOptsPerRequest", writeOptsPerRequest)
.toString();
}
Expand All @@ -137,13 +156,13 @@ private static List<BlobWriteOption> applySkipIfExists(
public static final class Builder {

private boolean skipIfExists;
private @NonNull String prefix;
private @NonNull String bucketName;
private @NonNull UploadBlobInfoFactory uploadBlobInfoFactory;
private @NonNull List<BlobWriteOption> writeOptsPerRequest;

private Builder() {
this.prefix = "";
this.bucketName = "";
this.uploadBlobInfoFactory = UploadBlobInfoFactory.defaultInstance();
this.writeOptsPerRequest = ImmutableList.of();
}

Expand All @@ -162,11 +181,37 @@ public Builder setSkipIfExists(boolean skipIfExists) {
/**
* Sets a common prefix that will be applied to all object paths in the destination bucket.
*
* <p><i>NOTE</i>: this method and {@link #setUploadBlobInfoFactory(UploadBlobInfoFactory)} are
* mutually exclusive, and last invocation "wins".
*
* @return the builder instance with the value for prefix modified.
* @see ParallelUploadConfig#getPrefix()
* @see ParallelUploadConfig.Builder#setUploadBlobInfoFactory(UploadBlobInfoFactory)
* @see UploadBlobInfoFactory#prefixObjectNames(String)
*/
public Builder setPrefix(@NonNull String prefix) {
this.prefix = prefix;
this.uploadBlobInfoFactory = UploadBlobInfoFactory.prefixObjectNames(prefix);
return this;
}

/**
* Sets a {@link UploadBlobInfoFactory} which can be used to produce a custom BlobInfo based on
* a provided bucket name and file name.
*
* <p>The bucket name in the returned BlobInfo MUST be equal to the value provided to {@link
* #setBucketName(String)}, if not that upload will fail with a {@link
* TransferStatus#FAILED_TO_START} and a {@link BucketNameMismatchException}.
*
* <p><i>NOTE</i>: this method and {@link #setPrefix(String)} are mutually exclusive, and last
* invocation "wins".
*
* @return the builder instance with the value for uploadBlobInfoFactory modified.
* @see ParallelUploadConfig#getPrefix()
* @see ParallelUploadConfig#getUploadBlobInfoFactory()
* @since 2.49.0
*/
public Builder setUploadBlobInfoFactory(@NonNull UploadBlobInfoFactory uploadBlobInfoFactory) {
this.uploadBlobInfoFactory = uploadBlobInfoFactory;
return this;
}

Expand Down Expand Up @@ -199,10 +244,99 @@ public Builder setWriteOptsPerRequest(@NonNull List<BlobWriteOption> writeOptsPe
* @return {@link ParallelUploadConfig}
*/
public ParallelUploadConfig build() {
checkNotNull(prefix);
checkNotNull(bucketName);
checkNotNull(uploadBlobInfoFactory);
checkNotNull(writeOptsPerRequest);
return new ParallelUploadConfig(skipIfExists, prefix, bucketName, writeOptsPerRequest);
return new ParallelUploadConfig(
skipIfExists, bucketName, uploadBlobInfoFactory, writeOptsPerRequest);
}
}

public interface UploadBlobInfoFactory {

/**
* Method to produce a {@link BlobInfo} to be used for the upload to Cloud Storage.
*
* <p>The bucket name in the returned BlobInfo MUST be equal to the value provided to the {@link
* ParallelUploadConfig.Builder#setBucketName(String)}, if not that upload will fail with a
* {@link TransferStatus#FAILED_TO_START} and a {@link BucketNameMismatchException}.
*
* @param bucketName The name of the bucket to be uploaded to. The value provided here will be
* the value from {@link ParallelUploadConfig#getBucketName()}.
* @param fileName The String representation of the absolute path of the file to be uploaded
* @return The instance of {@link BlobInfo} that should be used to upload the file to Cloud
* Storage.
*/
BlobInfo apply(String bucketName, String fileName);

/**
* Adapter factory to provide the same semantics as if using {@link Builder#setPrefix(String)}
*/
static UploadBlobInfoFactory prefixObjectNames(String prefix) {
return new PrefixObjectNames(prefix);
}

/** The default instance which applies not modification to the provided {@code fileName} */
static UploadBlobInfoFactory defaultInstance() {
return DefaultUploadBlobInfoFactory.INSTANCE;
}

/**
* Convenience method to "lift" a {@link Function} that transforms the file name to an {@link
* UploadBlobInfoFactory}
*/
static UploadBlobInfoFactory transformFileName(Function<String, String> fileNameTransformer) {
return (b, f) -> BlobInfo.newBuilder(b, fileNameTransformer.apply(f)).build();
}
}

private static final class DefaultUploadBlobInfoFactory implements UploadBlobInfoFactory {
private static final DefaultUploadBlobInfoFactory INSTANCE = new DefaultUploadBlobInfoFactory();

private DefaultUploadBlobInfoFactory() {}

@Override
public BlobInfo apply(String bucketName, String fileName) {
return BlobInfo.newBuilder(bucketName, fileName).build();
}
}

private static final class PrefixObjectNames implements UploadBlobInfoFactory {
private final String prefix;

private PrefixObjectNames(String prefix) {
this.prefix = prefix;
}

@Override
public BlobInfo apply(String bucketName, String fileName) {
String separator = "";
if (!fileName.startsWith("/")) {
separator = "/";
}
return BlobInfo.newBuilder(bucketName, prefix + separator + fileName).build();
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (!(o instanceof PrefixObjectNames)) {
return false;
}
PrefixObjectNames that = (PrefixObjectNames) o;
return Objects.equals(prefix, that.prefix);
}

@Override
public int hashCode() {
return Objects.hashCode(prefix);
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this).add("prefix", prefix).toString();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,17 @@ public void close() throws Exception {
List<ApiFuture<UploadResult>> uploadTasks = new ArrayList<>();
for (Path file : files) {
if (Files.isDirectory(file)) throw new IllegalStateException("Directories are not supported");
String blobName = TransferManagerUtils.createBlobName(config, file);
BlobInfo blobInfo = BlobInfo.newBuilder(config.getBucketName(), blobName).build();
String bucketName = config.getBucketName();
BlobInfo blobInfo =
config.getUploadBlobInfoFactory().apply(bucketName, file.toAbsolutePath().toString());
if (!blobInfo.getBucket().equals(bucketName)) {
uploadTasks.add(
ApiFutures.immediateFuture(
UploadResult.newBuilder(blobInfo, TransferStatus.FAILED_TO_START)
.setException(new BucketNameMismatchException(blobInfo.getBucket(), bucketName))
.build()));
continue;
}
if (transferManagerConfig.isAllowParallelCompositeUpload()
&& qos.parallelCompositeUpload(Files.size(file))) {
ParallelCompositeUploadCallable callable =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,6 @@ final class TransferManagerUtils {

private TransferManagerUtils() {}

static String createBlobName(ParallelUploadConfig config, Path file) {
if (config.getPrefix().isEmpty()) {
return file.toString();
} else {
return config.getPrefix().concat(file.toString());
}
}

static Path createDestPath(ParallelDownloadConfig config, BlobInfo originalBlob) {
Path newPath =
config
Expand Down
Loading