1 | /** |
2 | * Licensed to jclouds, Inc. (jclouds) under one or more |
3 | * contributor license agreements. See the NOTICE file |
4 | * distributed with this work for additional information |
5 | * regarding copyright ownership. jclouds licenses this file |
6 | * to you under the Apache License, Version 2.0 (the |
7 | * "License"); you may not use this file except in compliance |
8 | * with the License. You may obtain a copy of the License at |
9 | * |
10 | * http://www.apache.org/licenses/LICENSE-2.0 |
11 | * |
12 | * Unless required by applicable law or agreed to in writing, |
13 | * software distributed under the License is distributed on an |
14 | * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
15 | * KIND, either express or implied. See the License for the |
16 | * specific language governing permissions and limitations |
17 | * under the License. |
18 | */ |
19 | package org.jclouds.concurrent; |
20 | |
21 | import static com.google.common.collect.Maps.newHashMap; |
22 | |
23 | import java.util.Map; |
24 | import java.util.concurrent.CountDownLatch; |
25 | import java.util.concurrent.ExecutionException; |
26 | import java.util.concurrent.ExecutorService; |
27 | import java.util.concurrent.Future; |
28 | import java.util.concurrent.TimeUnit; |
29 | import java.util.concurrent.TimeoutException; |
30 | import java.util.concurrent.atomic.AtomicInteger; |
31 | |
32 | import org.jclouds.javax.annotation.Nullable; |
33 | import javax.annotation.Resource; |
34 | import javax.inject.Named; |
35 | |
36 | import org.jclouds.Constants; |
37 | import org.jclouds.http.handlers.BackoffLimitedRetryHandler; |
38 | import org.jclouds.logging.Logger; |
39 | |
40 | import com.google.common.annotations.Beta; |
41 | import com.google.common.base.Function; |
42 | import com.google.common.base.Throwables; |
43 | import com.google.common.collect.ImmutableMap; |
44 | import com.google.common.collect.Iterables; |
45 | import com.google.common.collect.Maps; |
46 | import com.google.inject.Inject; |
47 | |
48 | /** |
49 | * functions related to or replacing those in {@link com.google.common.collect.Iterables} dealing with Futures |
50 | * |
51 | * @author Adrian Cole |
52 | */ |
53 | @Beta |
54 | public class FutureIterables { |
55 | @Resource |
56 | private static Logger logger = Logger.CONSOLE; |
57 | |
58 | @Inject(optional = true) |
59 | @Named(Constants.PROPERTY_MAX_RETRIES) |
60 | private static int maxRetries = 5; |
61 | |
62 | @Inject(optional = true) |
63 | @Named(Constants.PROPERTY_RETRY_DELAY_START) |
64 | private static long delayStart = 50L; |
65 | |
66 | @Inject(optional = true) |
67 | private static BackoffLimitedRetryHandler retryHandler = BackoffLimitedRetryHandler.INSTANCE; |
68 | |
69 | public static <F, T> Iterable<T> transformParallel(final Iterable<F> fromIterable, |
70 | final Function<? super F, Future<T>> function) { |
71 | return transformParallel(fromIterable, function, org.jclouds.concurrent.MoreExecutors.sameThreadExecutor(), null); |
72 | } |
73 | |
74 | public static <F, T> Iterable<T> transformParallel(final Iterable<F> fromIterable, |
75 | final Function<? super F, Future<T>> function, ExecutorService exec, @Nullable Long maxTime) { |
76 | return transformParallel(fromIterable, function, exec, maxTime, logger, "transforming"); |
77 | } |
78 | |
79 | public static <F, T> Iterable<T> transformParallel(final Iterable<F> fromIterable, |
80 | final Function<? super F, Future<T>> function, ExecutorService exec, @Nullable Long maxTime, Logger logger, |
81 | String logPrefix) { |
82 | return transformParallel(fromIterable, function, exec, maxTime, logger, logPrefix, retryHandler, maxRetries); |
83 | } |
84 | |
85 | public static <F, T> Iterable<T> transformParallel(Iterable<F> fromIterable, |
86 | Function<? super F, Future<T>> function, ExecutorService exec, @Nullable Long maxTime, Logger logger, |
87 | String logPrefix, BackoffLimitedRetryHandler retryHandler, int maxRetries) { |
88 | Map<F, Exception> exceptions = newHashMap(); |
89 | Map<F, Future<T>> responses = newHashMap(); |
90 | for (int i = 0; i < maxRetries; i++) { |
91 | |
92 | for (F from : fromIterable) { |
93 | responses.put(from, function.apply(from)); |
94 | } |
95 | exceptions = awaitCompletion(responses, exec, maxTime, logger, logPrefix); |
96 | if (exceptions.size() > 0) { |
97 | fromIterable = exceptions.keySet(); |
98 | retryHandler.imposeBackoffExponentialDelay(delayStart, 2, i + 1, maxRetries, |
99 | String.format("error %s: %s: %s", logPrefix, fromIterable, exceptions)); |
100 | } else { |
101 | break; |
102 | } |
103 | } |
104 | if (exceptions.size() > 0) |
105 | throw new RuntimeException(String.format("error %s: %s: %s", logPrefix, fromIterable, exceptions)); |
106 | |
107 | return unwrap(responses.values()); |
108 | } |
109 | |
110 | public static <T> Map<T, Exception> awaitCompletion(Map<T, ? extends Future<?>> responses, ExecutorService exec, |
111 | @Nullable Long maxTime, final Logger logger, final String logPrefix) { |
112 | if (responses.size() == 0) |
113 | return ImmutableMap.of(); |
114 | final int total = responses.size(); |
115 | final CountDownLatch doneSignal = new CountDownLatch(total); |
116 | final AtomicInteger complete = new AtomicInteger(0); |
117 | final AtomicInteger errors = new AtomicInteger(0); |
118 | final long start = System.currentTimeMillis(); |
119 | final Map<T, Exception> errorMap = Maps.newHashMap(); |
120 | for (final java.util.Map.Entry<T, ? extends Future<?>> future : responses.entrySet()) { |
121 | Futures.makeListenable(future.getValue(), exec).addListener(new Runnable() { |
122 | |
123 | @Override |
124 | public void run() { |
125 | try { |
126 | future.getValue().get(); |
127 | complete.incrementAndGet(); |
128 | } catch (Exception e) { |
129 | errors.incrementAndGet(); |
130 | logException(logger, logPrefix, total, complete.get(), errors.get(), start, e); |
131 | errorMap.put(future.getKey(), e); |
132 | } |
133 | doneSignal.countDown(); |
134 | } |
135 | |
136 | @Override |
137 | public String toString() { |
138 | return "callGetOnFuture(" + future.getKey() + "," + future.getValue() + ")"; |
139 | } |
140 | }, exec); |
141 | } |
142 | try { |
143 | if (maxTime != null) |
144 | doneSignal.await(maxTime, TimeUnit.MILLISECONDS); |
145 | else |
146 | doneSignal.await(); |
147 | if (errors.get() > 0) { |
148 | String message = message(logPrefix, total, complete.get(), errors.get(), start); |
149 | RuntimeException exception = new RuntimeException(message); |
150 | logger.error(exception, message); |
151 | } |
152 | if (logger.isTraceEnabled()) { |
153 | String message = message(logPrefix, total, complete.get(), errors.get(), start); |
154 | logger.trace(message); |
155 | } |
156 | } catch (InterruptedException e) { |
157 | String message = message(logPrefix, total, complete.get(), errors.get(), start); |
158 | TimeoutException exception = new TimeoutException(message); |
159 | logger.error(exception, message); |
160 | Throwables.propagate(exception); |
161 | } |
162 | return errorMap; |
163 | } |
164 | |
165 | public static <T> Iterable<T> unwrap(Iterable<Future<T>> values) { |
166 | return Iterables.transform(values, new Function<Future<T>, T>() { |
167 | @Override |
168 | public T apply(Future<T> from) { |
169 | try { |
170 | return from.get(); |
171 | } catch (InterruptedException e) { |
172 | Throwables.propagate(e); |
173 | } catch (ExecutionException e) { |
174 | Throwables.propagate(e); |
175 | } |
176 | return null; |
177 | } |
178 | |
179 | @Override |
180 | public String toString() { |
181 | return "callGetOnFuture()"; |
182 | } |
183 | }); |
184 | } |
185 | |
186 | private static void logException(Logger logger, String logPrefix, int total, int complete, int errors, long start, |
187 | Exception e) { |
188 | String message = message(logPrefix, total, complete, errors, start); |
189 | logger.error(e, message); |
190 | } |
191 | |
192 | private static String message(String prefix, int size, int complete, int errors, long start) { |
193 | return String.format("%s, completed: %d/%d, errors: %d, rate: %dms/op", prefix, complete, size, errors, |
194 | (long) ((System.currentTimeMillis() - start) / ((double) size))); |
195 | } |
196 | |
197 | protected static boolean timeOut(long start, Long maxTime) { |
198 | return maxTime != null ? System.currentTimeMillis() < start + maxTime : false; |
199 | } |
200 | |
201 | } |