EMMA Coverage Report (generated Wed Jun 22 19:47:49 EDT 2011)
[all classes][org.jclouds.concurrent]

COVERAGE SUMMARY FOR SOURCE FILE [FutureIterables.java]

nameclass, %method, %block, %line, %
FutureIterables.java0%   (0/3)0%   (0/16)0%   (0/430)0%   (0/68)

COVERAGE BREAKDOWN BY CLASS AND METHOD

nameclass, %method, %block, %line, %
     
class FutureIterables0%   (0/1)0%   (0/12)0%   (0/339)0%   (0/52)
<static initializer> 0%   (0/1)0%   (0/7)0%   (0/3)
FutureIterables (): void 0%   (0/1)0%   (0/3)0%   (0/1)
access$000 (Logger, String, int, int, int, long, Exception): void 0%   (0/1)0%   (0/9)0%   (0/1)
awaitCompletion (Map, ExecutorService, Long, Logger, String): Map 0%   (0/1)0%   (0/136)0%   (0/27)
logException (Logger, String, int, int, int, long, Exception): void 0%   (0/1)0%   (0/14)0%   (0/3)
message (String, int, int, int, long): String 0%   (0/1)0%   (0/36)0%   (0/1)
timeOut (long, Long): boolean 0%   (0/1)0%   (0/15)0%   (0/1)
transformParallel (Iterable, Function): Iterable 0%   (0/1)0%   (0/6)0%   (0/1)
transformParallel (Iterable, Function, ExecutorService, Long): Iterable 0%   (0/1)0%   (0/8)0%   (0/1)
transformParallel (Iterable, Function, ExecutorService, Long, Logger, String)... 0%   (0/1)0%   (0/10)0%   (0/1)
transformParallel (Iterable, Function, ExecutorService, Long, Logger, String,... 0%   (0/1)0%   (0/89)0%   (0/12)
unwrap (Iterable): Iterable 0%   (0/1)0%   (0/6)0%   (0/1)
     
class FutureIterables$10%   (0/1)0%   (0/2)0%   (0/74)0%   (0/10)
FutureIterables$1 (Map$Entry, AtomicInteger, AtomicInteger, Logger, String, i... 0%   (0/1)0%   (0/30)0%   (0/1)
run (): void 0%   (0/1)0%   (0/44)0%   (0/9)
     
class FutureIterables$20%   (0/1)0%   (0/2)0%   (0/17)0%   (0/8)
FutureIterables$2 (): void 0%   (0/1)0%   (0/3)0%   (0/1)
apply (Future): Object 0%   (0/1)0%   (0/14)0%   (0/7)

1/**
2 *
3 * Copyright (C) 2011 Cloud Conscious, LLC. <info@cloudconscious.com>
4 *
5 * ====================================================================
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 * ====================================================================
18 */
19package org.jclouds.concurrent;
20 
21import static com.google.common.collect.Maps.newHashMap;
22 
23import java.util.Map;
24import java.util.concurrent.CountDownLatch;
25import java.util.concurrent.ExecutionException;
26import java.util.concurrent.ExecutorService;
27import java.util.concurrent.Future;
28import java.util.concurrent.TimeUnit;
29import java.util.concurrent.TimeoutException;
30import java.util.concurrent.atomic.AtomicInteger;
31 
32import javax.annotation.Nullable;
33import javax.annotation.Resource;
34import javax.inject.Named;
35 
36import org.jclouds.Constants;
37import org.jclouds.http.handlers.BackoffLimitedRetryHandler;
38import org.jclouds.logging.Logger;
39 
40import com.google.common.annotations.Beta;
41import com.google.common.base.Function;
42import com.google.common.base.Throwables;
43import com.google.common.collect.ImmutableMap;
44import com.google.common.collect.Iterables;
45import com.google.common.collect.Maps;
46import com.google.inject.Inject;
47 
48/**
49 * functions related to or replacing those in {@link com.google.common.collect.Iterables} dealing with Futures
50 * 
51 * @author Adrian Cole
52 */
53@Beta
54public class FutureIterables {
55   @Resource
56   private static Logger logger = Logger.CONSOLE;
57 
58   @Inject(optional = true)
59   @Named(Constants.PROPERTY_MAX_RETRIES)
60   private static int maxRetries = 5;
61 
62   @Inject(optional = true)
63   private static BackoffLimitedRetryHandler retryHandler = BackoffLimitedRetryHandler.INSTANCE;
64 
65   public static <F, T> Iterable<T> transformParallel(final Iterable<F> fromIterable,
66            final Function<? super F, Future<T>> function) {
67      return transformParallel(fromIterable, function, org.jclouds.concurrent.MoreExecutors.sameThreadExecutor(), null);
68   }
69 
70   public static <F, T> Iterable<T> transformParallel(final Iterable<F> fromIterable,
71            final Function<? super F, Future<T>> function, ExecutorService exec, @Nullable Long maxTime) {
72      return transformParallel(fromIterable, function, exec, maxTime, logger, "transforming");
73   }
74 
75   public static <F, T> Iterable<T> transformParallel(final Iterable<F> fromIterable,
76            final Function<? super F, Future<T>> function, ExecutorService exec, @Nullable Long maxTime, Logger logger,
77            String logPrefix) {
78      return transformParallel(fromIterable, function, exec, maxTime, logger, logPrefix, retryHandler, maxRetries);
79   }
80 
81   public static <F, T> Iterable<T> transformParallel(Iterable<F> fromIterable,
82            Function<? super F, Future<T>> function, ExecutorService exec, @Nullable Long maxTime, Logger logger,
83            String logPrefix, BackoffLimitedRetryHandler retryHandler, int maxRetries) {
84      Map<F, Exception> exceptions = newHashMap();
85      Map<F, Future<T>> responses = newHashMap();
86      for (int i = 0; i < maxRetries; i++) {
87 
88         for (F from : fromIterable) {
89            responses.put(from, function.apply(from));
90         }
91         exceptions = awaitCompletion(responses, exec, maxTime, logger, logPrefix);
92         if (exceptions.size() > 0) {
93            fromIterable = exceptions.keySet();
94            retryHandler.imposeBackoffExponentialDelay(i + 1, String.format("error %s: %s: %s", logPrefix,
95                     fromIterable, exceptions));
96         } else {
97            break;
98         }
99      }
100      if (exceptions.size() > 0)
101         throw new RuntimeException(String.format("error %s: %s: %s", logPrefix, fromIterable, exceptions));
102 
103      return unwrap(responses.values());
104   }
105 
106   public static <T> Map<T, Exception> awaitCompletion(Map<T, ? extends Future<?>> responses, ExecutorService exec,
107            @Nullable Long maxTime, final Logger logger, final String logPrefix) {
108      if (responses.size() == 0)
109         return ImmutableMap.of();
110      final int total = responses.size();
111      final CountDownLatch doneSignal = new CountDownLatch(total);
112      final AtomicInteger complete = new AtomicInteger(0);
113      final AtomicInteger errors = new AtomicInteger(0);
114      final long start = System.currentTimeMillis();
115      final Map<T, Exception> errorMap = Maps.newHashMap();
116      for (final java.util.Map.Entry<T, ? extends Future<?>> future : responses.entrySet()) {
117         Futures.makeListenable(future.getValue(), exec).addListener(new Runnable() {
118            public void run() {
119               try {
120                  future.getValue().get();
121                  complete.incrementAndGet();
122               } catch (Exception e) {
123                  errors.incrementAndGet();
124                  logException(logger, logPrefix, total, complete.get(), errors.get(), start, e);
125                  errorMap.put(future.getKey(), e);
126               }
127               doneSignal.countDown();
128            }
129         }, exec);
130      }
131      try {
132         if (maxTime != null)
133            doneSignal.await(maxTime, TimeUnit.MILLISECONDS);
134         else
135            doneSignal.await();
136         if (errors.get() > 0) {
137            String message = message(logPrefix, total, complete.get(), errors.get(), start);
138            RuntimeException exception = new RuntimeException(message);
139            logger.error(exception, message);
140         }
141         if (logger.isTraceEnabled()) {
142            String message = message(logPrefix, total, complete.get(), errors.get(), start);
143            logger.trace(message);
144         }
145      } catch (InterruptedException e) {
146         String message = message(logPrefix, total, complete.get(), errors.get(), start);
147         TimeoutException exception = new TimeoutException(message);
148         logger.error(exception, message);
149         Throwables.propagate(exception);
150      }
151      return errorMap;
152   }
153 
154   public static <T> Iterable<T> unwrap(Iterable<Future<T>> values) {
155      return Iterables.transform(values, new Function<Future<T>, T>() {
156         @Override
157         public T apply(Future<T> from) {
158            try {
159               return from.get();
160            } catch (InterruptedException e) {
161               Throwables.propagate(e);
162            } catch (ExecutionException e) {
163               Throwables.propagate(e);
164            }
165            return null;
166         }
167      });
168   }
169 
170   private static void logException(Logger logger, String logPrefix, int total, int complete, int errors, long start,
171            Exception e) {
172      String message = message(logPrefix, total, complete, errors, start);
173      logger.error(e, message);
174   }
175 
176   private static String message(String prefix, int size, int complete, int errors, long start) {
177      return String.format("%s, completed: %d/%d, errors: %d, rate: %dms/op", prefix, complete, size, errors,
178               (long) ((System.currentTimeMillis() - start) / ((double) size)));
179   }
180 
181   protected static boolean timeOut(long start, Long maxTime) {
182      return maxTime != null ? System.currentTimeMillis() < start + maxTime : false;
183   }
184 
185}

[all classes][org.jclouds.concurrent]
EMMA 2.0.5312 (C) Vladimir Roubtsov