1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.jclouds.atmos.blobstore;
20
21 import static com.google.common.base.Preconditions.checkNotNull;
22 import static org.jclouds.atmos.options.PutOptions.Builder.publicRead;
23
24 import java.net.URI;
25 import java.util.Map;
26 import java.util.Set;
27 import java.util.concurrent.Callable;
28 import java.util.concurrent.ExecutorService;
29
30 import javax.inject.Inject;
31 import javax.inject.Named;
32 import javax.inject.Provider;
33 import javax.inject.Singleton;
34
35 import org.jclouds.Constants;
36 import org.jclouds.atmos.AtmosAsyncClient;
37 import org.jclouds.atmos.AtmosClient;
38 import org.jclouds.atmos.blobstore.functions.BlobStoreListOptionsToListOptions;
39 import org.jclouds.atmos.blobstore.functions.BlobToObject;
40 import org.jclouds.atmos.blobstore.functions.DirectoryEntryListToResourceMetadataList;
41 import org.jclouds.atmos.blobstore.functions.ObjectToBlob;
42 import org.jclouds.atmos.blobstore.functions.ObjectToBlobMetadata;
43 import org.jclouds.atmos.domain.AtmosObject;
44 import org.jclouds.atmos.domain.BoundedSet;
45 import org.jclouds.atmos.domain.DirectoryEntry;
46 import org.jclouds.atmos.options.ListOptions;
47 import org.jclouds.atmos.util.AtmosUtils;
48 import org.jclouds.blobstore.BlobStoreContext;
49 import org.jclouds.blobstore.domain.Blob;
50 import org.jclouds.blobstore.domain.BlobMetadata;
51 import org.jclouds.blobstore.domain.PageSet;
52 import org.jclouds.blobstore.domain.StorageMetadata;
53 import org.jclouds.blobstore.functions.BlobToHttpGetOptions;
54 import org.jclouds.blobstore.internal.BaseAsyncBlobStore;
55 import org.jclouds.blobstore.options.CreateContainerOptions;
56 import org.jclouds.blobstore.options.PutOptions;
57 import org.jclouds.blobstore.strategy.internal.FetchBlobMetadata;
58 import org.jclouds.blobstore.util.BlobUtils;
59 import org.jclouds.collect.Memoized;
60 import org.jclouds.concurrent.Futures;
61 import org.jclouds.crypto.Crypto;
62 import org.jclouds.domain.Location;
63 import org.jclouds.http.options.GetOptions;
64
65 import com.google.common.base.Function;
66 import com.google.common.base.Supplier;
67 import com.google.common.util.concurrent.ListenableFuture;
68
69
70
71
72 @Singleton
73 public class AtmosAsyncBlobStore extends BaseAsyncBlobStore {
74 private final AtmosAsyncClient async;
75 private final AtmosClient sync;
76 private final ObjectToBlob object2Blob;
77 private final ObjectToBlobMetadata object2BlobMd;
78 private final BlobToObject blob2Object;
79 private final BlobStoreListOptionsToListOptions container2ContainerListOptions;
80 private final DirectoryEntryListToResourceMetadataList container2ResourceList;
81 private final Crypto crypto;
82 private final BlobToHttpGetOptions blob2ObjectGetOptions;
83 private final Provider<FetchBlobMetadata> fetchBlobMetadataProvider;
84 private final Map<String, Boolean> isPublic;
85
86 @Inject
87 AtmosAsyncBlobStore(BlobStoreContext context, BlobUtils blobUtils,
88 @Named(Constants.PROPERTY_USER_THREADS) ExecutorService service, Supplier<Location> defaultLocation,
89 @Memoized Supplier<Set<? extends Location>> locations, AtmosAsyncClient async, AtmosClient sync,
90 ObjectToBlob object2Blob, ObjectToBlobMetadata object2BlobMd, BlobToObject blob2Object,
91 BlobStoreListOptionsToListOptions container2ContainerListOptions,
92 DirectoryEntryListToResourceMetadataList container2ResourceList, Crypto crypto,
93 BlobToHttpGetOptions blob2ObjectGetOptions, Provider<FetchBlobMetadata> fetchBlobMetadataProvider,
94 Map<String, Boolean> isPublic) {
95 super(context, blobUtils, service, defaultLocation, locations);
96 this.blob2ObjectGetOptions = checkNotNull(blob2ObjectGetOptions, "blob2ObjectGetOptions");
97 this.sync = checkNotNull(sync, "sync");
98 this.async = checkNotNull(async, "async");
99 this.container2ContainerListOptions = checkNotNull(container2ContainerListOptions,
100 "container2ContainerListOptions");
101 this.container2ResourceList = checkNotNull(container2ResourceList, "container2ResourceList");
102 this.object2Blob = checkNotNull(object2Blob, "object2Blob");
103 this.blob2Object = checkNotNull(blob2Object, "blob2Object");
104 this.object2BlobMd = checkNotNull(object2BlobMd, "object2BlobMd");
105 this.crypto = checkNotNull(crypto, "crypto");
106 this.fetchBlobMetadataProvider = checkNotNull(fetchBlobMetadataProvider, "fetchBlobMetadataProvider");
107 this.isPublic = checkNotNull(isPublic, "isPublic");
108 }
109
110
111
112
113 @Override
114 public ListenableFuture<BlobMetadata> blobMetadata(String container, String key) {
115 return Futures.compose(async.headFile(container + "/" + key), new Function<AtmosObject, BlobMetadata>() {
116 @Override
117 public BlobMetadata apply(AtmosObject from) {
118 return object2BlobMd.apply(from);
119 }
120 }, service);
121 }
122
123
124
125
126
127
128 @Override
129 public ListenableFuture<Boolean> createContainerInLocation(Location location, String container) {
130 return Futures.compose(async.createDirectory(container), new Function<URI, Boolean>() {
131
132 public Boolean apply(URI from) {
133 return true;
134 }
135
136 }, service);
137 }
138
139
140
141
142 @Override
143 public ListenableFuture<Void> createDirectory(String container, String directory) {
144 return Futures.compose(async.createDirectory(container + "/" + directory), new Function<URI, Void>() {
145
146 public Void apply(URI from) {
147 return null;
148 }
149
150 }, service);
151 }
152
153
154
155
156
157 protected boolean deleteAndVerifyContainerGone(final String container) {
158 sync.deletePath(container + "/");
159 return !sync.pathExists(container + "/");
160 }
161
162
163
164
165 @Override
166 public ListenableFuture<Boolean> containerExists(String container) {
167 return async.pathExists(container + "/");
168 }
169
170
171
172
173 @Override
174 public ListenableFuture<Boolean> directoryExists(String container, String directory) {
175 return async.pathExists(container + "/" + directory + "/");
176 }
177
178
179
180
181 @Override
182 public ListenableFuture<Void> deleteDirectory(String containerName, String directory) {
183 return removeBlob(containerName, directory + "/");
184 }
185
186
187
188
189
190
191
192
193
194 @Override
195 public ListenableFuture<Boolean> blobExists(String container, String key) {
196 return async.pathExists(container + "/" + key);
197 }
198
199
200
201
202 @Override
203 public ListenableFuture<Blob> getBlob(String container, String key, org.jclouds.blobstore.options.GetOptions options) {
204 GetOptions httpOptions = blob2ObjectGetOptions.apply(options);
205 ListenableFuture<AtmosObject> returnVal = async.readFile(container + "/" + key, httpOptions);
206 return Futures.compose(returnVal, object2Blob, service);
207 }
208
209
210
211
212 @Override
213 public ListenableFuture<PageSet<? extends StorageMetadata>> list() {
214 return Futures.compose(async.listDirectories(), container2ResourceList, service);
215 }
216
217
218
219
220 @Override
221 public ListenableFuture<PageSet<? extends StorageMetadata>> list(String container,
222 org.jclouds.blobstore.options.ListContainerOptions options) {
223 container = AtmosUtils.adjustContainerIfDirOptionPresent(container, options);
224 ListOptions nativeOptions = container2ContainerListOptions.apply(options);
225 ListenableFuture<BoundedSet<? extends DirectoryEntry>> returnVal = async.listDirectory(container, nativeOptions);
226 ListenableFuture<PageSet<? extends StorageMetadata>> list = Futures.compose(returnVal, container2ResourceList,
227 service);
228 return (ListenableFuture<PageSet<? extends StorageMetadata>>) (options.isDetailed() ? Futures.compose(list,
229 fetchBlobMetadataProvider.get().setContainerName(container), service) : list);
230 }
231
232
233
234
235
236
237 @Override
238 public ListenableFuture<String> putBlob(final String container, final Blob blob) {
239 final org.jclouds.atmos.options.PutOptions options = new org.jclouds.atmos.options.PutOptions();
240 try {
241 if (isPublic.get(container + "/"))
242 options.publicRead();
243 } catch (NullPointerException e) {
244
245 }
246 return Futures.makeListenable(service.submit(new Callable<String>() {
247
248 @Override
249 public String call() throws Exception {
250 return AtmosUtils.putBlob(sync, crypto, blob2Object, container, blob, options);
251 }
252
253 @Override
254 public String toString() {
255 return "putBlob(" + container + "," + blob.getMetadata().getName() + ")";
256 }
257 }), service);
258
259 }
260
261
262
263
264 @Override
265 public ListenableFuture<Void> removeBlob(String container, String key) {
266 return async.deletePath(container + "/" + key);
267 }
268
269 @Override
270 public ListenableFuture<String> putBlob(String container, Blob blob, PutOptions options) {
271
272 return putBlob(container, blob);
273 }
274
275 @Override
276 public ListenableFuture<Boolean> createContainerInLocation(Location location, String container,
277 CreateContainerOptions options) {
278 if (options.isPublicRead())
279 return Futures.compose(async.createDirectory(container, publicRead()), new Function<URI, Boolean>() {
280
281 public Boolean apply(URI from) {
282 return true;
283 }
284
285 }, service);
286 return createContainerInLocation(location, container);
287 }
288
289 }