View Javadoc

1   /**
2    * Licensed to jclouds, Inc. (jclouds) under one or more
3    * contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  jclouds licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing,
13   * software distributed under the License is distributed on an
14   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   * KIND, either express or implied.  See the License for the
16   * specific language governing permissions and limitations
17   * under the License.
18   */
19  package org.jclouds.concurrent.config;
20  
21  import static com.google.common.base.Preconditions.checkNotNull;
22  import static org.jclouds.concurrent.DynamicExecutors.newScalingThreadPool;
23  
24  import java.io.Closeable;
25  import java.io.IOException;
26  import java.util.Collection;
27  import java.util.List;
28  import java.util.concurrent.Callable;
29  import java.util.concurrent.ExecutionException;
30  import java.util.concurrent.ExecutorService;
31  import java.util.concurrent.Executors;
32  import java.util.concurrent.Future;
33  import java.util.concurrent.TimeUnit;
34  import java.util.concurrent.TimeoutException;
35  
36  import javax.annotation.Resource;
37  import javax.inject.Inject;
38  import javax.inject.Named;
39  import javax.inject.Singleton;
40  
41  import org.jclouds.Constants;
42  import org.jclouds.concurrent.MoreExecutors;
43  import org.jclouds.concurrent.SingleThreaded;
44  import org.jclouds.lifecycle.Closer;
45  import org.jclouds.logging.Logger;
46  
47  import com.google.common.annotations.VisibleForTesting;
48  import com.google.common.util.concurrent.ThreadFactoryBuilder;
49  import com.google.inject.AbstractModule;
50  import com.google.inject.Provides;
51  
52  /**
53   * Configures {@link ExecutorService}.
54   * 
55   * Note that this uses threads.
56   *  
57   * <p>
58   * This extends the underlying Future to expose a description (the task's toString) and the submission context (stack trace).
59   * The submission stack trace is appended to relevant stack traces on exceptions that are returned,
60   * so the user can see the logical chain of execution (in the executor, and where it was passed to the executor).
61   * 
62   * @author Adrian Cole
63   */
64  @ConfiguresExecutorService
65  public class ExecutorServiceModule extends AbstractModule {
66  
67     @VisibleForTesting
68     static final class ShutdownExecutorOnClose implements Closeable {
69        @Resource
70        protected Logger logger = Logger.NULL;
71  
72        private final ExecutorService service;
73  
74        private ShutdownExecutorOnClose(ExecutorService service) {
75           this.service = service;
76        }
77  
78        @Override
79        public void close() throws IOException {
80           List<Runnable> runnables = service.shutdownNow();
81           if (runnables.size() > 0)
82              logger.warn("when shutting down executor %s, runnables outstanding: %s", service, runnables);
83        }
84     }
85  
86     @VisibleForTesting
87     final ExecutorService userExecutorFromConstructor;
88     @VisibleForTesting
89     final ExecutorService ioExecutorFromConstructor;
90  
91     @Inject
92     public ExecutorServiceModule(@Named(Constants.PROPERTY_USER_THREADS) ExecutorService userThreads,
93              @Named(Constants.PROPERTY_IO_WORKER_THREADS) ExecutorService ioThreads) {
94        this.userExecutorFromConstructor = addToStringOnSubmit(checkNotGuavaSameThreadExecutor(userThreads));
95        this.ioExecutorFromConstructor = addToStringOnSubmit(checkNotGuavaSameThreadExecutor(ioThreads));
96     }
97  
98     static ExecutorService addToStringOnSubmit(ExecutorService executor) {
99        if (executor != null) {
100          return new DescribingExecutorService(executor);
101       }
102       return executor;
103    }
104 
105    static ExecutorService checkNotGuavaSameThreadExecutor(ExecutorService executor) {
106       // we detect behavior based on the class
107       if (executor != null && !(executor.getClass().isAnnotationPresent(SingleThreaded.class))
108                && executor.getClass().getSimpleName().indexOf("SameThread") != -1) {
109          Logger.CONSOLE.warn(
110                   "please switch from %s to %s or annotate your same threaded executor with @SingleThreaded", executor
111                            .getClass().getName(), MoreExecutors.SameThreadExecutorService.class.getName());
112          return MoreExecutors.sameThreadExecutor();
113       }
114       return executor;
115    }
116 
117    public ExecutorServiceModule() {
118       this(null, null);
119    }
120 
121    @Override
122    protected void configure() {
123    }
124 
125    static class DescribingExecutorService implements ExecutorService {
126 
127       private final ExecutorService delegate;
128 
129       public DescribingExecutorService(ExecutorService delegate) {
130          this.delegate = checkNotNull(delegate, "delegate");
131       }
132 
133       @Override
134       public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
135          return delegate.awaitTermination(timeout, unit);
136       }
137 
138       @Override
139       public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException {
140          return delegate.invokeAll(tasks);
141       }
142 
143       @Override
144       public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
145                throws InterruptedException {
146          return delegate.invokeAll(tasks, timeout, unit);
147       }
148 
149       @Override
150       public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
151          return delegate.invokeAny(tasks);
152       }
153 
154       @Override
155       public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
156                throws InterruptedException, ExecutionException, TimeoutException {
157          return delegate.invokeAny(tasks, timeout, unit);
158       }
159 
160       @Override
161       public boolean isShutdown() {
162          return delegate.isShutdown();
163       }
164 
165       @Override
166       public boolean isTerminated() {
167          return delegate.isTerminated();
168       }
169 
170       @Override
171       public void shutdown() {
172          delegate.shutdown();
173       }
174 
175       @Override
176       public List<Runnable> shutdownNow() {
177          return delegate.shutdownNow();
178       }
179 
180       @Override
181       public <T> Future<T> submit(Callable<T> task) {
182          return new DescribedFuture<T>(delegate.submit(task), task.toString(), getStackTraceHere());
183       }
184 
185       @SuppressWarnings({ "unchecked", "rawtypes" })
186       @Override
187       public Future<?> submit(Runnable task) {
188          return new DescribedFuture(delegate.submit(task), task.toString(), getStackTraceHere());
189       }
190 
191       @Override
192       public <T> Future<T> submit(Runnable task, T result) {
193          return new DescribedFuture<T>(delegate.submit(task, result), task.toString(), getStackTraceHere());
194       }
195 
196       @Override
197       public void execute(Runnable arg0) {
198          delegate.execute(arg0);
199       }
200 
201       @Override
202       public boolean equals(Object obj) {
203          return delegate.equals(obj);
204       }
205 
206       @Override
207       public int hashCode() {
208          return delegate.hashCode();
209       }
210 
211       @Override
212       public String toString() {
213          return delegate.toString();
214       }
215 
216    }
217 
218    static class DescribedFuture<T> implements Future<T> {
219       private final Future<T> delegate;
220       private final String description;
221       private StackTraceElement[] submissionTrace;
222 
223       public DescribedFuture(Future<T> delegate, String description, StackTraceElement[] submissionTrace) {
224          this.delegate = delegate;
225          this.description = description;
226          this.submissionTrace = submissionTrace;
227       }
228 
229       @Override
230       public boolean cancel(boolean arg0) {
231          return delegate.cancel(arg0);
232       }
233 
234       @Override
235       public T get() throws InterruptedException, ExecutionException {
236          try {
237             return delegate.get();
238          } catch (ExecutionException e) {
239             throw ensureCauseHasSubmissionTrace(e);
240          } catch (InterruptedException e) {
241             throw ensureCauseHasSubmissionTrace(e);
242          }
243       }
244 
245       @Override
246       public T get(long arg0, TimeUnit arg1) throws InterruptedException, ExecutionException, TimeoutException {
247          try {
248             return delegate.get(arg0, arg1);
249          } catch (ExecutionException e) {
250             throw ensureCauseHasSubmissionTrace(e);
251          } catch (InterruptedException e) {
252             throw ensureCauseHasSubmissionTrace(e);
253          } catch (TimeoutException e) {
254             throw ensureCauseHasSubmissionTrace(e);
255          }
256       }
257 
258       /** This method does the work to ensure _if_ a submission stack trace was provided,
259        * it is included in the exception.  most errors are thrown from the frame of the
260        * Future.get call, with a cause that took place in the executor's thread.
261        * We extend the stack trace of that cause with the submission stack trace.
262        * (An alternative would be to put the stack trace as a root cause,
263        * at the bottom of the stack, or appended to all traces, or inserted
264        * after the second cause, etc ... but since we can't change the "Caused by:"
265        * method in Throwable the compromise made here seems best.)
266        */
267       private <ET extends Exception> ET ensureCauseHasSubmissionTrace(ET e) {
268          if (submissionTrace==null) return e;
269          if (e.getCause()==null) {
270             ExecutionException ee = new ExecutionException("task submitted from the following trace", null);
271             e.initCause(ee);
272             return e;
273          }
274          Throwable cause = e.getCause();
275          StackTraceElement[] causeTrace = cause.getStackTrace();
276          boolean causeIncludesSubmissionTrace = submissionTrace.length >= causeTrace.length;
277          for (int i=0; causeIncludesSubmissionTrace && i<submissionTrace.length; i++) {
278             if (!causeTrace[causeTrace.length-1-i].equals(submissionTrace[submissionTrace.length-1-i])) {
279                causeIncludesSubmissionTrace = false;
280             }
281          }
282          
283          if (!causeIncludesSubmissionTrace) {
284             cause.setStackTrace(merge(causeTrace, submissionTrace));
285          }
286          
287          return e;
288       }
289 
290       private StackTraceElement[] merge(StackTraceElement[] t1, StackTraceElement[] t2) {
291          StackTraceElement[] t12 = new StackTraceElement[t1.length + t2.length];
292          System.arraycopy(t1, 0, t12, 0, t1.length);
293          System.arraycopy(t2, 0, t12, t1.length, t2.length);
294          return t12;
295       }
296 
297       @Override
298       public boolean isCancelled() {
299          return delegate.isCancelled();
300       }
301 
302       @Override
303       public boolean isDone() {
304          return delegate.isDone();
305       }
306 
307       @Override
308       public boolean equals(Object obj) {
309          return delegate.equals(obj);
310       }
311 
312       @Override
313       public int hashCode() {
314          return delegate.hashCode();
315       }
316 
317       @Override
318       public String toString() {
319          return description;
320       }
321 
322    }
323 
324    @Provides
325    @Singleton
326    @Named(Constants.PROPERTY_USER_THREADS)
327    ExecutorService provideExecutorService(@Named(Constants.PROPERTY_USER_THREADS) int count, Closer closer) {
328       if (userExecutorFromConstructor != null)
329          return userExecutorFromConstructor;
330       return shutdownOnClose(addToStringOnSubmit(newThreadPoolNamed("user thread %d", count)), closer);
331    }
332 
333    @Provides
334    @Singleton
335    @Named(Constants.PROPERTY_IO_WORKER_THREADS)
336    ExecutorService provideIOExecutor(@Named(Constants.PROPERTY_IO_WORKER_THREADS) int count, Closer closer) {
337       if (ioExecutorFromConstructor != null)
338          return ioExecutorFromConstructor;
339       return shutdownOnClose(addToStringOnSubmit(newThreadPoolNamed("i/o thread %d", count)), closer);
340    }
341 
342    @VisibleForTesting
343    static ExecutorService shutdownOnClose(final ExecutorService service, Closer closer) {
344       closer.addToClose(new ShutdownExecutorOnClose(service));
345       return service;
346    }
347 
348    @VisibleForTesting
349    static ExecutorService newCachedThreadPoolNamed(String name) {
350       return Executors.newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat(name).setThreadFactory(
351                Executors.defaultThreadFactory()).build());
352    }
353 
354    @VisibleForTesting
355    static ExecutorService newThreadPoolNamed(String name, int maxCount) {
356       return maxCount == 0 ? newCachedThreadPoolNamed(name) : newScalingThreadPoolNamed(name, maxCount);
357    }
358 
359    @VisibleForTesting
360    static ExecutorService newScalingThreadPoolNamed(String name, int maxCount) {
361       return newScalingThreadPool(1, maxCount, 60L * 1000, new ThreadFactoryBuilder().setNameFormat(name)
362                .setThreadFactory(Executors.defaultThreadFactory()).build());
363    }
364 
365    /** returns the stack trace at the caller */
366    static StackTraceElement[] getStackTraceHere() {
367       // remove the first two items in the stack trace (because the first one refers to the call to 
368       // Thread.getStackTrace, and the second one is us)
369       StackTraceElement[] fullSubmissionTrace = Thread.currentThread().getStackTrace();
370       StackTraceElement[] cleanedSubmissionTrace = new StackTraceElement[fullSubmissionTrace.length-2];
371       System.arraycopy(fullSubmissionTrace, 2, cleanedSubmissionTrace, 0, cleanedSubmissionTrace.length);
372       return cleanedSubmissionTrace;
373    }
374    
375 }