Add support for s3:// URIs

This adds support for s3:// URIs in all places where Nix allows URIs,
e.g. in builtins.fetchurl, builtins.fetchTarball, <nix/fetchurl.nix>
and NIX_PATH. It allows fetching resources from private S3 buckets,
using credentials obtained from the standard places (i.e. AWS_*
environment variables, ~/.aws/credentials and the EC2 metadata
server). This may not be super-useful in general, but since we already
depend on aws-sdk-cpp, it's a cheap feature to add.
This commit is contained in:
Eelco Dolstra 2017-02-14 14:20:00 +01:00
parent 62ff5ad424
commit 9ff9c3f2f8
No known key found for this signature in database
GPG Key ID: 8170B4726D7198DE
5 changed files with 142 additions and 63 deletions

View File

@ -4,6 +4,7 @@
#include "hash.hh"
#include "store-api.hh"
#include "archive.hh"
#include "s3.hh"
#include <unistd.h>
#include <fcntl.h>
@ -480,6 +481,31 @@ struct CurlDownloader : public Downloader
std::function<void(const DownloadResult &)> success,
std::function<void(std::exception_ptr exc)> failure) override
{
/* Ugly hack to support s3:// URIs. */
if (hasPrefix(request.uri, "s3://")) {
// FIXME: do this on a worker thread
sync2async<DownloadResult>(success, failure, [&]() {
#ifdef ENABLE_S3
S3Helper s3Helper;
auto slash = request.uri.find('/', 5);
if (slash == std::string::npos)
throw nix::Error("bad S3 URI %s", request.uri);
std::string bucketName(request.uri, 5, slash - 5);
std::string key(request.uri, slash + 1);
// FIXME: implement ETag
auto s3Res = s3Helper.getObject(bucketName, key);
DownloadResult res;
if (!s3Res.data)
throw DownloadError(NotFound, fmt("S3 object %s does not exist", request.uri));
res.data = s3Res.data;
return res;
#else
throw nix::Error("cannot download %s because Nix is not built with S3 support", request.uri);
#endif
});
return;
}
auto item = std::make_shared<DownloadItem>(*this, request);
item->success = success;
item->failure = failure;
@ -629,7 +655,7 @@ bool isUri(const string & s)
size_t pos = s.find("://");
if (pos == string::npos) return false;
string scheme(s, 0, pos);
return scheme == "http" || scheme == "https" || scheme == "file" || scheme == "channel" || scheme == "git";
return scheme == "http" || scheme == "https" || scheme == "file" || scheme == "channel" || scheme == "git" || scheme == "s3";
}

View File

