Merge branch 's3-multipart-uploads' of https://github.com/AmineChikhaoui/nix

This commit is contained in:
Eelco Dolstra 2018-05-10 12:06:02 +02:00
commit 38def17627
No known key found for this signature in database
GPG Key ID: 8170B4726D7198DE
3 changed files with 68 additions and 20 deletions

View File

@ -61,7 +61,7 @@ rec {
++ lib.optional (stdenv.isLinux || stdenv.isDarwin) libsodium ++ lib.optional (stdenv.isLinux || stdenv.isDarwin) libsodium
++ lib.optional (stdenv.isLinux || stdenv.isDarwin) ++ lib.optional (stdenv.isLinux || stdenv.isDarwin)
(aws-sdk-cpp.override { (aws-sdk-cpp.override {
apis = ["s3"]; apis = ["s3" "transfer"];
customMemoryManagement = false; customMemoryManagement = false;
}); });

View File

@ -18,7 +18,7 @@ libstore_FILES = sandbox-defaults.sb sandbox-minimal.sb sandbox-network.sb
$(foreach file,$(libstore_FILES),$(eval $(call install-data-in,$(d)/$(file),$(datadir)/nix/sandbox))) $(foreach file,$(libstore_FILES),$(eval $(call install-data-in,$(d)/$(file),$(datadir)/nix/sandbox)))
ifeq ($(ENABLE_S3), 1) ifeq ($(ENABLE_S3), 1)
libstore_LDFLAGS += -laws-cpp-sdk-s3 -laws-cpp-sdk-core libstore_LDFLAGS += -laws-cpp-sdk-transfer -laws-cpp-sdk-s3 -laws-cpp-sdk-core
endif endif
ifeq ($(OS), SunOS) ifeq ($(OS), SunOS)

View File

@ -17,6 +17,7 @@
#include <aws/core/client/DefaultRetryStrategy.h> #include <aws/core/client/DefaultRetryStrategy.h>
#include <aws/core/utils/logging/FormattedLogSystem.h> #include <aws/core/utils/logging/FormattedLogSystem.h>
#include <aws/core/utils/logging/LogMacros.h> #include <aws/core/utils/logging/LogMacros.h>
#include <aws/core/utils/threading/Executor.h>
#include <aws/s3/S3Client.h> #include <aws/s3/S3Client.h>
#include <aws/s3/model/CreateBucketRequest.h> #include <aws/s3/model/CreateBucketRequest.h>
#include <aws/s3/model/GetBucketLocationRequest.h> #include <aws/s3/model/GetBucketLocationRequest.h>
@ -24,6 +25,9 @@
#include <aws/s3/model/HeadObjectRequest.h> #include <aws/s3/model/HeadObjectRequest.h>
#include <aws/s3/model/ListObjectsRequest.h> #include <aws/s3/model/ListObjectsRequest.h>
#include <aws/s3/model/PutObjectRequest.h> #include <aws/s3/model/PutObjectRequest.h>
#include <aws/transfer/TransferManager.h>
using namespace Aws::Transfer;
namespace nix { namespace nix {
@ -169,6 +173,8 @@ struct S3BinaryCacheStoreImpl : public S3BinaryCacheStore
const Setting<std::string> narinfoCompression{this, "", "narinfo-compression", "compression method for .narinfo files"}; const Setting<std::string> narinfoCompression{this, "", "narinfo-compression", "compression method for .narinfo files"};
const Setting<std::string> lsCompression{this, "", "ls-compression", "compression method for .ls files"}; const Setting<std::string> lsCompression{this, "", "ls-compression", "compression method for .ls files"};
const Setting<std::string> logCompression{this, "", "log-compression", "compression method for log/* files"}; const Setting<std::string> logCompression{this, "", "log-compression", "compression method for log/* files"};
const Setting<uint64_t> bufferSize{
this, 5 * 1024 * 1024, "buffer-size", "size (in bytes) of each part in multi-part uploads. defaults to 5Mb"};
std::string bucketName; std::string bucketName;
@ -271,34 +277,76 @@ struct S3BinaryCacheStoreImpl : public S3BinaryCacheStore
const std::string & mimeType, const std::string & mimeType,
const std::string & contentEncoding) const std::string & contentEncoding)
{ {
auto request =
Aws::S3::Model::PutObjectRequest()
.WithBucket(bucketName)
.WithKey(path);
request.SetContentType(mimeType);
if (contentEncoding != "")
request.SetContentEncoding(contentEncoding);
auto stream = std::make_shared<istringstream_nocopy>(data); auto stream = std::make_shared<istringstream_nocopy>(data);
request.SetBody(stream); auto maxThreads = std::thread::hardware_concurrency();
stats.put++; static std::shared_ptr<Aws::Utils::Threading::PooledThreadExecutor>
stats.putBytes += data.size(); executor = std::make_shared<Aws::Utils::Threading::PooledThreadExecutor>(maxThreads);
TransferManagerConfiguration transferConfig(executor.get());
transferConfig.s3Client = s3Helper.client;
transferConfig.bufferSize = bufferSize;
if (contentEncoding != "")
transferConfig.createMultipartUploadTemplate.SetContentEncoding(
contentEncoding);
transferConfig.uploadProgressCallback =
[&](const TransferManager *transferManager,
const std::shared_ptr<const TransferHandle>
&transferHandle) {
//FIXME: find a way to properly abort the multipart upload.
checkInterrupt();
printTalkative("upload progress ('%s'): '%d' of '%d' bytes",
path,
transferHandle->GetBytesTransferred(),
transferHandle->GetBytesTotalSize());
};
transferConfig.transferStatusUpdatedCallback =
[&](const TransferManager *,
const std::shared_ptr<const TransferHandle>
&transferHandle) {
switch (transferHandle->GetStatus()) {
case TransferStatus::COMPLETED:
printTalkative("upload of '%s' completed", path);
stats.put++;
stats.putBytes += data.size();
break;
case TransferStatus::IN_PROGRESS:
break;
case TransferStatus::FAILED:
throw Error("AWS error: failed to upload 's3://%s/%s'",
bucketName, path);
break;
default:
throw Error("AWS error: transfer status of 's3://%s/%s' "
"in unexpected state",
bucketName, path);
};
};
std::shared_ptr<TransferManager> transferManager =
TransferManager::Create(transferConfig);
auto now1 = std::chrono::steady_clock::now(); auto now1 = std::chrono::steady_clock::now();
auto result = checkAws(format("AWS error uploading '%s'") % path, std::shared_ptr<TransferHandle> transferHandle =
s3Helper.client->PutObject(request)); transferManager->UploadFile(stream, bucketName, path, mimeType,
Aws::Map<Aws::String, Aws::String>());
transferHandle->WaitUntilFinished();
auto now2 = std::chrono::steady_clock::now(); auto now2 = std::chrono::steady_clock::now();
auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(now2 - now1).count(); auto duration =
std::chrono::duration_cast<std::chrono::milliseconds>(now2 - now1)
.count();
printInfo(format("uploaded 's3://%1%/%2%' (%3% bytes) in %4% ms") printInfo(format("uploaded 's3://%1%/%2%' (%3% bytes) in %4% ms") %
% bucketName % path % data.size() % duration); bucketName % path % data.size() % duration);
stats.putTimeMs += duration; stats.putTimeMs += duration;
} }