1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.jclouds.concurrent;
20
21 import static com.google.common.base.Preconditions.checkNotNull;
22
23 import java.util.concurrent.ExecutionException;
24 import java.util.concurrent.Executor;
25 import java.util.concurrent.ExecutorService;
26 import java.util.concurrent.Future;
27 import java.util.concurrent.TimeUnit;
28 import java.util.concurrent.TimeoutException;
29 import java.util.concurrent.atomic.AtomicBoolean;
30
31 import com.google.common.annotations.Beta;
32 import com.google.common.annotations.VisibleForTesting;
33 import com.google.common.base.Function;
34 import com.google.common.collect.ForwardingObject;
35 import com.google.common.util.concurrent.ExecutionList;
36 import com.google.common.util.concurrent.ForwardingFuture;
37 import com.google.common.util.concurrent.ListenableFuture;
38
39
40
41
42
43
44 @Beta
45 public class Futures {
46 @VisibleForTesting
47 static class CallGetAndRunExecutionList<T> implements Runnable {
48 private final Future<T> delegate;
49 private final ExecutionList executionList;
50
51 public CallGetAndRunExecutionList(Future<T> delegate, ExecutionList executionList) {
52 this.delegate = checkNotNull(delegate, "delegate");
53 this.executionList = checkNotNull(executionList, "executionList");
54 }
55
56 @Override
57 public void run() {
58 try {
59 delegate.get();
60 } catch (InterruptedException e) {
61
62
63 Thread.currentThread().interrupt();
64
65
66 throw new IllegalStateException(String.format(
67 "interrupted calling get() on [%s], so could not run listeners", delegate), e);
68 } catch (Throwable e) {
69
70
71 }
72 executionList.execute();
73 }
74
75 @Override
76 public String toString() {
77 return "[delegate=" + delegate + ", executionList=" + executionList + "]";
78 }
79 }
80
81
82
83
84
85
86
87 public static class FutureListener<T> {
88
89 final ExecutorService adapterExecutor;
90
91
92 private final ExecutionList executionList = new ExecutionList();
93
94
95
96 private final AtomicBoolean hasListeners = new AtomicBoolean(false);
97
98
99 private final Future<T> delegate;
100
101 static <T> FutureListener<T> create(Future<T> delegate, ExecutorService adapterExecutor) {
102 return new FutureListener<T>(delegate, adapterExecutor);
103 }
104
105 private FutureListener(Future<T> delegate, ExecutorService adapterExecutor) {
106 this.delegate = checkNotNull(delegate, "delegate");
107 this.adapterExecutor = checkNotNull(adapterExecutor, "adapterExecutor");
108 }
109
110 public void addListener(Runnable listener, Executor exec) {
111 executionList.add(listener, exec);
112
113
114
115 if (hasListeners.compareAndSet(false, true)) {
116 if (delegate.isDone()) {
117
118
119 executionList.execute();
120 return;
121 }
122 adapterExecutor.execute(new CallGetAndRunExecutionList<T>(delegate, executionList));
123 }
124 }
125
126 Future<T> getFuture() {
127 return delegate;
128 }
129
130 ExecutorService getExecutor() {
131 return adapterExecutor;
132 }
133 }
134
135 public static class ListenableFutureAdapter<T> extends ForwardingFuture<T> implements ListenableFuture<T> {
136 final FutureListener<T> futureListener;
137
138 static <T> ListenableFutureAdapter<T> create(Future<T> future, ExecutorService executor) {
139 return new ListenableFutureAdapter<T>(future, executor);
140 }
141
142 private ListenableFutureAdapter(Future<T> future, ExecutorService executor) {
143 this.futureListener = FutureListener.create(future, executor);
144 }
145
146 @Override
147 protected Future<T> delegate() {
148 return futureListener.getFuture();
149 }
150
151 @Override
152 public void addListener(Runnable listener, Executor exec) {
153 futureListener.addListener(listener, exec);
154 }
155
156 }
157
158 public static class LazyListenableFutureFunctionAdapter<I, O> extends ForwardingObject implements
159 ListenableFuture<O> {
160 private final FutureListener<I> futureListener;
161 private final Function<? super I, ? extends O> function;
162
163 static <I, O> LazyListenableFutureFunctionAdapter<I, O> create(Future<I> future,
164 Function<? super I, ? extends O> function, ExecutorService executor) {
165 return new LazyListenableFutureFunctionAdapter<I, O>(future, function, executor);
166 }
167
168 static <I, O> LazyListenableFutureFunctionAdapter<I, O> create(FutureListener<I> futureListener,
169 Function<? super I, ? extends O> function) {
170 return new LazyListenableFutureFunctionAdapter<I, O>(futureListener, function);
171 }
172
173 private LazyListenableFutureFunctionAdapter(Future<I> future, Function<? super I, ? extends O> function,
174 ExecutorService executor) {
175 this(FutureListener.create(future, executor), function);
176 }
177
178 private LazyListenableFutureFunctionAdapter(FutureListener<I> futureListener,
179 Function<? super I, ? extends O> function) {
180 this.futureListener = checkNotNull(futureListener, "futureListener");
181 this.function = checkNotNull(function, "function");
182 }
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202 private final Object lock = new Object();
203 private boolean set = false;
204 private O value = null;
205
206 @Override
207 public O get() throws InterruptedException, ExecutionException {
208 return apply(futureListener.getFuture().get());
209 }
210
211 @Override
212 public O get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
213 return apply(futureListener.getFuture().get(timeout, unit));
214 }
215
216 private O apply(I raw) {
217 synchronized (lock) {
218 if (!set) {
219 value = function.apply(raw);
220 set = true;
221 }
222 return value;
223 }
224 }
225
226 @Override
227 public boolean cancel(boolean mayInterruptIfRunning) {
228 return futureListener.getFuture().cancel(mayInterruptIfRunning);
229 }
230
231 @Override
232 public boolean isCancelled() {
233 return futureListener.getFuture().isCancelled();
234 }
235
236 @Override
237 public boolean isDone() {
238 return futureListener.getFuture().isDone();
239 }
240
241 @Override
242 public void addListener(Runnable listener, Executor exec) {
243 futureListener.addListener(listener, exec);
244 }
245
246 @Override
247 protected Object delegate() {
248 return futureListener.getFuture();
249 }
250
251 }
252
253
254
255
256
257
258 public static <I, O> ListenableFuture<O> compose(Future<I> future, final Function<? super I, ? extends O> function,
259 ExecutorService executorService) {
260 if (future instanceof Futures.ListenableFutureAdapter<?>) {
261 Futures.ListenableFutureAdapter<I> lf = (ListenableFutureAdapter<I>) future;
262 if (lf.futureListener.adapterExecutor.getClass().isAnnotationPresent(SingleThreaded.class))
263 return Futures.LazyListenableFutureFunctionAdapter.create(
264 ((org.jclouds.concurrent.Futures.ListenableFutureAdapter<I>) future).futureListener, function);
265 else
266 return com.google.common.util.concurrent.Futures.transform(lf, function, executorService);
267 } else if (executorService.getClass().isAnnotationPresent(SingleThreaded.class)) {
268 return Futures.LazyListenableFutureFunctionAdapter.create(future, function, executorService);
269 } else {
270 return com.google.common.util.concurrent.Futures.transform(Futures.makeListenable(future, executorService),
271 function, executorService);
272 }
273 }
274
275
276
277
278
279
280 public static <T> ListenableFuture<T> makeListenable(Future<T> future, ExecutorService executorService) {
281 if (future instanceof ListenableFuture<?>) {
282 return (ListenableFuture<T>) future;
283 }
284 return ListenableFutureAdapter.create(future, executorService);
285 }
286
287 }