libstore S3: fix progress bar and make file transfers interruptible

(cherry picked from commit 9da01e69f9)
This commit is contained in:
Philipp Otterbein 2025-02-19 18:51:02 +01:00 committed by Mergify
parent 4d990f1459
commit beab9eb978
2 changed files with 101 additions and 18 deletions

View File

@ -789,10 +789,6 @@ struct curlFileTransfer : public FileTransfer
S3Helper s3Helper(profile, region, scheme, endpoint);
Activity act(*logger, lvlTalkative, actFileTransfer,
fmt("downloading '%s'", request.uri),
{request.uri}, request.parentAct);
// FIXME: implement ETag
auto s3Res = s3Helper.getObject(bucketName, key);
FileTransferResult res;

View File

@ -160,7 +160,10 @@ ref<Aws::Client::ClientConfiguration> S3Helper::makeConfig(
S3Helper::FileTransferResult S3Helper::getObject(
const std::string & bucketName, const std::string & key)
{
debug("fetching 's3://%s/%s'...", bucketName, key);
std::string uri = "s3://" + bucketName + "/" + key;
Activity act(*logger, lvlTalkative, actFileTransfer,
fmt("downloading '%s'", uri),
Logger::Fields{uri}, getCurActivity());
auto request =
Aws::S3::Model::GetObjectRequest()
@ -171,6 +174,26 @@ S3Helper::FileTransferResult S3Helper::getObject(
return Aws::New<std::stringstream>("STRINGSTREAM");
});
size_t bytesDone = 0;
size_t bytesExpected = 0;
request.SetDataReceivedEventHandler([&](const Aws::Http::HttpRequest * req, Aws::Http::HttpResponse * resp, long long l) {
if (!bytesExpected && resp->HasHeader("Content-Length")) {
if (auto length = string2Int<size_t>(resp->GetHeader("Content-Length"))) {
bytesExpected = *length;
}
}
bytesDone += l;
act.progress(bytesDone, bytesExpected);
});
request.SetContinueRequestHandler([](const Aws::Http::HttpRequest*) {
try {
checkInterrupt();
return true;
} catch(...) {}
return false;
});
FileTransferResult res;
auto now1 = std::chrono::steady_clock::now();
@ -180,6 +203,8 @@ S3Helper::FileTransferResult S3Helper::getObject(
auto result = checkAws(fmt("AWS error fetching '%s'", key),
client->GetObject(request));
act.progress(result.GetContentLength(), result.GetContentLength());
res.data = decompress(result.GetContentEncoding(),
dynamic_cast<std::stringstream &>(result.GetBody()).str());
@ -307,11 +332,35 @@ struct S3BinaryCacheStoreImpl : virtual S3BinaryCacheStoreConfig, public virtual
std::shared_ptr<TransferManager> transferManager;
std::once_flag transferManagerCreated;
struct AsyncContext : public Aws::Client::AsyncCallerContext
{
mutable std::mutex mutex;
mutable std::condition_variable cv;
const Activity & act;
void notify() const
{
cv.notify_one();
}
void wait() const
{
std::unique_lock<std::mutex> lk(mutex);
cv.wait(lk);
}
AsyncContext(const Activity & act) : act(act) {}
};
void uploadFile(const std::string & path,
std::shared_ptr<std::basic_iostream<char>> istream,
const std::string & mimeType,
const std::string & contentEncoding)
{
std::string uri = "s3://" + bucketName + "/" + path;
Activity act(*logger, lvlTalkative, actFileTransfer,
fmt("uploading '%s'", uri),
Logger::Fields{uri}, getCurActivity());
istream->seekg(0, istream->end);
auto size = istream->tellg();
istream->seekg(0, istream->beg);
@ -330,16 +379,25 @@ struct S3BinaryCacheStoreImpl : virtual S3BinaryCacheStoreConfig, public virtual
transferConfig.bufferSize = bufferSize;
transferConfig.uploadProgressCallback =
[](const TransferManager *transferManager,
const std::shared_ptr<const TransferHandle>
&transferHandle)
[](const TransferManager * transferManager,
const std::shared_ptr<const TransferHandle> & transferHandle)
{
//FIXME: find a way to properly abort the multipart upload.
//checkInterrupt();
debug("upload progress ('%s'): '%d' of '%d' bytes",
transferHandle->GetKey(),
transferHandle->GetBytesTransferred(),
transferHandle->GetBytesTotalSize());
auto context = std::dynamic_pointer_cast<const AsyncContext>(transferHandle->GetContext());
size_t bytesDone = transferHandle->GetBytesTransferred();
size_t bytesTotal = transferHandle->GetBytesTotalSize();
try {
checkInterrupt();
context->act.progress(bytesDone, bytesTotal);
} catch (...) {
context->notify();
}
};
transferConfig.transferStatusUpdatedCallback =
[](const TransferManager * transferManager,
const std::shared_ptr<const TransferHandle> & transferHandle)
{
auto context = std::dynamic_pointer_cast<const AsyncContext>(transferHandle->GetContext());
context->notify();
};
transferManager = TransferManager::Create(transferConfig);
@ -353,29 +411,56 @@ struct S3BinaryCacheStoreImpl : virtual S3BinaryCacheStoreConfig, public virtual
if (contentEncoding != "")
throw Error("setting a content encoding is not supported with S3 multi-part uploads");
auto context = std::make_shared<AsyncContext>(act);
std::shared_ptr<TransferHandle> transferHandle =
transferManager->UploadFile(
istream, bucketName, path, mimeType,
Aws::Map<Aws::String, Aws::String>(),
nullptr /*, contentEncoding */);
context /*, contentEncoding */);
transferHandle->WaitUntilFinished();
TransferStatus status = transferHandle->GetStatus();
while (status == TransferStatus::IN_PROGRESS || status == TransferStatus::NOT_STARTED) {
try {
checkInterrupt();
context->wait();
} catch (...) {
transferHandle->Cancel();
transferHandle->WaitUntilFinished();
}
status = transferHandle->GetStatus();
}
act.progress(transferHandle->GetBytesTransferred(), transferHandle->GetBytesTotalSize());
if (transferHandle->GetStatus() == TransferStatus::FAILED)
if (status == TransferStatus::FAILED)
throw Error("AWS error: failed to upload 's3://%s/%s': %s",
bucketName, path, transferHandle->GetLastError().GetMessage());
if (transferHandle->GetStatus() != TransferStatus::COMPLETED)
if (status != TransferStatus::COMPLETED)
throw Error("AWS error: transfer status of 's3://%s/%s' in unexpected state",
bucketName, path);
} else {
act.progress(0, size);
auto request =
Aws::S3::Model::PutObjectRequest()
.WithBucket(bucketName)
.WithKey(path);
size_t bytesSent = 0;
request.SetDataSentEventHandler([&](const Aws::Http::HttpRequest * req, long long l) {
bytesSent += l;
act.progress(bytesSent, size);
});
request.SetContinueRequestHandler([](const Aws::Http::HttpRequest*) {
try {
checkInterrupt();
return true;
} catch(...) {}
return false;
});
request.SetContentType(mimeType);
if (contentEncoding != "")
@ -385,6 +470,8 @@ struct S3BinaryCacheStoreImpl : virtual S3BinaryCacheStoreConfig, public virtual
auto result = checkAws(fmt("AWS error uploading '%s'", path),
s3Helper.client->PutObject(request));
act.progress(size, size);
}
auto now2 = std::chrono::steady_clock::now();