1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.jclouds.ssh.jsch;
20
21 import static com.google.common.base.Preconditions.checkArgument;
22 import static com.google.common.base.Preconditions.checkNotNull;
23 import static com.google.common.base.Preconditions.checkState;
24 import static com.google.common.base.Predicates.instanceOf;
25 import static com.google.common.base.Predicates.or;
26 import static com.google.common.base.Throwables.getCausalChain;
27 import static com.google.common.collect.Iterables.any;
28
29 import java.io.IOException;
30 import java.io.InputStream;
31 import java.net.ConnectException;
32 import java.util.Arrays;
33
34 import javax.annotation.PostConstruct;
35 import javax.annotation.PreDestroy;
36 import javax.annotation.Resource;
37 import javax.inject.Named;
38
39 import org.apache.commons.io.input.ProxyInputStream;
40 import org.apache.commons.io.output.ByteArrayOutputStream;
41 import org.jclouds.compute.domain.ExecResponse;
42 import org.jclouds.http.handlers.BackoffLimitedRetryHandler;
43 import org.jclouds.io.Payload;
44 import org.jclouds.io.Payloads;
45 import org.jclouds.logging.Logger;
46 import org.jclouds.net.IPSocket;
47 import org.jclouds.rest.AuthorizationException;
48 import org.jclouds.ssh.SshClient;
49 import org.jclouds.ssh.SshException;
50 import org.jclouds.util.Strings2;
51
52 import com.google.common.annotations.VisibleForTesting;
53 import com.google.common.base.Predicate;
54 import com.google.common.base.Predicates;
55 import com.google.common.base.Splitter;
56 import com.google.common.io.Closeables;
57 import com.google.inject.Inject;
58 import com.jcraft.jsch.ChannelExec;
59 import com.jcraft.jsch.ChannelSftp;
60 import com.jcraft.jsch.JSch;
61 import com.jcraft.jsch.JSchException;
62 import com.jcraft.jsch.Session;
63
64
65
66
67
68
69 public class JschSshClient implements SshClient {
70
71 private final class CloseFtpChannelOnCloseInputStream extends ProxyInputStream {
72
73 private final ChannelSftp sftp;
74
75 private CloseFtpChannelOnCloseInputStream(InputStream proxy, ChannelSftp sftp) {
76 super(proxy);
77 this.sftp = sftp;
78 }
79
80 @Override
81 public void close() throws IOException {
82 super.close();
83 if (sftp != null)
84 sftp.disconnect();
85 }
86 }
87
88 private final String host;
89 private final int port;
90 private final String username;
91 private final String password;
92
93 @Inject(optional = true)
94 @Named("jclouds.ssh.max-retries")
95 @VisibleForTesting
96 int sshRetries = 5;
97
98 @Inject(optional = true)
99 @Named("jclouds.ssh.retry-auth")
100 @VisibleForTesting
101 boolean retryAuth;
102
103 @Inject(optional = true)
104 @Named("jclouds.ssh.retryable-messages")
105 @VisibleForTesting
106 String retryableMessages = "failed to send channel request,channel is not opened,invalid data,End of IO Stream Read,Connection reset,connection is closed by foreign host,socket is not established";
107
108 @Inject(optional = true)
109 @Named("jclouds.ssh.retry-predicate")
110 Predicate<Throwable> retryPredicate = or(instanceOf(ConnectException.class), instanceOf(IOException.class));
111
112 @Resource
113 @Named("jclouds.ssh")
114 protected Logger logger = Logger.NULL;
115
116 private Session session;
117 private final byte[] privateKey;
118 final byte[] emptyPassPhrase = new byte[0];
119 private final int timeout;
120 private final BackoffLimitedRetryHandler backoffLimitedRetryHandler;
121
122 public JschSshClient(BackoffLimitedRetryHandler backoffLimitedRetryHandler, IPSocket socket, int timeout,
123 String username, String password, byte[] privateKey) {
124 this.host = checkNotNull(socket, "socket").getAddress();
125 checkArgument(socket.getPort() > 0, "ssh port must be greater then zero" + socket.getPort());
126 checkArgument(password != null || privateKey != null, "you must specify a password or a key");
127 this.port = socket.getPort();
128 this.username = checkNotNull(username, "username");
129 this.backoffLimitedRetryHandler = checkNotNull(backoffLimitedRetryHandler, "backoffLimitedRetryHandler");
130 this.timeout = timeout;
131 this.password = password;
132 this.privateKey = privateKey;
133 }
134
135 @Override
136 public void put(String path, String contents) {
137 put(path, Payloads.newStringPayload(checkNotNull(contents, "contents")));
138 }
139
140 private void checkConnected() {
141 checkState(session != null && session.isConnected(), String.format("(%s) Session not connected!", toString()));
142 }
143
144 public static interface Connection<T> {
145 void clear();
146
147 T create() throws Exception;
148 }
149
150 Connection<Session> sessionConnection = new Connection<Session>() {
151
152 @Override
153 public void clear() {
154 if (session != null && session.isConnected()) {
155 session.disconnect();
156 session = null;
157 }
158 }
159
160 @Override
161 public Session create() throws Exception {
162 JSch jsch = new JSch();
163 session = jsch.getSession(username, host, port);
164 if (timeout != 0)
165 session.setTimeout(timeout);
166 if (password != null) {
167 session.setPassword(password);
168 } else {
169
170 jsch.addIdentity(username, Arrays.copyOf(privateKey, privateKey.length), null, emptyPassPhrase);
171 }
172 java.util.Properties config = new java.util.Properties();
173 config.put("StrictHostKeyChecking", "no");
174 session.setConfig(config);
175 session.connect(timeout);
176 return session;
177 }
178
179 @Override
180 public String toString() {
181 return String.format("Session(%s)", JschSshClient.this.toString());
182 }
183 };
184
185 protected <T, C extends Connection<T>> T acquire(C connection) {
186 connection.clear();
187 String errorMessage = String.format("(%s) error acquiring %s", toString(), connection);
188 for (int i = 0; i < sshRetries; i++) {
189 try {
190 logger.debug(">> (%s) acquiring %s", toString(), connection);
191 T returnVal = connection.create();
192 logger.debug("<< (%s) acquired %s", toString(), returnVal);
193 return returnVal;
194 } catch (Exception from) {
195 connection.clear();
196
197 if (i + 1 == sshRetries) {
198 throw propagate(from, errorMessage);
199 } else if (shouldRetry(from)) {
200 logger.warn(from, "<< " + errorMessage + ": " + from.getMessage());
201 backoffForAttempt(i + 1, errorMessage + ": " + from.getMessage());
202 continue;
203 }
204 }
205 }
206 assert false : "should not reach here";
207 return null;
208 }
209
210 @PostConstruct
211 public void connect() {
212 acquire(sessionConnection);
213 }
214
215 Connection<ChannelSftp> sftpConnection = new Connection<ChannelSftp>() {
216
217 private ChannelSftp sftp;
218
219 @Override
220 public void clear() {
221 if (sftp != null)
222 sftp.disconnect();
223 }
224
225 @Override
226 public ChannelSftp create() throws JSchException {
227 checkConnected();
228 String channel = "sftp";
229 sftp = (ChannelSftp) session.openChannel(channel);
230 sftp.connect();
231 return sftp;
232 }
233
234 @Override
235 public String toString() {
236 return "ChannelSftp(" + JschSshClient.this.toString() + ")";
237 }
238 };
239
240 class GetConnection implements Connection<Payload> {
241 private final String path;
242 private ChannelSftp sftp;
243
244 GetConnection(String path) {
245 this.path = checkNotNull(path, "path");
246 }
247
248 @Override
249 public void clear() {
250 if (sftp != null)
251 sftp.disconnect();
252 }
253
254 @Override
255 public Payload create() throws Exception {
256 sftp = acquire(sftpConnection);
257 return Payloads.newInputStreamPayload(new CloseFtpChannelOnCloseInputStream(sftp.get(path), sftp));
258 }
259
260 @Override
261 public String toString() {
262 return "Payload(" + JschSshClient.this.toString() + ")[" + path + "]";
263 }
264 };
265
266 public Payload get(String path) {
267 return acquire(new GetConnection(path));
268 }
269
270 class PutConnection implements Connection<Void> {
271 private final String path;
272 private final Payload contents;
273 private ChannelSftp sftp;
274
275 PutConnection(String path, Payload contents) {
276 this.path = checkNotNull(path, "path");
277 this.contents = checkNotNull(contents, "contents");
278 }
279
280 @Override
281 public void clear() {
282 if (sftp != null)
283 sftp.disconnect();
284 }
285
286 @Override
287 public Void create() throws Exception {
288 sftp = acquire(sftpConnection);
289 InputStream is = checkNotNull(contents.getInput(), "inputstream for path %s", path);
290 try {
291 sftp.put(is, path);
292 } finally {
293 Closeables.closeQuietly(contents);
294 }
295 return null;
296 }
297
298 @Override
299 public String toString() {
300 return "Put(" + JschSshClient.this.toString() + ")[" + path + "]";
301 }
302 };
303
304 @Override
305 public void put(String path, Payload contents) {
306 acquire(new PutConnection(path, contents));
307 }
308
309 @VisibleForTesting
310 boolean shouldRetry(Exception from) {
311 Predicate<Throwable> predicate = retryAuth ? Predicates.<Throwable>or(retryPredicate, instanceOf(AuthorizationException.class))
312 : retryPredicate;
313 if (any(getCausalChain(from), predicate))
314 return true;
315 if (!retryableMessages.equals(""))
316 return any(Splitter.on(",").split(retryableMessages), causalChainHasMessageContaining(from));
317 return false;
318 }
319
320 @VisibleForTesting
321 Predicate<String> causalChainHasMessageContaining(final Exception from) {
322 return new Predicate<String>() {
323
324 @Override
325 public boolean apply(final String input) {
326 return any(getCausalChain(from), new Predicate<Throwable>() {
327
328 @Override
329 public boolean apply(Throwable arg0) {
330 return arg0.getMessage() != null && arg0.getMessage().indexOf(input) != -1;
331 }
332
333 });
334 }
335
336 };
337 }
338
339 private void backoffForAttempt(int retryAttempt, String message) {
340 backoffLimitedRetryHandler.imposeBackoffExponentialDelay(200L, 2, retryAttempt, sshRetries, message);
341 }
342
343 SshException propagate(Exception e, String message) {
344 message += ": " + e.getMessage();
345 if (e.getMessage() != null && e.getMessage().indexOf("Auth fail") != -1)
346 throw new AuthorizationException("(" + toString() + ") " + message, e);
347 throw e instanceof SshException ? SshException.class.cast(e) : new SshException(
348 "(" + toString() + ") " + message, e);
349 }
350
351 @Override
352 public String toString() {
353 return String.format("%s@%s:%d", username, host, port);
354 }
355
356 @PreDestroy
357 public void disconnect() {
358 sessionConnection.clear();
359 }
360
361 protected Connection<ChannelExec> execConnection(final String command) {
362 checkNotNull(command, "command");
363 return new Connection<ChannelExec>() {
364
365 private ChannelExec executor = null;
366
367 @Override
368 public void clear() {
369 if (executor != null)
370 executor.disconnect();
371 }
372
373 @Override
374 public ChannelExec create() throws Exception {
375 checkConnected();
376 String channel = "exec";
377 executor = (ChannelExec) session.openChannel(channel);
378 executor.setPty(true);
379 executor.setCommand(command);
380 ByteArrayOutputStream error = new ByteArrayOutputStream();
381 executor.setErrStream(error);
382 executor.connect();
383 return executor;
384 }
385
386 @Override
387 public String toString() {
388 return "ChannelExec(" + JschSshClient.this.toString() + ")";
389 }
390 };
391
392 }
393
394 class ExecConnection implements Connection<ExecResponse> {
395 private final String command;
396 private ChannelExec executor;
397
398 ExecConnection(String command) {
399 this.command = checkNotNull(command, "command");
400 }
401
402 @Override
403 public void clear() {
404 if (executor != null)
405 executor.disconnect();
406 }
407
408 @Override
409 public ExecResponse create() throws Exception {
410 try {
411 executor = acquire(execConnection(command));
412 String outputString = Strings2.toStringAndClose(executor.getInputStream());
413 int errorStatus = executor.getExitStatus();
414 int i = 0;
415 String message = String.format("bad status -1 %s", toString());
416 while ((errorStatus = executor.getExitStatus()) == -1 && i < JschSshClient.this.sshRetries) {
417 logger.warn("<< " + message);
418 backoffForAttempt(++i, message);
419 }
420 if (errorStatus == -1)
421 throw new SshException(message);
422
423
424
425
426
427 String errorString = "";
428 return new ExecResponse(outputString, errorString, errorStatus);
429 } finally {
430 clear();
431 }
432 }
433
434 @Override
435 public String toString() {
436 return "ExecResponse(" + JschSshClient.this.toString() + ")[" + command + "]";
437 }
438 }
439
440 public ExecResponse exec(String command) {
441 return acquire(new ExecConnection(command));
442 }
443
444 @Override
445 public String getHostAddress() {
446 return this.host;
447 }
448
449 @Override
450 public String getUsername() {
451 return this.username;
452 }
453
454 }