EMMA Coverage Report (generated Wed Oct 26 13:47:17 EDT 2011)
[all classes][org.jclouds.concurrent]

COVERAGE SUMMARY FOR SOURCE FILE [FutureIterables.java]

nameclass, %method, %block, %line, %
FutureIterables.java0%   (0/3)0%   (0/18)0%   (0/456)0%   (0/71)

COVERAGE BREAKDOWN BY CLASS AND METHOD

nameclass, %method, %block, %line, %
     
class FutureIterables0%   (0/1)0%   (0/12)0%   (0/344)0%   (0/53)
<static initializer> 0%   (0/1)0%   (0/9)0%   (0/4)
FutureIterables (): void 0%   (0/1)0%   (0/3)0%   (0/1)
access$000 (Logger, String, int, int, int, long, Exception): void 0%   (0/1)0%   (0/9)0%   (0/1)
awaitCompletion (Map, ExecutorService, Long, Logger, String): Map 0%   (0/1)0%   (0/136)0%   (0/27)
logException (Logger, String, int, int, int, long, Exception): void 0%   (0/1)0%   (0/14)0%   (0/3)
message (String, int, int, int, long): String 0%   (0/1)0%   (0/36)0%   (0/1)
timeOut (long, Long): boolean 0%   (0/1)0%   (0/15)0%   (0/1)
transformParallel (Iterable, Function): Iterable 0%   (0/1)0%   (0/6)0%   (0/1)
transformParallel (Iterable, Function, ExecutorService, Long): Iterable 0%   (0/1)0%   (0/8)0%   (0/1)
transformParallel (Iterable, Function, ExecutorService, Long, Logger, String)... 0%   (0/1)0%   (0/10)0%   (0/1)
transformParallel (Iterable, Function, ExecutorService, Long, Logger, String,... 0%   (0/1)0%   (0/92)0%   (0/12)
unwrap (Iterable): Iterable 0%   (0/1)0%   (0/6)0%   (0/1)
     
class FutureIterables$10%   (0/1)0%   (0/3)0%   (0/93)0%   (0/11)
FutureIterables$1 (Map$Entry, AtomicInteger, AtomicInteger, Logger, String, i... 0%   (0/1)0%   (0/30)0%   (0/1)
run (): void 0%   (0/1)0%   (0/44)0%   (0/9)
toString (): String 0%   (0/1)0%   (0/19)0%   (0/1)
     
class FutureIterables$20%   (0/1)0%   (0/3)0%   (0/19)0%   (0/9)
FutureIterables$2 (): void 0%   (0/1)0%   (0/3)0%   (0/1)
apply (Future): Object 0%   (0/1)0%   (0/14)0%   (0/7)
toString (): String 0%   (0/1)0%   (0/2)0%   (0/1)

1/**
2 * Licensed to jclouds, Inc. (jclouds) under one or more
3 * contributor license agreements.  See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership.  jclouds licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License.  You may obtain a copy of the License at
9 *
10 *   http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied.  See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19package org.jclouds.concurrent;
20 
21import static com.google.common.collect.Maps.newHashMap;
22 
23import java.util.Map;
24import java.util.concurrent.CountDownLatch;
25import java.util.concurrent.ExecutionException;
26import java.util.concurrent.ExecutorService;
27import java.util.concurrent.Future;
28import java.util.concurrent.TimeUnit;
29import java.util.concurrent.TimeoutException;
30import java.util.concurrent.atomic.AtomicInteger;
31 
32import org.jclouds.javax.annotation.Nullable;
33import javax.annotation.Resource;
34import javax.inject.Named;
35 
36import org.jclouds.Constants;
37import org.jclouds.http.handlers.BackoffLimitedRetryHandler;
38import org.jclouds.logging.Logger;
39 
40import com.google.common.annotations.Beta;
41import com.google.common.base.Function;
42import com.google.common.base.Throwables;
43import com.google.common.collect.ImmutableMap;
44import com.google.common.collect.Iterables;
45import com.google.common.collect.Maps;
46import com.google.inject.Inject;
47 
48/**
49 * functions related to or replacing those in {@link com.google.common.collect.Iterables} dealing with Futures
50 * 
51 * @author Adrian Cole
52 */
53@Beta
54public class FutureIterables {
55   @Resource
56   private static Logger logger = Logger.CONSOLE;
57 
58   @Inject(optional = true)
59   @Named(Constants.PROPERTY_MAX_RETRIES)
60   private static int maxRetries = 5;
61 
62   @Inject(optional = true)
63   @Named(Constants.PROPERTY_RETRY_DELAY_START)
64   private static long delayStart = 50L;
65 
66   @Inject(optional = true)
67   private static BackoffLimitedRetryHandler retryHandler = BackoffLimitedRetryHandler.INSTANCE;
68 
69   public static <F, T> Iterable<T> transformParallel(final Iterable<F> fromIterable,
70            final Function<? super F, Future<T>> function) {
71      return transformParallel(fromIterable, function, org.jclouds.concurrent.MoreExecutors.sameThreadExecutor(), null);
72   }
73 
74   public static <F, T> Iterable<T> transformParallel(final Iterable<F> fromIterable,
75            final Function<? super F, Future<T>> function, ExecutorService exec, @Nullable Long maxTime) {
76      return transformParallel(fromIterable, function, exec, maxTime, logger, "transforming");
77   }
78 
79   public static <F, T> Iterable<T> transformParallel(final Iterable<F> fromIterable,
80            final Function<? super F, Future<T>> function, ExecutorService exec, @Nullable Long maxTime, Logger logger,
81            String logPrefix) {
82      return transformParallel(fromIterable, function, exec, maxTime, logger, logPrefix, retryHandler, maxRetries);
83   }
84 
85   public static <F, T> Iterable<T> transformParallel(Iterable<F> fromIterable,
86            Function<? super F, Future<T>> function, ExecutorService exec, @Nullable Long maxTime, Logger logger,
87            String logPrefix, BackoffLimitedRetryHandler retryHandler, int maxRetries) {
88      Map<F, Exception> exceptions = newHashMap();
89      Map<F, Future<T>> responses = newHashMap();
90      for (int i = 0; i < maxRetries; i++) {
91 
92         for (F from : fromIterable) {
93            responses.put(from, function.apply(from));
94         }
95         exceptions = awaitCompletion(responses, exec, maxTime, logger, logPrefix);
96         if (exceptions.size() > 0) {
97            fromIterable = exceptions.keySet();
98            retryHandler.imposeBackoffExponentialDelay(delayStart, 2, i + 1, maxRetries,
99                     String.format("error %s: %s: %s", logPrefix, fromIterable, exceptions));
100         } else {
101            break;
102         }
103      }
104      if (exceptions.size() > 0)
105         throw new RuntimeException(String.format("error %s: %s: %s", logPrefix, fromIterable, exceptions));
106 
107      return unwrap(responses.values());
108   }
109 
110   public static <T> Map<T, Exception> awaitCompletion(Map<T, ? extends Future<?>> responses, ExecutorService exec,
111            @Nullable Long maxTime, final Logger logger, final String logPrefix) {
112      if (responses.size() == 0)
113         return ImmutableMap.of();
114      final int total = responses.size();
115      final CountDownLatch doneSignal = new CountDownLatch(total);
116      final AtomicInteger complete = new AtomicInteger(0);
117      final AtomicInteger errors = new AtomicInteger(0);
118      final long start = System.currentTimeMillis();
119      final Map<T, Exception> errorMap = Maps.newHashMap();
120      for (final java.util.Map.Entry<T, ? extends Future<?>> future : responses.entrySet()) {
121         Futures.makeListenable(future.getValue(), exec).addListener(new Runnable() {
122 
123            @Override
124            public void run() {
125               try {
126                  future.getValue().get();
127                  complete.incrementAndGet();
128               } catch (Exception e) {
129                  errors.incrementAndGet();
130                  logException(logger, logPrefix, total, complete.get(), errors.get(), start, e);
131                  errorMap.put(future.getKey(), e);
132               }
133               doneSignal.countDown();
134            }
135            
136            @Override
137            public String toString() {
138               return "callGetOnFuture(" + future.getKey() + "," + future.getValue() + ")";
139            }
140         }, exec);
141      }
142      try {
143         if (maxTime != null)
144            doneSignal.await(maxTime, TimeUnit.MILLISECONDS);
145         else
146            doneSignal.await();
147         if (errors.get() > 0) {
148            String message = message(logPrefix, total, complete.get(), errors.get(), start);
149            RuntimeException exception = new RuntimeException(message);
150            logger.error(exception, message);
151         }
152         if (logger.isTraceEnabled()) {
153            String message = message(logPrefix, total, complete.get(), errors.get(), start);
154            logger.trace(message);
155         }
156      } catch (InterruptedException e) {
157         String message = message(logPrefix, total, complete.get(), errors.get(), start);
158         TimeoutException exception = new TimeoutException(message);
159         logger.error(exception, message);
160         Throwables.propagate(exception);
161      }
162      return errorMap;
163   }
164 
165   public static <T> Iterable<T> unwrap(Iterable<Future<T>> values) {
166      return Iterables.transform(values, new Function<Future<T>, T>() {
167         @Override
168         public T apply(Future<T> from) {
169            try {
170               return from.get();
171            } catch (InterruptedException e) {
172               Throwables.propagate(e);
173            } catch (ExecutionException e) {
174               Throwables.propagate(e);
175            }
176            return null;
177         }
178         
179         @Override
180         public String toString() {
181            return "callGetOnFuture()";
182         }
183      });
184   }
185 
186   private static void logException(Logger logger, String logPrefix, int total, int complete, int errors, long start,
187            Exception e) {
188      String message = message(logPrefix, total, complete, errors, start);
189      logger.error(e, message);
190   }
191 
192   private static String message(String prefix, int size, int complete, int errors, long start) {
193      return String.format("%s, completed: %d/%d, errors: %d, rate: %dms/op", prefix, complete, size, errors,
194               (long) ((System.currentTimeMillis() - start) / ((double) size)));
195   }
196 
197   protected static boolean timeOut(long start, Long maxTime) {
198      return maxTime != null ? System.currentTimeMillis() < start + maxTime : false;
199   }
200 
201}

[all classes][org.jclouds.concurrent]
EMMA 2.0.5312 (C) Vladimir Roubtsov