1 | /** |
2 | * Licensed to jclouds, Inc. (jclouds) under one or more |
3 | * contributor license agreements. See the NOTICE file |
4 | * distributed with this work for additional information |
5 | * regarding copyright ownership. jclouds licenses this file |
6 | * to you under the Apache License, Version 2.0 (the |
7 | * "License"); you may not use this file except in compliance |
8 | * with the License. You may obtain a copy of the License at |
9 | * |
10 | * http://www.apache.org/licenses/LICENSE-2.0 |
11 | * |
12 | * Unless required by applicable law or agreed to in writing, |
13 | * software distributed under the License is distributed on an |
14 | * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
15 | * KIND, either express or implied. See the License for the |
16 | * specific language governing permissions and limitations |
17 | * under the License. |
18 | */ |
19 | package org.jclouds.compute.callables; |
20 | |
21 | import static com.google.common.base.Preconditions.checkNotNull; |
22 | |
23 | import java.util.Date; |
24 | import java.util.concurrent.ExecutionException; |
25 | import java.util.concurrent.ExecutorService; |
26 | import java.util.concurrent.TimeUnit; |
27 | import java.util.concurrent.TimeoutException; |
28 | |
29 | import javax.annotation.PostConstruct; |
30 | import javax.annotation.Resource; |
31 | import javax.inject.Inject; |
32 | import javax.inject.Named; |
33 | |
34 | import org.jclouds.Constants; |
35 | import org.jclouds.compute.domain.ExecResponse; |
36 | import org.jclouds.compute.predicates.ScriptStatusReturnsZero; |
37 | import org.jclouds.compute.reference.ComputeServiceConstants; |
38 | import org.jclouds.logging.Logger; |
39 | import org.jclouds.predicates.RetryablePredicate; |
40 | import org.jclouds.scriptbuilder.InitBuilder; |
41 | import org.jclouds.ssh.SshClient; |
42 | |
43 | import com.google.common.base.Predicate; |
44 | import com.google.common.base.Throwables; |
45 | import com.google.common.util.concurrent.AbstractFuture; |
46 | import com.google.inject.assistedinject.Assisted; |
47 | |
48 | /** |
49 | * A future that works in tandem with a task that was invoked by {@link InitBuilder} |
50 | * |
51 | * @author Adrian Cole |
52 | */ |
53 | public class BlockUntilInitScriptStatusIsZeroThenReturnOutput extends AbstractFuture<ExecResponse> { |
54 | |
55 | public static interface Factory { |
56 | BlockUntilInitScriptStatusIsZeroThenReturnOutput create(SudoAwareInitManager commandRunner); |
57 | } |
58 | |
59 | @Resource |
60 | @Named(ComputeServiceConstants.COMPUTE_LOGGER) |
61 | protected Logger logger = Logger.NULL; |
62 | |
63 | private final ExecutorService userThreads; |
64 | private final SudoAwareInitManager commandRunner; |
65 | private final RetryablePredicate<String> notRunningAnymore; |
66 | |
67 | private boolean shouldCancel; |
68 | |
69 | @Inject |
70 | public BlockUntilInitScriptStatusIsZeroThenReturnOutput( |
71 | @Named(Constants.PROPERTY_USER_THREADS) ExecutorService userThreads, |
72 | final ScriptStatusReturnsZero stateRunning, @Assisted final SudoAwareInitManager commandRunner) { |
73 | this.commandRunner = checkNotNull(commandRunner, "commandRunner"); |
74 | this.userThreads = checkNotNull(userThreads, "userThreads"); |
75 | this.notRunningAnymore = new RetryablePredicate<String>(new Predicate<String>() { |
76 | |
77 | @Override |
78 | public boolean apply(String arg0) { |
79 | return commandRunner.runAction(arg0).getOutput().trim().equals(""); |
80 | } |
81 | // arbitrarily high value, but Long.MAX_VALUE doesn't work! |
82 | }, TimeUnit.DAYS.toMillis(365)) { |
83 | /** |
84 | * make sure we stop the retry loop if someone cancelled the future, this keeps threads |
85 | * from being consumed on dead tasks |
86 | */ |
87 | @Override |
88 | protected boolean atOrAfter(Date end) { |
89 | if (shouldCancel) |
90 | Throwables.propagate(new TimeoutException("cancelled")); |
91 | return super.atOrAfter(end); |
92 | } |
93 | }; |
94 | } |
95 | |
96 | /** |
97 | * in case login credentials or user changes at runtime. |
98 | */ |
99 | public void setSshClient(SshClient client) { |
100 | |
101 | } |
102 | |
103 | /** |
104 | * Submits a thread that will either set the result of the future or the exception that took |
105 | * place |
106 | */ |
107 | @PostConstruct |
108 | BlockUntilInitScriptStatusIsZeroThenReturnOutput init() { |
109 | userThreads.submit(new Runnable() { |
110 | @Override |
111 | public void run() { |
112 | try { |
113 | boolean complete = notRunningAnymore.apply("status"); |
114 | String stdout = commandRunner.runAction("tail").getOutput(); |
115 | String stderr = commandRunner.runAction("tailerr").getOutput(); |
116 | // TODO make ScriptBuilder save exit status on nuhup |
117 | logger.debug("<< complete(%s) status(%s)", commandRunner.getStatement().getInstanceName(), complete); |
118 | set(new ExecResponse(stdout, stderr, complete && !shouldCancel ? 0 : -1)); |
119 | } catch (Exception e) { |
120 | setException(e); |
121 | } |
122 | } |
123 | }); |
124 | return this; |
125 | } |
126 | |
127 | @Override |
128 | protected void interruptTask() { |
129 | logger.debug("<< cancelled(%s)", commandRunner.getStatement().getInstanceName()); |
130 | commandRunner.refreshAndRunAction("stop"); |
131 | shouldCancel = true; |
132 | super.interruptTask(); |
133 | } |
134 | |
135 | @Override |
136 | public String toString() { |
137 | return String.format("running task[%s]", commandRunner); |
138 | } |
139 | |
140 | @Override |
141 | public int hashCode() { |
142 | final int prime = 31; |
143 | int result = 1; |
144 | result = prime * result + ((commandRunner == null) ? 0 : commandRunner.hashCode()); |
145 | result = prime * result + ((logger == null) ? 0 : logger.hashCode()); |
146 | result = prime * result + ((notRunningAnymore == null) ? 0 : notRunningAnymore.hashCode()); |
147 | result = prime * result + (shouldCancel ? 1231 : 1237); |
148 | result = prime * result + ((userThreads == null) ? 0 : userThreads.hashCode()); |
149 | return result; |
150 | } |
151 | |
152 | @Override |
153 | public boolean equals(Object obj) { |
154 | if (this == obj) |
155 | return true; |
156 | if (obj == null) |
157 | return false; |
158 | if (getClass() != obj.getClass()) |
159 | return false; |
160 | BlockUntilInitScriptStatusIsZeroThenReturnOutput other = (BlockUntilInitScriptStatusIsZeroThenReturnOutput) obj; |
161 | if (commandRunner == null) { |
162 | if (other.commandRunner != null) |
163 | return false; |
164 | } else if (!commandRunner.equals(other.commandRunner)) |
165 | return false; |
166 | if (logger == null) { |
167 | if (other.logger != null) |
168 | return false; |
169 | } else if (!logger.equals(other.logger)) |
170 | return false; |
171 | if (notRunningAnymore == null) { |
172 | if (other.notRunningAnymore != null) |
173 | return false; |
174 | } else if (!notRunningAnymore.equals(other.notRunningAnymore)) |
175 | return false; |
176 | if (shouldCancel != other.shouldCancel) |
177 | return false; |
178 | if (userThreads == null) { |
179 | if (other.userThreads != null) |
180 | return false; |
181 | } else if (!userThreads.equals(other.userThreads)) |
182 | return false; |
183 | return true; |
184 | } |
185 | |
186 | @Override |
187 | public ExecResponse get(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException, |
188 | ExecutionException { |
189 | try { |
190 | return super.get(timeout, unit); |
191 | } catch (TimeoutException e) { |
192 | throw new ScriptStillRunningException(timeout, unit, this); |
193 | } |
194 | } |
195 | |
196 | } |