| 1 | /** | 
| 2 |  * Licensed to jclouds, Inc. (jclouds) under one or more | 
| 3 |  * contributor license agreements.  See the NOTICE file | 
| 4 |  * distributed with this work for additional information | 
| 5 |  * regarding copyright ownership.  jclouds licenses this file | 
| 6 |  * to you under the Apache License, Version 2.0 (the | 
| 7 |  * "License"); you may not use this file except in compliance | 
| 8 |  * with the License.  You may obtain a copy of the License at | 
| 9 |  * | 
| 10 |  *   http://www.apache.org/licenses/LICENSE-2.0 | 
| 11 |  * | 
| 12 |  * Unless required by applicable law or agreed to in writing, | 
| 13 |  * software distributed under the License is distributed on an | 
| 14 |  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | 
| 15 |  * KIND, either express or implied.  See the License for the | 
| 16 |  * specific language governing permissions and limitations | 
| 17 |  * under the License. | 
| 18 |  */ | 
| 19 | /* | 
| 20 | * Licensed to Elastic Search and Shay Banon under one | 
| 21 | * or more contributor license agreements. See the NOTICE file | 
| 22 | * distributed with this work for additional information | 
| 23 | * regarding copyright ownership. Elastic Search licenses this | 
| 24 | * file to you under the Apache License, Version 2.0 (the | 
| 25 | * "License"); you may not use this file except in compliance | 
| 26 | * with the License. You may obtain a copy of the License at | 
| 27 | * | 
| 28 | * http://www.apache.org/licenses/LICENSE-2.0 | 
| 29 | * | 
| 30 | * Unless required by applicable law or agreed to in writing, | 
| 31 | * software distributed under the License is distributed on an | 
| 32 | * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | 
| 33 | * KIND, either express or implied. See the License for the | 
| 34 | * specific language governing permissions and limitations | 
| 35 | * under the License. | 
| 36 | */ | 
| 37 |   | 
| 38 | package org.jclouds.concurrent; | 
| 39 |   | 
| 40 |   | 
| 41 | import java.util.concurrent.*; | 
| 42 | import java.util.concurrent.atomic.AtomicInteger; | 
| 43 |   | 
| 44 | /** | 
| 45 | * An {@link ExecutorService} that executes each submitted task using one of | 
| 46 | * possibly several pooled threads, normally configured using | 
| 47 | * {@link DynamicExecutors} factory methods. | 
| 48 | * | 
| 49 | * @author kimchy (shay.banon) | 
| 50 | */ | 
| 51 | public class DynamicThreadPoolExecutor extends ThreadPoolExecutor { | 
| 52 |     /** | 
| 53 | * number of threads that are actively executing tasks | 
| 54 | */ | 
| 55 |     private final AtomicInteger activeCount = new AtomicInteger(); | 
| 56 |   | 
| 57 |     public DynamicThreadPoolExecutor(int corePoolSize, int maximumPoolSize, | 
| 58 |                                      long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, | 
| 59 |                                      ThreadFactory threadFactory) { | 
| 60 |         super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory); | 
| 61 |     } | 
| 62 |   | 
| 63 |     @Override public int getActiveCount() { | 
| 64 |         return activeCount.get(); | 
| 65 |     } | 
| 66 |   | 
| 67 |     @Override protected void beforeExecute(Thread t, Runnable r) { | 
| 68 |         activeCount.incrementAndGet(); | 
| 69 |     } | 
| 70 |   | 
| 71 |     @Override protected void afterExecute(Runnable r, Throwable t) { | 
| 72 |         activeCount.decrementAndGet(); | 
| 73 |     } | 
| 74 |   | 
| 75 |     /** | 
| 76 | * Much like a {@link SynchronousQueue} which acts as a rendezvous channel. It | 
| 77 | * is well suited for handoff designs, in which a tasks is only queued if there | 
| 78 | * is an available thread to pick it up. | 
| 79 | * <p/> | 
| 80 | * This queue is correlated with a thread-pool, and allows insertions to the | 
| 81 | * queue only if there is a free thread that can poll this task. Otherwise, the | 
| 82 | * task is rejected and the decision is left up to one of the | 
| 83 | * {@link RejectedExecutionHandler} policies: | 
| 84 | * <ol> | 
| 85 | * <li> {@link ForceQueuePolicy} - forces the queue to accept the rejected task. </li> | 
| 86 | * <li> {@link TimedBlockingPolicy} - waits for a given time for the task to be | 
| 87 | * executed.</li> | 
| 88 | * </ol> | 
| 89 | * | 
| 90 | * @author kimchy (Shay Banon) | 
| 91 | */ | 
| 92 |     public static class DynamicQueue<E> extends LinkedBlockingQueue<E> { | 
| 93 |         private static final long serialVersionUID = 1L; | 
| 94 |   | 
| 95 |         /** | 
| 96 | * The executor this Queue belongs to | 
| 97 | */ | 
| 98 |         private transient ThreadPoolExecutor executor; | 
| 99 |   | 
| 100 |         /** | 
| 101 | * Creates a <tt>DynamicQueue</tt> with a capacity of | 
| 102 | * {@link Integer#MAX_VALUE}. | 
| 103 | */ | 
| 104 |         public DynamicQueue() { | 
| 105 |             super(); | 
| 106 |         } | 
| 107 |   | 
| 108 |         /** | 
| 109 | * Creates a <tt>DynamicQueue</tt> with the given (fixed) capacity. | 
| 110 | * | 
| 111 | * @param capacity the capacity of this queue. | 
| 112 | */ | 
| 113 |         public DynamicQueue(int capacity) { | 
| 114 |             super(capacity); | 
| 115 |         } | 
| 116 |   | 
| 117 |         /** | 
| 118 | * Sets the executor this queue belongs to. | 
| 119 | */ | 
| 120 |         public void setThreadPoolExecutor(ThreadPoolExecutor executor) { | 
| 121 |             this.executor = executor; | 
| 122 |         } | 
| 123 |   | 
| 124 |         /** | 
| 125 | * Inserts the specified element at the tail of this queue if there is at | 
| 126 | * least one available thread to run the current task. If all pool threads | 
| 127 | * are actively busy, it rejects the offer. | 
| 128 | * | 
| 129 | * @param o the element to add. | 
| 130 | * @return <tt>true</tt> if it was possible to add the element to this | 
| 131 | * queue, else <tt>false</tt> | 
| 132 | * @see ThreadPoolExecutor#execute(Runnable) | 
| 133 | */ | 
| 134 |         @Override | 
| 135 |         public boolean offer(E o) { | 
| 136 |             int allWorkingThreads = executor.getActiveCount() + super.size(); | 
| 137 |             return allWorkingThreads < executor.getPoolSize() && super.offer(o); | 
| 138 |         } | 
| 139 |     } | 
| 140 |   | 
| 141 |     /** | 
| 142 | * A handler for rejected tasks that adds the specified element to this queue, | 
| 143 | * waiting if necessary for space to become available. | 
| 144 | */ | 
| 145 |     public static class ForceQueuePolicy implements RejectedExecutionHandler { | 
| 146 |         public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { | 
| 147 |             try { | 
| 148 |                 executor.getQueue().put(r); | 
| 149 |             } catch (InterruptedException e) { | 
| 150 |                 //should never happen since we never wait | 
| 151 |                 throw new RejectedExecutionException(e); | 
| 152 |             } | 
| 153 |         } | 
| 154 |     } | 
| 155 |   | 
| 156 |     /** | 
| 157 | * A handler for rejected tasks that inserts the specified element into this | 
| 158 | * queue, waiting if necessary up to the specified wait time for space to become | 
| 159 | * available. | 
| 160 | */ | 
| 161 |     public static class TimedBlockingPolicy implements RejectedExecutionHandler { | 
| 162 |         private final long waitTime; | 
| 163 |   | 
| 164 |         /** | 
| 165 | * @param waitTime wait time in milliseconds for space to become available. | 
| 166 | */ | 
| 167 |         public TimedBlockingPolicy(long waitTime) { | 
| 168 |             this.waitTime = waitTime; | 
| 169 |         } | 
| 170 |   | 
| 171 |         public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { | 
| 172 |             try { | 
| 173 |                 boolean successful = executor.getQueue().offer(r, waitTime, TimeUnit.MILLISECONDS); | 
| 174 |                 if (!successful) | 
| 175 |                     throw new RejectedExecutionException("Rejected execution after waiting " | 
| 176 |                             + waitTime + " ms for task [" + r.getClass() + "] to be executed."); | 
| 177 |             } catch (InterruptedException e) { | 
| 178 |                 throw new RejectedExecutionException(e); | 
| 179 |             } | 
| 180 |         } | 
| 181 |     } | 
| 182 | } |