View Javadoc

1   /**
2    *
3    * Copyright (C) 2011 Cloud Conscious, LLC. <info@cloudconscious.com>
4    *
5    * ====================================================================
6    * Licensed under the Apache License, Version 2.0 (the "License");
7    * you may not use this file except in compliance with the License.
8    * You may obtain a copy of the License at
9    *
10   * http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   * ====================================================================
18   */
19  package org.jclouds.ssh.jsch;
20  
21  import static com.google.common.base.Preconditions.checkArgument;
22  import static com.google.common.base.Preconditions.checkNotNull;
23  import static com.google.common.base.Preconditions.checkState;
24  import static com.google.common.base.Predicates.instanceOf;
25  import static com.google.common.base.Predicates.or;
26  import static com.google.common.base.Throwables.getCausalChain;
27  import static com.google.common.collect.Iterables.any;
28  
29  import java.io.IOException;
30  import java.io.InputStream;
31  import java.net.ConnectException;
32  import java.util.Arrays;
33  
34  import javax.annotation.PostConstruct;
35  import javax.annotation.PreDestroy;
36  import javax.annotation.Resource;
37  import javax.inject.Named;
38  
39  import org.apache.commons.io.input.ProxyInputStream;
40  import org.apache.commons.io.output.ByteArrayOutputStream;
41  import org.jclouds.compute.domain.ExecResponse;
42  import org.jclouds.http.handlers.BackoffLimitedRetryHandler;
43  import org.jclouds.io.Payload;
44  import org.jclouds.io.Payloads;
45  import org.jclouds.logging.Logger;
46  import org.jclouds.net.IPSocket;
47  import org.jclouds.rest.AuthorizationException;
48  import org.jclouds.ssh.SshClient;
49  import org.jclouds.ssh.SshException;
50  import org.jclouds.util.Strings2;
51  
52  import com.google.common.annotations.VisibleForTesting;
53  import com.google.common.base.Predicate;
54  import com.google.common.base.Predicates;
55  import com.google.common.base.Splitter;
56  import com.google.common.io.Closeables;
57  import com.google.inject.Inject;
58  import com.jcraft.jsch.ChannelExec;
59  import com.jcraft.jsch.ChannelSftp;
60  import com.jcraft.jsch.JSch;
61  import com.jcraft.jsch.JSchException;
62  import com.jcraft.jsch.Session;
63  
64  /**
65   * This class needs refactoring. It is not thread safe.
66   * 
67   * @author Adrian Cole
68   */
69  public class JschSshClient implements SshClient {
70  
71     private final class CloseFtpChannelOnCloseInputStream extends ProxyInputStream {
72  
73        private final ChannelSftp sftp;
74  
75        private CloseFtpChannelOnCloseInputStream(InputStream proxy, ChannelSftp sftp) {
76           super(proxy);
77           this.sftp = sftp;
78        }
79  
80        @Override
81        public void close() throws IOException {
82           super.close();
83           if (sftp != null)
84              sftp.disconnect();
85        }
86     }
87  
88     private final String host;
89     private final int port;
90     private final String username;
91     private final String password;
92  
93     @Inject(optional = true)
94     @Named("jclouds.ssh.max-retries")
95     @VisibleForTesting
96     int sshRetries = 5;
97  
98     @Inject(optional = true)
99     @Named("jclouds.ssh.retry-auth")
100    @VisibleForTesting
101    boolean retryAuth;
102 
103    @Inject(optional = true)
104    @Named("jclouds.ssh.retryable-messages")
105    @VisibleForTesting
106    String retryableMessages = "failed to send channel request,channel is not opened,invalid data,End of IO Stream Read,Connection reset,connection is closed by foreign host,socket is not established";
107 
108    @Inject(optional = true)
109    @Named("jclouds.ssh.retry-predicate")
110    Predicate<Throwable> retryPredicate = or(instanceOf(ConnectException.class), instanceOf(IOException.class));
111 
112    @Resource
113    @Named("jclouds.ssh")
114    protected Logger logger = Logger.NULL;
115 
116    private Session session;
117    private final byte[] privateKey;
118    final byte[] emptyPassPhrase = new byte[0];
119    private final int timeout;
120    private final BackoffLimitedRetryHandler backoffLimitedRetryHandler;
121 
122    public JschSshClient(BackoffLimitedRetryHandler backoffLimitedRetryHandler, IPSocket socket, int timeout,
123          String username, String password, byte[] privateKey) {
124       this.host = checkNotNull(socket, "socket").getAddress();
125       checkArgument(socket.getPort() > 0, "ssh port must be greater then zero" + socket.getPort());
126       checkArgument(password != null || privateKey != null, "you must specify a password or a key");
127       this.port = socket.getPort();
128       this.username = checkNotNull(username, "username");
129       this.backoffLimitedRetryHandler = checkNotNull(backoffLimitedRetryHandler, "backoffLimitedRetryHandler");
130       this.timeout = timeout;
131       this.password = password;
132       this.privateKey = privateKey;
133    }
134 
135    @Override
136    public void put(String path, String contents) {
137       put(path, Payloads.newStringPayload(checkNotNull(contents, "contents")));
138    }
139 
140    private void checkConnected() {
141       checkState(session != null && session.isConnected(), String.format("(%s) Session not connected!", toString()));
142    }
143 
144    public static interface Connection<T> {
145       void clear();
146 
147       T create() throws Exception;
148    }
149 
150    Connection<Session> sessionConnection = new Connection<Session>() {
151 
152       @Override
153       public void clear() {
154          if (session != null && session.isConnected()) {
155             session.disconnect();
156             session = null;
157          }
158       }
159 
160       @Override
161       public Session create() throws Exception {
162          JSch jsch = new JSch();
163          session = jsch.getSession(username, host, port);
164          if (timeout != 0)
165             session.setTimeout(timeout);
166          if (password != null) {
167             session.setPassword(password);
168          } else {
169             // jsch wipes out your private key
170             jsch.addIdentity(username, Arrays.copyOf(privateKey, privateKey.length), null, emptyPassPhrase);
171          }
172          java.util.Properties config = new java.util.Properties();
173          config.put("StrictHostKeyChecking", "no");
174          session.setConfig(config);
175          session.connect(timeout);
176          return session;
177       }
178 
179       @Override
180       public String toString() {
181          return String.format("Session(%s)", JschSshClient.this.toString());
182       }
183    };
184 
185    protected <T, C extends Connection<T>> T acquire(C connection) {
186       connection.clear();
187       String errorMessage = String.format("(%s) error acquiring %s", toString(), connection);
188       for (int i = 0; i < sshRetries; i++) {
189          try {
190             logger.debug(">> (%s) acquiring %s", toString(), connection);
191             T returnVal = connection.create();
192             logger.debug("<< (%s) acquired %s", toString(), returnVal);
193             return returnVal;
194          } catch (Exception from) {
195             connection.clear();
196 
197             if (i + 1 == sshRetries) {
198                throw propagate(from, errorMessage);
199             } else if (shouldRetry(from)) {
200                logger.warn(from, "<< " + errorMessage + ": " + from.getMessage());
201                backoffForAttempt(i + 1, errorMessage + ": " + from.getMessage());
202                continue;
203             }
204          }
205       }
206       assert false : "should not reach here";
207       return null;
208    }
209 
210    @PostConstruct
211    public void connect() {
212       acquire(sessionConnection);
213    }
214 
215    Connection<ChannelSftp> sftpConnection = new Connection<ChannelSftp>() {
216 
217       private ChannelSftp sftp;
218 
219       @Override
220       public void clear() {
221          if (sftp != null)
222             sftp.disconnect();
223       }
224 
225       @Override
226       public ChannelSftp create() throws JSchException {
227          checkConnected();
228          String channel = "sftp";
229          sftp = (ChannelSftp) session.openChannel(channel);
230          sftp.connect();
231          return sftp;
232       }
233 
234       @Override
235       public String toString() {
236          return "ChannelSftp(" + JschSshClient.this.toString() + ")";
237       }
238    };
239 
240    class GetConnection implements Connection<Payload> {
241       private final String path;
242       private ChannelSftp sftp;
243 
244       GetConnection(String path) {
245          this.path = checkNotNull(path, "path");
246       }
247 
248       @Override
249       public void clear() {
250          if (sftp != null)
251             sftp.disconnect();
252       }
253 
254       @Override
255       public Payload create() throws Exception {
256          sftp = acquire(sftpConnection);
257          return Payloads.newInputStreamPayload(new CloseFtpChannelOnCloseInputStream(sftp.get(path), sftp));
258       }
259 
260       @Override
261       public String toString() {
262          return "Payload(" + JschSshClient.this.toString() + ")[" + path + "]";
263       }
264    };
265 
266    public Payload get(String path) {
267       return acquire(new GetConnection(path));
268    }
269 
270    class PutConnection implements Connection<Void> {
271       private final String path;
272       private final Payload contents;
273       private ChannelSftp sftp;
274 
275       PutConnection(String path, Payload contents) {
276          this.path = checkNotNull(path, "path");
277          this.contents = checkNotNull(contents, "contents");
278       }
279 
280       @Override
281       public void clear() {
282          if (sftp != null)
283             sftp.disconnect();
284       }
285 
286       @Override
287       public Void create() throws Exception {
288          sftp = acquire(sftpConnection);
289          InputStream is = checkNotNull(contents.getInput(), "inputstream for path %s", path);
290          try {
291             sftp.put(is, path);
292          } finally {
293             Closeables.closeQuietly(contents);
294          }
295          return null;
296       }
297 
298       @Override
299       public String toString() {
300          return "Put(" + JschSshClient.this.toString() + ")[" + path + "]";
301       }
302    };
303 
304    @Override
305    public void put(String path, Payload contents) {
306       acquire(new PutConnection(path, contents));
307    }
308 
309    @VisibleForTesting
310    boolean shouldRetry(Exception from) {
311       Predicate<Throwable> predicate = retryAuth ?  Predicates.<Throwable>or(retryPredicate, instanceOf(AuthorizationException.class))
312             : retryPredicate;
313       if (any(getCausalChain(from), predicate))
314          return true;
315       if (!retryableMessages.equals(""))
316          return any(Splitter.on(",").split(retryableMessages), causalChainHasMessageContaining(from));
317       return false;
318    }
319 
320    @VisibleForTesting
321    Predicate<String> causalChainHasMessageContaining(final Exception from) {
322       return new Predicate<String>() {
323 
324          @Override
325          public boolean apply(final String input) {
326             return any(getCausalChain(from), new Predicate<Throwable>() {
327 
328                @Override
329                public boolean apply(Throwable arg0) {
330                   return arg0.getMessage() != null && arg0.getMessage().indexOf(input) != -1;
331                }
332 
333             });
334          }
335 
336       };
337    }
338 
339    private void backoffForAttempt(int retryAttempt, String message) {
340       backoffLimitedRetryHandler.imposeBackoffExponentialDelay(200L, 2, retryAttempt, sshRetries, message);
341    }
342 
343    SshException propagate(Exception e, String message) {
344       message += ": " + e.getMessage();
345       if (e.getMessage() != null && e.getMessage().indexOf("Auth fail") != -1)
346          throw new AuthorizationException("(" + toString() + ") " + message, e);
347       throw e instanceof SshException ? SshException.class.cast(e) : new SshException(
348             "(" + toString() + ") " + message, e);
349    }
350 
351    @Override
352    public String toString() {
353       return String.format("%s@%s:%d", username, host, port);
354    }
355 
356    @PreDestroy
357    public void disconnect() {
358       sessionConnection.clear();
359    }
360 
361    protected Connection<ChannelExec> execConnection(final String command) {
362       checkNotNull(command, "command");
363       return new Connection<ChannelExec>() {
364 
365          private ChannelExec executor = null;
366 
367          @Override
368          public void clear() {
369             if (executor != null)
370                executor.disconnect();
371          }
372 
373          @Override
374          public ChannelExec create() throws Exception {
375             checkConnected();
376             String channel = "exec";
377             executor = (ChannelExec) session.openChannel(channel);
378             executor.setPty(true);
379             executor.setCommand(command);
380             ByteArrayOutputStream error = new ByteArrayOutputStream();
381             executor.setErrStream(error);
382             executor.connect();
383             return executor;
384          }
385 
386          @Override
387          public String toString() {
388             return "ChannelExec(" + JschSshClient.this.toString() + ")";
389          }
390       };
391 
392    }
393 
394    class ExecConnection implements Connection<ExecResponse> {
395       private final String command;
396       private ChannelExec executor;
397 
398       ExecConnection(String command) {
399          this.command = checkNotNull(command, "command");
400       }
401 
402       @Override
403       public void clear() {
404          if (executor != null)
405             executor.disconnect();
406       }
407 
408       @Override
409       public ExecResponse create() throws Exception {
410          try {
411             executor = acquire(execConnection(command));
412             String outputString = Strings2.toStringAndClose(executor.getInputStream());
413             int errorStatus = executor.getExitStatus();
414             int i = 0;
415             String message = String.format("bad status -1 %s", toString());
416             while ((errorStatus = executor.getExitStatus()) == -1 && i < JschSshClient.this.sshRetries) {
417                logger.warn("<< " + message);
418                backoffForAttempt(++i, message);
419             }
420             if (errorStatus == -1)
421                throw new SshException(message);
422             // be careful as this can hang reading
423             // com.jcraft.jsch.Channel$MyPipedInputStream when there's a slow
424             // network connection
425             // String errorString =
426             // Strings2.toStringAndClose(executor.getErrStream());
427             String errorString = "";
428             return new ExecResponse(outputString, errorString, errorStatus);
429          } finally {
430             clear();
431          }
432       }
433 
434       @Override
435       public String toString() {
436          return "ExecResponse(" + JschSshClient.this.toString() + ")[" + command + "]";
437       }
438    }
439 
440    public ExecResponse exec(String command) {
441       return acquire(new ExecConnection(command));
442    }
443 
444    @Override
445    public String getHostAddress() {
446       return this.host;
447    }
448 
449    @Override
450    public String getUsername() {
451       return this.username;
452    }
453 
454 }