1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.jclouds.concurrent.config;
20
21 import static com.google.common.base.Preconditions.checkNotNull;
22 import static org.jclouds.concurrent.DynamicExecutors.newScalingThreadPool;
23
24 import java.io.Closeable;
25 import java.io.IOException;
26 import java.util.Collection;
27 import java.util.List;
28 import java.util.concurrent.Callable;
29 import java.util.concurrent.ExecutionException;
30 import java.util.concurrent.ExecutorService;
31 import java.util.concurrent.Executors;
32 import java.util.concurrent.Future;
33 import java.util.concurrent.TimeUnit;
34 import java.util.concurrent.TimeoutException;
35
36 import javax.annotation.Resource;
37 import javax.inject.Inject;
38 import javax.inject.Named;
39 import javax.inject.Singleton;
40
41 import org.jclouds.Constants;
42 import org.jclouds.concurrent.MoreExecutors;
43 import org.jclouds.concurrent.SingleThreaded;
44 import org.jclouds.lifecycle.Closer;
45 import org.jclouds.logging.Logger;
46
47 import com.google.common.annotations.VisibleForTesting;
48 import com.google.common.util.concurrent.ThreadFactoryBuilder;
49 import com.google.inject.AbstractModule;
50 import com.google.inject.Provides;
51
52
53
54
55
56
57
58
59
60
61
62
63
64 @ConfiguresExecutorService
65 public class ExecutorServiceModule extends AbstractModule {
66
67 @VisibleForTesting
68 static final class ShutdownExecutorOnClose implements Closeable {
69 @Resource
70 protected Logger logger = Logger.NULL;
71
72 private final ExecutorService service;
73
74 private ShutdownExecutorOnClose(ExecutorService service) {
75 this.service = service;
76 }
77
78 @Override
79 public void close() throws IOException {
80 List<Runnable> runnables = service.shutdownNow();
81 if (runnables.size() > 0)
82 logger.warn("when shutting down executor %s, runnables outstanding: %s", service, runnables);
83 }
84 }
85
86 @VisibleForTesting
87 final ExecutorService userExecutorFromConstructor;
88 @VisibleForTesting
89 final ExecutorService ioExecutorFromConstructor;
90
91 @Inject
92 public ExecutorServiceModule(@Named(Constants.PROPERTY_USER_THREADS) ExecutorService userThreads,
93 @Named(Constants.PROPERTY_IO_WORKER_THREADS) ExecutorService ioThreads) {
94 this.userExecutorFromConstructor = addToStringOnSubmit(checkNotGuavaSameThreadExecutor(userThreads));
95 this.ioExecutorFromConstructor = addToStringOnSubmit(checkNotGuavaSameThreadExecutor(ioThreads));
96 }
97
98 static ExecutorService addToStringOnSubmit(ExecutorService executor) {
99 if (executor != null) {
100 return new DescribingExecutorService(executor);
101 }
102 return executor;
103 }
104
105 static ExecutorService checkNotGuavaSameThreadExecutor(ExecutorService executor) {
106
107 if (executor != null && !(executor.getClass().isAnnotationPresent(SingleThreaded.class))
108 && executor.getClass().getSimpleName().indexOf("SameThread") != -1) {
109 Logger.CONSOLE.warn(
110 "please switch from %s to %s or annotate your same threaded executor with @SingleThreaded", executor
111 .getClass().getName(), MoreExecutors.SameThreadExecutorService.class.getName());
112 return MoreExecutors.sameThreadExecutor();
113 }
114 return executor;
115 }
116
117 public ExecutorServiceModule() {
118 this(null, null);
119 }
120
121 @Override
122 protected void configure() {
123 }
124
125 static class DescribingExecutorService implements ExecutorService {
126
127 private final ExecutorService delegate;
128
129 public DescribingExecutorService(ExecutorService delegate) {
130 this.delegate = checkNotNull(delegate, "delegate");
131 }
132
133 @Override
134 public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
135 return delegate.awaitTermination(timeout, unit);
136 }
137
138 @Override
139 public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException {
140 return delegate.invokeAll(tasks);
141 }
142
143 @Override
144 public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
145 throws InterruptedException {
146 return delegate.invokeAll(tasks, timeout, unit);
147 }
148
149 @Override
150 public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
151 return delegate.invokeAny(tasks);
152 }
153
154 @Override
155 public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
156 throws InterruptedException, ExecutionException, TimeoutException {
157 return delegate.invokeAny(tasks, timeout, unit);
158 }
159
160 @Override
161 public boolean isShutdown() {
162 return delegate.isShutdown();
163 }
164
165 @Override
166 public boolean isTerminated() {
167 return delegate.isTerminated();
168 }
169
170 @Override
171 public void shutdown() {
172 delegate.shutdown();
173 }
174
175 @Override
176 public List<Runnable> shutdownNow() {
177 return delegate.shutdownNow();
178 }
179
180 @Override
181 public <T> Future<T> submit(Callable<T> task) {
182 return new DescribedFuture<T>(delegate.submit(task), task.toString(), getStackTraceHere());
183 }
184
185 @SuppressWarnings({ "unchecked", "rawtypes" })
186 @Override
187 public Future<?> submit(Runnable task) {
188 return new DescribedFuture(delegate.submit(task), task.toString(), getStackTraceHere());
189 }
190
191 @Override
192 public <T> Future<T> submit(Runnable task, T result) {
193 return new DescribedFuture<T>(delegate.submit(task, result), task.toString(), getStackTraceHere());
194 }
195
196 @Override
197 public void execute(Runnable arg0) {
198 delegate.execute(arg0);
199 }
200
201 @Override
202 public boolean equals(Object obj) {
203 return delegate.equals(obj);
204 }
205
206 @Override
207 public int hashCode() {
208 return delegate.hashCode();
209 }
210
211 @Override
212 public String toString() {
213 return delegate.toString();
214 }
215
216 }
217
218 static class DescribedFuture<T> implements Future<T> {
219 private final Future<T> delegate;
220 private final String description;
221 private StackTraceElement[] submissionTrace;
222
223 public DescribedFuture(Future<T> delegate, String description, StackTraceElement[] submissionTrace) {
224 this.delegate = delegate;
225 this.description = description;
226 this.submissionTrace = submissionTrace;
227 }
228
229 @Override
230 public boolean cancel(boolean arg0) {
231 return delegate.cancel(arg0);
232 }
233
234 @Override
235 public T get() throws InterruptedException, ExecutionException {
236 try {
237 return delegate.get();
238 } catch (ExecutionException e) {
239 throw ensureCauseHasSubmissionTrace(e);
240 } catch (InterruptedException e) {
241 throw ensureCauseHasSubmissionTrace(e);
242 }
243 }
244
245 @Override
246 public T get(long arg0, TimeUnit arg1) throws InterruptedException, ExecutionException, TimeoutException {
247 try {
248 return delegate.get(arg0, arg1);
249 } catch (ExecutionException e) {
250 throw ensureCauseHasSubmissionTrace(e);
251 } catch (InterruptedException e) {
252 throw ensureCauseHasSubmissionTrace(e);
253 } catch (TimeoutException e) {
254 throw ensureCauseHasSubmissionTrace(e);
255 }
256 }
257
258
259
260
261
262
263
264
265
266
267 private <ET extends Exception> ET ensureCauseHasSubmissionTrace(ET e) {
268 if (submissionTrace==null) return e;
269 if (e.getCause()==null) {
270 ExecutionException ee = new ExecutionException("task submitted from the following trace", null);
271 e.initCause(ee);
272 return e;
273 }
274 Throwable cause = e.getCause();
275 StackTraceElement[] causeTrace = cause.getStackTrace();
276 boolean causeIncludesSubmissionTrace = submissionTrace.length >= causeTrace.length;
277 for (int i=0; causeIncludesSubmissionTrace && i<submissionTrace.length; i++) {
278 if (!causeTrace[causeTrace.length-1-i].equals(submissionTrace[submissionTrace.length-1-i])) {
279 causeIncludesSubmissionTrace = false;
280 }
281 }
282
283 if (!causeIncludesSubmissionTrace) {
284 cause.setStackTrace(merge(causeTrace, submissionTrace));
285 }
286
287 return e;
288 }
289
290 private StackTraceElement[] merge(StackTraceElement[] t1, StackTraceElement[] t2) {
291 StackTraceElement[] t12 = new StackTraceElement[t1.length + t2.length];
292 System.arraycopy(t1, 0, t12, 0, t1.length);
293 System.arraycopy(t2, 0, t12, t1.length, t2.length);
294 return t12;
295 }
296
297 @Override
298 public boolean isCancelled() {
299 return delegate.isCancelled();
300 }
301
302 @Override
303 public boolean isDone() {
304 return delegate.isDone();
305 }
306
307 @Override
308 public boolean equals(Object obj) {
309 return delegate.equals(obj);
310 }
311
312 @Override
313 public int hashCode() {
314 return delegate.hashCode();
315 }
316
317 @Override
318 public String toString() {
319 return description;
320 }
321
322 }
323
324 @Provides
325 @Singleton
326 @Named(Constants.PROPERTY_USER_THREADS)
327 ExecutorService provideExecutorService(@Named(Constants.PROPERTY_USER_THREADS) int count, Closer closer) {
328 if (userExecutorFromConstructor != null)
329 return userExecutorFromConstructor;
330 return shutdownOnClose(addToStringOnSubmit(newThreadPoolNamed("user thread %d", count)), closer);
331 }
332
333 @Provides
334 @Singleton
335 @Named(Constants.PROPERTY_IO_WORKER_THREADS)
336 ExecutorService provideIOExecutor(@Named(Constants.PROPERTY_IO_WORKER_THREADS) int count, Closer closer) {
337 if (ioExecutorFromConstructor != null)
338 return ioExecutorFromConstructor;
339 return shutdownOnClose(addToStringOnSubmit(newThreadPoolNamed("i/o thread %d", count)), closer);
340 }
341
342 @VisibleForTesting
343 static ExecutorService shutdownOnClose(final ExecutorService service, Closer closer) {
344 closer.addToClose(new ShutdownExecutorOnClose(service));
345 return service;
346 }
347
348 @VisibleForTesting
349 static ExecutorService newCachedThreadPoolNamed(String name) {
350 return Executors.newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat(name).setThreadFactory(
351 Executors.defaultThreadFactory()).build());
352 }
353
354 @VisibleForTesting
355 static ExecutorService newThreadPoolNamed(String name, int maxCount) {
356 return maxCount == 0 ? newCachedThreadPoolNamed(name) : newScalingThreadPoolNamed(name, maxCount);
357 }
358
359 @VisibleForTesting
360 static ExecutorService newScalingThreadPoolNamed(String name, int maxCount) {
361 return newScalingThreadPool(1, maxCount, 60L * 1000, new ThreadFactoryBuilder().setNameFormat(name)
362 .setThreadFactory(Executors.defaultThreadFactory()).build());
363 }
364
365
366 static StackTraceElement[] getStackTraceHere() {
367
368
369 StackTraceElement[] fullSubmissionTrace = Thread.currentThread().getStackTrace();
370 StackTraceElement[] cleanedSubmissionTrace = new StackTraceElement[fullSubmissionTrace.length-2];
371 System.arraycopy(fullSubmissionTrace, 2, cleanedSubmissionTrace, 0, cleanedSubmissionTrace.length);
372 return cleanedSubmissionTrace;
373 }
374
375 }