EMMA Coverage Report (generated Wed Oct 26 13:47:17 EDT 2011)
[all classes][org.jclouds.concurrent.config]

COVERAGE SUMMARY FOR SOURCE FILE [ExecutorServiceModule.java]

nameclass, %method, %block, %line, %
ExecutorServiceModule.java80%  (4/5)65%  (28/43)65%  (308/471)69%  (68.7/100)

COVERAGE BREAKDOWN BY CLASS AND METHOD

nameclass, %method, %block, %line, %
     
class ExecutorServiceModule$DescribingExecutorService100% (1/1)35%  (6/17)42%  (45/107)43%  (9/21)
awaitTermination (long, TimeUnit): boolean 0%   (0/1)0%   (0/6)0%   (0/1)
equals (Object): boolean 0%   (0/1)0%   (0/5)0%   (0/1)
hashCode (): int 0%   (0/1)0%   (0/4)0%   (0/1)
invokeAll (Collection): List 0%   (0/1)0%   (0/5)0%   (0/1)
invokeAll (Collection, long, TimeUnit): List 0%   (0/1)0%   (0/7)0%   (0/1)
invokeAny (Collection): Object 0%   (0/1)0%   (0/5)0%   (0/1)
invokeAny (Collection, long, TimeUnit): Object 0%   (0/1)0%   (0/7)0%   (0/1)
isTerminated (): boolean 0%   (0/1)0%   (0/4)0%   (0/1)
shutdown (): void 0%   (0/1)0%   (0/4)0%   (0/2)
submit (Runnable): Future 0%   (0/1)0%   (0/11)0%   (0/1)
toString (): String 0%   (0/1)0%   (0/4)0%   (0/1)
ExecutorServiceModule$DescribingExecutorService (ExecutorService): void 100% (1/1)100% (9/9)100% (3/3)
execute (Runnable): void 100% (1/1)100% (5/5)100% (2/2)
isShutdown (): boolean 100% (1/1)100% (4/4)100% (1/1)
shutdownNow (): List 100% (1/1)100% (4/4)100% (1/1)
submit (Callable): Future 100% (1/1)100% (11/11)100% (1/1)
submit (Runnable, Object): Future 100% (1/1)100% (12/12)100% (1/1)
     
