EMMA Coverage Report (generated Mon Oct 17 05:41:20 EDT 2011)
[all classes][org.jclouds.aws.s3.blobstore.strategy.internal]

COVERAGE SUMMARY FOR SOURCE FILE [ParallelMultipartUploadStrategy.java]

nameclass, %method, %block, %line, %
ParallelMultipartUploadStrategy.java0%   (0/4)0%   (0/14)0%   (0/909)0%   (0/111)

COVERAGE BREAKDOWN BY CLASS AND METHOD

nameclass, %method, %block, %line, %
     
class ParallelMultipartUploadStrategy0%   (0/1)0%   (0/3)0%   (0/137)0%   (0/22)
ParallelMultipartUploadStrategy (AWSS3AsyncBlobStore, PayloadSlicer, Executor... 0%   (0/1)0%   (0/33)0%   (0/9)
execute (String, Blob): ListenableFuture 0%   (0/1)0%   (0/13)0%   (0/1)
prepareUploadPart (String, String, String, Integer, Payload, long, long, Sort... 0%   (0/1)0%   (0/91)0%   (0/12)
     
class ParallelMultipartUploadStrategy$10%   (0/1)0%   (0/2)0%   (0/314)0%   (0/18)
ParallelMultipartUploadStrategy$1 (ParallelMultipartUploadStrategy, SortedMap... 0%   (0/1)0%   (0/54)0%   (0/1)
run (): void 0%   (0/1)0%   (0/260)0%   (0/17)
     
class ParallelMultipartUploadStrategy$20%   (0/1)0%   (0/2)0%   (0/422)0%   (0/58)
ParallelMultipartUploadStrategy$2 (ParallelMultipartUploadStrategy, Blob, Str... 0%   (0/1)0%   (0/12)0%   (0/1)
call (): String 0%   (0/1)0%   (0/410)0%   (0/57)
     
class ParallelMultipartUploadStrategy$Part0%   (0/1)0%   (0/7)0%   (0/36)0%   (0/14)
ParallelMultipartUploadStrategy$Part (ParallelMultipartUploadStrategy, int, l... 0%   (0/1)0%   (0/15)0%   (0/5)
getOffset (): long 0%   (0/1)0%   (0/3)0%   (0/1)
getPart (): int 0%   (0/1)0%   (0/3)0%   (0/1)
getSize (): long 0%   (0/1)0%   (0/3)0%   (0/1)
setOffset (long): void 0%   (0/1)0%   (0/4)0%   (0/2)
setPart (int): void 0%   (0/1)0%   (0/4)0%   (0/2)
setSize (long): void 0%   (0/1)0%   (0/4)0%   (0/2)

1/**
2 * Licensed to jclouds, Inc. (jclouds) under one or more
3 * contributor license agreements.  See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership.  jclouds licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License.  You may obtain a copy of the License at
9 *
10 *   http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied.  See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19package org.jclouds.aws.s3.blobstore.strategy.internal;
20 
21import static com.google.common.base.Preconditions.checkNotNull;
22 
23import java.util.Map;
24import java.util.Queue;
25import java.util.SortedMap;
26import java.util.concurrent.ArrayBlockingQueue;
27import java.util.concurrent.BlockingQueue;
28import java.util.concurrent.Callable;
29import java.util.concurrent.CancellationException;
30import java.util.concurrent.ConcurrentHashMap;
31import java.util.concurrent.ConcurrentLinkedQueue;
32import java.util.concurrent.ConcurrentSkipListMap;
33import java.util.concurrent.CountDownLatch;
34import java.util.concurrent.ExecutorService;
35import java.util.concurrent.TimeUnit;
36import java.util.concurrent.atomic.AtomicInteger;
37 
38import javax.annotation.Resource;
39import javax.inject.Named;
40 
41import org.jclouds.Constants;
42import org.jclouds.aws.s3.AWSS3AsyncClient;
43import org.jclouds.aws.s3.AWSS3Client;
44import org.jclouds.aws.s3.blobstore.AWSS3AsyncBlobStore;
45import org.jclouds.aws.s3.blobstore.strategy.AsyncMultipartUploadStrategy;
46import org.jclouds.blobstore.domain.Blob;
47import org.jclouds.blobstore.internal.BlobRuntimeException;
48import org.jclouds.blobstore.reference.BlobStoreConstants;
49import org.jclouds.concurrent.Futures;
50import org.jclouds.io.Payload;
51import org.jclouds.io.PayloadSlicer;
52import org.jclouds.logging.Logger;
53import org.jclouds.s3.domain.ObjectMetadataBuilder;
54import org.jclouds.util.Throwables2;
55 
56import com.google.common.annotations.VisibleForTesting;
57import com.google.common.collect.Maps;
58import com.google.common.util.concurrent.ListenableFuture;
59import com.google.inject.Inject;
60 
61public class ParallelMultipartUploadStrategy implements AsyncMultipartUploadStrategy {
62   @Resource
63   @Named(BlobStoreConstants.BLOBSTORE_LOGGER)
64   protected Logger logger = Logger.NULL;
65   
66   @VisibleForTesting
67   static final int DEFAULT_PARALLEL_DEGREE = 4;
68   @VisibleForTesting
69   static final int DEFAULT_MIN_RETRIES = 5;
70   @VisibleForTesting
71   static final int DEFAULT_MAX_PERCENT_RETRIES = 10;
72   
73   private final ExecutorService ioWorkerExecutor;
74  
75   @Inject(optional = true)
76   @Named("jclouds.mpu.parallel.degree")
77   @VisibleForTesting
78   int parallelDegree = DEFAULT_PARALLEL_DEGREE;
79 
80   @Inject(optional = true)
81   @Named("jclouds.mpu.parallel.retries.min")
82   @VisibleForTesting
83   int minRetries = DEFAULT_MIN_RETRIES;
84 
85   @Inject(optional = true)
86   @Named("jclouds.mpu.parallel.retries.maxpercent")
87   @VisibleForTesting
88   int maxPercentRetries = DEFAULT_MAX_PERCENT_RETRIES;
89 
90   /**
91    * maximum duration of an blob Request
92    */
93   @Inject(optional = true)
94   @Named(Constants.PROPERTY_REQUEST_TIMEOUT)
95   protected Long maxTime;
96   
97   protected final AWSS3AsyncBlobStore ablobstore;
98   protected final PayloadSlicer slicer;
99 
100   @Inject
101   public ParallelMultipartUploadStrategy(AWSS3AsyncBlobStore ablobstore, PayloadSlicer slicer,
102         @Named(Constants.PROPERTY_IO_WORKER_THREADS) ExecutorService ioWorkerExecutor) {
103      this.ablobstore = checkNotNull(ablobstore, "ablobstore");
104      this.slicer = checkNotNull(slicer, "slicer");
105      this.ioWorkerExecutor = checkNotNull(ioWorkerExecutor, "ioWorkerExecutor");
106   }
107   
108   protected void prepareUploadPart(final String container, final String key, 
109         final String uploadId, final Integer part, final Payload payload, 
110         final long offset, final long size, final SortedMap<Integer, String> etags, 
111         final BlockingQueue<Integer> activeParts, 
112         final Map<Integer, ListenableFuture<String>> futureParts, 
113         final AtomicInteger errors, final int maxRetries, final Map<Integer, Exception> errorMap, 
114         final Queue<Part> toRetry, final CountDownLatch latch) {
115      if (errors.get() > maxRetries) {
116         activeParts.remove(part); // remove part from the bounded-queue without blocking
117         latch.countDown();
118         return;
119      }
120      final AWSS3AsyncClient client = (AWSS3AsyncClient) ablobstore.getContext()
121         .getProviderSpecificContext().getAsyncApi();
122      Payload chunkedPart = slicer.slice(payload, offset, size);
123      logger.debug(String.format("async uploading part %s of %s to container %s with uploadId %s", part, key, container, uploadId));
124      final long start = System.currentTimeMillis();
125      final ListenableFuture<String> futureETag = client.uploadPart(container, key, part, uploadId, chunkedPart);
126      futureETag.addListener(new Runnable() {
127         @Override
128         public void run() {
129            try {
130               etags.put(part, futureETag.get());
131               logger.debug(String.format("async uploaded part %s of %s to container %s in %sms with uploadId %s", 
132                     part, key, container, (System.currentTimeMillis()-start), uploadId));
133            } catch (CancellationException e) {
134               errorMap.put(part, e);
135               String message = String.format("%s while uploading part %s - [%s,%s] to container %s with uploadId: %s running since %dms", 
136                     e.getMessage(), part, offset, size, container, uploadId, (System.currentTimeMillis()-start));
137               logger.debug(message);
138            } catch (Exception e) {
139               errorMap.put(part, e);
140               String message = String.format("%s while uploading part %s - [%s,%s] to container %s with uploadId: %s running since %dms", 
141                     e.getMessage(), part, offset, size, container, uploadId, (System.currentTimeMillis()-start));
142               logger.error(message, e);
143               if (errors.incrementAndGet() <= maxRetries)
144                  toRetry.add(new Part(part, offset, size));
145            } finally {
146               activeParts.remove(part); // remove part from the bounded-queue without blocking
147               futureParts.remove(part);
148               latch.countDown();
149            }
150         }
151      }, ioWorkerExecutor);
152      futureParts.put(part, futureETag);
153   }   
154   
155   @Override
156   public ListenableFuture<String> execute(final String container, final Blob blob) {
157      return Futures.makeListenable(
158            ioWorkerExecutor.submit(new Callable<String>() {
159               @Override
160               public String call() throws Exception {
161                  String key = blob.getMetadata().getName();
162                  Payload payload = blob.getPayload();
163                  MultipartUploadSlicingAlgorithm algorithm = new MultipartUploadSlicingAlgorithm();
164                  algorithm.calculateChunkSize(payload.getContentMetadata()
165                        .getContentLength());
166                  int parts = algorithm.getParts();
167                  long chunkSize = algorithm.getChunkSize();
168                  long remaining = algorithm.getRemaining();
169                  if (parts > 0) {
170                     AWSS3Client client = (AWSS3Client) ablobstore
171                           .getContext().getProviderSpecificContext().getApi();
172                     String uploadId = null;
173                     final Map<Integer, ListenableFuture<String>> futureParts = 
174                        new ConcurrentHashMap<Integer, ListenableFuture<String>>();
175                     final Map<Integer, Exception> errorMap = Maps.newHashMap();
176                     AtomicInteger errors = new AtomicInteger(0);
177                     int maxRetries = Math.max(minRetries, parts * maxPercentRetries / 100);
178                     int effectiveParts = remaining > 0 ? parts + 1 : parts;
179                     try {
180                        uploadId = client.initiateMultipartUpload(container,
181                                 ObjectMetadataBuilder.create().key(key).build()); // TODO md5
182                        logger.debug(String.format("initiated multipart upload of %s to container %s" + 
183                              " with uploadId %s consisting from %s part (possible max. retries: %d)", 
184                              key, container, uploadId, effectiveParts, maxRetries));
185                        // we need a bounded-blocking queue to control the amount of parallel jobs 
186                        ArrayBlockingQueue<Integer> activeParts = new ArrayBlockingQueue<Integer>(parallelDegree);
187                        Queue<Part> toRetry = new ConcurrentLinkedQueue<Part>();
188                        SortedMap<Integer, String> etags = new ConcurrentSkipListMap<Integer, String>();
189                        CountDownLatch latch = new CountDownLatch(effectiveParts);
190                        int part;
191                        while ((part = algorithm.getNextPart()) <= parts) {
192                           Integer partKey = new Integer(part);
193                           activeParts.put(partKey);
194                           prepareUploadPart(container, key, uploadId, partKey, payload, 
195                                 algorithm.getNextChunkOffset(), chunkSize, etags, 
196                                 activeParts, futureParts, errors, maxRetries, errorMap, toRetry, latch);
197                        }
198                        if (remaining > 0) {
199                           Integer partKey = new Integer(part);
200                           activeParts.put(partKey);
201                           prepareUploadPart(container, key, uploadId, partKey, payload, 
202                                 algorithm.getNextChunkOffset(), remaining, etags, 
203                                 activeParts, futureParts, errors, maxRetries, errorMap, toRetry, latch);
204                        }
205                        latch.await();
206                        // handling retries
207                        while (errors.get() <= maxRetries && toRetry.size() > 0) {
208                           int atOnce = Math.min(Math.min(toRetry.size(), errors.get()), parallelDegree);
209                           CountDownLatch retryLatch = new CountDownLatch(atOnce);
210                           for (int i = 0; i < atOnce; i++) {
211                              Part failedPart = toRetry.poll();
212                              Integer partKey = new Integer(failedPart.getPart());
213                              activeParts.put(partKey);
214                              prepareUploadPart(container, key, uploadId, partKey, payload, 
215                                    failedPart.getOffset(), failedPart.getSize(), etags, 
216                                    activeParts, futureParts, errors, maxRetries, errorMap, toRetry, retryLatch);
217                           }
218                           retryLatch.await();
219                        }
220                        if (errors.get() > maxRetries) {
221                           throw new BlobRuntimeException(String.format(
222                                 "Too many failed parts: %s while multipart upload of %s to container %s with uploadId %s", 
223                                 errors.get(), key, container, uploadId));
224                        }
225                        String eTag = client.completeMultipartUpload(container, key, uploadId, etags);
226                        logger.debug(String.format("multipart upload of %s to container %s with uploadId %s" +
227                                        " succeffully finished with %s retries", key, container, uploadId, errors.get()));
228                        return eTag;
229                     } catch (Exception ex) {
230                        RuntimeException rtex = Throwables2.getFirstThrowableOfType(ex, RuntimeException.class);
231                        if (rtex == null) {
232                           rtex = new RuntimeException(ex);
233                        }
234                        for (Map.Entry<Integer, ListenableFuture<String>> entry : futureParts.entrySet()) {
235                           entry.getValue().cancel(false);
236                        }
237                        if (uploadId != null) {
238                           client.abortMultipartUpload(container, key, uploadId);
239                        }
240                        throw rtex;
241                     }
242                  } else {
243                     ListenableFuture<String> futureETag = ablobstore.putBlob(container, blob);
244                     return maxTime != null ? 
245                           futureETag.get(maxTime,TimeUnit.SECONDS) : futureETag.get();
246                  }
247               }
248            }), ioWorkerExecutor);
249   }
250   
251   class Part {
252      private int part;
253      private long offset;
254      private long size;
255      
256      Part(int part, long offset, long size) {
257         this.part = part;
258         this.offset = offset;
259         this.size = size;
260      }
261 
262      public int getPart() {
263         return part;
264      }
265 
266      public void setPart(int part) {
267         this.part = part;
268      }
269 
270      public long getOffset() {
271         return offset;
272      }
273 
274      public void setOffset(long offset) {
275         this.offset = offset;
276      }
277 
278      public long getSize() {
279         return size;
280      }
281 
282      public void setSize(long size) {
283         this.size = size;
284      }     
285   }
286}

[all classes][org.jclouds.aws.s3.blobstore.strategy.internal]
EMMA 2.0.5312 (C) Vladimir Roubtsov