EMMA Coverage Report (generated Wed Oct 26 13:47:17 EDT 2011)
[all classes][org.jclouds.concurrent]

COVERAGE SUMMARY FOR SOURCE FILE [DynamicThreadPoolExecutor.java]

nameclass, %method, %block, %line, %
DynamicThreadPoolExecutor.java75%  (3/4)75%  (9/12)54%  (64/118)56%  (18/32)

COVERAGE BREAKDOWN BY CLASS AND METHOD

nameclass, %method, %block, %line, %
     
class DynamicThreadPoolExecutor$TimedBlockingPolicy0%   (0/1)0%   (0/2)0%   (0/44)0%   (0/10)
DynamicThreadPoolExecutor$TimedBlockingPolicy (long): void 0%   (0/1)0%   (0/6)0%   (0/3)
rejectedExecution (Runnable, ThreadPoolExecutor): void 0%   (0/1)0%   (0/38)0%   (0/7)
     
class DynamicThreadPoolExecutor$ForceQueuePolicy100% (1/1)100% (2/2)60%  (9/15)67%  (4/6)
rejectedExecution (Runnable, ThreadPoolExecutor): void 100% (1/1)50%  (6/12)60%  (3/5)
DynamicThreadPoolExecutor$ForceQueuePolicy (): void 100% (1/1)100% (3/3)100% (1/1)
     
class DynamicThreadPoolExecutor$DynamicQueue100% (1/1)75%  (3/4)87%  (27/31)75%  (6/8)
DynamicThreadPoolExecutor$DynamicQueue (int): void 0%   (0/1)0%   (0/4)0%   (0/2)
DynamicThreadPoolExecutor$DynamicQueue (): void 100% (1/1)100% (3/3)100% (2/2)
offer (Object): boolean 100% (1/1)100% (20/20)100% (2/2)
setThreadPoolExecutor (ThreadPoolExecutor): void 100% (1/1)100% (4/4)100% (2/2)
     
