From beab9eb978105cccafd0710f06408b41d872395e Mon Sep 17 00:00:00 2001 From: Philipp Otterbein Date: Wed, 19 Feb 2025 18:51:02 +0100 Subject: [PATCH] libstore S3: fix progress bar and make file transfers interruptible (cherry picked from commit 9da01e69f96346d73c2d1c03adce109f3e57a9a4) --- src/libstore/filetransfer.cc | 4 - src/libstore/s3-binary-cache-store.cc | 115 ++++++++++++++++++++++---- 2 files changed, 101 insertions(+), 18 deletions(-) diff --git a/src/libstore/filetransfer.cc b/src/libstore/filetransfer.cc index 49453f6df..485250a6b 100644 --- a/src/libstore/filetransfer.cc +++ b/src/libstore/filetransfer.cc @@ -789,10 +789,6 @@ struct curlFileTransfer : public FileTransfer S3Helper s3Helper(profile, region, scheme, endpoint); - Activity act(*logger, lvlTalkative, actFileTransfer, - fmt("downloading '%s'", request.uri), - {request.uri}, request.parentAct); - // FIXME: implement ETag auto s3Res = s3Helper.getObject(bucketName, key); FileTransferResult res; diff --git a/src/libstore/s3-binary-cache-store.cc b/src/libstore/s3-binary-cache-store.cc index 87f5feb45..ca03c7cd8 100644 --- a/src/libstore/s3-binary-cache-store.cc +++ b/src/libstore/s3-binary-cache-store.cc @@ -160,7 +160,10 @@ ref S3Helper::makeConfig( S3Helper::FileTransferResult S3Helper::getObject( const std::string & bucketName, const std::string & key) { - debug("fetching 's3://%s/%s'...", bucketName, key); + std::string uri = "s3://" + bucketName + "/" + key; + Activity act(*logger, lvlTalkative, actFileTransfer, + fmt("downloading '%s'", uri), + Logger::Fields{uri}, getCurActivity()); auto request = Aws::S3::Model::GetObjectRequest() @@ -171,6 +174,26 @@ S3Helper::FileTransferResult S3Helper::getObject( return Aws::New("STRINGSTREAM"); }); + size_t bytesDone = 0; + size_t bytesExpected = 0; + request.SetDataReceivedEventHandler([&](const Aws::Http::HttpRequest * req, Aws::Http::HttpResponse * resp, long long l) { + if (!bytesExpected && resp->HasHeader("Content-Length")) { + if (auto length = string2Int(resp->GetHeader("Content-Length"))) { + bytesExpected = *length; + } + } + bytesDone += l; + act.progress(bytesDone, bytesExpected); + }); + + request.SetContinueRequestHandler([](const Aws::Http::HttpRequest*) { + try { + checkInterrupt(); + return true; + } catch(...) {} + return false; + }); + FileTransferResult res; auto now1 = std::chrono::steady_clock::now(); @@ -180,6 +203,8 @@ S3Helper::FileTransferResult S3Helper::getObject( auto result = checkAws(fmt("AWS error fetching '%s'", key), client->GetObject(request)); + act.progress(result.GetContentLength(), result.GetContentLength()); + res.data = decompress(result.GetContentEncoding(), dynamic_cast(result.GetBody()).str()); @@ -307,11 +332,35 @@ struct S3BinaryCacheStoreImpl : virtual S3BinaryCacheStoreConfig, public virtual std::shared_ptr transferManager; std::once_flag transferManagerCreated; + struct AsyncContext : public Aws::Client::AsyncCallerContext + { + mutable std::mutex mutex; + mutable std::condition_variable cv; + const Activity & act; + + void notify() const + { + cv.notify_one(); + } + + void wait() const + { + std::unique_lock lk(mutex); + cv.wait(lk); + } + + AsyncContext(const Activity & act) : act(act) {} + }; + void uploadFile(const std::string & path, std::shared_ptr> istream, const std::string & mimeType, const std::string & contentEncoding) { + std::string uri = "s3://" + bucketName + "/" + path; + Activity act(*logger, lvlTalkative, actFileTransfer, + fmt("uploading '%s'", uri), + Logger::Fields{uri}, getCurActivity()); istream->seekg(0, istream->end); auto size = istream->tellg(); istream->seekg(0, istream->beg); @@ -330,16 +379,25 @@ struct S3BinaryCacheStoreImpl : virtual S3BinaryCacheStoreConfig, public virtual transferConfig.bufferSize = bufferSize; transferConfig.uploadProgressCallback = - [](const TransferManager *transferManager, - const std::shared_ptr - &transferHandle) + [](const TransferManager * transferManager, + const std::shared_ptr & transferHandle) { - //FIXME: find a way to properly abort the multipart upload. - //checkInterrupt(); - debug("upload progress ('%s'): '%d' of '%d' bytes", - transferHandle->GetKey(), - transferHandle->GetBytesTransferred(), - transferHandle->GetBytesTotalSize()); + auto context = std::dynamic_pointer_cast(transferHandle->GetContext()); + size_t bytesDone = transferHandle->GetBytesTransferred(); + size_t bytesTotal = transferHandle->GetBytesTotalSize(); + try { + checkInterrupt(); + context->act.progress(bytesDone, bytesTotal); + } catch (...) { + context->notify(); + } + }; + transferConfig.transferStatusUpdatedCallback = + [](const TransferManager * transferManager, + const std::shared_ptr & transferHandle) + { + auto context = std::dynamic_pointer_cast(transferHandle->GetContext()); + context->notify(); }; transferManager = TransferManager::Create(transferConfig); @@ -353,29 +411,56 @@ struct S3BinaryCacheStoreImpl : virtual S3BinaryCacheStoreConfig, public virtual if (contentEncoding != "") throw Error("setting a content encoding is not supported with S3 multi-part uploads"); + auto context = std::make_shared(act); std::shared_ptr transferHandle = transferManager->UploadFile( istream, bucketName, path, mimeType, Aws::Map(), - nullptr /*, contentEncoding */); + context /*, contentEncoding */); - transferHandle->WaitUntilFinished(); + TransferStatus status = transferHandle->GetStatus(); + while (status == TransferStatus::IN_PROGRESS || status == TransferStatus::NOT_STARTED) { + try { + checkInterrupt(); + context->wait(); + } catch (...) { + transferHandle->Cancel(); + transferHandle->WaitUntilFinished(); + } + status = transferHandle->GetStatus(); + } + act.progress(transferHandle->GetBytesTransferred(), transferHandle->GetBytesTotalSize()); - if (transferHandle->GetStatus() == TransferStatus::FAILED) + if (status == TransferStatus::FAILED) throw Error("AWS error: failed to upload 's3://%s/%s': %s", bucketName, path, transferHandle->GetLastError().GetMessage()); - if (transferHandle->GetStatus() != TransferStatus::COMPLETED) + if (status != TransferStatus::COMPLETED) throw Error("AWS error: transfer status of 's3://%s/%s' in unexpected state", bucketName, path); } else { + act.progress(0, size); auto request = Aws::S3::Model::PutObjectRequest() .WithBucket(bucketName) .WithKey(path); + size_t bytesSent = 0; + request.SetDataSentEventHandler([&](const Aws::Http::HttpRequest * req, long long l) { + bytesSent += l; + act.progress(bytesSent, size); + }); + + request.SetContinueRequestHandler([](const Aws::Http::HttpRequest*) { + try { + checkInterrupt(); + return true; + } catch(...) {} + return false; + }); + request.SetContentType(mimeType); if (contentEncoding != "") @@ -385,6 +470,8 @@ struct S3BinaryCacheStoreImpl : virtual S3BinaryCacheStoreConfig, public virtual auto result = checkAws(fmt("AWS error uploading '%s'", path), s3Helper.client->PutObject(request)); + + act.progress(size, size); } auto now2 = std::chrono::steady_clock::now();