42
42
import java .nio .charset .StandardCharsets ;
43
43
import java .security .SecureRandom ;
44
44
import java .time .Clock ;
45
+ import java .time .Duration ;
46
+ import java .time .OffsetDateTime ;
45
47
import java .util .Base64 ;
46
48
import java .util .Base64 .Encoder ;
47
49
import java .util .concurrent .Executor ;
48
50
import java .util .concurrent .Executors ;
49
51
import java .util .concurrent .ThreadFactory ;
50
52
import java .util .concurrent .atomic .AtomicInteger ;
53
+ import java .util .function .UnaryOperator ;
51
54
import javax .annotation .concurrent .Immutable ;
52
55
import org .checkerframework .checker .nullness .qual .NonNull ;
53
56
@@ -125,18 +128,21 @@ public final class ParallelCompositeUploadBlobWriteSessionConfig extends BlobWri
125
128
private final BufferAllocationStrategy bufferAllocationStrategy ;
126
129
private final PartNamingStrategy partNamingStrategy ;
127
130
private final PartCleanupStrategy partCleanupStrategy ;
131
+ private final PartMetadataFieldDecorator partMetadataFieldDecorator ;
128
132
129
133
private ParallelCompositeUploadBlobWriteSessionConfig (
130
134
int maxPartsPerCompose ,
131
135
ExecutorSupplier executorSupplier ,
132
136
BufferAllocationStrategy bufferAllocationStrategy ,
133
137
PartNamingStrategy partNamingStrategy ,
134
- PartCleanupStrategy partCleanupStrategy ) {
138
+ PartCleanupStrategy partCleanupStrategy ,
139
+ PartMetadataFieldDecorator partMetadataFieldDecorator ) {
135
140
this .maxPartsPerCompose = maxPartsPerCompose ;
136
141
this .executorSupplier = executorSupplier ;
137
142
this .bufferAllocationStrategy = bufferAllocationStrategy ;
138
143
this .partNamingStrategy = partNamingStrategy ;
139
144
this .partCleanupStrategy = partCleanupStrategy ;
145
+ this .partMetadataFieldDecorator = partMetadataFieldDecorator ;
140
146
}
141
147
142
148
@ InternalApi
@@ -150,7 +156,8 @@ ParallelCompositeUploadBlobWriteSessionConfig withMaxPartsPerCompose(int maxPart
150
156
executorSupplier ,
151
157
bufferAllocationStrategy ,
152
158
partNamingStrategy ,
153
- partCleanupStrategy );
159
+ partCleanupStrategy ,
160
+ partMetadataFieldDecorator );
154
161
}
155
162
156
163
/**
@@ -170,7 +177,8 @@ public ParallelCompositeUploadBlobWriteSessionConfig withExecutorSupplier(
170
177
executorSupplier ,
171
178
bufferAllocationStrategy ,
172
179
partNamingStrategy ,
173
- partCleanupStrategy );
180
+ partCleanupStrategy ,
181
+ partMetadataFieldDecorator );
174
182
}
175
183
176
184
/**
@@ -191,7 +199,8 @@ public ParallelCompositeUploadBlobWriteSessionConfig withBufferAllocationStrateg
191
199
executorSupplier ,
192
200
bufferAllocationStrategy ,
193
201
partNamingStrategy ,
194
- partCleanupStrategy );
202
+ partCleanupStrategy ,
203
+ partMetadataFieldDecorator );
195
204
}
196
205
197
206
/**
@@ -211,7 +220,8 @@ public ParallelCompositeUploadBlobWriteSessionConfig withPartNamingStrategy(
211
220
executorSupplier ,
212
221
bufferAllocationStrategy ,
213
222
partNamingStrategy ,
214
- partCleanupStrategy );
223
+ partCleanupStrategy ,
224
+ partMetadataFieldDecorator );
215
225
}
216
226
217
227
/**
@@ -231,7 +241,29 @@ public ParallelCompositeUploadBlobWriteSessionConfig withPartCleanupStrategy(
231
241
executorSupplier ,
232
242
bufferAllocationStrategy ,
233
243
partNamingStrategy ,
234
- partCleanupStrategy );
244
+ partCleanupStrategy ,
245
+ partMetadataFieldDecorator );
246
+ }
247
+
248
+ /**
249
+ * Specify a Part Metadata Field decorator, this will manipulate the metadata associated with part
250
+ * objects, the ultimate object metadata will remain unchanged.
251
+ *
252
+ * <p><i>Default: </i> {@link PartMetadataFieldDecorator#noOp()}
253
+ *
254
+ * @since 2.36.0 This new api is in preview and is subject to breaking changes.
255
+ */
256
+ @ BetaApi
257
+ public ParallelCompositeUploadBlobWriteSessionConfig withPartMetadataFieldDecorator (
258
+ PartMetadataFieldDecorator partMetadataFieldDecorator ) {
259
+ checkNotNull (partMetadataFieldDecorator , "partMetadataFieldDecorator must be non null" );
260
+ return new ParallelCompositeUploadBlobWriteSessionConfig (
261
+ maxPartsPerCompose ,
262
+ executorSupplier ,
263
+ bufferAllocationStrategy ,
264
+ partNamingStrategy ,
265
+ partCleanupStrategy ,
266
+ partMetadataFieldDecorator );
235
267
}
236
268
237
269
@ BetaApi
@@ -241,15 +273,19 @@ static ParallelCompositeUploadBlobWriteSessionConfig withDefaults() {
241
273
ExecutorSupplier .cachedPool (),
242
274
BufferAllocationStrategy .simple (ByteSizeConstants ._16MiB ),
243
275
PartNamingStrategy .noPrefix (),
244
- PartCleanupStrategy .always ());
276
+ PartCleanupStrategy .always (),
277
+ PartMetadataFieldDecorator .noOp ());
245
278
}
246
279
247
280
@ InternalApi
248
281
@ Override
249
282
WriterFactory createFactory (Clock clock ) throws IOException {
250
283
Executor executor = executorSupplier .get ();
251
284
BufferHandlePool bufferHandlePool = bufferAllocationStrategy .get ();
252
- return new ParallelCompositeUploadWriterFactory (clock , executor , bufferHandlePool );
285
+ PartMetadataFieldDecoratorInstance partMetadataFieldDecoratorInstance =
286
+ partMetadataFieldDecorator .newInstance (clock );
287
+ return new ParallelCompositeUploadWriterFactory (
288
+ clock , executor , bufferHandlePool , partMetadataFieldDecoratorInstance );
253
289
}
254
290
255
291
/**
@@ -277,6 +313,7 @@ private BufferAllocationStrategy() {}
277
313
*/
278
314
@ BetaApi
279
315
public static BufferAllocationStrategy simple (int capacity ) {
316
+ checkArgument (capacity > 0 , "bufferCapacity must be > 0" );
280
317
return new SimpleBufferAllocationStrategy (capacity );
281
318
}
282
319
@@ -291,6 +328,8 @@ public static BufferAllocationStrategy simple(int capacity) {
291
328
*/
292
329
@ BetaApi
293
330
public static BufferAllocationStrategy fixedPool (int bufferCount , int bufferCapacity ) {
331
+ checkArgument (bufferCount > 0 , "bufferCount must be > 0" );
332
+ checkArgument (bufferCapacity > 0 , "bufferCapacity must be > 0" );
294
333
return new FixedPoolBufferAllocationStrategy (bufferCount , bufferCapacity );
295
334
}
296
335
@@ -361,6 +400,7 @@ public static ExecutorSupplier cachedPool() {
361
400
*/
362
401
@ BetaApi
363
402
public static ExecutorSupplier fixedPool (int poolSize ) {
403
+ checkArgument (poolSize > 0 , "poolSize must be > 0" );
364
404
return new FixedSupplier (poolSize );
365
405
}
366
406
@@ -631,6 +671,79 @@ protected String fmtFields(String randomKey, String ultimateObjectName, String p
631
671
}
632
672
}
633
673
674
+ /**
675
+ * A Decorator which is used to manipulate metadata fields, specifically on the part objects
676
+ * created in a Parallel Composite Upload
677
+ *
678
+ * @see #withPartMetadataFieldDecorator(PartMetadataFieldDecorator)
679
+ * @since 2.36.0 This new api is in preview and is subject to breaking changes.
680
+ */
681
+ @ BetaApi
682
+ @ Immutable
683
+ public abstract static class PartMetadataFieldDecorator implements Serializable {
684
+
685
+ abstract PartMetadataFieldDecoratorInstance newInstance (Clock clock );
686
+
687
+ /**
688
+ * A decorator that is used to manipulate the Custom Time Metadata field of part files. {@link
689
+ * BlobInfo#getCustomTimeOffsetDateTime()}
690
+ *
691
+ * <p>When provided with a duration, a time in the future will be calculated for each part
692
+ * object upon upload, this new value can be used in OLM rules to cleanup abandoned part files.
693
+ *
694
+ * <p>See [CustomTime OLM
695
+ * documentation](https://cloud.google.com/storage/docs/lifecycle#dayssincecustomtime)
696
+ *
697
+ * @see #withPartMetadataFieldDecorator(PartMetadataFieldDecorator)
698
+ * @since 2.36.0 This new api is in preview and is subject to breaking changes.
699
+ */
700
+ @ BetaApi
701
+ public static PartMetadataFieldDecorator setCustomTimeInFuture (Duration timeInFuture ) {
702
+ checkNotNull (timeInFuture , "timeInFuture must not be null" );
703
+ return new CustomTimeInFuture (timeInFuture );
704
+ }
705
+
706
+ @ BetaApi
707
+ public static PartMetadataFieldDecorator noOp () {
708
+ return NoOp .INSTANCE ;
709
+ }
710
+
711
+ @ BetaApi
712
+ private static final class CustomTimeInFuture extends PartMetadataFieldDecorator {
713
+ private static final long serialVersionUID = -6213742554954751892L ;
714
+ private final Duration duration ;
715
+
716
+ CustomTimeInFuture (Duration duration ) {
717
+ this .duration = duration ;
718
+ }
719
+
720
+ @ Override
721
+ PartMetadataFieldDecoratorInstance newInstance (Clock clock ) {
722
+ return builder -> {
723
+ OffsetDateTime futureTime =
724
+ OffsetDateTime .from (
725
+ clock .instant ().plus (duration ).atZone (clock .getZone ()).toOffsetDateTime ());
726
+ return builder .setCustomTimeOffsetDateTime (futureTime );
727
+ };
728
+ }
729
+ }
730
+
731
+ private static final class NoOp extends PartMetadataFieldDecorator {
732
+ private static final long serialVersionUID = -4569486383992999205L ;
733
+ private static final NoOp INSTANCE = new NoOp ();
734
+
735
+ @ Override
736
+ PartMetadataFieldDecoratorInstance newInstance (Clock clock ) {
737
+ return builder -> builder ;
738
+ }
739
+
740
+ /** prevent java serialization from using a new instance */
741
+ private Object readResolve () {
742
+ return INSTANCE ;
743
+ }
744
+ }
745
+ }
746
+
634
747
/**
635
748
* A cleanup strategy which will dictate what cleanup operations are performed automatically when
636
749
* performing a parallel composite upload.
@@ -708,6 +821,8 @@ public static PartCleanupStrategy never() {
708
821
}
709
822
}
710
823
824
+ interface PartMetadataFieldDecoratorInstance extends UnaryOperator <BlobInfo .Builder > {}
825
+
711
826
private abstract static class Factory <T > implements Serializable {
712
827
private static final long serialVersionUID = 271806144227661056L ;
713
828
@@ -721,12 +836,17 @@ private class ParallelCompositeUploadWriterFactory implements WriterFactory {
721
836
private final Clock clock ;
722
837
private final Executor executor ;
723
838
private final BufferHandlePool bufferHandlePool ;
839
+ private final PartMetadataFieldDecoratorInstance partMetadataFieldDecoratorInstance ;
724
840
725
841
private ParallelCompositeUploadWriterFactory (
726
- Clock clock , Executor executor , BufferHandlePool bufferHandlePool ) {
842
+ Clock clock ,
843
+ Executor executor ,
844
+ BufferHandlePool bufferHandlePool ,
845
+ PartMetadataFieldDecoratorInstance partMetadataFieldDecoratorInstance ) {
727
846
this .clock = clock ;
728
847
this .executor = executor ;
729
848
this .bufferHandlePool = bufferHandlePool ;
849
+ this .partMetadataFieldDecoratorInstance = partMetadataFieldDecoratorInstance ;
730
850
}
731
851
732
852
@ Override
@@ -760,6 +880,7 @@ public ApiFuture<BufferedWritableByteChannel> openAsync() {
760
880
partNamingStrategy ,
761
881
partCleanupStrategy ,
762
882
maxPartsPerCompose ,
883
+ partMetadataFieldDecoratorInstance ,
763
884
result ,
764
885
storageInternal ,
765
886
info ,
0 commit comments