1 package com.github.davidmoten.aws.lw.client;
2
3 import static org.junit.Assert.assertArrayEquals;
4 import static org.junit.Assert.assertEquals;
5 import static org.junit.Assert.assertTrue;
6
7 import java.io.ByteArrayInputStream;
8 import java.io.ByteArrayOutputStream;
9 import java.io.Closeable;
10 import java.io.File;
11 import java.io.IOException;
12 import java.io.InputStream;
13 import java.io.UncheckedIOException;
14 import java.nio.charset.StandardCharsets;
15 import java.nio.file.Files;
16 import java.util.Arrays;
17 import java.util.HashMap;
18 import java.util.List;
19 import java.util.Map;
20 import java.util.concurrent.Executors;
21 import java.util.concurrent.TimeUnit;
22
23 import org.davidmoten.kool.function.Consumer;
24 import org.junit.Test;
25
26 import com.github.davidmoten.aws.lw.client.internal.Retries;
27 import com.github.davidmoten.aws.lw.client.xml.builder.Xml;
28 import com.github.davidmoten.junit.Asserts;
29
30 public class MultipartTest {
31
32 private static byte[] createBytes() throws IOException {
33 ByteArrayOutputStream bytes = new ByteArrayOutputStream();
34 for (int i = 0; i < 600000; i++) {
35 bytes.write("0123456789".getBytes(StandardCharsets.UTF_8));
36 }
37 return bytes.toByteArray();
38 }
39
40 @Test
41 public void testMultipart() throws Exception {
42 testMultipart(out -> {
43 for (int i = 0; i < 600000; i++) {
44 out.write("0123456789".getBytes(StandardCharsets.UTF_8));
45 }
46 });
47 }
48
49 @Test
50 public void testMultipartSingleWrite() throws Exception {
51 testMultipart(out -> {
52 out.write(createBytes());
53 });
54 }
55
56 @Test
57 public void testMultipartSameChunkAsPartSize() throws Exception {
58 testMultipart(out -> {
59 byte[] bytes = createBytes();
60 int partSize = 5 * 1024 * 1024;
61 out.write(bytes, 0, partSize);
62 out.write(bytes, partSize, bytes.length - partSize);
63 });
64 }
65
66 @Test
67 public void testMultipartSingleWriteOfExactlyPartsSize() throws Exception {
68 HttpClientTestingWithQueue h = new HttpClientTestingWithQueue();
69 Client s3 = Client
70 .s3()
71 .region("ap-southeast-2")
72 .accessKey("123")
73 .secretKey("456")
74 .httpClient(h)
75 .build();
76 h.add(startMultipartUpload());
77 h.add(submitPart1());
78 h.add(completeMultipartUpload());
79
80 try (MultipartOutputStream out = Multipart.s3(s3)
81 .bucket("mybucket")
82 .key("mykey")
83 .executor(Executors.newFixedThreadPool(1))
84 .retryInitialInterval(1, TimeUnit.MILLISECONDS)
85 .transformCreateRequest(x -> x)
86 .partSizeMb(5)
87 .partTimeout(5, TimeUnit.MINUTES)
88 .outputStream()) {
89 out.write(0);
90 out.write(new byte[5 * 1024 * 1024 - 1]);
91 }
92 assertEquals(Arrays.asList(
93 "POST:https://s3.ap-southeast-2.amazonaws.com/mybucket/mykey?uploads",
94 "PUT:https://s3.ap-southeast-2.amazonaws.com/mybucket/mykey?partNumber=1&uploadId=abcde",
95 "POST:https://s3.ap-southeast-2.amazonaws.com/mybucket/mykey?uploadId=abcde"), //
96 h.urls());
97 }
98
99 public void testMultipart(Consumer<MultipartOutputStream> consumer) throws Exception {
100 HttpClientTestingWithQueue h = new HttpClientTestingWithQueue();
101 Client s3 = Client
102 .s3()
103 .region("ap-southeast-2")
104 .accessKey("123")
105 .secretKey("456")
106 .httpClient(h)
107 .baseUrlFactory((serviceName, region)
108 -> "https://"+ serviceName + "." + region.map(x -> x + ".").orElse("") + "mine.com/") //
109 .build();
110 h.add(startMultipartUpload());
111 h.add(submitPart1());
112 h.add(submitPart2Fails());
113 h.add(submitPart2());
114 h.add(completeMultipartUpload());
115
116 try (MultipartOutputStream out = Multipart.s3(s3)
117 .bucket("mybucket")
118 .key("mykey")
119 .executor(Executors.newFixedThreadPool(1))
120 .retryInitialInterval(1, TimeUnit.MILLISECONDS)
121 .partSizeMb(5)
122 .partTimeout(5, TimeUnit.MINUTES)
123 .outputStream()) {
124 consumer.accept(out);
125 }
126 assertEquals(Arrays.asList(
127 "POST:https://s3.ap-southeast-2.mine.com/mybucket/mykey?uploads",
128 "PUT:https://s3.ap-southeast-2.mine.com/mybucket/mykey?partNumber=1&uploadId=abcde",
129 "PUT:https://s3.ap-southeast-2.mine.com/mybucket/mykey?partNumber=2&uploadId=abcde",
130 "PUT:https://s3.ap-southeast-2.mine.com/mybucket/mykey?partNumber=2&uploadId=abcde",
131 "POST:https://s3.ap-southeast-2.mine.com/mybucket/mykey?uploadId=abcde"), //
132 h.urls());
133 assertArrayEquals(createBytes(), h.bytes());
134 }
135
136 @Test(expected = UncheckedIOException.class)
137 public void testMultipartUploadFileDoesNotExist() throws IOException {
138 HttpClientTestingWithQueue h = new HttpClientTestingWithQueue();
139 Client s3 = Client
140 .s3()
141 .region("ap-southeast-2")
142 .accessKey("123")
143 .secretKey("456")
144 .httpClient(h)
145 .build();
146 Multipart.s3(s3)
147 .bucket("mybucket")
148 .key("mykey")
149 .executor(Executors.newFixedThreadPool(1))
150 .retryInitialInterval(1, TimeUnit.MILLISECONDS)
151 .partSizeMb(5)
152 .partTimeout(5, TimeUnit.MINUTES)
153 .upload(new File("target/doesnotexist"));
154
155 }
156
157 @Test(expected = RuntimeException.class)
158 public void testMultipartUploadInputStreamFactoryThrows() throws IOException {
159 HttpClientTestingWithQueue h = new HttpClientTestingWithQueue();
160 Client s3 = Client
161 .s3()
162 .region("ap-southeast-2")
163 .accessKey("123")
164 .secretKey("456")
165 .httpClient(h)
166 .build();
167 Multipart.s3(s3)
168 .bucket("mybucket")
169 .key("mykey")
170 .executor(Executors.newFixedThreadPool(1))
171 .retryInitialInterval(1, TimeUnit.MILLISECONDS)
172 .partSizeMb(5)
173 .partTimeout(5, TimeUnit.MINUTES)
174 .upload(() -> {
175 throw new Exception();
176 });
177 }
178
179 @Test
180 public void testMultipartUploadFile() throws IOException {
181 HttpClientTestingWithQueue h = new HttpClientTestingWithQueue();
182 Client s3 = Client
183 .s3()
184 .region("ap-southeast-2")
185 .accessKey("123")
186 .secretKey("456")
187 .httpClient(h)
188 .build();
189 h.add(startMultipartUpload());
190 h.add(submitPart1());
191 h.add(submitPart2Fails());
192 h.add(submitPart2());
193 h.add(completeMultipartUpload());
194
195 File file = new File("target/temp.txt");
196 byte[] bytes = createBytes();
197 Files.write(file.toPath(), bytes);
198 Multipart.s3(s3)
199 .bucket("mybucket")
200 .key("mykey")
201 .executor(Executors.newFixedThreadPool(1))
202 .retryInitialInterval(1, TimeUnit.MILLISECONDS)
203 .partSizeMb(5)
204 .partTimeout(5, TimeUnit.MINUTES)
205 .upload(file);
206
207 assertEquals(Arrays.asList(
208 "POST:https://s3.ap-southeast-2.amazonaws.com/mybucket/mykey?uploads",
209 "PUT:https://s3.ap-southeast-2.amazonaws.com/mybucket/mykey?partNumber=1&uploadId=abcde",
210 "PUT:https://s3.ap-southeast-2.amazonaws.com/mybucket/mykey?partNumber=2&uploadId=abcde",
211 "PUT:https://s3.ap-southeast-2.amazonaws.com/mybucket/mykey?partNumber=2&uploadId=abcde",
212 "POST:https://s3.ap-southeast-2.amazonaws.com/mybucket/mykey?uploadId=abcde"), //
213 h.urls());
214 assertArrayEquals(bytes, h.bytes());
215 }
216
217 @Test
218 public void testMultipartUploadByteArray() throws IOException {
219 HttpClientTestingWithQueue h = new HttpClientTestingWithQueue();
220 Client s3 = Client
221 .s3()
222 .region("ap-southeast-2")
223 .accessKey("123")
224 .secretKey("456")
225 .httpClient(h)
226 .build();
227 h.add(startMultipartUpload());
228 h.add(submitPart1());
229 h.add(submitPart2Fails());
230 h.add(submitPart2());
231 h.add(completeMultipartUpload());
232
233 byte[] bytes = createBytes();
234 Multipart.s3(s3)
235 .bucket("mybucket")
236 .key("mykey")
237 .executor(Executors.newFixedThreadPool(1))
238 .retryInitialInterval(1, TimeUnit.MILLISECONDS)
239 .partSizeMb(5)
240 .partTimeout(5, TimeUnit.MINUTES)
241 .upload(bytes);
242
243 assertEquals(Arrays.asList(
244 "POST:https://s3.ap-southeast-2.amazonaws.com/mybucket/mykey?uploads",
245 "PUT:https://s3.ap-southeast-2.amazonaws.com/mybucket/mykey?partNumber=1&uploadId=abcde",
246 "PUT:https://s3.ap-southeast-2.amazonaws.com/mybucket/mykey?partNumber=2&uploadId=abcde",
247 "PUT:https://s3.ap-southeast-2.amazonaws.com/mybucket/mykey?partNumber=2&uploadId=abcde",
248 "POST:https://s3.ap-southeast-2.amazonaws.com/mybucket/mykey?uploadId=abcde"), //
249 h.urls());
250 assertArrayEquals(bytes, h.bytes());
251 }
252
253 @Test
254 public void testMultipartAbort() throws IOException {
255 HttpClientTestingWithQueue h = new HttpClientTestingWithQueue();
256 Client s3 = Client
257 .s3()
258 .region("ap-southeast-2")
259 .accessKey("123")
260 .secretKey("456")
261 .httpClient(h)
262 .retryMaxAttempts(1)
263 .build();
264
265 h.add(startMultipartUpload());
266 h.add(submitPart1());
267 h.add(submitPart2Fails());
268 h.add(abortMultipartUpload());
269
270 try (MultipartOutputStream out = Multipart.s3(s3)
271 .bucket("mybucket")
272 .key("mykey")
273 .executor(Executors.newFixedThreadPool(1))
274 .maxAttemptsPerAction(1)
275 .retryInitialInterval(1, TimeUnit.SECONDS)
276 .retryBackoffFactor(1.0)
277 .retryMaxInterval(10, TimeUnit.SECONDS)
278 .retryJitter(0)
279 .outputStream()) {
280 for (int i = 0; i < 600000; i++) {
281 out.write("0123456789".getBytes(StandardCharsets.UTF_8));
282 }
283 } catch (RuntimeException e) {
284 assertTrue(e.getCause().getCause() instanceof ServiceException);
285 }
286
287 assertEquals(Arrays.asList(
288 "POST:https://s3.ap-southeast-2.amazonaws.com/mybucket/mykey?uploads",
289 "PUT:https://s3.ap-southeast-2.amazonaws.com/mybucket/mykey?partNumber=1&uploadId=abcde",
290 "PUT:https://s3.ap-southeast-2.amazonaws.com/mybucket/mykey?partNumber=2&uploadId=abcde",
291 "DELETE:https://s3.ap-southeast-2.amazonaws.com/mybucket/mykey?uploadId=abcde"), //
292 h.urls());
293 }
294
295 private static Client s3() {
296 return Client
297 .s3()
298 .region("ap-southeast-2")
299 .accessKey("123")
300 .secretKey("456")
301 .build();
302 }
303
304 @SuppressWarnings("resource")
305 @Test(expected = IllegalArgumentException.class)
306 public void testMultipartOutputStreamBadArgumentPartTimeoutMs() {
307 new MultipartOutputStream(s3(), "bucket", "key", x -> x, Executors.newFixedThreadPool(1),
308 -1, Retries.create(x -> false, x -> true), 0);
309 }
310
311 @SuppressWarnings("resource")
312 @Test(expected = IllegalArgumentException.class)
313 public void testMultipartOutputStreamBadArgumentMaxAttempts() {
314 new MultipartOutputStream(s3(), "bucket", "key", x -> x, Executors.newFixedThreadPool(1), 1,
315 Retries.create(x -> false, x -> true), 0);
316 }
317
318 @SuppressWarnings("resource")
319 @Test(expected = IllegalArgumentException.class)
320 public void testMultipartOutputStreamBadArgumentPartSize() {
321 new MultipartOutputStream(s3(), "bucket", "key", x -> x, Executors.newFixedThreadPool(1), 1,
322 Retries.create(x -> false, x -> true), 1000);
323 }
324
325 @Test
326 public void testMultipartDefaultExecutor() {
327 HttpClientTestingWithQueue h = new HttpClientTestingWithQueue();
328 Client s3 = Client
329 .s3()
330 .region("ap-southeast-2")
331 .accessKey("123")
332 .secretKey("456")
333 .httpClient(h)
334 .build();
335 h.add(startMultipartUpload());
336
337 Multipart.s3(s3)
338 .bucket("mybucket")
339 .key("mykey")
340 .outputStream();
341
342 assertEquals(Arrays.asList(
343 "POST:https://s3.ap-southeast-2.amazonaws.com/mybucket/mykey?uploads"), h.urls());
344 }
345
346 @Test(expected = IllegalArgumentException.class)
347 public void testMultipartBadPartTimeout() {
348 Multipart.s3(s3())
349 .bucket("mybucket")
350 .key("mykey")
351 .partTimeout(-1, TimeUnit.MINUTES);
352 }
353
354 @Test(expected = IllegalArgumentException.class)
355 public void testMultipartBadRetryInitialInterval() {
356 Multipart.s3(s3())
357 .bucket("mybucket")
358 .key("mykey")
359 .retryInitialInterval(-1, TimeUnit.MINUTES);
360 }
361
362 @Test(expected = IllegalArgumentException.class)
363 public void testMultipartBadRetryBackoffFactor() {
364 Multipart.s3(s3())
365 .bucket("mybucket")
366 .key("mykey")
367 .retryBackoffFactor(-1);
368 }
369
370 @Test(expected = IllegalArgumentException.class)
371 public void testMultipartBadRetryMaxInterval() {
372 Multipart.s3(s3())
373 .bucket("mybucket")
374 .key("mykey")
375 .retryMaxInterval(-1, TimeUnit.SECONDS);
376 }
377
378 @Test(expected = IllegalArgumentException.class)
379 public void testMultipartBadRetryJitter() {
380 Multipart.s3(s3())
381 .bucket("mybucket")
382 .key("mykey")
383 .retryJitter(-1);
384 }
385
386 @Test(expected = IllegalArgumentException.class)
387 public void testMultipartBadRetryJitter2() {
388 Multipart.s3(s3())
389 .bucket("mybucket")
390 .key("mykey")
391 .retryJitter(2);
392 }
393
394 @Test
395 public void isUtilityClass() {
396 Asserts.assertIsUtilityClass(Multipart.class);
397 }
398
399 @Test(expected = IllegalArgumentException.class)
400 public void testMultipartBadArgument() {
401 Multipart.s3(s3()).bucket("bucket").key("key").maxAttemptsPerAction(0);
402 }
403
404 @Test(expected = IllegalArgumentException.class)
405 public void testMultipartBadArgument2() {
406 Multipart.s3(s3()).bucket("bucket").key("key").partSize(1000);
407 }
408
409 @Test(expected = IllegalArgumentException.class)
410 public void testMultipartBadArgument3() {
411 Multipart.s3(s3()).bucket("bucket").key("key").retryInitialInterval(-1, TimeUnit.MILLISECONDS);
412 }
413
414 private static final Closeable DO_NOTHING = () -> {
415 };
416
417 private static InputStream emptyInputStream() {
418 return new ByteArrayInputStream(new byte[0]);
419 }
420
421 private static ResponseInputStream completeMultipartUpload() {
422
423
424
425 Map<String, List<String>> responseHeaders = new HashMap<>();
426 responseHeaders.put("Content-Length", Arrays.asList("0"));
427 return new ResponseInputStream(DO_NOTHING, 200, responseHeaders, emptyInputStream());
428 }
429
430 private static ResponseInputStream abortMultipartUpload() {
431
432
433
434 Map<String, List<String>> responseHeaders = new HashMap<>();
435 responseHeaders.put("Content-Length", Arrays.asList("0"));
436 return new ResponseInputStream(DO_NOTHING, 200, responseHeaders, emptyInputStream());
437 }
438
439 private static ResponseInputStream submitPart2() {
440
441 Map<String, List<String>> responseHeaders = new HashMap<>();
442 responseHeaders.put("Content-Length", Arrays.asList("0"));
443 responseHeaders.put("ETag", Arrays.asList("\"etag2\""));
444 return new ResponseInputStream(DO_NOTHING, 200, responseHeaders, emptyInputStream());
445 }
446
447 private static ResponseInputStream submitPart2Fails() {
448
449 Map<String, List<String>> responseHeaders = new HashMap<>();
450 responseHeaders.put("Content-Length", Arrays.asList("0"));
451 responseHeaders.put("ETag", Arrays.asList("\"etag2\""));
452 return new ResponseInputStream(DO_NOTHING, 500, responseHeaders, emptyInputStream());
453 }
454
455 private static ResponseInputStream submitPart1() {
456
457 Map<String, List<String>> responseHeaders = new HashMap<>();
458 responseHeaders.put("Content-Length", Arrays.asList("0"));
459 responseHeaders.put("ETag", Arrays.asList("\"etag1\""));
460 return new ResponseInputStream(DO_NOTHING, 200, responseHeaders, emptyInputStream());
461 }
462
463 private static ResponseInputStream startMultipartUpload() {
464 String responseXml = Xml.create("InitiateMultipartUploadResult")
465 .a("xmlns", "http://s3.amazonaws.com/doc/2006-03-01/") //
466 .e("Bucket").content("mybucket")
467 .up()
468 .e("Key").content("mykey")
469 .up()
470 .e("UploadId").content("abcde")
471 .toString();
472 byte[] bytes = responseXml.getBytes(StandardCharsets.UTF_8);
473 Map<String, List<String>> responseHeaders = new HashMap<>();
474 responseHeaders.put("Content-Length", Arrays.asList("" + bytes.length));
475 InputStream result = new ByteArrayInputStream(responseXml.getBytes(StandardCharsets.UTF_8));
476 return new ResponseInputStream(DO_NOTHING, 200, responseHeaders, result);
477 }
478 }