| 1 | /** | 
| 2 |  * Licensed to jclouds, Inc. (jclouds) under one or more | 
| 3 |  * contributor license agreements.  See the NOTICE file | 
| 4 |  * distributed with this work for additional information | 
| 5 |  * regarding copyright ownership.  jclouds licenses this file | 
| 6 |  * to you under the Apache License, Version 2.0 (the | 
| 7 |  * "License"); you may not use this file except in compliance | 
| 8 |  * with the License.  You may obtain a copy of the License at | 
| 9 |  * | 
| 10 |  *   http://www.apache.org/licenses/LICENSE-2.0 | 
| 11 |  * | 
| 12 |  * Unless required by applicable law or agreed to in writing, | 
| 13 |  * software distributed under the License is distributed on an | 
| 14 |  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | 
| 15 |  * KIND, either express or implied.  See the License for the | 
| 16 |  * specific language governing permissions and limitations | 
| 17 |  * under the License. | 
| 18 |  */ | 
| 19 | package org.jclouds.concurrent; | 
| 20 |   | 
| 21 | import static com.google.common.collect.Maps.newHashMap; | 
| 22 |   | 
| 23 | import java.util.Map; | 
| 24 | import java.util.concurrent.CountDownLatch; | 
| 25 | import java.util.concurrent.ExecutionException; | 
| 26 | import java.util.concurrent.ExecutorService; | 
| 27 | import java.util.concurrent.Future; | 
| 28 | import java.util.concurrent.TimeUnit; | 
| 29 | import java.util.concurrent.TimeoutException; | 
| 30 | import java.util.concurrent.atomic.AtomicInteger; | 
| 31 |   | 
| 32 | import org.jclouds.javax.annotation.Nullable; | 
| 33 | import javax.annotation.Resource; | 
| 34 | import javax.inject.Named; | 
| 35 |   | 
| 36 | import org.jclouds.Constants; | 
| 37 | import org.jclouds.http.handlers.BackoffLimitedRetryHandler; | 
| 38 | import org.jclouds.logging.Logger; | 
| 39 |   | 
| 40 | import com.google.common.annotations.Beta; | 
| 41 | import com.google.common.base.Function; | 
| 42 | import com.google.common.base.Throwables; | 
| 43 | import com.google.common.collect.ImmutableMap; | 
| 44 | import com.google.common.collect.Iterables; | 
| 45 | import com.google.common.collect.Maps; | 
| 46 | import com.google.inject.Inject; | 
| 47 |   | 
| 48 | /** | 
| 49 |  * functions related to or replacing those in {@link com.google.common.collect.Iterables} dealing with Futures | 
| 50 |  *  | 
| 51 |  * @author Adrian Cole | 
| 52 |  */ | 
| 53 | @Beta | 
| 54 | public class FutureIterables { | 
| 55 |    @Resource | 
| 56 |    private static Logger logger = Logger.CONSOLE; | 
| 57 |   | 
| 58 |    @Inject(optional = true) | 
| 59 |    @Named(Constants.PROPERTY_MAX_RETRIES) | 
| 60 |    private static int maxRetries = 5; | 
| 61 |   | 
| 62 |    @Inject(optional = true) | 
| 63 |    @Named(Constants.PROPERTY_RETRY_DELAY_START) | 
| 64 |    private static long delayStart = 50L; | 
| 65 |   | 
| 66 |    @Inject(optional = true) | 
| 67 |    private static BackoffLimitedRetryHandler retryHandler = BackoffLimitedRetryHandler.INSTANCE; | 
| 68 |   | 
| 69 |    public static <F, T> Iterable<T> transformParallel(final Iterable<F> fromIterable, | 
| 70 |             final Function<? super F, Future<T>> function) { | 
| 71 |       return transformParallel(fromIterable, function, org.jclouds.concurrent.MoreExecutors.sameThreadExecutor(), null); | 
| 72 |    } | 
| 73 |   | 
| 74 |    public static <F, T> Iterable<T> transformParallel(final Iterable<F> fromIterable, | 
| 75 |             final Function<? super F, Future<T>> function, ExecutorService exec, @Nullable Long maxTime) { | 
| 76 |       return transformParallel(fromIterable, function, exec, maxTime, logger, "transforming"); | 
| 77 |    } | 
| 78 |   | 
| 79 |    public static <F, T> Iterable<T> transformParallel(final Iterable<F> fromIterable, | 
| 80 |             final Function<? super F, Future<T>> function, ExecutorService exec, @Nullable Long maxTime, Logger logger, | 
| 81 |             String logPrefix) { | 
| 82 |       return transformParallel(fromIterable, function, exec, maxTime, logger, logPrefix, retryHandler, maxRetries); | 
| 83 |    } | 
| 84 |   | 
| 85 |    public static <F, T> Iterable<T> transformParallel(Iterable<F> fromIterable, | 
| 86 |             Function<? super F, Future<T>> function, ExecutorService exec, @Nullable Long maxTime, Logger logger, | 
| 87 |             String logPrefix, BackoffLimitedRetryHandler retryHandler, int maxRetries) { | 
| 88 |       Map<F, Exception> exceptions = newHashMap(); | 
| 89 |       Map<F, Future<T>> responses = newHashMap(); | 
| 90 |       for (int i = 0; i < maxRetries; i++) { | 
| 91 |   | 
| 92 |          for (F from : fromIterable) { | 
| 93 |             responses.put(from, function.apply(from)); | 
| 94 |          } | 
| 95 |          exceptions = awaitCompletion(responses, exec, maxTime, logger, logPrefix); | 
| 96 |          if (exceptions.size() > 0) { | 
| 97 |             fromIterable = exceptions.keySet(); | 
| 98 |             retryHandler.imposeBackoffExponentialDelay(delayStart, 2, i + 1, maxRetries, | 
| 99 |                      String.format("error %s: %s: %s", logPrefix, fromIterable, exceptions)); | 
| 100 |          } else { | 
| 101 |             break; | 
| 102 |          } | 
| 103 |       } | 
| 104 |       if (exceptions.size() > 0) | 
| 105 |          throw new RuntimeException(String.format("error %s: %s: %s", logPrefix, fromIterable, exceptions)); | 
| 106 |   | 
| 107 |       return unwrap(responses.values()); | 
| 108 |    } | 
| 109 |   | 
| 110 |    public static <T> Map<T, Exception> awaitCompletion(Map<T, ? extends Future<?>> responses, ExecutorService exec, | 
| 111 |             @Nullable Long maxTime, final Logger logger, final String logPrefix) { | 
| 112 |       if (responses.size() == 0) | 
| 113 |          return ImmutableMap.of(); | 
| 114 |       final int total = responses.size(); | 
| 115 |       final CountDownLatch doneSignal = new CountDownLatch(total); | 
| 116 |       final AtomicInteger complete = new AtomicInteger(0); | 
| 117 |       final AtomicInteger errors = new AtomicInteger(0); | 
| 118 |       final long start = System.currentTimeMillis(); | 
| 119 |       final Map<T, Exception> errorMap = Maps.newHashMap(); | 
| 120 |       for (final java.util.Map.Entry<T, ? extends Future<?>> future : responses.entrySet()) { | 
| 121 |          Futures.makeListenable(future.getValue(), exec).addListener(new Runnable() { | 
| 122 |   | 
| 123 |             @Override | 
| 124 |             public void run() { | 
| 125 |                try { | 
| 126 |                   future.getValue().get(); | 
| 127 |                   complete.incrementAndGet(); | 
| 128 |                } catch (Exception e) { | 
| 129 |                   errors.incrementAndGet(); | 
| 130 |                   logException(logger, logPrefix, total, complete.get(), errors.get(), start, e); | 
| 131 |                   errorMap.put(future.getKey(), e); | 
| 132 |                } | 
| 133 |                doneSignal.countDown(); | 
| 134 |             } | 
| 135 |              | 
| 136 |             @Override | 
| 137 |             public String toString() { | 
| 138 |                return "callGetOnFuture(" + future.getKey() + "," + future.getValue() + ")"; | 
| 139 |             } | 
| 140 |          }, exec); | 
| 141 |       } | 
| 142 |       try { | 
| 143 |          if (maxTime != null) | 
| 144 |             doneSignal.await(maxTime, TimeUnit.MILLISECONDS); | 
| 145 |          else | 
| 146 |             doneSignal.await(); | 
| 147 |          if (errors.get() > 0) { | 
| 148 |             String message = message(logPrefix, total, complete.get(), errors.get(), start); | 
| 149 |             RuntimeException exception = new RuntimeException(message); | 
| 150 |             logger.error(exception, message); | 
| 151 |          } | 
| 152 |          if (logger.isTraceEnabled()) { | 
| 153 |             String message = message(logPrefix, total, complete.get(), errors.get(), start); | 
| 154 |             logger.trace(message); | 
| 155 |          } | 
| 156 |       } catch (InterruptedException e) { | 
| 157 |          String message = message(logPrefix, total, complete.get(), errors.get(), start); | 
| 158 |          TimeoutException exception = new TimeoutException(message); | 
| 159 |          logger.error(exception, message); | 
| 160 |          Throwables.propagate(exception); | 
| 161 |       } | 
| 162 |       return errorMap; | 
| 163 |    } | 
| 164 |   | 
| 165 |    public static <T> Iterable<T> unwrap(Iterable<Future<T>> values) { | 
| 166 |       return Iterables.transform(values, new Function<Future<T>, T>() { | 
| 167 |          @Override | 
| 168 |          public T apply(Future<T> from) { | 
| 169 |             try { | 
| 170 |                return from.get(); | 
| 171 |             } catch (InterruptedException e) { | 
| 172 |                Throwables.propagate(e); | 
| 173 |             } catch (ExecutionException e) { | 
| 174 |                Throwables.propagate(e); | 
| 175 |             } | 
| 176 |             return null; | 
| 177 |          } | 
| 178 |           | 
| 179 |          @Override | 
| 180 |          public String toString() { | 
| 181 |             return "callGetOnFuture()"; | 
| 182 |          } | 
| 183 |       }); | 
| 184 |    } | 
| 185 |   | 
| 186 |    private static void logException(Logger logger, String logPrefix, int total, int complete, int errors, long start, | 
| 187 |             Exception e) { | 
| 188 |       String message = message(logPrefix, total, complete, errors, start); | 
| 189 |       logger.error(e, message); | 
| 190 |    } | 
| 191 |   | 
| 192 |    private static String message(String prefix, int size, int complete, int errors, long start) { | 
| 193 |       return String.format("%s, completed: %d/%d, errors: %d, rate: %dms/op", prefix, complete, size, errors, | 
| 194 |                (long) ((System.currentTimeMillis() - start) / ((double) size))); | 
| 195 |    } | 
| 196 |   | 
| 197 |    protected static boolean timeOut(long start, Long maxTime) { | 
| 198 |       return maxTime != null ? System.currentTimeMillis() < start + maxTime : false; | 
| 199 |    } | 
| 200 |   | 
| 201 | } |