View Javadoc

1   /**
2    *
3    * Copyright (C) 2011 Cloud Conscious, LLC. <info@cloudconscious.com>
4    *
5    * ====================================================================
6    * Licensed under the Apache License, Version 2.0 (the "License");
7    * you may not use this file except in compliance with the License.
8    * You may obtain a copy of the License at
9    *
10   * http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   * ====================================================================
18   */
19  package org.jclouds.ssh.jsch;
20  
21  import static com.google.common.base.Preconditions.checkArgument;
22  import static com.google.common.base.Preconditions.checkNotNull;
23  import static com.google.common.base.Preconditions.checkState;
24  import static com.google.common.base.Predicates.instanceOf;
25  import static com.google.common.base.Predicates.or;
26  import static com.google.common.base.Throwables.getCausalChain;
27  import static com.google.common.base.Throwables.getRootCause;
28  import static com.google.common.collect.Iterables.any;
29  
30  import java.io.IOException;
31  import java.io.InputStream;
32  import java.net.ConnectException;
33  import java.util.Arrays;
34  
35  import javax.annotation.PostConstruct;
36  import javax.annotation.PreDestroy;
37  import javax.annotation.Resource;
38  import javax.inject.Named;
39  
40  import org.apache.commons.io.input.ProxyInputStream;
41  import org.apache.commons.io.output.ByteArrayOutputStream;
42  import org.jclouds.compute.domain.ExecResponse;
43  import org.jclouds.http.handlers.BackoffLimitedRetryHandler;
44  import org.jclouds.io.Payload;
45  import org.jclouds.io.Payloads;
46  import org.jclouds.logging.Logger;
47  import org.jclouds.net.IPSocket;
48  import org.jclouds.ssh.SshClient;
49  import org.jclouds.ssh.SshException;
50  import org.jclouds.util.Strings2;
51  
52  import com.google.common.annotations.VisibleForTesting;
53  import com.google.common.base.Predicate;
54  import com.google.common.base.Splitter;
55  import com.google.common.collect.Iterables;
56  import com.google.common.io.Closeables;
57  import com.google.inject.Inject;
58  import com.jcraft.jsch.ChannelExec;
59  import com.jcraft.jsch.ChannelSftp;
60  import com.jcraft.jsch.JSch;
61  import com.jcraft.jsch.Session;
62  
63  /**
64   * This class needs refactoring. It is not thread safe.
65   * 
66   * @author Adrian Cole
67   */
68  public class JschSshClient implements SshClient {
69  
70     private final class CloseFtpChannelOnCloseInputStream extends ProxyInputStream {
71  
72        private final ChannelSftp sftp;
73  
74        private CloseFtpChannelOnCloseInputStream(InputStream proxy, ChannelSftp sftp) {
75           super(proxy);
76           this.sftp = sftp;
77        }
78  
79        @Override
80        public void close() throws IOException {
81           super.close();
82           if (sftp != null)
83              sftp.disconnect();
84        }
85     }
86  
87     private final String host;
88     private final int port;
89     private final String username;
90     private final String password;
91  
92     @Inject(optional = true)
93     @Named("jclouds.ssh.max_retries")
94     @VisibleForTesting
95     int sshRetries = 5;
96  
97     @Inject(optional = true)
98     @Named("jclouds.ssh.retryable_messages")
99     @VisibleForTesting
100    String retryableMessages = "failed to send channel request,channel is not opened,invalid data,End of IO Stream Read,Connection reset,connection is closed by foreign host,socket is not established";
101 
102    @Inject(optional = true)
103    @Named("jclouds.ssh.retry_predicate")
104    private Predicate<Throwable> retryPredicate = or(instanceOf(ConnectException.class), instanceOf(IOException.class));
105 
106    @Resource
107    @Named("jclouds.ssh")
108    protected Logger logger = Logger.NULL;
109 
110    private Session session;
111    private final byte[] privateKey;
112    final byte[] emptyPassPhrase = new byte[0];
113    private final int timeout;
114    private final BackoffLimitedRetryHandler backoffLimitedRetryHandler;
115 
116    public JschSshClient(BackoffLimitedRetryHandler backoffLimitedRetryHandler, IPSocket socket, int timeout,
117             String username, String password, byte[] privateKey) {
118       this.host = checkNotNull(socket, "socket").getAddress();
119       checkArgument(socket.getPort() > 0, "ssh port must be greater then zero" + socket.getPort());
120       checkArgument(password != null || privateKey != null, "you must specify a password or a key");
121       this.port = socket.getPort();
122       this.username = checkNotNull(username, "username");
123       this.backoffLimitedRetryHandler = checkNotNull(backoffLimitedRetryHandler, "backoffLimitedRetryHandler");
124       this.timeout = timeout;
125       this.password = password;
126       this.privateKey = privateKey;
127    }
128 
129    @Override
130    public void put(String path, String contents) {
131       put(path, Payloads.newStringPayload(checkNotNull(contents, "contents")));
132    }
133 
134    private void checkConnected() {
135       checkState(session != null && session.isConnected(), String.format("(%s) Session not connected!", toString()));
136    }
137 
138    public static interface Connection<T> {
139       void clear();
140 
141       T create() throws Exception;
142    }
143 
144    Connection<Session> sessionConnection = new Connection<Session>() {
145 
146       @Override
147       public void clear() {
148          if (session != null && session.isConnected()) {
149             session.disconnect();
150             session = null;
151          }
152       }
153 
154       @Override
155       public Session create() throws Exception {
156          JSch jsch = new JSch();
157          session = jsch.getSession(username, host, port);
158          if (timeout != 0)
159             session.setTimeout(timeout);
160          if (password != null) {
161             session.setPassword(password);
162          } else {
163             // jsch wipes out your private key
164             jsch.addIdentity(username, Arrays.copyOf(privateKey, privateKey.length), null, emptyPassPhrase);
165          }
166          java.util.Properties config = new java.util.Properties();
167          config.put("StrictHostKeyChecking", "no");
168          session.setConfig(config);
169          session.connect();
170          return session;
171       }
172 
173       @Override
174       public String toString() {
175          return String.format("Session(%s)", JschSshClient.this.toString());
176       }
177    };
178 
179    protected <T, C extends Connection<T>> T acquire(C connection) {
180       connection.clear();
181       Exception e = null;
182       String errorMessage = String.format("(%s) error acquiring %s", toString(), connection);
183       for (int i = 0; i < sshRetries; i++) {
184          try {
185             logger.debug(">> (%s) acquiring %s", toString(), connection);
186             T returnVal = connection.create();
187             logger.debug("<< (%s) acquired %s", toString(), returnVal);
188             return returnVal;
189          } catch (Exception from) {
190             e = from;
191             connection.clear();
192 
193             if (i == sshRetries)
194                throw propagate(from, errorMessage);
195 
196             if (shouldRetry(from)) {
197                logger.warn("<< " + errorMessage + ": " + from.getMessage());
198                backoffForAttempt(i + 1, errorMessage + ": " + from.getMessage());
199                continue;
200             }
201             throw propagate(from, errorMessage);
202          }
203       }
204       if (e != null)
205          throw propagate(e, errorMessage);
206       return null;
207    }
208 
209    @PostConstruct
210    public void connect() {
211       acquire(sessionConnection);
212    }
213 
214    Connection<ChannelSftp> sftpConnection = new Connection<ChannelSftp>() {
215 
216       private ChannelSftp sftp;
217 
218       @Override
219       public void clear() {
220          if (sftp != null)
221             sftp.disconnect();
222       }
223 
224       @Override
225       public ChannelSftp create() throws Exception {
226          checkConnected();
227          sftp = (ChannelSftp) session.openChannel("sftp");
228          sftp.connect();
229          return sftp;
230       }
231 
232       @Override
233       public String toString() {
234          return "ChannelSftp(" + JschSshClient.this.toString() + ")";
235       }
236    };
237 
238    class GetConnection implements Connection<Payload> {
239       private final String path;
240       private ChannelSftp sftp;
241 
242       GetConnection(String path) {
243          this.path = checkNotNull(path, "path");
244       }
245 
246       @Override
247       public void clear() {
248          if (sftp != null)
249             sftp.disconnect();
250       }
251 
252       @Override
253       public Payload create() throws Exception {
254          sftp = acquire(sftpConnection);
255          return Payloads.newInputStreamPayload(new CloseFtpChannelOnCloseInputStream(sftp.get(path), sftp));
256       }
257 
258       @Override
259       public String toString() {
260          return "Payload(" + JschSshClient.this.toString() + ")[" + path + "]";
261       }
262    };
263 
264    public Payload get(String path) {
265       return acquire(new GetConnection(path));
266    }
267 
268    class PutConnection implements Connection<Void> {
269       private final String path;
270       private final Payload contents;
271       private ChannelSftp sftp;
272 
273       PutConnection(String path, Payload contents) {
274          this.path = checkNotNull(path, "path");
275          this.contents = checkNotNull(contents, "contents");
276       }
277 
278       @Override
279       public void clear() {
280          if (sftp != null)
281             sftp.disconnect();
282       }
283 
284       @Override
285       public Void create() throws Exception {
286          sftp = acquire(sftpConnection);
287          try {
288             sftp.put(contents.getInput(), path);
289          } finally {
290             Closeables.closeQuietly(contents);
291             clear();
292          }
293          return null;
294       }
295 
296       @Override
297       public String toString() {
298          return "Put(" + JschSshClient.this.toString() + ")[" + path + "]";
299       }
300    };
301 
302    @Override
303    public void put(String path, Payload contents) {
304       acquire(new PutConnection(path, contents));
305    }
306 
307    @VisibleForTesting
308    boolean shouldRetry(Exception from) {
309       final String rootMessage = getRootCause(from).getMessage();
310       return any(getCausalChain(from), retryPredicate)
311                || Iterables.any(Splitter.on(",").split(retryableMessages), new Predicate<String>() {
312 
313                   @Override
314                   public boolean apply(String input) {
315                      return rootMessage.indexOf(input) != -1;
316                   }
317 
318                });
319    }
320 
321    private void backoffForAttempt(int retryAttempt, String message) {
322       backoffLimitedRetryHandler.imposeBackoffExponentialDelay(200L, 2, retryAttempt, sshRetries, message);
323    }
324 
325    private SshException propagate(Exception e, String message) {
326       message += ": " + e.getMessage();
327       logger.error(e, "<< " + message);
328       throw new SshException(message, e);
329    }
330 
331    @Override
332    public String toString() {
333       return String.format("%s@%s:%d", username, host, port);
334    }
335 
336    @PreDestroy
337    public void disconnect() {
338       sessionConnection.clear();
339    }
340 
341    Connection<ChannelExec> execConnection = new Connection<ChannelExec>() {
342 
343       private ChannelExec executor = null;
344 
345       @Override
346       public void clear() {
347          if (executor != null)
348             executor.disconnect();
349       }
350 
351       @Override
352       public ChannelExec create() throws Exception {
353          checkConnected();
354          executor = (ChannelExec) session.openChannel("exec");
355          executor.setPty(true);
356          return executor;
357       }
358 
359       @Override
360       public String toString() {
361          return "ChannelExec(" + JschSshClient.this.toString() + ")";
362       }
363 
364    };
365 
366    class ExecConnection implements Connection<ExecResponse> {
367       private final String command;
368       private ChannelExec executor;
369 
370       ExecConnection(String command) {
371          this.command = checkNotNull(command, "command");
372       }
373 
374       @Override
375       public void clear() {
376          if (executor != null)
377             executor.disconnect();
378       }
379 
380       @Override
381       public ExecResponse create() throws Exception {
382          executor = acquire(execConnection);
383          executor.setCommand(command);
384          ByteArrayOutputStream error = new ByteArrayOutputStream();
385          executor.setErrStream(error);
386          try {
387             executor.connect();
388             String outputString = Strings2.toStringAndClose(executor.getInputStream());
389             String errorString = error.toString();
390             int errorStatus = executor.getExitStatus();
391             int i = 0;
392             String message = String.format("bad status -1 %s", toString());
393             while ((errorStatus = executor.getExitStatus()) == -1 && i < JschSshClient.this.sshRetries) {
394                logger.warn("<< " + message);
395                backoffForAttempt(++i, message);
396             }
397             if (errorStatus == -1)
398                throw new SshException(message);
399             return new ExecResponse(outputString, errorString, errorStatus);
400          } finally {
401             if (executor != null)
402                executor.disconnect();
403          }
404       }
405 
406       @Override
407       public String toString() {
408          return "ExecResponse(" + JschSshClient.this.toString() + ")[" + command + "]";
409       }
410 
411    };
412 
413    public ExecResponse exec(String command) {
414       return acquire(new ExecConnection(command));
415    }
416 
417    @Override
418    public String getHostAddress() {
419       return this.host;
420    }
421 
422    @Override
423    public String getUsername() {
424       return this.username;
425    }
426 
427 }