View Javadoc

1   /**
2    * Licensed to jclouds, Inc. (jclouds) under one or more
3    * contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  jclouds licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing,
13   * software distributed under the License is distributed on an
14   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   * KIND, either express or implied.  See the License for the
16   * specific language governing permissions and limitations
17   * under the License.
18   */
19  package org.jclouds.http.internal;
20  
21  import static com.google.common.base.Preconditions.checkNotNull;
22  import static com.google.common.io.ByteStreams.copy;
23  import static org.jclouds.http.HttpUtils.checkRequestHasContentLengthOrChunkedEncoding;
24  import static org.jclouds.http.HttpUtils.wirePayloadIfEnabled;
25  
26  import java.io.FilterInputStream;
27  import java.io.IOException;
28  import java.io.InputStream;
29  import java.util.concurrent.Callable;
30  import java.util.concurrent.ExecutorService;
31  import java.util.concurrent.Future;
32  
33  import javax.annotation.Resource;
34  import javax.inject.Inject;
35  import javax.inject.Named;
36  import javax.net.ssl.SSLException;
37  
38  import org.jclouds.Constants;
39  import org.jclouds.http.HttpCommand;
40  import org.jclouds.http.HttpCommandExecutorService;
41  import org.jclouds.http.HttpRequest;
42  import org.jclouds.http.HttpRequestFilter;
43  import org.jclouds.http.HttpResponse;
44  import org.jclouds.http.HttpResponseException;
45  import org.jclouds.http.HttpUtils;
46  import org.jclouds.http.IOExceptionRetryHandler;
47  import org.jclouds.http.handlers.DelegatingErrorHandler;
48  import org.jclouds.http.handlers.DelegatingRetryHandler;
49  import org.jclouds.logging.Logger;
50  import org.jclouds.rest.AuthorizationException;
51  import org.jclouds.util.Throwables2;
52  
53  import com.google.common.io.NullOutputStream;
54  
55  /**
56   * 
57   * @author Adrian Cole
58   */
59  public abstract class BaseHttpCommandExecutorService<Q> implements HttpCommandExecutorService {
60     protected final HttpUtils utils;
61  
62     private final DelegatingRetryHandler retryHandler;
63     private final IOExceptionRetryHandler ioRetryHandler;
64     private final DelegatingErrorHandler errorHandler;
65     private final ExecutorService ioWorkerExecutor;
66  
67     @Resource
68     protected Logger logger = Logger.NULL;
69     @Resource
70     @Named(Constants.LOGGER_HTTP_HEADERS)
71     protected Logger headerLog = Logger.NULL;
72  
73     protected final HttpWire wire;
74  
75     @Inject
76     protected BaseHttpCommandExecutorService(HttpUtils utils,
77              @Named(Constants.PROPERTY_IO_WORKER_THREADS) ExecutorService ioWorkerExecutor,
78              DelegatingRetryHandler retryHandler, IOExceptionRetryHandler ioRetryHandler,
79              DelegatingErrorHandler errorHandler, HttpWire wire) {
80        this.utils = checkNotNull(utils, "utils");
81        this.retryHandler = checkNotNull(retryHandler, "retryHandler");
82        this.ioRetryHandler = checkNotNull(ioRetryHandler, "ioRetryHandler");
83        this.errorHandler = checkNotNull(errorHandler, "errorHandler");
84        this.ioWorkerExecutor = checkNotNull(ioWorkerExecutor, "ioWorkerExecutor");
85        this.wire = checkNotNull(wire, "wire");
86     }
87  
88     public static InputStream consumeOnClose(InputStream in) {
89        return new ConsumeOnCloseInputStream(in);
90     }
91  
92     /**
93      * Ensures the content is always flushed.
94      * 
95      */
96     static class ConsumeOnCloseInputStream extends FilterInputStream {
97  
98        protected ConsumeOnCloseInputStream(InputStream in) {
99           super(in);
100       }
101 
102       boolean closed;
103 
104       @Override
105       public void close() throws IOException {
106          if (!closed) {
107             try {
108                copy(this, new NullOutputStream());
109             } catch (IOException e) {
110             } finally {
111                closed = true;
112                super.close();
113             }
114          }
115       }
116 
117       @Override
118       protected void finalize() throws Throwable {
119          close();
120          super.finalize();
121       }
122 
123    }
124 
125    @Override
126    public Future<HttpResponse> submit(HttpCommand command) {
127       HttpRequest request = command.getCurrentRequest();
128       checkRequestHasContentLengthOrChunkedEncoding(request,
129                "if the request has a payload, it must be set to chunked encoding or specify a content length: "
130                         + request);
131       return ioWorkerExecutor.submit(new HttpResponseCallable(command));
132    }
133 
134    public class HttpResponseCallable implements Callable<HttpResponse> {
135       private final HttpCommand command;
136 
137       public HttpResponseCallable(HttpCommand command) {
138          this.command = command;
139       }
140 
141       public HttpResponse call() throws Exception {
142 
143          HttpResponse response = null;
144          for (;;) {
145             HttpRequest request = command.getCurrentRequest();
146             Q nativeRequest = null;
147             try {
148                for (HttpRequestFilter filter : request.getFilters()) {
149                   request = filter.filter(request);
150                }
151                checkRequestHasContentLengthOrChunkedEncoding(request,
152                         "After filtering, the request has neither chunked encoding nor content length: " + request);
153                logger.debug("Sending request %s: %s", request.hashCode(), request.getRequestLine());
154                wirePayloadIfEnabled(wire, request);
155                utils.logRequest(headerLog, request, ">>");
156                nativeRequest = convert(request);
157                response = invoke(nativeRequest);
158 
159                logger.debug("Receiving response %s: %s", request.hashCode(), response.getStatusLine());
160                utils.logResponse(headerLog, response, "<<");
161                if (response.getPayload() != null && wire.enabled())
162                   wire.input(response);
163                int statusCode = response.getStatusCode();
164                if (statusCode >= 300) {
165                   if (shouldContinue(response))
166                      continue;
167                   else
168                      break;
169                } else {
170                   break;
171                }
172             } catch (Exception e) {
173                IOException ioe = Throwables2.getFirstThrowableOfType(e, IOException.class);
174                if (ioe != null) {
175                   if (ioe instanceof SSLException) {
176                      command.setException(new AuthorizationException(e.getMessage() + " connecting to "
177                               + command.getCurrentRequest().getRequestLine(), e));
178                      break;
179                   } else if (ioRetryHandler.shouldRetryRequest(command, ioe)) {
180                      continue;
181                   }
182                }
183                command.setException(new HttpResponseException(e.getMessage() + " connecting to "
184                         + command.getCurrentRequest().getRequestLine(), command, null, e));
185                break;
186             } finally {
187                cleanup(nativeRequest);
188             }
189          }
190          if (command.getException() != null)
191             throw command.getException();
192          return response;
193       }
194 
195       private boolean shouldContinue(HttpResponse response) {
196          boolean shouldContinue = false;
197          if (retryHandler.shouldRetryRequest(command, response)) {
198             shouldContinue = true;
199          } else {
200             errorHandler.handleError(command, response);
201          }
202          return shouldContinue;
203       }
204 
205       @Override
206       public String toString() {
207          return command.toString();
208       }
209 
210    }
211 
212    protected abstract Q convert(HttpRequest request) throws IOException, InterruptedException;
213 
214    protected abstract HttpResponse invoke(Q nativeRequest) throws IOException, InterruptedException;
215 
216    protected abstract void cleanup(Q nativeResponse);
217 
218 }