1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.jclouds.http.internal;
20
21 import static com.google.common.base.Preconditions.checkNotNull;
22 import static com.google.common.io.ByteStreams.copy;
23 import static org.jclouds.http.HttpUtils.checkRequestHasContentLengthOrChunkedEncoding;
24 import static org.jclouds.http.HttpUtils.wirePayloadIfEnabled;
25
26 import java.io.FilterInputStream;
27 import java.io.IOException;
28 import java.io.InputStream;
29 import java.util.concurrent.Callable;
30 import java.util.concurrent.ExecutorService;
31 import java.util.concurrent.Future;
32
33 import javax.annotation.Resource;
34 import javax.inject.Inject;
35 import javax.inject.Named;
36 import javax.net.ssl.SSLException;
37
38 import org.jclouds.Constants;
39 import org.jclouds.http.HttpCommand;
40 import org.jclouds.http.HttpCommandExecutorService;
41 import org.jclouds.http.HttpRequest;
42 import org.jclouds.http.HttpRequestFilter;
43 import org.jclouds.http.HttpResponse;
44 import org.jclouds.http.HttpResponseException;
45 import org.jclouds.http.HttpUtils;
46 import org.jclouds.http.IOExceptionRetryHandler;
47 import org.jclouds.http.handlers.DelegatingErrorHandler;
48 import org.jclouds.http.handlers.DelegatingRetryHandler;
49 import org.jclouds.logging.Logger;
50 import org.jclouds.rest.AuthorizationException;
51 import org.jclouds.util.Throwables2;
52
53 import com.google.common.io.NullOutputStream;
54
55
56
57
58
59 public abstract class BaseHttpCommandExecutorService<Q> implements HttpCommandExecutorService {
60 protected final HttpUtils utils;
61
62 private final DelegatingRetryHandler retryHandler;
63 private final IOExceptionRetryHandler ioRetryHandler;
64 private final DelegatingErrorHandler errorHandler;
65 private final ExecutorService ioWorkerExecutor;
66
67 @Resource
68 protected Logger logger = Logger.NULL;
69 @Resource
70 @Named(Constants.LOGGER_HTTP_HEADERS)
71 protected Logger headerLog = Logger.NULL;
72
73 protected final HttpWire wire;
74
75 @Inject
76 protected BaseHttpCommandExecutorService(HttpUtils utils,
77 @Named(Constants.PROPERTY_IO_WORKER_THREADS) ExecutorService ioWorkerExecutor,
78 DelegatingRetryHandler retryHandler, IOExceptionRetryHandler ioRetryHandler,
79 DelegatingErrorHandler errorHandler, HttpWire wire) {
80 this.utils = checkNotNull(utils, "utils");
81 this.retryHandler = checkNotNull(retryHandler, "retryHandler");
82 this.ioRetryHandler = checkNotNull(ioRetryHandler, "ioRetryHandler");
83 this.errorHandler = checkNotNull(errorHandler, "errorHandler");
84 this.ioWorkerExecutor = checkNotNull(ioWorkerExecutor, "ioWorkerExecutor");
85 this.wire = checkNotNull(wire, "wire");
86 }
87
88 public static InputStream consumeOnClose(InputStream in) {
89 return new ConsumeOnCloseInputStream(in);
90 }
91
92
93
94
95
96 static class ConsumeOnCloseInputStream extends FilterInputStream {
97
98 protected ConsumeOnCloseInputStream(InputStream in) {
99 super(in);
100 }
101
102 boolean closed;
103
104 @Override
105 public void close() throws IOException {
106 if (!closed) {
107 try {
108 copy(this, new NullOutputStream());
109 } catch (IOException e) {
110 } finally {
111 closed = true;
112 super.close();
113 }
114 }
115 }
116
117 @Override
118 protected void finalize() throws Throwable {
119 close();
120 super.finalize();
121 }
122
123 }
124
125 @Override
126 public Future<HttpResponse> submit(HttpCommand command) {
127 HttpRequest request = command.getCurrentRequest();
128 checkRequestHasContentLengthOrChunkedEncoding(request,
129 "if the request has a payload, it must be set to chunked encoding or specify a content length: "
130 + request);
131 return ioWorkerExecutor.submit(new HttpResponseCallable(command));
132 }
133
134 public class HttpResponseCallable implements Callable<HttpResponse> {
135 private final HttpCommand command;
136
137 public HttpResponseCallable(HttpCommand command) {
138 this.command = command;
139 }
140
141 public HttpResponse call() throws Exception {
142
143 HttpResponse response = null;
144 for (;;) {
145 HttpRequest request = command.getCurrentRequest();
146 Q nativeRequest = null;
147 try {
148 for (HttpRequestFilter filter : request.getFilters()) {
149 request = filter.filter(request);
150 }
151 checkRequestHasContentLengthOrChunkedEncoding(request,
152 "After filtering, the request has neither chunked encoding nor content length: " + request);
153 logger.debug("Sending request %s: %s", request.hashCode(), request.getRequestLine());
154 wirePayloadIfEnabled(wire, request);
155 utils.logRequest(headerLog, request, ">>");
156 nativeRequest = convert(request);
157 response = invoke(nativeRequest);
158
159 logger.debug("Receiving response %s: %s", request.hashCode(), response.getStatusLine());
160 utils.logResponse(headerLog, response, "<<");
161 if (response.getPayload() != null && wire.enabled())
162 wire.input(response);
163 int statusCode = response.getStatusCode();
164 if (statusCode >= 300) {
165 if (shouldContinue(response))
166 continue;
167 else
168 break;
169 } else {
170 break;
171 }
172 } catch (Exception e) {
173 IOException ioe = Throwables2.getFirstThrowableOfType(e, IOException.class);
174 if (ioe != null) {
175 if (ioe instanceof SSLException) {
176 command.setException(new AuthorizationException(e.getMessage() + " connecting to "
177 + command.getCurrentRequest().getRequestLine(), e));
178 break;
179 } else if (ioRetryHandler.shouldRetryRequest(command, ioe)) {
180 continue;
181 }
182 }
183 command.setException(new HttpResponseException(e.getMessage() + " connecting to "
184 + command.getCurrentRequest().getRequestLine(), command, null, e));
185 break;
186 } finally {
187 cleanup(nativeRequest);
188 }
189 }
190 if (command.getException() != null)
191 throw command.getException();
192 return response;
193 }
194
195 private boolean shouldContinue(HttpResponse response) {
196 boolean shouldContinue = false;
197 if (retryHandler.shouldRetryRequest(command, response)) {
198 shouldContinue = true;
199 } else {
200 errorHandler.handleError(command, response);
201 }
202 return shouldContinue;
203 }
204
205 @Override
206 public String toString() {
207 return command.toString();
208 }
209
210 }
211
212 protected abstract Q convert(HttpRequest request) throws IOException, InterruptedException;
213
214 protected abstract HttpResponse invoke(Q nativeRequest) throws IOException, InterruptedException;
215
216 protected abstract void cleanup(Q nativeResponse);
217
218 }