View Javadoc

1   /**
2    *
3    * Copyright (C) 2011 Cloud Conscious, LLC. <info@cloudconscious.com>
4    *
5    * ====================================================================
6    * Licensed under the Apache License, Version 2.0 (the "License");
7    * you may not use this file except in compliance with the License.
8    * You may obtain a copy of the License at
9    *
10   * http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   * ====================================================================
18   */
19  package org.jclouds.concurrent.config;
20  
21  import static com.google.common.base.Preconditions.checkNotNull;
22  import static org.jclouds.concurrent.DynamicExecutors.newScalingThreadPool;
23  
24  import java.io.Closeable;
25  import java.io.IOException;
26  import java.util.Collection;
27  import java.util.List;
28  import java.util.concurrent.Callable;
29  import java.util.concurrent.ExecutionException;
30  import java.util.concurrent.ExecutorService;
31  import java.util.concurrent.Executors;
32  import java.util.concurrent.Future;
33  import java.util.concurrent.TimeUnit;
34  import java.util.concurrent.TimeoutException;
35  
36  import javax.annotation.Resource;
37  import javax.inject.Inject;
38  import javax.inject.Named;
39  import javax.inject.Singleton;
40  
41  import org.jclouds.Constants;
42  import org.jclouds.concurrent.MoreExecutors;
43  import org.jclouds.concurrent.SingleThreaded;
44  import org.jclouds.lifecycle.Closer;
45  import org.jclouds.logging.Logger;
46  
47  import com.google.common.annotations.VisibleForTesting;
48  import com.google.common.util.concurrent.ThreadFactoryBuilder;
49  import com.google.inject.AbstractModule;
50  import com.google.inject.Provides;
51  
52  /**
53   * Configures {@link ExecutorService}.
54   * 
55   * Note that this uses threads
56   * 
57   * @author Adrian Cole
58   */
59  @ConfiguresExecutorService
60  public class ExecutorServiceModule extends AbstractModule {
61  
62     @VisibleForTesting
63     static final class ShutdownExecutorOnClose implements Closeable {
64        @Resource
65        protected Logger logger = Logger.NULL;
66  
67        private final ExecutorService service;
68  
69        private ShutdownExecutorOnClose(ExecutorService service) {
70           this.service = service;
71        }
72  
73        @Override
74        public void close() throws IOException {
75           List<Runnable> runnables = service.shutdownNow();
76           if (runnables.size() > 0)
77              logger.warn("when shutting down executor %s, runnables outstanding: %s", service, runnables);
78        }
79     }
80  
81     @VisibleForTesting
82     final ExecutorService userExecutorFromConstructor;
83     @VisibleForTesting
84     final ExecutorService ioExecutorFromConstructor;
85  
86     @Inject
87     public ExecutorServiceModule(@Named(Constants.PROPERTY_USER_THREADS) ExecutorService userThreads,
88              @Named(Constants.PROPERTY_IO_WORKER_THREADS) ExecutorService ioThreads) {
89        this.userExecutorFromConstructor = addToStringOnSubmit(checkNotGuavaSameThreadExecutor(userThreads));
90        this.ioExecutorFromConstructor = addToStringOnSubmit(checkNotGuavaSameThreadExecutor(ioThreads));
91     }
92  
93     static ExecutorService addToStringOnSubmit(ExecutorService executor) {
94        if (executor != null) {
95           return new AddToStringOnSubmitExecutorService(executor);
96        }
97        return executor;
98     }
99  
100    static ExecutorService checkNotGuavaSameThreadExecutor(ExecutorService executor) {
101       // we detect behavior based on the class
102       if (executor != null && !(executor.getClass().isAnnotationPresent(SingleThreaded.class))
103                && executor.getClass().getSimpleName().indexOf("SameThread") != -1) {
104          Logger.CONSOLE.warn(
105                   "please switch from %s to %s or annotate your same threaded executor with @SingleThreaded", executor
106                            .getClass().getName(), MoreExecutors.SameThreadExecutorService.class.getName());
107          return MoreExecutors.sameThreadExecutor();
108       }
109       return executor;
110    }
111 
112    public ExecutorServiceModule() {
113       this(null, null);
114    }
115 
116    @Override
117    protected void configure() {
118    }
119 
120    static class AddToStringOnSubmitExecutorService implements ExecutorService {
121 
122       private final ExecutorService delegate;
123 
124       public AddToStringOnSubmitExecutorService(ExecutorService delegate) {
125          this.delegate = checkNotNull(delegate, "delegate");
126       }
127 
128       @Override
129       public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
130          return delegate.awaitTermination(timeout, unit);
131       }
132 
133       @Override
134       public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException {
135          return delegate.invokeAll(tasks);
136       }
137 
138       @Override
139       public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
140                throws InterruptedException {
141          return delegate.invokeAll(tasks, timeout, unit);
142       }
143 
144       @Override
145       public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
146          return delegate.invokeAny(tasks);
147       }
148 
149       @Override
150       public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
151                throws InterruptedException, ExecutionException, TimeoutException {
152          return delegate.invokeAny(tasks, timeout, unit);
153       }
154 
155       @Override
156       public boolean isShutdown() {
157          return delegate.isShutdown();
158       }
159 
160       @Override
161       public boolean isTerminated() {
162          return delegate.isTerminated();
163       }
164 
165       @Override
166       public void shutdown() {
167          delegate.shutdown();
168       }
169 
170       @Override
171       public List<Runnable> shutdownNow() {
172          return delegate.shutdownNow();
173       }
174 
175       @Override
176       public <T> Future<T> submit(Callable<T> task) {
177          return new AddToStringFuture<T>(delegate.submit(task), task.toString());
178       }
179 
180       @SuppressWarnings({ "unchecked", "rawtypes" })
181       @Override
182       public Future<?> submit(Runnable task) {
183          return new AddToStringFuture(delegate.submit(task), task.toString());
184       }
185 
186       @Override
187       public <T> Future<T> submit(Runnable task, T result) {
188          return new AddToStringFuture<T>(delegate.submit(task, result), task.toString());
189       }
190 
191       @Override
192       public void execute(Runnable arg0) {
193          delegate.execute(arg0);
194       }
195 
196       @Override
197       public boolean equals(Object obj) {
198          return delegate.equals(obj);
199       }
200 
201       @Override
202       public int hashCode() {
203          return delegate.hashCode();
204       }
205 
206       @Override
207       public String toString() {
208          return delegate.toString();
209       }
210 
211    }
212 
213    static class AddToStringFuture<T> implements Future<T> {
214       private final Future<T> delegate;
215       private final String toString;
216 
217       public AddToStringFuture(Future<T> delegate, String toString) {
218          this.delegate = delegate;
219          this.toString = toString;
220       }
221 
222       @Override
223       public boolean cancel(boolean arg0) {
224          return delegate.cancel(arg0);
225       }
226 
227       @Override
228       public T get() throws InterruptedException, ExecutionException {
229          return delegate.get();
230       }
231 
232       @Override
233       public T get(long arg0, TimeUnit arg1) throws InterruptedException, ExecutionException, TimeoutException {
234          return delegate.get(arg0, arg1);
235       }
236 
237       @Override
238       public boolean isCancelled() {
239          return delegate.isCancelled();
240       }
241 
242       @Override
243       public boolean isDone() {
244          return delegate.isDone();
245       }
246 
247       @Override
248       public boolean equals(Object obj) {
249          return delegate.equals(obj);
250       }
251 
252       @Override
253       public int hashCode() {
254          return delegate.hashCode();
255       }
256 
257       @Override
258       public String toString() {
259          return toString;
260       }
261 
262    }
263 
264    @Provides
265    @Singleton
266    @Named(Constants.PROPERTY_USER_THREADS)
267    ExecutorService provideExecutorService(@Named(Constants.PROPERTY_USER_THREADS) int count, Closer closer) {
268       if (userExecutorFromConstructor != null)
269          return userExecutorFromConstructor;
270       return shutdownOnClose(addToStringOnSubmit(newThreadPoolNamed("user thread %d", count)), closer);
271    }
272 
273    @Provides
274    @Singleton
275    @Named(Constants.PROPERTY_IO_WORKER_THREADS)
276    ExecutorService provideIOExecutor(@Named(Constants.PROPERTY_IO_WORKER_THREADS) int count, Closer closer) {
277       if (ioExecutorFromConstructor != null)
278          return ioExecutorFromConstructor;
279       return shutdownOnClose(addToStringOnSubmit(newThreadPoolNamed("i/o thread %d", count)), closer);
280    }
281 
282    @VisibleForTesting
283    static ExecutorService shutdownOnClose(final ExecutorService service, Closer closer) {
284       closer.addToClose(new ShutdownExecutorOnClose(service));
285       return service;
286    }
287 
288    @VisibleForTesting
289    static ExecutorService newCachedThreadPoolNamed(String name) {
290       return Executors.newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat(name).setThreadFactory(
291                Executors.defaultThreadFactory()).build());
292    }
293 
294    @VisibleForTesting
295    static ExecutorService newThreadPoolNamed(String name, int maxCount) {
296       return maxCount == 0 ? newCachedThreadPoolNamed(name) : newScalingThreadPoolNamed(name, maxCount);
297    }
298 
299    @VisibleForTesting
300    static ExecutorService newScalingThreadPoolNamed(String name, int maxCount) {
301       return newScalingThreadPool(1, maxCount, 60L * 1000, new ThreadFactoryBuilder().setNameFormat(name)
302                .setThreadFactory(Executors.defaultThreadFactory()).build());
303    }
304 
305 }