1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.jclouds.ssh.jsch;
20
21 import static com.google.common.base.Preconditions.checkArgument;
22 import static com.google.common.base.Preconditions.checkNotNull;
23 import static com.google.common.base.Preconditions.checkState;
24 import static com.google.common.base.Predicates.instanceOf;
25 import static com.google.common.base.Predicates.or;
26 import static com.google.common.base.Throwables.getCausalChain;
27 import static com.google.common.base.Throwables.getRootCause;
28 import static com.google.common.collect.Iterables.any;
29
30 import java.io.IOException;
31 import java.io.InputStream;
32 import java.net.ConnectException;
33 import java.util.Arrays;
34
35 import javax.annotation.PostConstruct;
36 import javax.annotation.PreDestroy;
37 import javax.annotation.Resource;
38 import javax.inject.Named;
39
40 import org.apache.commons.io.input.ProxyInputStream;
41 import org.apache.commons.io.output.ByteArrayOutputStream;
42 import org.jclouds.compute.domain.ExecResponse;
43 import org.jclouds.http.handlers.BackoffLimitedRetryHandler;
44 import org.jclouds.io.Payload;
45 import org.jclouds.io.Payloads;
46 import org.jclouds.logging.Logger;
47 import org.jclouds.net.IPSocket;
48 import org.jclouds.ssh.SshClient;
49 import org.jclouds.ssh.SshException;
50 import org.jclouds.util.Strings2;
51
52 import com.google.common.annotations.VisibleForTesting;
53 import com.google.common.base.Predicate;
54 import com.google.common.base.Splitter;
55 import com.google.common.collect.Iterables;
56 import com.google.common.io.Closeables;
57 import com.google.inject.Inject;
58 import com.jcraft.jsch.ChannelExec;
59 import com.jcraft.jsch.ChannelSftp;
60 import com.jcraft.jsch.JSch;
61 import com.jcraft.jsch.Session;
62
63
64
65
66
67
68 public class JschSshClient implements SshClient {
69
70 private final class CloseFtpChannelOnCloseInputStream extends ProxyInputStream {
71
72 private final ChannelSftp sftp;
73
74 private CloseFtpChannelOnCloseInputStream(InputStream proxy, ChannelSftp sftp) {
75 super(proxy);
76 this.sftp = sftp;
77 }
78
79 @Override
80 public void close() throws IOException {
81 super.close();
82 if (sftp != null)
83 sftp.disconnect();
84 }
85 }
86
87 private final String host;
88 private final int port;
89 private final String username;
90 private final String password;
91
92 @Inject(optional = true)
93 @Named("jclouds.ssh.max_retries")
94 @VisibleForTesting
95 int sshRetries = 5;
96
97 @Inject(optional = true)
98 @Named("jclouds.ssh.retryable_messages")
99 @VisibleForTesting
100 String retryableMessages = "failed to send channel request,channel is not opened,invalid data,End of IO Stream Read,Connection reset,connection is closed by foreign host,socket is not established";
101
102 @Inject(optional = true)
103 @Named("jclouds.ssh.retry_predicate")
104 private Predicate<Throwable> retryPredicate = or(instanceOf(ConnectException.class), instanceOf(IOException.class));
105
106 @Resource
107 @Named("jclouds.ssh")
108 protected Logger logger = Logger.NULL;
109
110 private Session session;
111 private final byte[] privateKey;
112 final byte[] emptyPassPhrase = new byte[0];
113 private final int timeout;
114 private final BackoffLimitedRetryHandler backoffLimitedRetryHandler;
115
116 public JschSshClient(BackoffLimitedRetryHandler backoffLimitedRetryHandler, IPSocket socket, int timeout,
117 String username, String password, byte[] privateKey) {
118 this.host = checkNotNull(socket, "socket").getAddress();
119 checkArgument(socket.getPort() > 0, "ssh port must be greater then zero" + socket.getPort());
120 checkArgument(password != null || privateKey != null, "you must specify a password or a key");
121 this.port = socket.getPort();
122 this.username = checkNotNull(username, "username");
123 this.backoffLimitedRetryHandler = checkNotNull(backoffLimitedRetryHandler, "backoffLimitedRetryHandler");
124 this.timeout = timeout;
125 this.password = password;
126 this.privateKey = privateKey;
127 }
128
129 @Override
130 public void put(String path, String contents) {
131 put(path, Payloads.newStringPayload(checkNotNull(contents, "contents")));
132 }
133
134 private void checkConnected() {
135 checkState(session != null && session.isConnected(), String.format("(%s) Session not connected!", toString()));
136 }
137
138 public static interface Connection<T> {
139 void clear();
140
141 T create() throws Exception;
142 }
143
144 Connection<Session> sessionConnection = new Connection<Session>() {
145
146 @Override
147 public void clear() {
148 if (session != null && session.isConnected()) {
149 session.disconnect();
150 session = null;
151 }
152 }
153
154 @Override
155 public Session create() throws Exception {
156 JSch jsch = new JSch();
157 session = jsch.getSession(username, host, port);
158 if (timeout != 0)
159 session.setTimeout(timeout);
160 if (password != null) {
161 session.setPassword(password);
162 } else {
163
164 jsch.addIdentity(username, Arrays.copyOf(privateKey, privateKey.length), null, emptyPassPhrase);
165 }
166 java.util.Properties config = new java.util.Properties();
167 config.put("StrictHostKeyChecking", "no");
168 session.setConfig(config);
169 session.connect();
170 return session;
171 }
172
173 @Override
174 public String toString() {
175 return String.format("Session(%s)", JschSshClient.this.toString());
176 }
177 };
178
179 protected <T, C extends Connection<T>> T acquire(C connection) {
180 connection.clear();
181 Exception e = null;
182 String errorMessage = String.format("(%s) error acquiring %s", toString(), connection);
183 for (int i = 0; i < sshRetries; i++) {
184 try {
185 logger.debug(">> (%s) acquiring %s", toString(), connection);
186 T returnVal = connection.create();
187 logger.debug("<< (%s) acquired %s", toString(), returnVal);
188 return returnVal;
189 } catch (Exception from) {
190 e = from;
191 connection.clear();
192
193 if (i == sshRetries)
194 throw propagate(from, errorMessage);
195
196 if (shouldRetry(from)) {
197 logger.warn("<< " + errorMessage + ": " + from.getMessage());
198 backoffForAttempt(i + 1, errorMessage + ": " + from.getMessage());
199 continue;
200 }
201 throw propagate(from, errorMessage);
202 }
203 }
204 if (e != null)
205 throw propagate(e, errorMessage);
206 return null;
207 }
208
209 @PostConstruct
210 public void connect() {
211 acquire(sessionConnection);
212 }
213
214 Connection<ChannelSftp> sftpConnection = new Connection<ChannelSftp>() {
215
216 private ChannelSftp sftp;
217
218 @Override
219 public void clear() {
220 if (sftp != null)
221 sftp.disconnect();
222 }
223
224 @Override
225 public ChannelSftp create() throws Exception {
226 checkConnected();
227 sftp = (ChannelSftp) session.openChannel("sftp");
228 sftp.connect();
229 return sftp;
230 }
231
232 @Override
233 public String toString() {
234 return "ChannelSftp(" + JschSshClient.this.toString() + ")";
235 }
236 };
237
238 class GetConnection implements Connection<Payload> {
239 private final String path;
240 private ChannelSftp sftp;
241
242 GetConnection(String path) {
243 this.path = checkNotNull(path, "path");
244 }
245
246 @Override
247 public void clear() {
248 if (sftp != null)
249 sftp.disconnect();
250 }
251
252 @Override
253 public Payload create() throws Exception {
254 sftp = acquire(sftpConnection);
255 return Payloads.newInputStreamPayload(new CloseFtpChannelOnCloseInputStream(sftp.get(path), sftp));
256 }
257
258 @Override
259 public String toString() {
260 return "Payload(" + JschSshClient.this.toString() + ")[" + path + "]";
261 }
262 };
263
264 public Payload get(String path) {
265 return acquire(new GetConnection(path));
266 }
267
268 class PutConnection implements Connection<Void> {
269 private final String path;
270 private final Payload contents;
271 private ChannelSftp sftp;
272
273 PutConnection(String path, Payload contents) {
274 this.path = checkNotNull(path, "path");
275 this.contents = checkNotNull(contents, "contents");
276 }
277
278 @Override
279 public void clear() {
280 if (sftp != null)
281 sftp.disconnect();
282 }
283
284 @Override
285 public Void create() throws Exception {
286 sftp = acquire(sftpConnection);
287 try {
288 sftp.put(contents.getInput(), path);
289 } finally {
290 Closeables.closeQuietly(contents);
291 clear();
292 }
293 return null;
294 }
295
296 @Override
297 public String toString() {
298 return "Put(" + JschSshClient.this.toString() + ")[" + path + "]";
299 }
300 };
301
302 @Override
303 public void put(String path, Payload contents) {
304 acquire(new PutConnection(path, contents));
305 }
306
307 @VisibleForTesting
308 boolean shouldRetry(Exception from) {
309 final String rootMessage = getRootCause(from).getMessage();
310 return any(getCausalChain(from), retryPredicate)
311 || Iterables.any(Splitter.on(",").split(retryableMessages), new Predicate<String>() {
312
313 @Override
314 public boolean apply(String input) {
315 return rootMessage.indexOf(input) != -1;
316 }
317
318 });
319 }
320
321 private void backoffForAttempt(int retryAttempt, String message) {
322 backoffLimitedRetryHandler.imposeBackoffExponentialDelay(200L, 2, retryAttempt, sshRetries, message);
323 }
324
325 private SshException propagate(Exception e, String message) {
326 message += ": " + e.getMessage();
327 logger.error(e, "<< " + message);
328 throw new SshException(message, e);
329 }
330
331 @Override
332 public String toString() {
333 return String.format("%s@%s:%d", username, host, port);
334 }
335
336 @PreDestroy
337 public void disconnect() {
338 sessionConnection.clear();
339 }
340
341 Connection<ChannelExec> execConnection = new Connection<ChannelExec>() {
342
343 private ChannelExec executor = null;
344
345 @Override
346 public void clear() {
347 if (executor != null)
348 executor.disconnect();
349 }
350
351 @Override
352 public ChannelExec create() throws Exception {
353 checkConnected();
354 executor = (ChannelExec) session.openChannel("exec");
355 executor.setPty(true);
356 return executor;
357 }
358
359 @Override
360 public String toString() {
361 return "ChannelExec(" + JschSshClient.this.toString() + ")";
362 }
363
364 };
365
366 class ExecConnection implements Connection<ExecResponse> {
367 private final String command;
368 private ChannelExec executor;
369
370 ExecConnection(String command) {
371 this.command = checkNotNull(command, "command");
372 }
373
374 @Override
375 public void clear() {
376 if (executor != null)
377 executor.disconnect();
378 }
379
380 @Override
381 public ExecResponse create() throws Exception {
382 executor = acquire(execConnection);
383 executor.setCommand(command);
384 ByteArrayOutputStream error = new ByteArrayOutputStream();
385 executor.setErrStream(error);
386 try {
387 executor.connect();
388 String outputString = Strings2.toStringAndClose(executor.getInputStream());
389 String errorString = error.toString();
390 int errorStatus = executor.getExitStatus();
391 int i = 0;
392 String message = String.format("bad status -1 %s", toString());
393 while ((errorStatus = executor.getExitStatus()) == -1 && i < JschSshClient.this.sshRetries) {
394 logger.warn("<< " + message);
395 backoffForAttempt(++i, message);
396 }
397 if (errorStatus == -1)
398 throw new SshException(message);
399 return new ExecResponse(outputString, errorString, errorStatus);
400 } finally {
401 if (executor != null)
402 executor.disconnect();
403 }
404 }
405
406 @Override
407 public String toString() {
408 return "ExecResponse(" + JschSshClient.this.toString() + ")[" + command + "]";
409 }
410
411 };
412
413 public ExecResponse exec(String command) {
414 return acquire(new ExecConnection(command));
415 }
416
417 @Override
418 public String getHostAddress() {
419 return this.host;
420 }
421
422 @Override
423 public String getUsername() {
424 return this.username;
425 }
426
427 }