class DynamicThreadPoolExecutor100% (1/1)100% (4/4)100% (28/28)100% (8/8)
DynamicThreadPoolExecutor (int, int, long, TimeUnit, BlockingQueue, ThreadFac... 100% (1/1)100% (14/14)100% (3/3)
afterExecute (Runnable, Throwable): void 100% (1/1)100% (5/5)100% (2/2)
beforeExecute (Thread, Runnable): void 100% (1/1)100% (5/5)100% (2/2)
getActiveCount (): int 100% (1/1)100% (4/4)100% (1/1)

1/**
2 * Licensed to jclouds, Inc. (jclouds) under one or more
3 * contributor license agreements.  See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership.  jclouds licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License.  You may obtain a copy of the License at
9 *
10 *   http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied.  See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19/*
20* Licensed to Elastic Search and Shay Banon under one
21* or more contributor license agreements. See the NOTICE file
22* distributed with this work for additional information
23* regarding copyright ownership. Elastic Search licenses this
24* file to you under the Apache License, Version 2.0 (the
25* "License"); you may not use this file except in compliance
26* with the License. You may obtain a copy of the License at
27*
28* http://www.apache.org/licenses/LICENSE-2.0
29*
30* Unless required by applicable law or agreed to in writing,
31* software distributed under the License is distributed on an
32* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
33* KIND, either express or implied. See the License for the
34* specific language governing permissions and limitations
35* under the License.
36*/
37 
38package org.jclouds.concurrent;
39 
40 
41import java.util.concurrent.*;
42import java.util.concurrent.atomic.AtomicInteger;
43 
44/**
45* An {@link ExecutorService} that executes each submitted task using one of
46* possibly several pooled threads, normally configured using
47* {@link DynamicExecutors} factory methods.
48*
49* @author kimchy (shay.banon)
50*/
51public class DynamicThreadPoolExecutor extends ThreadPoolExecutor {
52    /**
53* number of threads that are actively executing tasks
54*/
55    private final AtomicInteger activeCount = new AtomicInteger();
56 
57    public DynamicThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
58                                     long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue,
59                                     ThreadFactory threadFactory) {
60        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
61    }
62 
63    @Override public int getActiveCount() {
64        return activeCount.get();
65    }
66 
67    @Override protected void beforeExecute(Thread t, Runnable r) {
68        activeCount.incrementAndGet();
69    }
70 
71    @Override protected void afterExecute(Runnable r, Throwable t) {
72        activeCount.decrementAndGet();
73    }
74 
75    /**
76* Much like a {@link SynchronousQueue} which acts as a rendezvous channel. It
77* is well suited for handoff designs, in which a tasks is only queued if there
78* is an available thread to pick it up.
79* <p/>
80* This queue is correlated with a thread-pool, and allows insertions to the
81* queue only if there is a free thread that can poll this task. Otherwise, the
82* task is rejected and the decision is left up to one of the
83* {@link RejectedExecutionHandler} policies:
84* <ol>
85* <li> {@link ForceQueuePolicy} - forces the queue to accept the rejected task. </li>
86* <li> {@link TimedBlockingPolicy} - waits for a given time for the task to be
87* executed.</li>
88* </ol>
89*
90* @author kimchy (Shay Banon)
91*/
92    public static class DynamicQueue<E> extends LinkedBlockingQueue<E> {
93        private static final long serialVersionUID = 1L;
94 
95        /**
96* The executor this Queue belongs to
97*/
98        private transient ThreadPoolExecutor executor;
99 
100        /**
101* Creates a <tt>DynamicQueue</tt> with a capacity of
102* {@link Integer#MAX_VALUE}.
103*/
104        public DynamicQueue() {
105            super();
106        }
107 
108        /**
109* Creates a <tt>DynamicQueue</tt> with the given (fixed) capacity.
110*
111* @param capacity the capacity of this queue.
112*/
113        public DynamicQueue(int capacity) {
114            super(capacity);
115        }
116 
117        /**
118* Sets the executor this queue belongs to.
119*/
120        public void setThreadPoolExecutor(ThreadPoolExecutor executor) {
121            this.executor = executor;
122        }
123 
124        /**
125* Inserts the specified element at the tail of this queue if there is at
126* least one available thread to run the current task. If all pool threads
127* are actively busy, it rejects the offer.
128*
129* @param o the element to add.
130* @return <tt>true</tt> if it was possible to add the element to this
131* queue, else <tt>false</tt>
132* @see ThreadPoolExecutor#execute(Runnable)
133*/
134        @Override
135        public boolean offer(E o) {
136            int allWorkingThreads = executor.getActiveCount() + super.size();
137            return allWorkingThreads < executor.getPoolSize() && super.offer(o);
138        }
139    }
140 
141    /**
142* A handler for rejected tasks that adds the specified element to this queue,
143* waiting if necessary for space to become available.
144*/
145    public static class ForceQueuePolicy implements RejectedExecutionHandler {
146        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
147            try {
148                executor.getQueue().put(r);
149            } catch (InterruptedException e) {
150                //should never happen since we never wait
151                throw new RejectedExecutionException(e);
152            }
153        }
154    }
155 
156    /**
157* A handler for rejected tasks that inserts the specified element into this
158* queue, waiting if necessary up to the specified wait time for space to become
159* available.
160*/
161    public static class TimedBlockingPolicy implements RejectedExecutionHandler {
162        private final long waitTime;
163 
164        /**
165* @param waitTime wait time in milliseconds for space to become available.
166*/
167        public TimedBlockingPolicy(long waitTime) {
168            this.waitTime = waitTime;
169        }
170 
171        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
172            try {
173                boolean successful = executor.getQueue().offer(r, waitTime, TimeUnit.MILLISECONDS);
174                if (!successful)
175                    throw new RejectedExecutionException("Rejected execution after waiting "
176                            + waitTime + " ms for task [" + r.getClass() + "] to be executed.");
177            } catch (InterruptedException e) {
178                throw new RejectedExecutionException(e);
179            }
180        }
181    }
182}

[all classes][org.jclouds.concurrent]
EMMA 2.0.5312 (C) Vladimir Roubtsov