EMMA Coverage Report (generated Wed Oct 26 13:47:17 EDT 2011)
[all classes][org.jclouds.concurrent]

COVERAGE SUMMARY FOR SOURCE FILE [Futures.java]

nameclass, %method, %block, %line, %
Futures.java80%  (4/5)44%  (12/27)47%  (149/320)50%  (37/74)

COVERAGE BREAKDOWN BY CLASS AND METHOD

nameclass, %method, %block, %line, %
     
class Futures$LazyListenableFutureFunctionAdapter0%   (0/1)0%   (0/12)0%   (0/115)0%   (0/25)
Futures$LazyListenableFutureFunctionAdapter (Future, Function, ExecutorServic... 0%   (0/1)0%   (0/7)0%   (0/2)
Futures$LazyListenableFutureFunctionAdapter (Futures$FutureListener, Function... 0%   (0/1)0%   (0/26)0%   (0/7)
addListener (Runnable, Executor): void 0%   (0/1)0%   (0/6)0%   (0/2)
apply (Object): Object 0%   (0/1)0%   (0/27)0%   (0/6)
cancel (boolean): boolean 0%   (0/1)0%   (0/6)0%   (0/1)
create (Future, Function, ExecutorService): Futures$LazyListenableFutureFunct... 0%   (0/1)0%   (0/7)0%   (0/1)
create (Futures$FutureListener, Function): Futures$LazyListenableFutureFuncti... 0%   (0/1)0%   (0/6)0%   (0/1)
delegate (): Object 0%   (0/1)0%   (0/4)0%   (0/1)
get (): Object 0%   (0/1)0%   (0/7)0%   (0/1)
get (long, TimeUnit): Object 0%   (0/1)0%   (0/9)0%   (0/1)
isCancelled (): boolean 0%   (0/1)0%   (0/5)0%   (0/1)
isDone (): boolean 0%   (0/1)0%   (0/5)0%   (0/1)
     
class Futures100% (1/1)67%  (2/3)41%  (22/54)38%  (5/13)
Futures (): void 0%   (0/1)0%   (0/3)0%   (0/2)
compose (Future, Function, ExecutorService): ListenableFuture 100% (1/1)37%  (15/41)38%  (3/8)
makeListenable (Future, ExecutorService): ListenableFuture 100% (1/1)70%  (7/10)67%  (2/3)
     
class Futures$CallGetAndRunExecutionList100% (1/1)67%  (2/3)71%  (42/59)92%  (12/13)
toString (): String 0%   (0/1)0%   (0/17)0%   (0/1)
Futures$CallGetAndRunExecutionList (Future, ExecutionList): void 100% (1/1)100% (15/15)100% (4/4)
run (): void 100% (1/1)100% (27/27)100% (8/8)
     
class Futures$FutureListener100% (1/1)80%  (4/5)90%  (61/68)81%  (13/16)
getExecutor (): ExecutorService 0%   (0/1)0%   (0/3)0%   (0/1)
addListener (Runnable, Executor): void 100% (1/1)87%  (26/30)71%  (5/7)
Futures$FutureListener (Future, ExecutorService): void 100% (1/1)100% (26/26)100% (6/6)
create (Future, ExecutorService): Futures$FutureListener 100% (1/1)100% (6/6)100% (1/1)
getFuture (): Future 100% (1/1)100% (3/3)100% (1/1)
     
class Futures$ListenableFutureAdapter100% (1/1)100% (4/4)100% (24/24)100% (7/7)
Futures$ListenableFutureAdapter (Future, ExecutorService): void 100% (1/1)100% (8/8)100% (3/3)
addListener (Runnable, Executor): void 100% (1/1)100% (6/6)100% (2/2)
create (Future, ExecutorService): Futures$ListenableFutureAdapter 100% (1/1)100% (6/6)100% (1/1)
delegate (): Future 100% (1/1)100% (4/4)100% (1/1)

1/**
2 * Licensed to jclouds, Inc. (jclouds) under one or more
3 * contributor license agreements.  See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership.  jclouds licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License.  You may obtain a copy of the License at
9 *
10 *   http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied.  See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19package org.jclouds.concurrent;
20 
21import static com.google.common.base.Preconditions.checkNotNull;
22 
23import java.util.concurrent.ExecutionException;
24import java.util.concurrent.Executor;
25import java.util.concurrent.ExecutorService;
26import java.util.concurrent.Future;
27import java.util.concurrent.TimeUnit;
28import java.util.concurrent.TimeoutException;
29import java.util.concurrent.atomic.AtomicBoolean;
30 
31import com.google.common.annotations.Beta;
32import com.google.common.annotations.VisibleForTesting;
33import com.google.common.base.Function;
34import com.google.common.collect.ForwardingObject;
35import com.google.common.util.concurrent.ExecutionList;
36import com.google.common.util.concurrent.ForwardingFuture;
37import com.google.common.util.concurrent.ListenableFuture;
38 
39/**
40 * functions related to or replacing those in {@link com.google.common.util.concurrent.Futures}
41 * 
42 * @author Adrian Cole
43 */
44@Beta
45public class Futures {
46   @VisibleForTesting
47   static class CallGetAndRunExecutionList<T> implements Runnable {
48      private final Future<T> delegate;
49      private final ExecutionList executionList;
50 
51      public CallGetAndRunExecutionList(Future<T> delegate, ExecutionList executionList) {
52         this.delegate = checkNotNull(delegate, "delegate");
53         this.executionList = checkNotNull(executionList, "executionList");
54      }
55 
56      @Override
57      public void run() {
58         try {
59            delegate.get();
60         } catch (InterruptedException e) {
61            // This thread was interrupted. This should never happen, so we
62            // throw an IllegalStateException.
63            Thread.currentThread().interrupt();
64            // TODO we cannot inspect the executionList at the moment to make a reasonable
65            // toString()
66            throw new IllegalStateException(String.format(
67                     "interrupted calling get() on [%s], so could not run listeners", delegate), e);
68         } catch (Throwable e) {
69            // ExecutionException / CancellationException / RuntimeException
70            // The task is done, run the listeners.
71         }
72         executionList.execute();
73      }
74 
75      @Override
76      public String toString() {
77         return "[delegate=" + delegate + ", executionList=" + executionList + "]";
78      }
79   }
80 
81   // Adapted from Guava
82   //
83   // * to allow us to enforce supply of an adapterExecutor
84   // note that this is done so that we can operate in Google AppEngine which
85   // restricts thread creation
86   // * to allow us to print debug info about what the delegate was doing
87   public static class FutureListener<T> {
88 
89      final ExecutorService adapterExecutor;
90 
91      // The execution list to hold our listeners.
92      private final ExecutionList executionList = new ExecutionList();
93 
94      // This allows us to only start up a thread waiting on the delegate future
95      // when the first listener is added.
96      private final AtomicBoolean hasListeners = new AtomicBoolean(false);
97 
98      // The delegate future.
99      private final Future<T> delegate;
100 
101      static <T> FutureListener<T> create(Future<T> delegate, ExecutorService adapterExecutor) {
102         return new FutureListener<T>(delegate, adapterExecutor);
103      }
104 
105      private FutureListener(Future<T> delegate, ExecutorService adapterExecutor) {
106         this.delegate = checkNotNull(delegate, "delegate");
107         this.adapterExecutor = checkNotNull(adapterExecutor, "adapterExecutor");
108      }
109 
110      public void addListener(Runnable listener, Executor exec) {
111         executionList.add(listener, exec);
112 
113         // When a listener is first added, we run a task that will wait for
114         // the delegate to finish, and when it is done will run the listeners.
115         if (hasListeners.compareAndSet(false, true)) {
116            if (delegate.isDone()) {
117               // If the delegate is already done, run the execution list
118               // immediately on the current thread.
119               executionList.execute();
120               return;
121            }
122            adapterExecutor.execute(new CallGetAndRunExecutionList<T>(delegate, executionList));
123         }
124      }
125 
126      Future<T> getFuture() {
127         return delegate;
128      }
129 
130      ExecutorService getExecutor() {
131         return adapterExecutor;
132      }
133   }
134 
135   public static class ListenableFutureAdapter<T> extends ForwardingFuture<T> implements ListenableFuture<T> {
136      final FutureListener<T> futureListener;
137 
138      static <T> ListenableFutureAdapter<T> create(Future<T> future, ExecutorService executor) {
139         return new ListenableFutureAdapter<T>(future, executor);
140      }
141 
142      private ListenableFutureAdapter(Future<T> future, ExecutorService executor) {
143         this.futureListener = FutureListener.create(future, executor);
144      }
145 
146      @Override
147      protected Future<T> delegate() {
148         return futureListener.getFuture();
149      }
150 
151      @Override
152      public void addListener(Runnable listener, Executor exec) {
153         futureListener.addListener(listener, exec);
154      }
155 
156   }
157 
158   public static class LazyListenableFutureFunctionAdapter<I, O> extends ForwardingObject implements
159            ListenableFuture<O> {
160      private final FutureListener<I> futureListener;
161      private final Function<? super I, ? extends O> function;
162 
163      static <I, O> LazyListenableFutureFunctionAdapter<I, O> create(Future<I> future,
164               Function<? super I, ? extends O> function, ExecutorService executor) {
165         return new LazyListenableFutureFunctionAdapter<I, O>(future, function, executor);
166      }
167 
168      static <I, O> LazyListenableFutureFunctionAdapter<I, O> create(FutureListener<I> futureListener,
169               Function<? super I, ? extends O> function) {
170         return new LazyListenableFutureFunctionAdapter<I, O>(futureListener, function);
171      }
172 
173      private LazyListenableFutureFunctionAdapter(Future<I> future, Function<? super I, ? extends O> function,
174               ExecutorService executor) {
175         this(FutureListener.create(future, executor), function);
176      }
177 
178      private LazyListenableFutureFunctionAdapter(FutureListener<I> futureListener,
179               Function<? super I, ? extends O> function) {
180         this.futureListener = checkNotNull(futureListener, "futureListener");
181         this.function = checkNotNull(function, "function");
182      }
183 
184      /*
185       * Concurrency detail:
186       * 
187       * <p>To preserve the idempotency of calls to this.get(*) calls to the function are only
188       * applied once. A lock is required to prevent multiple applications of the function. The
189       * calls to future.get(*) are performed outside the lock, as is required to prevent calls to
190       * get(long, TimeUnit) to persist beyond their timeout.
191       * 
192       * <p>Calls to future.get(*) on every call to this.get(*) also provide the cancellation
193       * behavior for this.
194       * 
195       * <p>(Consider: in thread A, call get(), in thread B call get(long, TimeUnit). Thread B may
196       * have to wait for Thread A to finish, which would be unacceptable.)
197       * 
198       * <p>Note that each call to Future<O>.get(*) results in a call to Future<I>.get(*), but the
199       * function is only applied once, so Future<I>.get(*) is assumed to be idempotent.
200       */
201 
202      private final Object lock = new Object();
203      private boolean set = false;
204      private O value = null;
205 
206      @Override
207      public O get() throws InterruptedException, ExecutionException {
208         return apply(futureListener.getFuture().get());
209      }
210 
211      @Override
212      public O get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
213         return apply(futureListener.getFuture().get(timeout, unit));
214      }
215 
216      private O apply(I raw) {
217         synchronized (lock) {
218            if (!set) {
219               value = function.apply(raw);
220               set = true;
221            }
222            return value;
223         }
224      }
225 
226      @Override
227      public boolean cancel(boolean mayInterruptIfRunning) {
228         return futureListener.getFuture().cancel(mayInterruptIfRunning);
229      }
230 
231      @Override
232      public boolean isCancelled() {
233         return futureListener.getFuture().isCancelled();
234      }
235 
236      @Override
237      public boolean isDone() {
238         return futureListener.getFuture().isDone();
239      }
240 
241      @Override
242      public void addListener(Runnable listener, Executor exec) {
243         futureListener.addListener(listener, exec);
244      }
245 
246      @Override
247      protected Object delegate() {
248         return futureListener.getFuture();
249      }
250 
251   }
252 
253   /**
254    * Just like {@code Futures#compose} except that we check the type of the executorService before
255    * creating the Future. If we are single threaded, invoke the function lazy as opposed to
256    * chaining, so that we don't invoke get() early.
257    */
258   public static <I, O> ListenableFuture<O> compose(Future<I> future, final Function<? super I, ? extends O> function,
259            ExecutorService executorService) {
260      if (future instanceof Futures.ListenableFutureAdapter<?>) {
261         Futures.ListenableFutureAdapter<I> lf = (ListenableFutureAdapter<I>) future;
262         if (lf.futureListener.adapterExecutor.getClass().isAnnotationPresent(SingleThreaded.class))
263            return Futures.LazyListenableFutureFunctionAdapter.create(
264                     ((org.jclouds.concurrent.Futures.ListenableFutureAdapter<I>) future).futureListener, function);
265         else
266            return com.google.common.util.concurrent.Futures.transform(lf, function, executorService);
267      } else if (executorService.getClass().isAnnotationPresent(SingleThreaded.class)) {
268         return Futures.LazyListenableFutureFunctionAdapter.create(future, function, executorService);
269      } else {
270         return com.google.common.util.concurrent.Futures.transform(Futures.makeListenable(future, executorService),
271                  function, executorService);
272      }
273   }
274 
275   /**
276    * Just like {@code Futures#makeListenable} except that we pass in an executorService.
277    * <p/>
278    * Temporary hack until http://code.google.com/p/guava-libraries/issues/detail?id=317 is fixed.
279    */
280   public static <T> ListenableFuture<T> makeListenable(Future<T> future, ExecutorService executorService) {
281      if (future instanceof ListenableFuture<?>) {
282         return (ListenableFuture<T>) future;
283      }
284      return ListenableFutureAdapter.create(future, executorService);
285   }
286 
287}

[all classes][org.jclouds.concurrent]
EMMA 2.0.5312 (C) Vladimir Roubtsov