View Javadoc
1   package com.github.davidmoten.aws.lw.client;
2   
3   import static org.junit.Assert.assertArrayEquals;
4   import static org.junit.Assert.assertEquals;
5   import static org.junit.Assert.assertTrue;
6   
7   import java.io.ByteArrayInputStream;
8   import java.io.ByteArrayOutputStream;
9   import java.io.Closeable;
10  import java.io.File;
11  import java.io.IOException;
12  import java.io.InputStream;
13  import java.io.UncheckedIOException;
14  import java.nio.charset.StandardCharsets;
15  import java.nio.file.Files;
16  import java.util.Arrays;
17  import java.util.HashMap;
18  import java.util.List;
19  import java.util.Map;
20  import java.util.concurrent.Executors;
21  import java.util.concurrent.TimeUnit;
22  
23  import org.davidmoten.kool.function.Consumer;
24  import org.junit.Test;
25  
26  import com.github.davidmoten.aws.lw.client.internal.Retries;
27  import com.github.davidmoten.aws.lw.client.xml.builder.Xml;
28  import com.github.davidmoten.junit.Asserts;
29  
30  public class MultipartTest {
31  
32      private static byte[] createBytes() throws IOException {
33          ByteArrayOutputStream bytes = new ByteArrayOutputStream();
34          for (int i = 0; i < 600000; i++) {
35              bytes.write("0123456789".getBytes(StandardCharsets.UTF_8));
36          }
37          return bytes.toByteArray();
38      }
39  
40      @Test
41      public void testMultipart() throws Exception {
42          testMultipart(out -> {
43              for (int i = 0; i < 600000; i++) {
44                  out.write("0123456789".getBytes(StandardCharsets.UTF_8));
45              }
46          });
47      }
48  
49      @Test
50      public void testMultipartSingleWrite() throws Exception {
51          testMultipart(out -> {
52              out.write(createBytes());
53          });
54      }
55  
56      @Test
57      public void testMultipartSameChunkAsPartSize() throws Exception {
58          testMultipart(out -> {
59              byte[] bytes = createBytes();
60              int partSize = 5 * 1024 * 1024;
61              out.write(bytes, 0, partSize);
62              out.write(bytes, partSize, bytes.length - partSize);
63          });
64      }
65  
66      @Test
67      public void testMultipartSingleWriteOfExactlyPartsSize() throws Exception {
68          HttpClientTestingWithQueue h = new HttpClientTestingWithQueue();
69          Client s3 = Client //
70                  .s3() //
71                  .region("ap-southeast-2") //
72                  .accessKey("123") //
73                  .secretKey("456") //
74                  .httpClient(h) //
75                  .build();
76          h.add(startMultipartUpload());
77          h.add(submitPart1());
78          h.add(completeMultipartUpload());
79  
80          try (MultipartOutputStream out = Multipart.s3(s3) //
81                  .bucket("mybucket") //
82                  .key("mykey") //
83                  .executor(Executors.newFixedThreadPool(1)) //
84                  .retryInitialInterval(1, TimeUnit.MILLISECONDS) //
85                  .transformCreateRequest(x -> x) //
86                  .partSizeMb(5) // s
87                  .partTimeout(5, TimeUnit.MINUTES) //
88                  .outputStream()) {
89              out.write(0);
90              out.write(new byte[5 * 1024 * 1024 - 1]);
91          }
92          assertEquals(Arrays.asList( //
93                  "POST:https://s3.ap-southeast-2.amazonaws.com/mybucket/mykey?uploads",
94                  "PUT:https://s3.ap-southeast-2.amazonaws.com/mybucket/mykey?partNumber=1&uploadId=abcde",
95                  "POST:https://s3.ap-southeast-2.amazonaws.com/mybucket/mykey?uploadId=abcde"), //
96                  h.urls());
97      }
98  
99      public void testMultipart(Consumer<MultipartOutputStream> consumer) throws Exception {
100         HttpClientTestingWithQueue h = new HttpClientTestingWithQueue();
101         Client s3 = Client //
102                 .s3() //
103                 .region("ap-southeast-2") //
104                 .accessKey("123") //
105                 .secretKey("456") //
106                 .httpClient(h) //
107                 .baseUrlFactory((serviceName, region) //
108                         -> "https://"+ serviceName + "." + region.map(x -> x + ".").orElse("") + "mine.com/") //
109                 .build();
110         h.add(startMultipartUpload());
111         h.add(submitPart1());
112         h.add(submitPart2Fails());
113         h.add(submitPart2());
114         h.add(completeMultipartUpload());
115 
116         try (MultipartOutputStream out = Multipart.s3(s3) //
117                 .bucket("mybucket") //
118                 .key("mykey") //
119                 .executor(Executors.newFixedThreadPool(1)) //
120                 .retryInitialInterval(1, TimeUnit.MILLISECONDS) //
121                 .partSizeMb(5) //
122                 .partTimeout(5, TimeUnit.MINUTES) //
123                 .outputStream()) {
124             consumer.accept(out);
125         }
126         assertEquals(Arrays.asList( //
127                 "POST:https://s3.ap-southeast-2.mine.com/mybucket/mykey?uploads",
128                 "PUT:https://s3.ap-southeast-2.mine.com/mybucket/mykey?partNumber=1&uploadId=abcde",
129                 "PUT:https://s3.ap-southeast-2.mine.com/mybucket/mykey?partNumber=2&uploadId=abcde",
130                 "PUT:https://s3.ap-southeast-2.mine.com/mybucket/mykey?partNumber=2&uploadId=abcde",
131                 "POST:https://s3.ap-southeast-2.mine.com/mybucket/mykey?uploadId=abcde"), //
132                 h.urls());
133         assertArrayEquals(createBytes(), h.bytes());
134     }
135 
136     @Test(expected = UncheckedIOException.class)
137     public void testMultipartUploadFileDoesNotExist() throws IOException {
138         HttpClientTestingWithQueue h = new HttpClientTestingWithQueue();
139         Client s3 = Client //
140                 .s3() //
141                 .region("ap-southeast-2") //
142                 .accessKey("123") //
143                 .secretKey("456") //
144                 .httpClient(h) //
145                 .build();
146         Multipart.s3(s3) //
147                 .bucket("mybucket") //
148                 .key("mykey") //
149                 .executor(Executors.newFixedThreadPool(1)) //
150                 .retryInitialInterval(1, TimeUnit.MILLISECONDS) //
151                 .partSizeMb(5) //
152                 .partTimeout(5, TimeUnit.MINUTES) //
153                 .upload(new File("target/doesnotexist"));
154 
155     }
156 
157     @Test(expected = RuntimeException.class)
158     public void testMultipartUploadInputStreamFactoryThrows() throws IOException {
159         HttpClientTestingWithQueue h = new HttpClientTestingWithQueue();
160         Client s3 = Client //
161                 .s3() //
162                 .region("ap-southeast-2") //
163                 .accessKey("123") //
164                 .secretKey("456") //
165                 .httpClient(h) //
166                 .build();
167         Multipart.s3(s3) //
168                 .bucket("mybucket") //
169                 .key("mykey") //
170                 .executor(Executors.newFixedThreadPool(1)) //
171                 .retryInitialInterval(1, TimeUnit.MILLISECONDS) //
172                 .partSizeMb(5) //
173                 .partTimeout(5, TimeUnit.MINUTES) //
174                 .upload(() -> {
175                     throw new Exception();
176                 });
177     }
178 
179     @Test
180     public void testMultipartUploadFile() throws IOException {
181         HttpClientTestingWithQueue h = new HttpClientTestingWithQueue();
182         Client s3 = Client //
183                 .s3() //
184                 .region("ap-southeast-2") //
185                 .accessKey("123") //
186                 .secretKey("456") //
187                 .httpClient(h) //
188                 .build();
189         h.add(startMultipartUpload());
190         h.add(submitPart1());
191         h.add(submitPart2Fails());
192         h.add(submitPart2());
193         h.add(completeMultipartUpload());
194 
195         File file = new File("target/temp.txt");
196         byte[] bytes = createBytes();
197         Files.write(file.toPath(), bytes);
198         Multipart.s3(s3) //
199                 .bucket("mybucket") //
200                 .key("mykey") //
201                 .executor(Executors.newFixedThreadPool(1)) //
202                 .retryInitialInterval(1, TimeUnit.MILLISECONDS) //
203                 .partSizeMb(5) //
204                 .partTimeout(5, TimeUnit.MINUTES) //
205                 .upload(file);
206 
207         assertEquals(Arrays.asList( //
208                 "POST:https://s3.ap-southeast-2.amazonaws.com/mybucket/mykey?uploads",
209                 "PUT:https://s3.ap-southeast-2.amazonaws.com/mybucket/mykey?partNumber=1&uploadId=abcde",
210                 "PUT:https://s3.ap-southeast-2.amazonaws.com/mybucket/mykey?partNumber=2&uploadId=abcde",
211                 "PUT:https://s3.ap-southeast-2.amazonaws.com/mybucket/mykey?partNumber=2&uploadId=abcde",
212                 "POST:https://s3.ap-southeast-2.amazonaws.com/mybucket/mykey?uploadId=abcde"), //
213                 h.urls());
214         assertArrayEquals(bytes, h.bytes());
215     }
216 
217     @Test
218     public void testMultipartUploadByteArray() throws IOException {
219         HttpClientTestingWithQueue h = new HttpClientTestingWithQueue();
220         Client s3 = Client //
221                 .s3() //
222                 .region("ap-southeast-2") //
223                 .accessKey("123") //
224                 .secretKey("456") //
225                 .httpClient(h) //
226                 .build();
227         h.add(startMultipartUpload());
228         h.add(submitPart1());
229         h.add(submitPart2Fails());
230         h.add(submitPart2());
231         h.add(completeMultipartUpload());
232 
233         byte[] bytes = createBytes();
234         Multipart.s3(s3) //
235                 .bucket("mybucket") //
236                 .key("mykey") //
237                 .executor(Executors.newFixedThreadPool(1)) //
238                 .retryInitialInterval(1, TimeUnit.MILLISECONDS) //
239                 .partSizeMb(5) //
240                 .partTimeout(5, TimeUnit.MINUTES) //
241                 .upload(bytes);
242 
243         assertEquals(Arrays.asList( //
244                 "POST:https://s3.ap-southeast-2.amazonaws.com/mybucket/mykey?uploads",
245                 "PUT:https://s3.ap-southeast-2.amazonaws.com/mybucket/mykey?partNumber=1&uploadId=abcde",
246                 "PUT:https://s3.ap-southeast-2.amazonaws.com/mybucket/mykey?partNumber=2&uploadId=abcde",
247                 "PUT:https://s3.ap-southeast-2.amazonaws.com/mybucket/mykey?partNumber=2&uploadId=abcde",
248                 "POST:https://s3.ap-southeast-2.amazonaws.com/mybucket/mykey?uploadId=abcde"), //
249                 h.urls());
250         assertArrayEquals(bytes, h.bytes());
251     }
252 
253     @Test
254     public void testMultipartAbort() throws IOException {
255         HttpClientTestingWithQueue h = new HttpClientTestingWithQueue();
256         Client s3 = Client //
257                 .s3() //
258                 .region("ap-southeast-2") //
259                 .accessKey("123") //
260                 .secretKey("456") //
261                 .httpClient(h) //
262                 .retryMaxAttempts(1) //
263                 .build();
264 
265         h.add(startMultipartUpload());
266         h.add(submitPart1());
267         h.add(submitPart2Fails());
268         h.add(abortMultipartUpload());
269 
270         try (MultipartOutputStream out = Multipart.s3(s3) //
271                 .bucket("mybucket") //
272                 .key("mykey") //
273                 .executor(Executors.newFixedThreadPool(1)) //
274                 .maxAttemptsPerAction(1) //
275                 .retryInitialInterval(1, TimeUnit.SECONDS) //
276                 .retryBackoffFactor(1.0) //
277                 .retryMaxInterval(10, TimeUnit.SECONDS) //
278                 .retryJitter(0) //
279                 .outputStream()) {
280             for (int i = 0; i < 600000; i++) {
281                 out.write("0123456789".getBytes(StandardCharsets.UTF_8));
282             }
283         } catch (RuntimeException e) {
284             assertTrue(e.getCause().getCause() instanceof ServiceException);
285         }
286 
287         assertEquals(Arrays.asList( //
288                 "POST:https://s3.ap-southeast-2.amazonaws.com/mybucket/mykey?uploads",
289                 "PUT:https://s3.ap-southeast-2.amazonaws.com/mybucket/mykey?partNumber=1&uploadId=abcde",
290                 "PUT:https://s3.ap-southeast-2.amazonaws.com/mybucket/mykey?partNumber=2&uploadId=abcde",
291                 "DELETE:https://s3.ap-southeast-2.amazonaws.com/mybucket/mykey?uploadId=abcde"), //
292                 h.urls());
293     }
294 
295     private static Client s3() {
296         return Client //
297                 .s3() //
298                 .region("ap-southeast-2") //
299                 .accessKey("123") //
300                 .secretKey("456") //
301                 .build();
302     }
303 
304     @SuppressWarnings("resource")
305     @Test(expected = IllegalArgumentException.class)
306     public void testMultipartOutputStreamBadArgumentPartTimeoutMs() {
307         new MultipartOutputStream(s3(), "bucket", "key", x -> x, Executors.newFixedThreadPool(1),
308                 -1, Retries.create(x -> false, x -> true), 0);
309     }
310 
311     @SuppressWarnings("resource")
312     @Test(expected = IllegalArgumentException.class)
313     public void testMultipartOutputStreamBadArgumentMaxAttempts() {
314         new MultipartOutputStream(s3(), "bucket", "key", x -> x, Executors.newFixedThreadPool(1), 1,
315                 Retries.create(x -> false, x -> true), 0);
316     }
317 
318     @SuppressWarnings("resource")
319     @Test(expected = IllegalArgumentException.class)
320     public void testMultipartOutputStreamBadArgumentPartSize() {
321         new MultipartOutputStream(s3(), "bucket", "key", x -> x, Executors.newFixedThreadPool(1), 1,
322                 Retries.create(x -> false, x -> true), 1000);
323     }
324 
325     @Test
326     public void testMultipartDefaultExecutor() {
327         HttpClientTestingWithQueue h = new HttpClientTestingWithQueue();
328         Client s3 = Client //
329                 .s3() //
330                 .region("ap-southeast-2") //
331                 .accessKey("123") //
332                 .secretKey("456") //
333                 .httpClient(h) //
334                 .build();
335         h.add(startMultipartUpload());
336 
337         Multipart.s3(s3) //
338                 .bucket("mybucket") //
339                 .key("mykey") //
340                 .outputStream();
341 
342         assertEquals(Arrays.asList( //
343                 "POST:https://s3.ap-southeast-2.amazonaws.com/mybucket/mykey?uploads"), h.urls());
344     }
345 
346     @Test(expected = IllegalArgumentException.class)
347     public void testMultipartBadPartTimeout() {
348         Multipart.s3(s3()) //
349                 .bucket("mybucket") //
350                 .key("mykey") //
351                 .partTimeout(-1, TimeUnit.MINUTES);
352     }
353     
354     @Test(expected = IllegalArgumentException.class)
355     public void testMultipartBadRetryInitialInterval() {
356         Multipart.s3(s3()) //
357                 .bucket("mybucket") //
358                 .key("mykey") //
359                 .retryInitialInterval(-1, TimeUnit.MINUTES);
360     }
361     
362     @Test(expected = IllegalArgumentException.class)
363     public void testMultipartBadRetryBackoffFactor() {
364         Multipart.s3(s3()) //
365                 .bucket("mybucket") //
366                 .key("mykey") //
367                 .retryBackoffFactor(-1);
368     }
369     
370     @Test(expected = IllegalArgumentException.class)
371     public void testMultipartBadRetryMaxInterval() {
372         Multipart.s3(s3()) //
373                 .bucket("mybucket") //
374                 .key("mykey") //
375                 .retryMaxInterval(-1, TimeUnit.SECONDS);
376     }
377     
378     @Test(expected = IllegalArgumentException.class)
379     public void testMultipartBadRetryJitter() {
380         Multipart.s3(s3()) //
381                 .bucket("mybucket") //
382                 .key("mykey") //
383                 .retryJitter(-1);
384     }
385     
386     @Test(expected = IllegalArgumentException.class)
387     public void testMultipartBadRetryJitter2() {
388         Multipart.s3(s3()) //
389                 .bucket("mybucket") //
390                 .key("mykey") //
391                 .retryJitter(2);
392     }
393 
394     @Test
395     public void isUtilityClass() {
396         Asserts.assertIsUtilityClass(Multipart.class);
397     }
398 
399     @Test(expected = IllegalArgumentException.class)
400     public void testMultipartBadArgument() {
401         Multipart.s3(s3()).bucket("bucket").key("key").maxAttemptsPerAction(0);
402     }
403 
404     @Test(expected = IllegalArgumentException.class)
405     public void testMultipartBadArgument2() {
406         Multipart.s3(s3()).bucket("bucket").key("key").partSize(1000);
407     }
408 
409     @Test(expected = IllegalArgumentException.class)
410     public void testMultipartBadArgument3() {
411         Multipart.s3(s3()).bucket("bucket").key("key").retryInitialInterval(-1, TimeUnit.MILLISECONDS);
412     }
413 
414     private static final Closeable DO_NOTHING = () -> {
415     };
416 
417     private static InputStream emptyInputStream() {
418         return new ByteArrayInputStream(new byte[0]);
419     }
420 
421     private static ResponseInputStream completeMultipartUpload() {
422         // response for completion
423         // actually includes xml response but we don't read it
424         // so we don't simulate it
425         Map<String, List<String>> responseHeaders = new HashMap<>();
426         responseHeaders.put("Content-Length", Arrays.asList("0"));
427         return new ResponseInputStream(DO_NOTHING, 200, responseHeaders, emptyInputStream());
428     }
429 
430     private static ResponseInputStream abortMultipartUpload() {
431         // response for completion
432         // actually includes xml response but we don't read it
433         // so we don't simulate it
434         Map<String, List<String>> responseHeaders = new HashMap<>();
435         responseHeaders.put("Content-Length", Arrays.asList("0"));
436         return new ResponseInputStream(DO_NOTHING, 200, responseHeaders, emptyInputStream());
437     }
438 
439     private static ResponseInputStream submitPart2() {
440         // response for submit part 2
441         Map<String, List<String>> responseHeaders = new HashMap<>();
442         responseHeaders.put("Content-Length", Arrays.asList("0"));
443         responseHeaders.put("ETag", Arrays.asList("\"etag2\""));
444         return new ResponseInputStream(DO_NOTHING, 200, responseHeaders, emptyInputStream());
445     }
446 
447     private static ResponseInputStream submitPart2Fails() {
448         // response for submit part 2 - fails
449         Map<String, List<String>> responseHeaders = new HashMap<>();
450         responseHeaders.put("Content-Length", Arrays.asList("0"));
451         responseHeaders.put("ETag", Arrays.asList("\"etag2\""));
452         return new ResponseInputStream(DO_NOTHING, 500, responseHeaders, emptyInputStream());
453     }
454 
455     private static ResponseInputStream submitPart1() {
456         // response for submit part 1
457         Map<String, List<String>> responseHeaders = new HashMap<>();
458         responseHeaders.put("Content-Length", Arrays.asList("0"));
459         responseHeaders.put("ETag", Arrays.asList("\"etag1\""));
460         return new ResponseInputStream(DO_NOTHING, 200, responseHeaders, emptyInputStream());
461     }
462 
463     private static ResponseInputStream startMultipartUpload() {
464         String responseXml = Xml.create("InitiateMultipartUploadResult") //
465                 .a("xmlns", "http://s3.amazonaws.com/doc/2006-03-01/") //
466                 .e("Bucket").content("mybucket") //
467                 .up() //
468                 .e("Key").content("mykey") //
469                 .up() //
470                 .e("UploadId").content("abcde") //
471                 .toString();
472         byte[] bytes = responseXml.getBytes(StandardCharsets.UTF_8);
473         Map<String, List<String>> responseHeaders = new HashMap<>();
474         responseHeaders.put("Content-Length", Arrays.asList("" + bytes.length));
475         InputStream result = new ByteArrayInputStream(responseXml.getBytes(StandardCharsets.UTF_8));
476         return new ResponseInputStream(DO_NOTHING, 200, responseHeaders, result);
477     }
478 }