1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.jclouds.filesystem;
20
21 import static com.google.common.base.Preconditions.checkNotNull;
22 import static com.google.common.base.Preconditions.checkState;
23 import static com.google.common.base.Throwables.getCausalChain;
24 import static com.google.common.base.Throwables.propagate;
25 import static com.google.common.collect.Iterables.filter;
26 import static com.google.common.collect.Iterables.find;
27 import static com.google.common.collect.Iterables.size;
28 import static com.google.common.collect.Iterables.transform;
29 import static com.google.common.collect.Lists.newArrayList;
30 import static com.google.common.collect.Lists.partition;
31 import static com.google.common.collect.Maps.newHashMap;
32 import static com.google.common.collect.Sets.filter;
33 import static com.google.common.collect.Sets.newTreeSet;
34 import static com.google.common.io.ByteStreams.toByteArray;
35 import static com.google.common.util.concurrent.Futures.immediateFailedFuture;
36 import static com.google.common.util.concurrent.Futures.immediateFuture;
37
38 import java.io.ByteArrayInputStream;
39 import java.io.ByteArrayOutputStream;
40 import java.io.File;
41 import java.io.IOException;
42 import java.io.ObjectInput;
43 import java.io.ObjectInputStream;
44 import java.io.ObjectOutput;
45 import java.io.ObjectOutputStream;
46 import java.net.URI;
47 import java.util.Collection;
48 import java.util.Date;
49 import java.util.List;
50 import java.util.Map;
51 import java.util.Map.Entry;
52 import java.util.Set;
53 import java.util.SortedSet;
54 import java.util.TreeSet;
55 import java.util.concurrent.ExecutorService;
56
57 import javax.annotation.Nullable;
58 import javax.annotation.Resource;
59 import javax.inject.Inject;
60 import javax.inject.Named;
61 import javax.ws.rs.Path;
62 import javax.ws.rs.PathParam;
63
64 import org.jclouds.Constants;
65 import org.jclouds.blobstore.BlobStoreContext;
66 import org.jclouds.blobstore.ContainerNotFoundException;
67 import org.jclouds.blobstore.KeyNotFoundException;
68 import org.jclouds.blobstore.domain.Blob;
69 import org.jclouds.blobstore.domain.BlobBuilder;
70 import org.jclouds.blobstore.domain.BlobMetadata;
71 import org.jclouds.blobstore.domain.MutableBlobMetadata;
72 import org.jclouds.blobstore.domain.MutableStorageMetadata;
73 import org.jclouds.blobstore.domain.PageSet;
74 import org.jclouds.blobstore.domain.StorageMetadata;
75 import org.jclouds.blobstore.domain.StorageType;
76 import org.jclouds.blobstore.domain.internal.MutableStorageMetadataImpl;
77 import org.jclouds.blobstore.domain.internal.PageSetImpl;
78 import org.jclouds.blobstore.functions.HttpGetOptionsListToGetOptions;
79 import org.jclouds.blobstore.internal.BaseAsyncBlobStore;
80 import org.jclouds.blobstore.options.CreateContainerOptions;
81 import org.jclouds.blobstore.options.GetOptions;
82 import org.jclouds.blobstore.options.ListContainerOptions;
83 import org.jclouds.blobstore.options.PutOptions;
84 import org.jclouds.blobstore.strategy.IfDirectoryReturnNameStrategy;
85 import org.jclouds.blobstore.util.BlobUtils;
86 import org.jclouds.collect.Memoized;
87 import org.jclouds.crypto.Crypto;
88 import org.jclouds.crypto.CryptoStreams;
89 import org.jclouds.date.DateService;
90 import org.jclouds.domain.Location;
91 import org.jclouds.filesystem.predicates.validators.FilesystemContainerNameValidator;
92 import org.jclouds.filesystem.strategy.FilesystemStorageStrategy;
93 import org.jclouds.http.HttpCommand;
94 import org.jclouds.http.HttpRequest;
95 import org.jclouds.http.HttpResponse;
96 import org.jclouds.http.HttpResponseException;
97 import org.jclouds.http.options.HttpRequestOptions;
98 import org.jclouds.io.Payloads;
99 import org.jclouds.io.payloads.BaseMutableContentMetadata;
100 import org.jclouds.logging.Logger;
101 import org.jclouds.rest.annotations.ParamValidators;
102
103 import com.google.common.base.Function;
104 import com.google.common.base.Predicate;
105 import com.google.common.base.Supplier;
106 import com.google.common.base.Throwables;
107 import com.google.common.collect.Iterables;
108 import com.google.common.util.concurrent.Futures;
109 import com.google.common.util.concurrent.ListenableFuture;
110
111
112
113
114
115
116
117 public class FilesystemAsyncBlobStore extends BaseAsyncBlobStore {
118
119 @Resource
120 protected Logger logger = Logger.NULL;
121
122 protected final DateService dateService;
123 protected final Crypto crypto;
124 protected final HttpGetOptionsListToGetOptions httpGetOptionsConverter;
125 protected final IfDirectoryReturnNameStrategy ifDirectoryReturnName;
126 protected final FilesystemStorageStrategy storageStrategy;
127
128 @Inject
129 protected FilesystemAsyncBlobStore(BlobStoreContext context, DateService dateService, Crypto crypto,
130 HttpGetOptionsListToGetOptions httpGetOptionsConverter, IfDirectoryReturnNameStrategy ifDirectoryReturnName,
131 BlobUtils blobUtils, @Named(Constants.PROPERTY_USER_THREADS) ExecutorService service,
132 Supplier<Location> defaultLocation, @Memoized Supplier<Set<? extends Location>> locations,
133 FilesystemStorageStrategy storageStrategy) {
134 super(context, blobUtils, service, defaultLocation, locations);
135 this.dateService = dateService;
136 this.crypto = crypto;
137 this.httpGetOptionsConverter = httpGetOptionsConverter;
138 this.ifDirectoryReturnName = ifDirectoryReturnName;
139 this.storageStrategy = checkNotNull(storageStrategy, "Storage strategy");
140 }
141
142
143
144
145 @Override
146 public ListenableFuture<PageSet<? extends StorageMetadata>> list(final String container, ListContainerOptions options) {
147
148
149 if (!containerExistsSyncImpl(container)) {
150 return immediateFailedFuture(cnfe(container));
151 }
152
153
154 Iterable<String> blobBelongingToContainer = null;
155 try {
156 blobBelongingToContainer = storageStrategy.getBlobKeysInsideContainer(container);
157 } catch (IOException e) {
158 logger.error(e, "An error occurred loading blobs contained into container %s", container);
159 Throwables.propagate(e);
160 }
161
162 SortedSet<StorageMetadata> contents = newTreeSet(transform(blobBelongingToContainer,
163 new Function<String, StorageMetadata>() {
164 public StorageMetadata apply(String key) {
165 Blob oldBlob = loadFileBlob(container, key);
166
167 checkState(oldBlob != null, "blob " + key + " is not present although it was in the list of "
168 + container);
169 checkState(oldBlob.getMetadata() != null, "blob " + container + "/" + key + " has no metadata");
170 MutableBlobMetadata md = copy(oldBlob.getMetadata());
171 String directoryName = ifDirectoryReturnName.execute(md);
172 if (directoryName != null) {
173 md.setName(directoryName);
174 md.setType(StorageType.RELATIVE_PATH);
175 }
176 return md;
177 }
178 }));
179
180 String marker = null;
181 if (options != null) {
182 if (options.getMarker() != null) {
183 final String finalMarker = options.getMarker();
184 StorageMetadata lastMarkerMetadata = find(contents, new Predicate<StorageMetadata>() {
185 public boolean apply(StorageMetadata metadata) {
186 return metadata.getName().equals(finalMarker);
187 }
188 });
189 contents = contents.tailSet(lastMarkerMetadata);
190 contents.remove(lastMarkerMetadata);
191 }
192
193 final String prefix = options.getDir();
194 if (prefix != null) {
195 contents = newTreeSet(filter(contents, new Predicate<StorageMetadata>() {
196 public boolean apply(StorageMetadata o) {
197 return (o != null && o.getName().startsWith(prefix) && !o.getName().equals(prefix));
198 }
199 }));
200 }
201
202 Integer maxResults = options.getMaxResults() != null ? options.getMaxResults() : 1000;
203 if (contents.size() > 0) {
204 SortedSet<StorageMetadata> contentsSlice = firstSliceOfSize(contents, maxResults);
205 if (!contentsSlice.contains(contents.last())) {
206
207 marker = contentsSlice.last().getName();
208 } else {
209 marker = null;
210 }
211 contents = contentsSlice;
212 }
213
214 final String delimiter = options.isRecursive() ? null : File.separator;
215 if (delimiter != null) {
216 SortedSet<String> commonPrefixes = null;
217 Iterable<String> iterable = transform(contents, new CommonPrefixes(prefix != null ? prefix : null,
218 delimiter));
219 commonPrefixes = iterable != null ? newTreeSet(iterable) : new TreeSet<String>();
220 commonPrefixes.remove(CommonPrefixes.NO_PREFIX);
221
222 contents = newTreeSet(filter(contents, new DelimiterFilter(prefix != null ? prefix : null, delimiter)));
223
224 Iterables.<StorageMetadata> addAll(contents,
225 transform(commonPrefixes, new Function<String, StorageMetadata>() {
226 public StorageMetadata apply(String o) {
227 MutableStorageMetadata md = new MutableStorageMetadataImpl();
228 md.setType(StorageType.RELATIVE_PATH);
229 md.setName(o);
230 return md;
231 }
232 }));
233 }
234
235
236 if (!options.isDetailed()) {
237 for (StorageMetadata md : contents) {
238 md.getUserMetadata().clear();
239 }
240 }
241 }
242
243 return Futures.<PageSet<? extends StorageMetadata>> immediateFuture(new PageSetImpl<StorageMetadata>(contents,
244 marker));
245
246 }
247
248 private ContainerNotFoundException cnfe(String name) {
249 return new ContainerNotFoundException(name, String.format("container %s not in filesystem", name));
250 }
251
252 public static MutableBlobMetadata copy(MutableBlobMetadata in) {
253 ByteArrayOutputStream bout = new ByteArrayOutputStream();
254 ObjectOutput os;
255 try {
256 os = new ObjectOutputStream(bout);
257 os.writeObject(in);
258 ObjectInput is = new ObjectInputStream(new ByteArrayInputStream(bout.toByteArray()));
259 MutableBlobMetadata metadata = (MutableBlobMetadata) is.readObject();
260 convertUserMetadataKeysToLowercase(metadata);
261 metadata.setContentMetadata(BaseMutableContentMetadata.fromContentMetadata(in.getContentMetadata().toBuilder()
262 .build()));
263 return metadata;
264 } catch (Exception e) {
265 propagate(e);
266 assert false : "exception should have propagated: " + e;
267 return null;
268 }
269 }
270
271 private static void convertUserMetadataKeysToLowercase(MutableBlobMetadata metadata) {
272 Map<String, String> lowerCaseUserMetadata = newHashMap();
273 for (Entry<String, String> entry : metadata.getUserMetadata().entrySet()) {
274 lowerCaseUserMetadata.put(entry.getKey().toLowerCase(), entry.getValue());
275 }
276 metadata.setUserMetadata(lowerCaseUserMetadata);
277 }
278
279 public static MutableBlobMetadata copy(MutableBlobMetadata in, String newKey) {
280 MutableBlobMetadata newMd = copy(in);
281 newMd.setName(newKey);
282 return newMd;
283 }
284
285
286
287
288 @Override
289 public ListenableFuture<Void> removeBlob(String container, String key) {
290 storageStrategy.removeBlob(container, key);
291 return immediateFuture(null);
292 }
293
294
295
296
297 @Override
298 public ListenableFuture<Boolean> containerExists(String containerName) {
299 boolean exists = containerExistsSyncImpl(containerName);
300 return immediateFuture(exists);
301 }
302
303
304
305
306 @Override
307 public ListenableFuture<PageSet<? extends StorageMetadata>> list() {
308 Iterable<String> containers = storageStrategy.getAllContainerNames();
309
310 return Futures.<PageSet<? extends StorageMetadata>> immediateFuture(new PageSetImpl<StorageMetadata>(transform(
311 containers, new Function<String, StorageMetadata>() {
312 public StorageMetadata apply(String name) {
313 MutableStorageMetadata cmd = create();
314 cmd.setName(name);
315 cmd.setType(StorageType.CONTAINER);
316 return cmd;
317 }
318 }), null));
319 }
320
321 protected MutableStorageMetadata create() {
322 return new MutableStorageMetadataImpl();
323 }
324
325
326
327
328 @Path("{container}")
329 @Override
330 public ListenableFuture<Boolean> createContainerInLocation(final Location location,
331 @PathParam("container") @ParamValidators({ FilesystemContainerNameValidator.class }) String name) {
332 boolean result = storageStrategy.createContainer(name);
333 return immediateFuture(result);
334 }
335
336 public String getFirstQueryOrNull(String string, @Nullable HttpRequestOptions options) {
337 if (options == null)
338 return null;
339 Collection<String> values = options.buildQueryParameters().get(string);
340 return (values != null && values.size() >= 1) ? values.iterator().next() : null;
341 }
342
343
344
345
346
347
348
349
350
351
352
353
354
355 private Blob loadFileBlob(String container, String key) {
356 logger.debug("Opening blob in container: %s - %s", container, key);
357 BlobBuilder builder = blobUtils.blobBuilder();
358 builder.name(key);
359 File file = storageStrategy.getFileForBlobKey(container, key);
360 try {
361 builder.payload(file).calculateMD5();
362 } catch (IOException e) {
363 logger.error("An error occurred calculating MD5 for blob %s from container ", key, container);
364 Throwables.propagateIfPossible(e);
365 }
366 Blob blob = builder.build();
367 if (blob.getPayload().getContentMetadata().getContentMD5() != null)
368 blob.getMetadata().setETag(CryptoStreams.hex(blob.getPayload().getContentMetadata().getContentMD5()));
369 return blob;
370 }
371
372 protected static class DelimiterFilter implements Predicate<StorageMetadata> {
373 private final String prefix;
374 private final String delimiter;
375
376 public DelimiterFilter(String prefix, String delimiter) {
377 this.prefix = prefix;
378 this.delimiter = delimiter;
379 }
380
381 public boolean apply(StorageMetadata metadata) {
382 if (prefix == null)
383 return metadata.getName().indexOf(delimiter) == -1;
384
385 String toMatch = prefix.endsWith("/") ? prefix : prefix + delimiter;
386 if (metadata.getName().startsWith(toMatch)) {
387 String unprefixedName = metadata.getName().replaceFirst(toMatch, "");
388 if (unprefixedName.equals("")) {
389
390 return false;
391 }
392 return unprefixedName.indexOf(delimiter) == -1;
393 }
394 return false;
395 }
396 }
397
398 protected static class CommonPrefixes implements Function<StorageMetadata, String> {
399 private final String prefix;
400 private final String delimiter;
401 public static final String NO_PREFIX = "NO_PREFIX";
402
403 public CommonPrefixes(String prefix, String delimiter) {
404 this.prefix = prefix;
405 this.delimiter = delimiter;
406 }
407
408 public String apply(StorageMetadata metadata) {
409 String working = metadata.getName();
410 if (prefix != null) {
411
412 String toMatch = prefix.endsWith("/") ? prefix : prefix + delimiter;
413 if (working.startsWith(toMatch)) {
414 working = working.replaceFirst(toMatch, "");
415 }
416 }
417 if (working.contains(delimiter)) {
418 return working.substring(0, working.indexOf(delimiter));
419 }
420 return NO_PREFIX;
421 }
422 }
423
424 public static <T extends Comparable<?>> SortedSet<T> firstSliceOfSize(Iterable<T> elements, int size) {
425 List<List<T>> slices = partition(newArrayList(elements), size);
426 return newTreeSet(slices.get(0));
427 }
428
429 public static HttpResponseException returnResponseException(int code) {
430 HttpResponse response = null;
431 response = new HttpResponse(code, null, null);
432 return new HttpResponseException(new HttpCommand() {
433
434 public int getRedirectCount() {
435 return 0;
436 }
437
438 public int incrementRedirectCount() {
439 return 0;
440 }
441
442 public boolean isReplayable() {
443 return false;
444 }
445
446 public Exception getException() {
447 return null;
448 }
449
450 public int getFailureCount() {
451 return 0;
452 }
453
454 public int incrementFailureCount() {
455 return 0;
456 }
457
458 public void setException(Exception exception) {
459
460 }
461
462 @Override
463 public HttpRequest getCurrentRequest() {
464 return new HttpRequest("GET", URI.create("http://stub"));
465 }
466
467 @Override
468 public void setCurrentRequest(HttpRequest request) {
469
470 }
471
472 }, response);
473 }
474
475
476
477
478 @Override
479 public ListenableFuture<String> putBlob(String containerName, Blob object) {
480 String blobKey = object.getMetadata().getName();
481
482 logger.debug("Put object with key [%s] to container [%s]", blobKey, containerName);
483 String eTag = getEtag(object);
484 try {
485
486
487
488 storageStrategy.writePayloadOnFile(containerName, blobKey, object.getPayload());
489 } catch (IOException e) {
490 logger.error(e, "An error occurred storing the new object with name [%s] to container [%s].", blobKey,
491 containerName);
492 Throwables.propagate(e);
493 }
494 return immediateFuture(eTag);
495 }
496
497
498
499
500 @Override
501 public ListenableFuture<Boolean> blobExists(final String containerName, final String key) {
502 return immediateFuture(storageStrategy.blobExists(containerName, key));
503 }
504
505
506
507
508 @Override
509 public ListenableFuture<Blob> getBlob(final String containerName, final String key, GetOptions options) {
510 logger.debug("Retrieving blob with key %s from container %s", key, containerName);
511
512 if (!containerExistsSyncImpl(containerName)) {
513 logger.debug("Container %s does not exist", containerName);
514 return immediateFailedFuture(cnfe(containerName));
515 }
516
517 if (!storageStrategy.blobExists(containerName, key)) {
518 logger.debug("Item %s does not exist in container %s", key, containerName);
519 return immediateFuture(null);
520 }
521
522 Blob blob = loadFileBlob(containerName, key);
523
524 if (options != null) {
525 if (options.getIfMatch() != null) {
526 if (!blob.getMetadata().getETag().equals(options.getIfMatch()))
527 return immediateFailedFuture(returnResponseException(412));
528 }
529 if (options.getIfNoneMatch() != null) {
530 if (blob.getMetadata().getETag().equals(options.getIfNoneMatch()))
531 return immediateFailedFuture(returnResponseException(304));
532 }
533 if (options.getIfModifiedSince() != null) {
534 Date modifiedSince = options.getIfModifiedSince();
535 if (blob.getMetadata().getLastModified().before(modifiedSince)) {
536 HttpResponse response = new HttpResponse(304, null, null);
537 return immediateFailedFuture(new HttpResponseException(String.format("%1$s is before %2$s", blob
538 .getMetadata().getLastModified(), modifiedSince), null, response));
539 }
540
541 }
542 if (options.getIfUnmodifiedSince() != null) {
543 Date unmodifiedSince = options.getIfUnmodifiedSince();
544 if (blob.getMetadata().getLastModified().after(unmodifiedSince)) {
545 HttpResponse response = new HttpResponse(412, null, null);
546 return immediateFailedFuture(new HttpResponseException(String.format("%1$s is after %2$s", blob
547 .getMetadata().getLastModified(), unmodifiedSince), null, response));
548 }
549 }
550
551 if (options.getRanges() != null && options.getRanges().size() > 0) {
552 byte[] data;
553 try {
554 data = toByteArray(blob.getPayload().getInput());
555 } catch (IOException e) {
556 return immediateFailedFuture(new RuntimeException(e));
557 }
558 ByteArrayOutputStream out = new ByteArrayOutputStream();
559 for (String s : options.getRanges()) {
560 if (s.startsWith("-")) {
561 int length = Integer.parseInt(s.substring(1));
562 out.write(data, data.length - length, length);
563 } else if (s.endsWith("-")) {
564 int offset = Integer.parseInt(s.substring(0, s.length() - 1));
565 out.write(data, offset, data.length - offset);
566 } else if (s.contains("-")) {
567 String[] firstLast = s.split("\\-");
568 int offset = Integer.parseInt(firstLast[0]);
569 int last = Integer.parseInt(firstLast[1]);
570 int length = (last < data.length) ? last + 1 : data.length - offset;
571 out.write(data, offset, length);
572 } else {
573 return immediateFailedFuture(new IllegalArgumentException("first and last were null!"));
574 }
575
576 }
577 blob.setPayload(out.toByteArray());
578 blob.getMetadata().getContentMetadata().setContentLength(new Long(data.length));
579 }
580 }
581 checkNotNull(blob.getPayload(), "payload " + blob);
582 return immediateFuture(blob);
583 }
584
585
586
587
588 @Override
589 public ListenableFuture<BlobMetadata> blobMetadata(String container, String key) {
590 try {
591 Blob blob = getBlob(container, key).get();
592 return Futures.<BlobMetadata> immediateFuture(blob != null ? blob.getMetadata() : null);
593 } catch (Exception e) {
594 if (size(filter(getCausalChain(e), KeyNotFoundException.class)) >= 1)
595 return immediateFuture(null);
596 return immediateFailedFuture(e);
597 }
598 }
599
600 @Override
601 protected boolean deleteAndVerifyContainerGone(String container) {
602 storageStrategy.deleteContainer(container);
603 return containerExistsSyncImpl(container);
604 }
605
606
607
608
609
610
611
612
613
614 @Override
615 public ListenableFuture<Void> deleteContainer(String container) {
616 deleteAndVerifyContainerGone(container);
617 return immediateFuture(null);
618 }
619
620
621
622
623
624
625
626
627 private boolean containerExistsSyncImpl(String containerName) {
628 return storageStrategy.containerExists(containerName);
629 }
630
631
632
633
634
635
636
637
638 private String getEtag(Blob object) {
639 try {
640 Payloads.calculateMD5(object, crypto.md5());
641 } catch (IOException ex) {
642 logger.error(ex, "An error occurred calculating MD5 for object with name %s.", object.getMetadata().getName());
643 Throwables.propagate(ex);
644 }
645
646 String eTag = CryptoStreams.hex(object.getPayload().getContentMetadata().getContentMD5());
647 return eTag;
648 }
649
650 @Override
651 public ListenableFuture<String> putBlob(String container, Blob blob, PutOptions options) {
652
653 return putBlob(container, blob);
654 }
655
656 @Override
657 public ListenableFuture<Boolean> createContainerInLocation(Location location, String container,
658 CreateContainerOptions options) {
659 if (options.isPublicRead())
660 throw new UnsupportedOperationException("publicRead");
661 return createContainerInLocation(location, container);
662 }
663 }