1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.jclouds.s3.blobstore;
20
21 import static com.google.common.base.Preconditions.checkNotNull;
22
23 import java.util.Map;
24 import java.util.Set;
25 import java.util.concurrent.ExecutorService;
26
27 import javax.inject.Inject;
28 import javax.inject.Named;
29 import javax.inject.Provider;
30 import javax.inject.Singleton;
31
32 import org.jclouds.Constants;
33 import org.jclouds.blobstore.BlobStoreContext;
34 import org.jclouds.blobstore.domain.Blob;
35 import org.jclouds.blobstore.domain.BlobMetadata;
36 import org.jclouds.blobstore.domain.PageSet;
37 import org.jclouds.blobstore.domain.StorageMetadata;
38 import org.jclouds.blobstore.domain.internal.PageSetImpl;
39 import org.jclouds.blobstore.functions.BlobToHttpGetOptions;
40 import org.jclouds.blobstore.internal.BaseAsyncBlobStore;
41 import org.jclouds.blobstore.options.CreateContainerOptions;
42 import org.jclouds.blobstore.options.ListContainerOptions;
43 import org.jclouds.blobstore.options.PutOptions;
44 import org.jclouds.blobstore.strategy.internal.FetchBlobMetadata;
45 import org.jclouds.blobstore.util.BlobUtils;
46 import org.jclouds.collect.Memoized;
47 import org.jclouds.concurrent.Futures;
48 import org.jclouds.domain.Location;
49 import org.jclouds.http.options.GetOptions;
50 import org.jclouds.s3.S3AsyncClient;
51 import org.jclouds.s3.S3Client;
52 import org.jclouds.s3.blobstore.functions.BlobToObject;
53 import org.jclouds.s3.blobstore.functions.BucketToResourceList;
54 import org.jclouds.s3.blobstore.functions.BucketToResourceMetadata;
55 import org.jclouds.s3.blobstore.functions.ContainerToBucketListOptions;
56 import org.jclouds.s3.blobstore.functions.ObjectToBlob;
57 import org.jclouds.s3.blobstore.functions.ObjectToBlobMetadata;
58 import org.jclouds.s3.domain.AccessControlList;
59 import org.jclouds.s3.domain.AccessControlList.GroupGranteeURI;
60 import org.jclouds.s3.domain.AccessControlList.Permission;
61 import org.jclouds.s3.domain.BucketMetadata;
62 import org.jclouds.s3.domain.CannedAccessPolicy;
63 import org.jclouds.s3.domain.ListBucketResponse;
64 import org.jclouds.s3.domain.ObjectMetadata;
65 import org.jclouds.s3.options.ListBucketOptions;
66 import org.jclouds.s3.options.PutBucketOptions;
67 import org.jclouds.s3.options.PutObjectOptions;
68 import org.jclouds.s3.util.S3Utils;
69
70 import com.google.common.base.Function;
71 import com.google.common.base.Supplier;
72 import com.google.common.collect.Iterables;
73 import com.google.common.util.concurrent.ListenableFuture;
74
75
76
77
78
79 @Singleton
80 public class S3AsyncBlobStore extends BaseAsyncBlobStore {
81
82 private final S3AsyncClient async;
83 private final S3Client sync;
84 private final BucketToResourceMetadata bucket2ResourceMd;
85 private final ContainerToBucketListOptions container2BucketListOptions;
86 private final BlobToHttpGetOptions blob2ObjectGetOptions;
87 private final BucketToResourceList bucket2ResourceList;
88 private final ObjectToBlob object2Blob;
89 private final BlobToObject blob2Object;
90 private final ObjectToBlobMetadata object2BlobMd;
91 private final Provider<FetchBlobMetadata> fetchBlobMetadataProvider;
92 private final Map<String, AccessControlList> bucketAcls;
93
94 @Inject
95 protected S3AsyncBlobStore(BlobStoreContext context, BlobUtils blobUtils,
96 @Named(Constants.PROPERTY_USER_THREADS) ExecutorService service, Supplier<Location> defaultLocation,
97 @Memoized Supplier<Set<? extends Location>> locations, S3AsyncClient async, S3Client sync,
98 BucketToResourceMetadata bucket2ResourceMd, ContainerToBucketListOptions container2BucketListOptions,
99 BucketToResourceList bucket2ResourceList, ObjectToBlob object2Blob,
100 BlobToHttpGetOptions blob2ObjectGetOptions, BlobToObject blob2Object, ObjectToBlobMetadata object2BlobMd,
101 Provider<FetchBlobMetadata> fetchBlobMetadataProvider, Map<String, AccessControlList> bucketAcls) {
102 super(context, blobUtils, service, defaultLocation, locations);
103 this.blob2ObjectGetOptions = checkNotNull(blob2ObjectGetOptions, "blob2ObjectGetOptions");
104 this.async = checkNotNull(async, "async");
105 this.sync = checkNotNull(sync, "sync");
106 this.bucket2ResourceMd = checkNotNull(bucket2ResourceMd, "bucket2ResourceMd");
107 this.container2BucketListOptions = checkNotNull(container2BucketListOptions, "container2BucketListOptions");
108 this.bucket2ResourceList = checkNotNull(bucket2ResourceList, "bucket2ResourceList");
109 this.object2Blob = checkNotNull(object2Blob, "object2Blob");
110 this.blob2Object = checkNotNull(blob2Object, "blob2Object");
111 this.object2BlobMd = checkNotNull(object2BlobMd, "object2BlobMd");
112 this.fetchBlobMetadataProvider = checkNotNull(fetchBlobMetadataProvider, "fetchBlobMetadataProvider");
113 this.bucketAcls = checkNotNull(bucketAcls, "bucketAcls");
114 }
115
116
117
118
119 @Override
120 public ListenableFuture<PageSet<? extends StorageMetadata>> list() {
121 return Futures.compose(async.listOwnedBuckets(),
122 new Function<Set<BucketMetadata>, org.jclouds.blobstore.domain.PageSet<? extends StorageMetadata>>() {
123 public org.jclouds.blobstore.domain.PageSet<? extends StorageMetadata> apply(Set<BucketMetadata> from) {
124 return new PageSetImpl<StorageMetadata>(Iterables.transform(from, bucket2ResourceMd), null);
125 }
126 }, service);
127 }
128
129
130
131
132
133
134
135 @Override
136 public ListenableFuture<Boolean> containerExists(String container) {
137 return async.bucketExists(container);
138 }
139
140
141
142
143
144
145
146
147
148 @Override
149 public ListenableFuture<Boolean> createContainerInLocation(Location location, String container) {
150 return createContainerInLocation(location, container, CreateContainerOptions.NONE);
151 }
152
153
154
155
156
157
158
159 @Override
160
161 public ListenableFuture<PageSet<? extends StorageMetadata>> list(String container, ListContainerOptions options) {
162 ListBucketOptions httpOptions = container2BucketListOptions.apply(options);
163 ListenableFuture<ListBucketResponse> returnVal = async.listBucket(container, httpOptions);
164 ListenableFuture<PageSet<? extends StorageMetadata>> list = Futures.compose(returnVal, bucket2ResourceList,
165 service);
166 return (options.isDetailed()) ? Futures.compose(list,
167 fetchBlobMetadataProvider.get().setContainerName(container), service) : list;
168 }
169
170
171
172
173 protected boolean deleteAndVerifyContainerGone(final String container) {
174 return S3Utils.deleteAndVerifyContainerGone(sync, container);
175 }
176
177
178
179
180
181
182
183
184
185 @Override
186 public ListenableFuture<Boolean> blobExists(String container, String key) {
187 return async.objectExists(container, key);
188 }
189
190
191
192
193
194
195
196
197
198 @Override
199 public ListenableFuture<BlobMetadata> blobMetadata(String container, String key) {
200 return Futures.compose(async.headObject(container, key), new Function<ObjectMetadata, BlobMetadata>() {
201
202 @Override
203 public BlobMetadata apply(ObjectMetadata from) {
204 return object2BlobMd.apply(from);
205 }
206
207 }, service);
208 }
209
210
211
212
213
214
215
216
217
218 @Override
219 public ListenableFuture<Blob> getBlob(String container, String key, org.jclouds.blobstore.options.GetOptions options) {
220 GetOptions httpOptions = blob2ObjectGetOptions.apply(options);
221 return Futures.compose(async.getObject(container, key, httpOptions), object2Blob, service);
222 }
223
224
225
226
227
228
229
230
231
232 @Override
233 public ListenableFuture<String> putBlob(String container, Blob blob) {
234 PutObjectOptions options = new PutObjectOptions();
235 try {
236 AccessControlList acl = bucketAcls.get(container);
237 if (acl != null && acl.hasPermission(GroupGranteeURI.ALL_USERS, Permission.READ))
238 options.withAcl(CannedAccessPolicy.PUBLIC_READ);
239 } catch (NullPointerException e) {
240
241 }
242 return async.putObject(container, blob2Object.apply(blob), options);
243 }
244
245
246
247
248
249
250
251
252
253 @Override
254 public ListenableFuture<Void> removeBlob(String container, String key) {
255 return async.deleteObject(container, key);
256 }
257
258 @Override
259 public ListenableFuture<String> putBlob(String container, Blob blob, PutOptions options) {
260
261 return putBlob(container, blob);
262 }
263
264 @Override
265 public ListenableFuture<Boolean> createContainerInLocation(Location location, String container,
266 CreateContainerOptions options) {
267 PutBucketOptions putBucketOptions = new PutBucketOptions();
268 if (options.isPublicRead())
269 putBucketOptions.withBucketAcl(CannedAccessPolicy.PUBLIC_READ);
270 location = location != null ? location : defaultLocation.get();
271 return async.putBucketInRegion(location.getId(), container, putBucketOptions);
272 }
273
274 }