class ExecutorServiceModule$ShutdownExecutorOnClose100% (1/1)100% (3/3)58%  (21/36)89%  (8/9)
close (): void 100% (1/1)35%  (8/23)75%  (3/4)
ExecutorServiceModule$ShutdownExecutorOnClose (ExecutorService): void 100% (1/1)100% (9/9)100% (4/4)
ExecutorServiceModule$ShutdownExecutorOnClose (ExecutorService, ExecutorServi... 100% (1/1)100% (4/4)100% (1/1)
     
class ExecutorServiceModule$DescribedFuture100% (1/1)64%  (7/11)72%  (131/181)67%  (27.6/41)
cancel (boolean): boolean 0%   (0/1)0%   (0/5)0%   (0/1)
equals (Object): boolean 0%   (0/1)0%   (0/5)0%   (0/1)
hashCode (): int 0%   (0/1)0%   (0/4)0%   (0/1)
isCancelled (): boolean 0%   (0/1)0%   (0/4)0%   (0/1)
get (long, TimeUnit): Object 100% (1/1)50%  (12/24)43%  (3/7)
get (): Object 100% (1/1)62%  (10/16)60%  (3/5)
ensureCauseHasSubmissionTrace (Exception): Exception 100% (1/1)82%  (66/80)76%  (10.6/14)
ExecutorServiceModule$DescribedFuture (Future, String, StackTraceElement []):... 100% (1/1)100% (12/12)100% (5/5)
isDone (): boolean 100% (1/1)100% (4/4)100% (1/1)
merge (StackTraceElement [], StackTraceElement []): StackTraceElement [] 100% (1/1)100% (24/24)100% (4/4)
toString (): String 100% (1/1)100% (3/3)100% (1/1)
     
class ExecutorServiceModule100% (1/1)100% (12/12)76%  (111/147)83%  (24.1/29)
checkNotGuavaSameThreadExecutor (ExecutorService): ExecutorService 100% (1/1)12%  (4/34)28%  (1.1/4)
provideExecutorService (int, Closer): ExecutorService 100% (1/1)77%  (10/13)67%  (2/3)
provideIOExecutor (int, Closer): ExecutorService 100% (1/1)77%  (10/13)67%  (2/3)
ExecutorServiceModule (): void 100% (1/1)100% (5/5)100% (2/2)
ExecutorServiceModule (ExecutorService, ExecutorService): void 100% (1/1)100% (13/13)100% (4/4)
addToStringOnSubmit (ExecutorService): ExecutorService 100% (1/1)100% (9/9)100% (3/3)
configure (): void 100% (1/1)100% (1/1)100% (1/1)
getStackTraceHere (): StackTraceElement [] 100% (1/1)100% (18/18)100% (4/4)
newCachedThreadPoolNamed (String): ExecutorService 100% (1/1)100% (10/10)100% (1/1)
newScalingThreadPoolNamed (String, int): ExecutorService 100% (1/1)100% (13/13)100% (1/1)
newThreadPoolNamed (String, int): ExecutorService 100% (1/1)100% (9/9)100% (1/1)
shutdownOnClose (ExecutorService, Closer): ExecutorService 100% (1/1)100% (9/9)100% (2/2)
     
class ExecutorServiceModule$10%   (0/1)100% (0/0)100% (0/0)100% (0/0)

1/**
2 * Licensed to jclouds, Inc. (jclouds) under one or more
3 * contributor license agreements.  See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership.  jclouds licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License.  You may obtain a copy of the License at
9 *
10 *   http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied.  See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19package org.jclouds.concurrent.config;
20 
21import static com.google.common.base.Preconditions.checkNotNull;
22import static org.jclouds.concurrent.DynamicExecutors.newScalingThreadPool;
23 
24import java.io.Closeable;
25import java.io.IOException;
26import java.util.Collection;
27import java.util.List;
28import java.util.concurrent.Callable;
29import java.util.concurrent.ExecutionException;
30import java.util.concurrent.ExecutorService;
31import java.util.concurrent.Executors;
32import java.util.concurrent.Future;
33import java.util.concurrent.TimeUnit;
34import java.util.concurrent.TimeoutException;
35 
36import javax.annotation.Resource;
37import javax.inject.Inject;
38import javax.inject.Named;
39import javax.inject.Singleton;
40 
41import org.jclouds.Constants;
42import org.jclouds.concurrent.MoreExecutors;
43import org.jclouds.concurrent.SingleThreaded;
44import org.jclouds.lifecycle.Closer;
45import org.jclouds.logging.Logger;
46 
47import com.google.common.annotations.VisibleForTesting;
48import com.google.common.util.concurrent.ThreadFactoryBuilder;
49import com.google.inject.AbstractModule;
50import com.google.inject.Provides;
51 
52/**
53 * Configures {@link ExecutorService}.
54 * 
55 * Note that this uses threads.
56 *  
57 * <p>
58 * This extends the underlying Future to expose a description (the task's toString) and the submission context (stack trace).
59 * The submission stack trace is appended to relevant stack traces on exceptions that are returned,
60 * so the user can see the logical chain of execution (in the executor, and where it was passed to the executor).
61 * 
62 * @author Adrian Cole
63 */
64@ConfiguresExecutorService
65public class ExecutorServiceModule extends AbstractModule {
66 
67   @VisibleForTesting
68   static final class ShutdownExecutorOnClose implements Closeable {
69      @Resource
70      protected Logger logger = Logger.NULL;
71 
72      private final ExecutorService service;
73 
74      private ShutdownExecutorOnClose(ExecutorService service) {
75         this.service = service;
76      }
77 
78      @Override
79      public void close() throws IOException {
80         List<Runnable> runnables = service.shutdownNow();
81         if (runnables.size() > 0)
82            logger.warn("when shutting down executor %s, runnables outstanding: %s", service, runnables);
83      }
84   }
85 
86   @VisibleForTesting
87   final ExecutorService userExecutorFromConstructor;
88   @VisibleForTesting
89   final ExecutorService ioExecutorFromConstructor;
90 
91   @Inject
92   public ExecutorServiceModule(@Named(Constants.PROPERTY_USER_THREADS) ExecutorService userThreads,
93            @Named(Constants.PROPERTY_IO_WORKER_THREADS) ExecutorService ioThreads) {
94      this.userExecutorFromConstructor = addToStringOnSubmit(checkNotGuavaSameThreadExecutor(userThreads));
95      this.ioExecutorFromConstructor = addToStringOnSubmit(checkNotGuavaSameThreadExecutor(ioThreads));
96   }
97 
98   static ExecutorService addToStringOnSubmit(ExecutorService executor) {
99      if (executor != null) {
100         return new DescribingExecutorService(executor);
101      }
102      return executor;
103   }
104 
105   static ExecutorService checkNotGuavaSameThreadExecutor(ExecutorService executor) {
106      // we detect behavior based on the class
107      if (executor != null && !(executor.getClass().isAnnotationPresent(SingleThreaded.class))
108               && executor.getClass().getSimpleName().indexOf("SameThread") != -1) {
109         Logger.CONSOLE.warn(
110                  "please switch from %s to %s or annotate your same threaded executor with @SingleThreaded", executor
111                           .getClass().getName(), MoreExecutors.SameThreadExecutorService.class.getName());
112         return MoreExecutors.sameThreadExecutor();
113      }
114      return executor;
115   }
116 
117   public ExecutorServiceModule() {
118      this(null, null);
119   }
120 
121   @Override
122   protected void configure() {
123   }
124 
125   static class DescribingExecutorService implements ExecutorService {
126 
127      private final ExecutorService delegate;
128 
129      public DescribingExecutorService(ExecutorService delegate) {
130         this.delegate = checkNotNull(delegate, "delegate");
131      }
132 
133      @Override
134      public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
135         return delegate.awaitTermination(timeout, unit);
136      }
137 
138      @Override
139      public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException {
140         return delegate.invokeAll(tasks);
141      }
142 
143      @Override
144      public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
145               throws InterruptedException {
146         return delegate.invokeAll(tasks, timeout, unit);
147      }
148 
149      @Override
150      public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
151         return delegate.invokeAny(tasks);
152      }
153 
154      @Override
155      public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
156               throws InterruptedException, ExecutionException, TimeoutException {
157         return delegate.invokeAny(tasks, timeout, unit);
158      }
159 
160      @Override
161      public boolean isShutdown() {
162         return delegate.isShutdown();
163      }
164 
165      @Override
166      public boolean isTerminated() {
167         return delegate.isTerminated();
168      }
169 
170      @Override
171      public void shutdown() {
172         delegate.shutdown();
173      }
174 
175      @Override
176      public List<Runnable> shutdownNow() {
177         return delegate.shutdownNow();
178      }
179 
180      @Override
181      public <T> Future<T> submit(Callable<T> task) {
182         return new DescribedFuture<T>(delegate.submit(task), task.toString(), getStackTraceHere());
183      }
184 
185      @SuppressWarnings({ "unchecked", "rawtypes" })
186      @Override
187      public Future<?> submit(Runnable task) {
188         return new DescribedFuture(delegate.submit(task), task.toString(), getStackTraceHere());
189      }
190 
191      @Override
192      public <T> Future<T> submit(Runnable task, T result) {
193         return new DescribedFuture<T>(delegate.submit(task, result), task.toString(), getStackTraceHere());
194      }
195 
196      @Override
197      public void execute(Runnable arg0) {
198         delegate.execute(arg0);
199      }
200 
201      @Override
202      public boolean equals(Object obj) {
203         return delegate.equals(obj);
204      }
205 
206      @Override
207      public int hashCode() {
208         return delegate.hashCode();
209      }
210 
211      @Override
212      public String toString() {
213         return delegate.toString();
214      }
215 
216   }
217 
218   static class DescribedFuture<T> implements Future<T> {
219      private final Future<T> delegate;
220      private final String description;
221      private StackTraceElement[] submissionTrace;
222 
223      public DescribedFuture(Future<T> delegate, String description, StackTraceElement[] submissionTrace) {
224         this.delegate = delegate;
225         this.description = description;
226         this.submissionTrace = submissionTrace;
227      }
228 
229      @Override
230      public boolean cancel(boolean arg0) {
231         return delegate.cancel(arg0);
232      }
233 
234      @Override
235      public T get() throws InterruptedException, ExecutionException {
236         try {
237            return delegate.get();
238         } catch (ExecutionException e) {
239            throw ensureCauseHasSubmissionTrace(e);
240         } catch (InterruptedException e) {
241            throw ensureCauseHasSubmissionTrace(e);
242         }
243      }
244 
245      @Override
246      public T get(long arg0, TimeUnit arg1) throws InterruptedException, ExecutionException, TimeoutException {
247         try {
248            return delegate.get(arg0, arg1);
249         } catch (ExecutionException e) {
250            throw ensureCauseHasSubmissionTrace(e);
251         } catch (InterruptedException e) {
252            throw ensureCauseHasSubmissionTrace(e);
253         } catch (TimeoutException e) {
254            throw ensureCauseHasSubmissionTrace(e);
255         }
256      }
257 
258      /** This method does the work to ensure _if_ a submission stack trace was provided,
259       * it is included in the exception.  most errors are thrown from the frame of the
260       * Future.get call, with a cause that took place in the executor's thread.
261       * We extend the stack trace of that cause with the submission stack trace.
262       * (An alternative would be to put the stack trace as a root cause,
263       * at the bottom of the stack, or appended to all traces, or inserted
264       * after the second cause, etc ... but since we can't change the "Caused by:"
265       * method in Throwable the compromise made here seems best.)
266       */
267      private <ET extends Exception> ET ensureCauseHasSubmissionTrace(ET e) {
268         if (submissionTrace==null) return e;
269         if (e.getCause()==null) {
270            ExecutionException ee = new ExecutionException("task submitted from the following trace", null);
271            e.initCause(ee);
272            return e;
273         }
274         Throwable cause = e.getCause();
275         StackTraceElement[] causeTrace = cause.getStackTrace();
276         boolean causeIncludesSubmissionTrace = submissionTrace.length >= causeTrace.length;
277         for (int i=0; causeIncludesSubmissionTrace && i<submissionTrace.length; i++) {
278            if (!causeTrace[causeTrace.length-1-i].equals(submissionTrace[submissionTrace.length-1-i])) {
279               causeIncludesSubmissionTrace = false;
280            }
281         }
282         
283         if (!causeIncludesSubmissionTrace) {
284            cause.setStackTrace(merge(causeTrace, submissionTrace));
285         }
286         
287         return e;
288      }
289 
290      private StackTraceElement[] merge(StackTraceElement[] t1, StackTraceElement[] t2) {
291         StackTraceElement[] t12 = new StackTraceElement[t1.length + t2.length];
292         System.arraycopy(t1, 0, t12, 0, t1.length);
293         System.arraycopy(t2, 0, t12, t1.length, t2.length);
294         return t12;
295      }
296 
297      @Override
298      public boolean isCancelled() {
299         return delegate.isCancelled();
300      }
301 
302      @Override
303      public boolean isDone() {
304         return delegate.isDone();
305      }
306 
307      @Override
308      public boolean equals(Object obj) {
309         return delegate.equals(obj);
310      }
311 
312      @Override
313      public int hashCode() {
314         return delegate.hashCode();
315      }
316 
317      @Override
318      public String toString() {
319         return description;
320      }
321 
322   }
323 
324   @Provides
325   @Singleton
326   @Named(Constants.PROPERTY_USER_THREADS)
327   ExecutorService provideExecutorService(@Named(Constants.PROPERTY_USER_THREADS) int count, Closer closer) {
328      if (userExecutorFromConstructor != null)
329         return userExecutorFromConstructor;
330      return shutdownOnClose(addToStringOnSubmit(newThreadPoolNamed("user thread %d", count)), closer);
331   }
332 
333   @Provides
334   @Singleton
335   @Named(Constants.PROPERTY_IO_WORKER_THREADS)
336   ExecutorService provideIOExecutor(@Named(Constants.PROPERTY_IO_WORKER_THREADS) int count, Closer closer) {
337      if (ioExecutorFromConstructor != null)
338         return ioExecutorFromConstructor;
339      return shutdownOnClose(addToStringOnSubmit(newThreadPoolNamed("i/o thread %d", count)), closer);
340   }
341 
342   @VisibleForTesting
343   static ExecutorService shutdownOnClose(final ExecutorService service, Closer closer) {
344      closer.addToClose(new ShutdownExecutorOnClose(service));
345      return service;
346   }
347 
348   @VisibleForTesting
349   static ExecutorService newCachedThreadPoolNamed(String name) {
350      return Executors.newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat(name).setThreadFactory(
351               Executors.defaultThreadFactory()).build());
352   }
353 
354   @VisibleForTesting
355   static ExecutorService newThreadPoolNamed(String name, int maxCount) {
356      return maxCount == 0 ? newCachedThreadPoolNamed(name) : newScalingThreadPoolNamed(name, maxCount);
357   }
358 
359   @VisibleForTesting
360   static ExecutorService newScalingThreadPoolNamed(String name, int maxCount) {
361      return newScalingThreadPool(1, maxCount, 60L * 1000, new ThreadFactoryBuilder().setNameFormat(name)
362               .setThreadFactory(Executors.defaultThreadFactory()).build());
363   }
364 
365   /** returns the stack trace at the caller */
366   static StackTraceElement[] getStackTraceHere() {
367      // remove the first two items in the stack trace (because the first one refers to the call to 
368      // Thread.getStackTrace, and the second one is us)
369      StackTraceElement[] fullSubmissionTrace = Thread.currentThread().getStackTrace();
370      StackTraceElement[] cleanedSubmissionTrace = new StackTraceElement[fullSubmissionTrace.length-2];
371      System.arraycopy(fullSubmissionTrace, 2, cleanedSubmissionTrace, 0, cleanedSubmissionTrace.length);
372      return cleanedSubmissionTrace;
373   }
374   
375}

[all classes][org.jclouds.concurrent.config]
EMMA 2.0.5312 (C) Vladimir Roubtsov