1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.jclouds.concurrent.config;
20
21 import static com.google.common.base.Preconditions.checkNotNull;
22 import static org.jclouds.concurrent.DynamicExecutors.newScalingThreadPool;
23
24 import java.io.Closeable;
25 import java.io.IOException;
26 import java.util.Collection;
27 import java.util.List;
28 import java.util.concurrent.Callable;
29 import java.util.concurrent.ExecutionException;
30 import java.util.concurrent.ExecutorService;
31 import java.util.concurrent.Executors;
32 import java.util.concurrent.Future;
33 import java.util.concurrent.TimeUnit;
34 import java.util.concurrent.TimeoutException;
35
36 import javax.annotation.Resource;
37 import javax.inject.Inject;
38 import javax.inject.Named;
39 import javax.inject.Singleton;
40
41 import org.jclouds.Constants;
42 import org.jclouds.concurrent.MoreExecutors;
43 import org.jclouds.concurrent.SingleThreaded;
44 import org.jclouds.lifecycle.Closer;
45 import org.jclouds.logging.Logger;
46
47 import com.google.common.annotations.VisibleForTesting;
48 import com.google.common.util.concurrent.ThreadFactoryBuilder;
49 import com.google.inject.AbstractModule;
50 import com.google.inject.Provides;
51
52
53
54
55
56
57
58
59 @ConfiguresExecutorService
60 public class ExecutorServiceModule extends AbstractModule {
61
62 @VisibleForTesting
63 static final class ShutdownExecutorOnClose implements Closeable {
64 @Resource
65 protected Logger logger = Logger.NULL;
66
67 private final ExecutorService service;
68
69 private ShutdownExecutorOnClose(ExecutorService service) {
70 this.service = service;
71 }
72
73 @Override
74 public void close() throws IOException {
75 List<Runnable> runnables = service.shutdownNow();
76 if (runnables.size() > 0)
77 logger.warn("when shutting down executor %s, runnables outstanding: %s", service, runnables);
78 }
79 }
80
81 @VisibleForTesting
82 final ExecutorService userExecutorFromConstructor;
83 @VisibleForTesting
84 final ExecutorService ioExecutorFromConstructor;
85
86 @Inject
87 public ExecutorServiceModule(@Named(Constants.PROPERTY_USER_THREADS) ExecutorService userThreads,
88 @Named(Constants.PROPERTY_IO_WORKER_THREADS) ExecutorService ioThreads) {
89 this.userExecutorFromConstructor = addToStringOnSubmit(checkNotGuavaSameThreadExecutor(userThreads));
90 this.ioExecutorFromConstructor = addToStringOnSubmit(checkNotGuavaSameThreadExecutor(ioThreads));
91 }
92
93 static ExecutorService addToStringOnSubmit(ExecutorService executor) {
94 if (executor != null) {
95 return new AddToStringOnSubmitExecutorService(executor);
96 }
97 return executor;
98 }
99
100 static ExecutorService checkNotGuavaSameThreadExecutor(ExecutorService executor) {
101
102 if (executor != null && !(executor.getClass().isAnnotationPresent(SingleThreaded.class))
103 && executor.getClass().getSimpleName().indexOf("SameThread") != -1) {
104 Logger.CONSOLE.warn(
105 "please switch from %s to %s or annotate your same threaded executor with @SingleThreaded", executor
106 .getClass().getName(), MoreExecutors.SameThreadExecutorService.class.getName());
107 return MoreExecutors.sameThreadExecutor();
108 }
109 return executor;
110 }
111
112 public ExecutorServiceModule() {
113 this(null, null);
114 }
115
116 @Override
117 protected void configure() {
118 }
119
120 static class AddToStringOnSubmitExecutorService implements ExecutorService {
121
122 private final ExecutorService delegate;
123
124 public AddToStringOnSubmitExecutorService(ExecutorService delegate) {
125 this.delegate = checkNotNull(delegate, "delegate");
126 }
127
128 @Override
129 public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
130 return delegate.awaitTermination(timeout, unit);
131 }
132
133 @Override
134 public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException {
135 return delegate.invokeAll(tasks);
136 }
137
138 @Override
139 public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
140 throws InterruptedException {
141 return delegate.invokeAll(tasks, timeout, unit);
142 }
143
144 @Override
145 public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
146 return delegate.invokeAny(tasks);
147 }
148
149 @Override
150 public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
151 throws InterruptedException, ExecutionException, TimeoutException {
152 return delegate.invokeAny(tasks, timeout, unit);
153 }
154
155 @Override
156 public boolean isShutdown() {
157 return delegate.isShutdown();
158 }
159
160 @Override
161 public boolean isTerminated() {
162 return delegate.isTerminated();
163 }
164
165 @Override
166 public void shutdown() {
167 delegate.shutdown();
168 }
169
170 @Override
171 public List<Runnable> shutdownNow() {
172 return delegate.shutdownNow();
173 }
174
175 @Override
176 public <T> Future<T> submit(Callable<T> task) {
177 return new AddToStringFuture<T>(delegate.submit(task), task.toString());
178 }
179
180 @SuppressWarnings({ "unchecked", "rawtypes" })
181 @Override
182 public Future<?> submit(Runnable task) {
183 return new AddToStringFuture(delegate.submit(task), task.toString());
184 }
185
186 @Override
187 public <T> Future<T> submit(Runnable task, T result) {
188 return new AddToStringFuture<T>(delegate.submit(task, result), task.toString());
189 }
190
191 @Override
192 public void execute(Runnable arg0) {
193 delegate.execute(arg0);
194 }
195
196 @Override
197 public boolean equals(Object obj) {
198 return delegate.equals(obj);
199 }
200
201 @Override
202 public int hashCode() {
203 return delegate.hashCode();
204 }
205
206 @Override
207 public String toString() {
208 return delegate.toString();
209 }
210
211 }
212
213 static class AddToStringFuture<T> implements Future<T> {
214 private final Future<T> delegate;
215 private final String toString;
216
217 public AddToStringFuture(Future<T> delegate, String toString) {
218 this.delegate = delegate;
219 this.toString = toString;
220 }
221
222 @Override
223 public boolean cancel(boolean arg0) {
224 return delegate.cancel(arg0);
225 }
226
227 @Override
228 public T get() throws InterruptedException, ExecutionException {
229 return delegate.get();
230 }
231
232 @Override
233 public T get(long arg0, TimeUnit arg1) throws InterruptedException, ExecutionException, TimeoutException {
234 return delegate.get(arg0, arg1);
235 }
236
237 @Override
238 public boolean isCancelled() {
239 return delegate.isCancelled();
240 }
241
242 @Override
243 public boolean isDone() {
244 return delegate.isDone();
245 }
246
247 @Override
248 public boolean equals(Object obj) {
249 return delegate.equals(obj);
250 }
251
252 @Override
253 public int hashCode() {
254 return delegate.hashCode();
255 }
256
257 @Override
258 public String toString() {
259 return toString;
260 }
261
262 }
263
264 @Provides
265 @Singleton
266 @Named(Constants.PROPERTY_USER_THREADS)
267 ExecutorService provideExecutorService(@Named(Constants.PROPERTY_USER_THREADS) int count, Closer closer) {
268 if (userExecutorFromConstructor != null)
269 return userExecutorFromConstructor;
270 return shutdownOnClose(addToStringOnSubmit(newThreadPoolNamed("user thread %d", count)), closer);
271 }
272
273 @Provides
274 @Singleton
275 @Named(Constants.PROPERTY_IO_WORKER_THREADS)
276 ExecutorService provideIOExecutor(@Named(Constants.PROPERTY_IO_WORKER_THREADS) int count, Closer closer) {
277 if (ioExecutorFromConstructor != null)
278 return ioExecutorFromConstructor;
279 return shutdownOnClose(addToStringOnSubmit(newThreadPoolNamed("i/o thread %d", count)), closer);
280 }
281
282 @VisibleForTesting
283 static ExecutorService shutdownOnClose(final ExecutorService service, Closer closer) {
284 closer.addToClose(new ShutdownExecutorOnClose(service));
285 return service;
286 }
287
288 @VisibleForTesting
289 static ExecutorService newCachedThreadPoolNamed(String name) {
290 return Executors.newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat(name).setThreadFactory(
291 Executors.defaultThreadFactory()).build());
292 }
293
294 @VisibleForTesting
295 static ExecutorService newThreadPoolNamed(String name, int maxCount) {
296 return maxCount == 0 ? newCachedThreadPoolNamed(name) : newScalingThreadPoolNamed(name, maxCount);
297 }
298
299 @VisibleForTesting
300 static ExecutorService newScalingThreadPoolNamed(String name, int maxCount) {
301 return newScalingThreadPool(1, maxCount, 60L * 1000, new ThreadFactoryBuilder().setNameFormat(name)
302 .setThreadFactory(Executors.defaultThreadFactory()).build());
303 }
304
305 }