1 | /** |
2 | * Licensed to jclouds, Inc. (jclouds) under one or more |
3 | * contributor license agreements. See the NOTICE file |
4 | * distributed with this work for additional information |
5 | * regarding copyright ownership. jclouds licenses this file |
6 | * to you under the Apache License, Version 2.0 (the |
7 | * "License"); you may not use this file except in compliance |
8 | * with the License. You may obtain a copy of the License at |
9 | * |
10 | * http://www.apache.org/licenses/LICENSE-2.0 |
11 | * |
12 | * Unless required by applicable law or agreed to in writing, |
13 | * software distributed under the License is distributed on an |
14 | * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
15 | * KIND, either express or implied. See the License for the |
16 | * specific language governing permissions and limitations |
17 | * under the License. |
18 | */ |
19 | package org.jclouds.concurrent.config; |
20 | |
21 | import static com.google.common.base.Preconditions.checkNotNull; |
22 | import static org.jclouds.concurrent.DynamicExecutors.newScalingThreadPool; |
23 | |
24 | import java.io.Closeable; |
25 | import java.io.IOException; |
26 | import java.util.Collection; |
27 | import java.util.List; |
28 | import java.util.concurrent.Callable; |
29 | import java.util.concurrent.ExecutionException; |
30 | import java.util.concurrent.ExecutorService; |
31 | import java.util.concurrent.Executors; |
32 | import java.util.concurrent.Future; |
33 | import java.util.concurrent.TimeUnit; |
34 | import java.util.concurrent.TimeoutException; |
35 | |
36 | import javax.annotation.Resource; |
37 | import javax.inject.Inject; |
38 | import javax.inject.Named; |
39 | import javax.inject.Singleton; |
40 | |
41 | import org.jclouds.Constants; |
42 | import org.jclouds.concurrent.MoreExecutors; |
43 | import org.jclouds.concurrent.SingleThreaded; |
44 | import org.jclouds.lifecycle.Closer; |
45 | import org.jclouds.logging.Logger; |
46 | |
47 | import com.google.common.annotations.VisibleForTesting; |
48 | import com.google.common.util.concurrent.ThreadFactoryBuilder; |
49 | import com.google.inject.AbstractModule; |
50 | import com.google.inject.Provides; |
51 | |
52 | /** |
53 | * Configures {@link ExecutorService}. |
54 | * |
55 | * Note that this uses threads. |
56 | * |
57 | * <p> |
58 | * This extends the underlying Future to expose a description (the task's toString) and the submission context (stack trace). |
59 | * The submission stack trace is appended to relevant stack traces on exceptions that are returned, |
60 | * so the user can see the logical chain of execution (in the executor, and where it was passed to the executor). |
61 | * |
62 | * @author Adrian Cole |
63 | */ |
64 | @ConfiguresExecutorService |
65 | public class ExecutorServiceModule extends AbstractModule { |
66 | |
67 | @VisibleForTesting |
68 | static final class ShutdownExecutorOnClose implements Closeable { |
69 | @Resource |
70 | protected Logger logger = Logger.NULL; |
71 | |
72 | private final ExecutorService service; |
73 | |
74 | private ShutdownExecutorOnClose(ExecutorService service) { |
75 | this.service = service; |
76 | } |
77 | |
78 | @Override |
79 | public void close() throws IOException { |
80 | List<Runnable> runnables = service.shutdownNow(); |
81 | if (runnables.size() > 0) |
82 | logger.warn("when shutting down executor %s, runnables outstanding: %s", service, runnables); |
83 | } |
84 | } |
85 | |
86 | @VisibleForTesting |
87 | final ExecutorService userExecutorFromConstructor; |
88 | @VisibleForTesting |
89 | final ExecutorService ioExecutorFromConstructor; |
90 | |
91 | @Inject |
92 | public ExecutorServiceModule(@Named(Constants.PROPERTY_USER_THREADS) ExecutorService userThreads, |
93 | @Named(Constants.PROPERTY_IO_WORKER_THREADS) ExecutorService ioThreads) { |
94 | this.userExecutorFromConstructor = addToStringOnSubmit(checkNotGuavaSameThreadExecutor(userThreads)); |
95 | this.ioExecutorFromConstructor = addToStringOnSubmit(checkNotGuavaSameThreadExecutor(ioThreads)); |
96 | } |
97 | |
98 | static ExecutorService addToStringOnSubmit(ExecutorService executor) { |
99 | if (executor != null) { |
100 | return new DescribingExecutorService(executor); |
101 | } |
102 | return executor; |
103 | } |
104 | |
105 | static ExecutorService checkNotGuavaSameThreadExecutor(ExecutorService executor) { |
106 | // we detect behavior based on the class |
107 | if (executor != null && !(executor.getClass().isAnnotationPresent(SingleThreaded.class)) |
108 | && executor.getClass().getSimpleName().indexOf("SameThread") != -1) { |
109 | Logger.CONSOLE.warn( |
110 | "please switch from %s to %s or annotate your same threaded executor with @SingleThreaded", executor |
111 | .getClass().getName(), MoreExecutors.SameThreadExecutorService.class.getName()); |
112 | return MoreExecutors.sameThreadExecutor(); |
113 | } |
114 | return executor; |
115 | } |
116 | |
117 | public ExecutorServiceModule() { |
118 | this(null, null); |
119 | } |
120 | |
121 | @Override |
122 | protected void configure() { |
123 | } |
124 | |
125 | static class DescribingExecutorService implements ExecutorService { |
126 | |
127 | private final ExecutorService delegate; |
128 | |
129 | public DescribingExecutorService(ExecutorService delegate) { |
130 | this.delegate = checkNotNull(delegate, "delegate"); |
131 | } |
132 | |
133 | @Override |
134 | public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { |
135 | return delegate.awaitTermination(timeout, unit); |
136 | } |
137 | |
138 | @Override |
139 | public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException { |
140 | return delegate.invokeAll(tasks); |
141 | } |
142 | |
143 | @Override |
144 | public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) |
145 | throws InterruptedException { |
146 | return delegate.invokeAll(tasks, timeout, unit); |
147 | } |
148 | |
149 | @Override |
150 | public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException { |
151 | return delegate.invokeAny(tasks); |
152 | } |
153 | |
154 | @Override |
155 | public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) |
156 | throws InterruptedException, ExecutionException, TimeoutException { |
157 | return delegate.invokeAny(tasks, timeout, unit); |
158 | } |
159 | |
160 | @Override |
161 | public boolean isShutdown() { |
162 | return delegate.isShutdown(); |
163 | } |
164 | |
165 | @Override |
166 | public boolean isTerminated() { |
167 | return delegate.isTerminated(); |
168 | } |
169 | |
170 | @Override |
171 | public void shutdown() { |
172 | delegate.shutdown(); |
173 | } |
174 | |
175 | @Override |
176 | public List<Runnable> shutdownNow() { |
177 | return delegate.shutdownNow(); |
178 | } |
179 | |
180 | @Override |
181 | public <T> Future<T> submit(Callable<T> task) { |
182 | return new DescribedFuture<T>(delegate.submit(task), task.toString(), getStackTraceHere()); |
183 | } |
184 | |
185 | @SuppressWarnings({ "unchecked", "rawtypes" }) |
186 | @Override |
187 | public Future<?> submit(Runnable task) { |
188 | return new DescribedFuture(delegate.submit(task), task.toString(), getStackTraceHere()); |
189 | } |
190 | |
191 | @Override |
192 | public <T> Future<T> submit(Runnable task, T result) { |
193 | return new DescribedFuture<T>(delegate.submit(task, result), task.toString(), getStackTraceHere()); |
194 | } |
195 | |
196 | @Override |
197 | public void execute(Runnable arg0) { |
198 | delegate.execute(arg0); |
199 | } |
200 | |
201 | @Override |
202 | public boolean equals(Object obj) { |
203 | return delegate.equals(obj); |
204 | } |
205 | |
206 | @Override |
207 | public int hashCode() { |
208 | return delegate.hashCode(); |
209 | } |
210 | |
211 | @Override |
212 | public String toString() { |
213 | return delegate.toString(); |
214 | } |
215 | |
216 | } |
217 | |
218 | static class DescribedFuture<T> implements Future<T> { |
219 | private final Future<T> delegate; |
220 | private final String description; |
221 | private StackTraceElement[] submissionTrace; |
222 | |
223 | public DescribedFuture(Future<T> delegate, String description, StackTraceElement[] submissionTrace) { |
224 | this.delegate = delegate; |
225 | this.description = description; |
226 | this.submissionTrace = submissionTrace; |
227 | } |
228 | |
229 | @Override |
230 | public boolean cancel(boolean arg0) { |
231 | return delegate.cancel(arg0); |
232 | } |
233 | |
234 | @Override |
235 | public T get() throws InterruptedException, ExecutionException { |
236 | try { |
237 | return delegate.get(); |
238 | } catch (ExecutionException e) { |
239 | throw ensureCauseHasSubmissionTrace(e); |
240 | } catch (InterruptedException e) { |
241 | throw ensureCauseHasSubmissionTrace(e); |
242 | } |
243 | } |
244 | |
245 | @Override |
246 | public T get(long arg0, TimeUnit arg1) throws InterruptedException, ExecutionException, TimeoutException { |
247 | try { |
248 | return delegate.get(arg0, arg1); |
249 | } catch (ExecutionException e) { |
250 | throw ensureCauseHasSubmissionTrace(e); |
251 | } catch (InterruptedException e) { |
252 | throw ensureCauseHasSubmissionTrace(e); |
253 | } catch (TimeoutException e) { |
254 | throw ensureCauseHasSubmissionTrace(e); |
255 | } |
256 | } |
257 | |
258 | /** This method does the work to ensure _if_ a submission stack trace was provided, |
259 | * it is included in the exception. most errors are thrown from the frame of the |
260 | * Future.get call, with a cause that took place in the executor's thread. |
261 | * We extend the stack trace of that cause with the submission stack trace. |
262 | * (An alternative would be to put the stack trace as a root cause, |
263 | * at the bottom of the stack, or appended to all traces, or inserted |
264 | * after the second cause, etc ... but since we can't change the "Caused by:" |
265 | * method in Throwable the compromise made here seems best.) |
266 | */ |
267 | private <ET extends Exception> ET ensureCauseHasSubmissionTrace(ET e) { |
268 | if (submissionTrace==null) return e; |
269 | if (e.getCause()==null) { |
270 | ExecutionException ee = new ExecutionException("task submitted from the following trace", null); |
271 | e.initCause(ee); |
272 | return e; |
273 | } |
274 | Throwable cause = e.getCause(); |
275 | StackTraceElement[] causeTrace = cause.getStackTrace(); |
276 | boolean causeIncludesSubmissionTrace = submissionTrace.length >= causeTrace.length; |
277 | for (int i=0; causeIncludesSubmissionTrace && i<submissionTrace.length; i++) { |
278 | if (!causeTrace[causeTrace.length-1-i].equals(submissionTrace[submissionTrace.length-1-i])) { |
279 | causeIncludesSubmissionTrace = false; |
280 | } |
281 | } |
282 | |
283 | if (!causeIncludesSubmissionTrace) { |
284 | cause.setStackTrace(merge(causeTrace, submissionTrace)); |
285 | } |
286 | |
287 | return e; |
288 | } |
289 | |
290 | private StackTraceElement[] merge(StackTraceElement[] t1, StackTraceElement[] t2) { |
291 | StackTraceElement[] t12 = new StackTraceElement[t1.length + t2.length]; |
292 | System.arraycopy(t1, 0, t12, 0, t1.length); |
293 | System.arraycopy(t2, 0, t12, t1.length, t2.length); |
294 | return t12; |
295 | } |
296 | |
297 | @Override |
298 | public boolean isCancelled() { |
299 | return delegate.isCancelled(); |
300 | } |
301 | |
302 | @Override |
303 | public boolean isDone() { |
304 | return delegate.isDone(); |
305 | } |
306 | |
307 | @Override |
308 | public boolean equals(Object obj) { |
309 | return delegate.equals(obj); |
310 | } |
311 | |
312 | @Override |
313 | public int hashCode() { |
314 | return delegate.hashCode(); |
315 | } |
316 | |
317 | @Override |
318 | public String toString() { |
319 | return description; |
320 | } |
321 | |
322 | } |
323 | |
324 | @Provides |
325 | @Singleton |
326 | @Named(Constants.PROPERTY_USER_THREADS) |
327 | ExecutorService provideExecutorService(@Named(Constants.PROPERTY_USER_THREADS) int count, Closer closer) { |
328 | if (userExecutorFromConstructor != null) |
329 | return userExecutorFromConstructor; |
330 | return shutdownOnClose(addToStringOnSubmit(newThreadPoolNamed("user thread %d", count)), closer); |
331 | } |
332 | |
333 | @Provides |
334 | @Singleton |
335 | @Named(Constants.PROPERTY_IO_WORKER_THREADS) |
336 | ExecutorService provideIOExecutor(@Named(Constants.PROPERTY_IO_WORKER_THREADS) int count, Closer closer) { |
337 | if (ioExecutorFromConstructor != null) |
338 | return ioExecutorFromConstructor; |
339 | return shutdownOnClose(addToStringOnSubmit(newThreadPoolNamed("i/o thread %d", count)), closer); |
340 | } |
341 | |
342 | @VisibleForTesting |
343 | static ExecutorService shutdownOnClose(final ExecutorService service, Closer closer) { |
344 | closer.addToClose(new ShutdownExecutorOnClose(service)); |
345 | return service; |
346 | } |
347 | |
348 | @VisibleForTesting |
349 | static ExecutorService newCachedThreadPoolNamed(String name) { |
350 | return Executors.newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat(name).setThreadFactory( |
351 | Executors.defaultThreadFactory()).build()); |
352 | } |
353 | |
354 | @VisibleForTesting |
355 | static ExecutorService newThreadPoolNamed(String name, int maxCount) { |
356 | return maxCount == 0 ? newCachedThreadPoolNamed(name) : newScalingThreadPoolNamed(name, maxCount); |
357 | } |
358 | |
359 | @VisibleForTesting |
360 | static ExecutorService newScalingThreadPoolNamed(String name, int maxCount) { |
361 | return newScalingThreadPool(1, maxCount, 60L * 1000, new ThreadFactoryBuilder().setNameFormat(name) |
362 | .setThreadFactory(Executors.defaultThreadFactory()).build()); |
363 | } |
364 | |
365 | /** returns the stack trace at the caller */ |
366 | static StackTraceElement[] getStackTraceHere() { |
367 | // remove the first two items in the stack trace (because the first one refers to the call to |
368 | // Thread.getStackTrace, and the second one is us) |
369 | StackTraceElement[] fullSubmissionTrace = Thread.currentThread().getStackTrace(); |
370 | StackTraceElement[] cleanedSubmissionTrace = new StackTraceElement[fullSubmissionTrace.length-2]; |
371 | System.arraycopy(fullSubmissionTrace, 2, cleanedSubmissionTrace, 0, cleanedSubmissionTrace.length); |
372 | return cleanedSubmissionTrace; |
373 | } |
374 | |
375 | } |