View Javadoc

1   /**
2    * Licensed to jclouds, Inc. (jclouds) under one or more
3    * contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  jclouds licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing,
13   * software distributed under the License is distributed on an
14   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   * KIND, either express or implied.  See the License for the
16   * specific language governing permissions and limitations
17   * under the License.
18   */
19  package org.jclouds.concurrent;
20  
21  import static com.google.common.base.Preconditions.checkNotNull;
22  
23  import java.util.concurrent.ExecutionException;
24  import java.util.concurrent.Executor;
25  import java.util.concurrent.ExecutorService;
26  import java.util.concurrent.Future;
27  import java.util.concurrent.TimeUnit;
28  import java.util.concurrent.TimeoutException;
29  import java.util.concurrent.atomic.AtomicBoolean;
30  
31  import com.google.common.annotations.Beta;
32  import com.google.common.annotations.VisibleForTesting;
33  import com.google.common.base.Function;
34  import com.google.common.collect.ForwardingObject;
35  import com.google.common.util.concurrent.ExecutionList;
36  import com.google.common.util.concurrent.ForwardingFuture;
37  import com.google.common.util.concurrent.ListenableFuture;
38  
39  /**
40   * functions related to or replacing those in {@link com.google.common.util.concurrent.Futures}
41   * 
42   * @author Adrian Cole
43   */
44  @Beta
45  public class Futures {
46     @VisibleForTesting
47     static class CallGetAndRunExecutionList<T> implements Runnable {
48        private final Future<T> delegate;
49        private final ExecutionList executionList;
50  
51        public CallGetAndRunExecutionList(Future<T> delegate, ExecutionList executionList) {
52           this.delegate = checkNotNull(delegate, "delegate");
53           this.executionList = checkNotNull(executionList, "executionList");
54        }
55  
56        @Override
57        public void run() {
58           try {
59              delegate.get();
60           } catch (InterruptedException e) {
61              // This thread was interrupted. This should never happen, so we
62              // throw an IllegalStateException.
63              Thread.currentThread().interrupt();
64              // TODO we cannot inspect the executionList at the moment to make a reasonable
65              // toString()
66              throw new IllegalStateException(String.format(
67                       "interrupted calling get() on [%s], so could not run listeners", delegate), e);
68           } catch (Throwable e) {
69              // ExecutionException / CancellationException / RuntimeException
70              // The task is done, run the listeners.
71           }
72           executionList.execute();
73        }
74  
75        @Override
76        public String toString() {
77           return "[delegate=" + delegate + ", executionList=" + executionList + "]";
78        }
79     }
80  
81     // Adapted from Guava
82     //
83     // * to allow us to enforce supply of an adapterExecutor
84     // note that this is done so that we can operate in Google AppEngine which
85     // restricts thread creation
86     // * to allow us to print debug info about what the delegate was doing
87     public static class FutureListener<T> {
88  
89        final ExecutorService adapterExecutor;
90  
91        // The execution list to hold our listeners.
92        private final ExecutionList executionList = new ExecutionList();
93  
94        // This allows us to only start up a thread waiting on the delegate future
95        // when the first listener is added.
96        private final AtomicBoolean hasListeners = new AtomicBoolean(false);
97  
98        // The delegate future.
99        private final Future<T> delegate;
100 
101       static <T> FutureListener<T> create(Future<T> delegate, ExecutorService adapterExecutor) {
102          return new FutureListener<T>(delegate, adapterExecutor);
103       }
104 
105       private FutureListener(Future<T> delegate, ExecutorService adapterExecutor) {
106          this.delegate = checkNotNull(delegate, "delegate");
107          this.adapterExecutor = checkNotNull(adapterExecutor, "adapterExecutor");
108       }
109 
110       public void addListener(Runnable listener, Executor exec) {
111          executionList.add(listener, exec);
112 
113          // When a listener is first added, we run a task that will wait for
114          // the delegate to finish, and when it is done will run the listeners.
115          if (hasListeners.compareAndSet(false, true)) {
116             if (delegate.isDone()) {
117                // If the delegate is already done, run the execution list
118                // immediately on the current thread.
119                executionList.execute();
120                return;
121             }
122             adapterExecutor.execute(new CallGetAndRunExecutionList<T>(delegate, executionList));
123          }
124       }
125 
126       Future<T> getFuture() {
127          return delegate;
128       }
129 
130       ExecutorService getExecutor() {
131          return adapterExecutor;
132       }
133    }
134 
135    public static class ListenableFutureAdapter<T> extends ForwardingFuture<T> implements ListenableFuture<T> {
136       final FutureListener<T> futureListener;
137 
138       static <T> ListenableFutureAdapter<T> create(Future<T> future, ExecutorService executor) {
139          return new ListenableFutureAdapter<T>(future, executor);
140       }
141 
142       private ListenableFutureAdapter(Future<T> future, ExecutorService executor) {
143          this.futureListener = FutureListener.create(future, executor);
144       }
145 
146       @Override
147       protected Future<T> delegate() {
148          return futureListener.getFuture();
149       }
150 
151       @Override
152       public void addListener(Runnable listener, Executor exec) {
153          futureListener.addListener(listener, exec);
154       }
155 
156    }
157 
158    public static class LazyListenableFutureFunctionAdapter<I, O> extends ForwardingObject implements
159             ListenableFuture<O> {
160       private final FutureListener<I> futureListener;
161       private final Function<? super I, ? extends O> function;
162 
163       static <I, O> LazyListenableFutureFunctionAdapter<I, O> create(Future<I> future,
164                Function<? super I, ? extends O> function, ExecutorService executor) {
165          return new LazyListenableFutureFunctionAdapter<I, O>(future, function, executor);
166       }
167 
168       static <I, O> LazyListenableFutureFunctionAdapter<I, O> create(FutureListener<I> futureListener,
169                Function<? super I, ? extends O> function) {
170          return new LazyListenableFutureFunctionAdapter<I, O>(futureListener, function);
171       }
172 
173       private LazyListenableFutureFunctionAdapter(Future<I> future, Function<? super I, ? extends O> function,
174                ExecutorService executor) {
175          this(FutureListener.create(future, executor), function);
176       }
177 
178       private LazyListenableFutureFunctionAdapter(FutureListener<I> futureListener,
179                Function<? super I, ? extends O> function) {
180          this.futureListener = checkNotNull(futureListener, "futureListener");
181          this.function = checkNotNull(function, "function");
182       }
183 
184       /*
185        * Concurrency detail:
186        * 
187        * <p>To preserve the idempotency of calls to this.get(*) calls to the function are only
188        * applied once. A lock is required to prevent multiple applications of the function. The
189        * calls to future.get(*) are performed outside the lock, as is required to prevent calls to
190        * get(long, TimeUnit) to persist beyond their timeout.
191        * 
192        * <p>Calls to future.get(*) on every call to this.get(*) also provide the cancellation
193        * behavior for this.
194        * 
195        * <p>(Consider: in thread A, call get(), in thread B call get(long, TimeUnit). Thread B may
196        * have to wait for Thread A to finish, which would be unacceptable.)
197        * 
198        * <p>Note that each call to Future<O>.get(*) results in a call to Future<I>.get(*), but the
199        * function is only applied once, so Future<I>.get(*) is assumed to be idempotent.
200        */
201 
202       private final Object lock = new Object();
203       private boolean set = false;
204       private O value = null;
205 
206       @Override
207       public O get() throws InterruptedException, ExecutionException {
208          return apply(futureListener.getFuture().get());
209       }
210 
211       @Override
212       public O get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
213          return apply(futureListener.getFuture().get(timeout, unit));
214       }
215 
216       private O apply(I raw) {
217          synchronized (lock) {
218             if (!set) {
219                value = function.apply(raw);
220                set = true;
221             }
222             return value;
223          }
224       }
225 
226       @Override
227       public boolean cancel(boolean mayInterruptIfRunning) {
228          return futureListener.getFuture().cancel(mayInterruptIfRunning);
229       }
230 
231       @Override
232       public boolean isCancelled() {
233          return futureListener.getFuture().isCancelled();
234       }
235 
236       @Override
237       public boolean isDone() {
238          return futureListener.getFuture().isDone();
239       }
240 
241       @Override
242       public void addListener(Runnable listener, Executor exec) {
243          futureListener.addListener(listener, exec);
244       }
245 
246       @Override
247       protected Object delegate() {
248          return futureListener.getFuture();
249       }
250 
251    }
252 
253    /**
254     * Just like {@code Futures#compose} except that we check the type of the executorService before
255     * creating the Future. If we are single threaded, invoke the function lazy as opposed to
256     * chaining, so that we don't invoke get() early.
257     */
258    public static <I, O> ListenableFuture<O> compose(Future<I> future, final Function<? super I, ? extends O> function,
259             ExecutorService executorService) {
260       if (future instanceof Futures.ListenableFutureAdapter<?>) {
261          Futures.ListenableFutureAdapter<I> lf = (ListenableFutureAdapter<I>) future;
262          if (lf.futureListener.adapterExecutor.getClass().isAnnotationPresent(SingleThreaded.class))
263             return Futures.LazyListenableFutureFunctionAdapter.create(
264                      ((org.jclouds.concurrent.Futures.ListenableFutureAdapter<I>) future).futureListener, function);
265          else
266             return com.google.common.util.concurrent.Futures.transform(lf, function, executorService);
267       } else if (executorService.getClass().isAnnotationPresent(SingleThreaded.class)) {
268          return Futures.LazyListenableFutureFunctionAdapter.create(future, function, executorService);
269       } else {
270          return com.google.common.util.concurrent.Futures.transform(Futures.makeListenable(future, executorService),
271                   function, executorService);
272       }
273    }
274 
275    /**
276     * Just like {@code Futures#makeListenable} except that we pass in an executorService.
277     * <p/>
278     * Temporary hack until http://code.google.com/p/guava-libraries/issues/detail?id=317 is fixed.
279     */
280    public static <T> ListenableFuture<T> makeListenable(Future<T> future, ExecutorService executorService) {
281       if (future instanceof ListenableFuture<?>) {
282          return (ListenableFuture<T>) future;
283       }
284       return ListenableFutureAdapter.create(future, executorService);
285    }
286 
287 }