@ -23,7 +23,7 @@ struct DownloadRequest
struct DownloadResult
{
bool cached;
bool cached = false;
std::string etag;
std::string effectiveUrl;
std::shared_ptr<std::string> data;

View File

@ -1,6 +1,6 @@
#if ENABLE_S3
#if __linux__
#include "s3.hh"
#include "s3-binary-cache-store.hh"
#include "nar-info.hh"
#include "nar-info-disk-cache.hh"
@ -18,15 +18,6 @@
namespace nix {
struct istringstream_nocopy : public std::stringstream
{
istringstream_nocopy(const std::string & s)
{
rdbuf()->pubsetbuf(
(char *) s.data(), s.size());
}
};
struct S3Error : public Error
{
Aws::S3::S3Errors err;
@ -60,21 +51,81 @@ static void initAWS()
});
}
S3Helper::S3Helper()
: config(makeConfig())
, client(make_ref<Aws::S3::S3Client>(*config))
{
}
ref<Aws::Client::ClientConfiguration> S3Helper::makeConfig()
{
initAWS();
auto res = make_ref<Aws::Client::ClientConfiguration>();
res->region = Aws::Region::US_EAST_1; // FIXME: make configurable
res->requestTimeoutMs = 600 * 1000;
return res;
}
S3Helper::DownloadResult S3Helper::getObject(
const std::string & bucketName, const std::string & key)
{
debug("fetching s3://%s/%s...", bucketName, key);
auto request =
Aws::S3::Model::GetObjectRequest()
.WithBucket(bucketName)
.WithKey(key);
request.SetResponseStreamFactory([&]() {
return Aws::New<std::stringstream>("STRINGSTREAM");
});
DownloadResult res;
auto now1 = std::chrono::steady_clock::now();
try {
auto result = checkAws(fmt("AWS error fetching %s", key),
client->GetObject(request));
res.data = std::make_shared<std::string>(
dynamic_cast<std::stringstream &>(result.GetBody()).str());
} catch (S3Error & e) {
if (e.err != Aws::S3::S3Errors::NO_SUCH_KEY) throw;
}
auto now2 = std::chrono::steady_clock::now();
res.durationMs = std::chrono::duration_cast<std::chrono::milliseconds>(now2 - now1).count();
return res;
}
#if __linux__
struct istringstream_nocopy : public std::stringstream
{
istringstream_nocopy(const std::string & s)
{
rdbuf()->pubsetbuf(
(char *) s.data(), s.size());
}
};
struct S3BinaryCacheStoreImpl : public S3BinaryCacheStore
{
std::string bucketName;
ref<Aws::Client::ClientConfiguration> config;
ref<Aws::S3::S3Client> client;
Stats stats;
S3Helper s3Helper;
S3BinaryCacheStoreImpl(
const Params & params, const std::string & bucketName)
: S3BinaryCacheStore(params)
, bucketName(bucketName)
, config(makeConfig())
, client(make_ref<Aws::S3::S3Client>(*config))
{
diskCache = getNarInfoDiskCache();
}
@ -84,15 +135,6 @@ struct S3BinaryCacheStoreImpl : public S3BinaryCacheStore
return "s3://" + bucketName;
}
ref<Aws::Client::ClientConfiguration> makeConfig()
{
initAWS();
auto res = make_ref<Aws::Client::ClientConfiguration>();
res->region = Aws::Region::US_EAST_1; // FIXME: make configurable
res->requestTimeoutMs = 600 * 1000;
return res;
}
void init() override
{
if (!diskCache->cacheExists(getUri(), wantMassQuery_, priority)) {
@ -100,7 +142,7 @@ struct S3BinaryCacheStoreImpl : public S3BinaryCacheStore
/* Create the bucket if it doesn't already exists. */
// FIXME: HeadBucket would be more appropriate, but doesn't return
// an easily parsed 404 message.
auto res = client->GetBucketLocation(
auto res = s3Helper.client->GetBucketLocation(
Aws::S3::Model::GetBucketLocationRequest().WithBucket(bucketName));
if (!res.IsSuccess()) {
@ -108,7 +150,7 @@ struct S3BinaryCacheStoreImpl : public S3BinaryCacheStore
throw Error(format("AWS error checking bucket %s: %s") % bucketName % res.GetError().GetMessage());
checkAws(format("AWS error creating bucket %s") % bucketName,
client->CreateBucket(
s3Helper.client->CreateBucket(
Aws::S3::Model::CreateBucketRequest()
.WithBucket(bucketName)
.WithCreateBucketConfiguration(
@ -146,7 +188,7 @@ struct S3BinaryCacheStoreImpl : public S3BinaryCacheStore
{
stats.head++;
auto res = client->HeadObject(
auto res = s3Helper.client->HeadObject(
Aws::S3::Model::HeadObjectRequest()
.WithBucket(bucketName)
.WithKey(path));
@ -179,7 +221,7 @@ struct S3BinaryCacheStoreImpl : public S3BinaryCacheStore
auto now1 = std::chrono::steady_clock::now();
auto result = checkAws(format("AWS error uploading %s") % path,
client->PutObject(request));
s3Helper.client->PutObject(request));
auto now2 = std::chrono::steady_clock::now();
@ -198,42 +240,18 @@ struct S3BinaryCacheStoreImpl : public S3BinaryCacheStore
sync2async<std::shared_ptr<std::string>>(success, failure, [&]() {
debug(format("fetching s3://%1%/%2%...") % bucketName % path);
auto request =
Aws::S3::Model::GetObjectRequest()
.WithBucket(bucketName)
.WithKey(path);
request.SetResponseStreamFactory([&]() {
return Aws::New<std::stringstream>("STRINGSTREAM");
});
stats.get++;
try {
auto res = s3Helper.getObject(bucketName, path);
auto now1 = std::chrono::steady_clock::now();
stats.getBytes += res.data ? res.data->size() : 0;
stats.getTimeMs += res.durationMs;
auto result = checkAws(format("AWS error fetching %s") % path,
client->GetObject(request));
if (res.data)
printTalkative("downloaded s3://%s/%s (%d bytes) in %d ms",
bucketName, path, res.data->size(), res.durationMs);
auto now2 = std::chrono::steady_clock::now();
auto res = dynamic_cast<std::stringstream &>(result.GetBody()).str();
auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(now2 - now1).count();
printMsg(lvlTalkative, format("downloaded s3://%1%/%2% (%3% bytes) in %4% ms")
% bucketName % path % res.size() % duration);
stats.getBytes += res.size();
stats.getTimeMs += duration;
return std::make_shared<std::string>(res);
} catch (S3Error & e) {
if (e.err == Aws::S3::S3Errors::NO_SUCH_KEY) return std::shared_ptr<std::string>();
throw;
}
return res.data;
});
}
@ -246,7 +264,7 @@ struct S3BinaryCacheStoreImpl : public S3BinaryCacheStore
debug(format("listing bucket s3://%s from key %s...") % bucketName % marker);
auto res = checkAws(format("AWS error listing bucket %s") % bucketName,
client->ListObjects(
s3Helper.client->ListObjects(
Aws::S3::Model::ListObjectsRequest()
.WithBucket(bucketName)
.WithDelimiter("/")
@ -281,7 +299,8 @@ static RegisterStoreImplementation regStore([](
return store;
});
#endif
}
#endif
#endif

33
src/libstore/s3.hh Normal file
View File

@ -0,0 +1,33 @@
#pragma once
#if ENABLE_S3
#include "ref.hh"
namespace Aws { namespace Client { class ClientConfiguration; } }
namespace Aws { namespace S3 { class S3Client; } }
namespace nix {
struct S3Helper
{
ref<Aws::Client::ClientConfiguration> config;
ref<Aws::S3::S3Client> client;
S3Helper();
ref<Aws::Client::ClientConfiguration> makeConfig();
struct DownloadResult
{
std::shared_ptr<std::string> data;
unsigned int durationMs;
};
DownloadResult getObject(
const std::string & bucketName, const std::string & key);
};
}
#endif

View File

@ -78,6 +78,7 @@ extern Verbosity verbosity; /* suppress msgs > this */
#define printError(args...) printMsg(lvlError, args)
#define printInfo(args...) printMsg(lvlInfo, args)
#define printTalkative(args...) printMsg(lvlTalkative, args)
#define debug(args...) printMsg(lvlDebug, args)
#define vomit(args...) printMsg(lvlVomit, args)