From beab9eb978105cccafd0710f06408b41d872395e Mon Sep 17 00:00:00 2001 From: Philipp Otterbein Date: Wed, 19 Feb 2025 18:51:02 +0100 Subject: [PATCH 1/3] libstore S3: fix progress bar and make file transfers interruptible (cherry picked from commit 9da01e69f96346d73c2d1c03adce109f3e57a9a4) --- src/libstore/filetransfer.cc | 4 - src/libstore/s3-binary-cache-store.cc | 115 ++++++++++++++++++++++---- 2 files changed, 101 insertions(+), 18 deletions(-) diff --git a/src/libstore/filetransfer.cc b/src/libstore/filetransfer.cc index 49453f6df..485250a6b 100644 --- a/src/libstore/filetransfer.cc +++ b/src/libstore/filetransfer.cc @@ -789,10 +789,6 @@ struct curlFileTransfer : public FileTransfer S3Helper s3Helper(profile, region, scheme, endpoint); - Activity act(*logger, lvlTalkative, actFileTransfer, - fmt("downloading '%s'", request.uri), - {request.uri}, request.parentAct); - // FIXME: implement ETag auto s3Res = s3Helper.getObject(bucketName, key); FileTransferResult res; diff --git a/src/libstore/s3-binary-cache-store.cc b/src/libstore/s3-binary-cache-store.cc index 87f5feb45..ca03c7cd8 100644 --- a/src/libstore/s3-binary-cache-store.cc +++ b/src/libstore/s3-binary-cache-store.cc @@ -160,7 +160,10 @@ ref S3Helper::makeConfig( S3Helper::FileTransferResult S3Helper::getObject( const std::string & bucketName, const std::string & key) { - debug("fetching 's3://%s/%s'...", bucketName, key); + std::string uri = "s3://" + bucketName + "/" + key; + Activity act(*logger, lvlTalkative, actFileTransfer, + fmt("downloading '%s'", uri), + Logger::Fields{uri}, getCurActivity()); auto request = Aws::S3::Model::GetObjectRequest() @@ -171,6 +174,26 @@ S3Helper::FileTransferResult S3Helper::getObject( return Aws::New("STRINGSTREAM"); }); + size_t bytesDone = 0; + size_t bytesExpected = 0; + request.SetDataReceivedEventHandler([&](const Aws::Http::HttpRequest * req, Aws::Http::HttpResponse * resp, long long l) { + if (!bytesExpected && resp->HasHeader("Content-Length")) { + if (auto length = string2Int(resp->GetHeader("Content-Length"))) { + bytesExpected = *length; + } + } + bytesDone += l; + act.progress(bytesDone, bytesExpected); + }); + + request.SetContinueRequestHandler([](const Aws::Http::HttpRequest*) { + try { + checkInterrupt(); + return true; + } catch(...) {} + return false; + }); + FileTransferResult res; auto now1 = std::chrono::steady_clock::now(); @@ -180,6 +203,8 @@ S3Helper::FileTransferResult S3Helper::getObject( auto result = checkAws(fmt("AWS error fetching '%s'", key), client->GetObject(request)); + act.progress(result.GetContentLength(), result.GetContentLength()); + res.data = decompress(result.GetContentEncoding(), dynamic_cast(result.GetBody()).str()); @@ -307,11 +332,35 @@ struct S3BinaryCacheStoreImpl : virtual S3BinaryCacheStoreConfig, public virtual std::shared_ptr transferManager; std::once_flag transferManagerCreated; + struct AsyncContext : public Aws::Client::AsyncCallerContext + { + mutable std::mutex mutex; + mutable std::condition_variable cv; + const Activity & act; + + void notify() const + { + cv.notify_one(); + } + + void wait() const + { + std::unique_lock lk(mutex); + cv.wait(lk); + } + + AsyncContext(const Activity & act) : act(act) {} + }; + void uploadFile(const std::string & path, std::shared_ptr> istream, const std::string & mimeType, const std::string & contentEncoding) { + std::string uri = "s3://" + bucketName + "/" + path; + Activity act(*logger, lvlTalkative, actFileTransfer, + fmt("uploading '%s'", uri), + Logger::Fields{uri}, getCurActivity()); istream->seekg(0, istream->end); auto size = istream->tellg(); istream->seekg(0, istream->beg); @@ -330,16 +379,25 @@ struct S3BinaryCacheStoreImpl : virtual S3BinaryCacheStoreConfig, public virtual transferConfig.bufferSize = bufferSize; transferConfig.uploadProgressCallback = - [](const TransferManager *transferManager, - const std::shared_ptr - &transferHandle) + [](const TransferManager * transferManager, + const std::shared_ptr & transferHandle) { - //FIXME: find a way to properly abort the multipart upload. - //checkInterrupt(); - debug("upload progress ('%s'): '%d' of '%d' bytes", - transferHandle->GetKey(), - transferHandle->GetBytesTransferred(), - transferHandle->GetBytesTotalSize()); + auto context = std::dynamic_pointer_cast(transferHandle->GetContext()); + size_t bytesDone = transferHandle->GetBytesTransferred(); + size_t bytesTotal = transferHandle->GetBytesTotalSize(); + try { + checkInterrupt(); + context->act.progress(bytesDone, bytesTotal); + } catch (...) { + context->notify(); + } + }; + transferConfig.transferStatusUpdatedCallback = + [](const TransferManager * transferManager, + const std::shared_ptr & transferHandle) + { + auto context = std::dynamic_pointer_cast(transferHandle->GetContext()); + context->notify(); }; transferManager = TransferManager::Create(transferConfig); @@ -353,29 +411,56 @@ struct S3BinaryCacheStoreImpl : virtual S3BinaryCacheStoreConfig, public virtual if (contentEncoding != "") throw Error("setting a content encoding is not supported with S3 multi-part uploads"); + auto context = std::make_shared(act); std::shared_ptr transferHandle = transferManager->UploadFile( istream, bucketName, path, mimeType, Aws::Map(), - nullptr /*, contentEncoding */); + context /*, contentEncoding */); - transferHandle->WaitUntilFinished(); + TransferStatus status = transferHandle->GetStatus(); + while (status == TransferStatus::IN_PROGRESS || status == TransferStatus::NOT_STARTED) { + try { + checkInterrupt(); + context->wait(); + } catch (...) { + transferHandle->Cancel(); + transferHandle->WaitUntilFinished(); + } + status = transferHandle->GetStatus(); + } + act.progress(transferHandle->GetBytesTransferred(), transferHandle->GetBytesTotalSize()); - if (transferHandle->GetStatus() == TransferStatus::FAILED) + if (status == TransferStatus::FAILED) throw Error("AWS error: failed to upload 's3://%s/%s': %s", bucketName, path, transferHandle->GetLastError().GetMessage()); - if (transferHandle->GetStatus() != TransferStatus::COMPLETED) + if (status != TransferStatus::COMPLETED) throw Error("AWS error: transfer status of 's3://%s/%s' in unexpected state", bucketName, path); } else { + act.progress(0, size); auto request = Aws::S3::Model::PutObjectRequest() .WithBucket(bucketName) .WithKey(path); + size_t bytesSent = 0; + request.SetDataSentEventHandler([&](const Aws::Http::HttpRequest * req, long long l) { + bytesSent += l; + act.progress(bytesSent, size); + }); + + request.SetContinueRequestHandler([](const Aws::Http::HttpRequest*) { + try { + checkInterrupt(); + return true; + } catch(...) {} + return false; + }); + request.SetContentType(mimeType); if (contentEncoding != "") @@ -385,6 +470,8 @@ struct S3BinaryCacheStoreImpl : virtual S3BinaryCacheStoreConfig, public virtual auto result = checkAws(fmt("AWS error uploading '%s'", path), s3Helper.client->PutObject(request)); + + act.progress(size, size); } auto now2 = std::chrono::steady_clock::now(); From c53bd8905b239bf341df39d6488008f36abd6f8d Mon Sep 17 00:00:00 2001 From: Philipp Otterbein Date: Wed, 12 Mar 2025 00:50:20 +0100 Subject: [PATCH 2/3] libstore: same progress bar behavior for PUT and POST requests - no differentiation between uploads and downloads in CLI (cherry picked from commit db297d3dda12306459341da01e9892b4df2d6d37) --- src/libstore/filetransfer.cc | 24 +++++-------------- .../include/nix/store/filetransfer.hh | 2 +- 2 files changed, 7 insertions(+), 19 deletions(-) diff --git a/src/libstore/filetransfer.cc b/src/libstore/filetransfer.cc index 485250a6b..08c782139 100644 --- a/src/libstore/filetransfer.cc +++ b/src/libstore/filetransfer.cc @@ -95,7 +95,7 @@ struct curlFileTransfer : public FileTransfer : fileTransfer(fileTransfer) , request(request) , act(*logger, lvlTalkative, actFileTransfer, - request.post ? "" : fmt(request.data ? "uploading '%s'" : "downloading '%s'", request.uri), + fmt("%sing '%s'", request.verb(), request.uri), {request.uri}, request.parentAct) , callback(std::move(callback)) , finalSink([this](std::string_view data) { @@ -272,19 +272,11 @@ struct curlFileTransfer : public FileTransfer return getInterrupted(); } - int silentProgressCallback(curl_off_t dltotal, curl_off_t dlnow) - { - return getInterrupted(); - } - static int progressCallbackWrapper(void * userp, curl_off_t dltotal, curl_off_t dlnow, curl_off_t ultotal, curl_off_t ulnow) { - return ((TransferItem *) userp)->progressCallback(dltotal, dlnow); - } - - static int silentProgressCallbackWrapper(void * userp, curl_off_t dltotal, curl_off_t dlnow, curl_off_t ultotal, curl_off_t ulnow) - { - return ((TransferItem *) userp)->silentProgressCallback(dltotal, dlnow); + auto & item = *static_cast(userp); + auto isUpload = bool(item.request.data); + return item.progressCallback(isUpload ? ultotal : dltotal, isUpload ? ulnow : dlnow); } static int debugCallback(CURL * handle, curl_infotype type, char * data, size_t size, void * userptr) @@ -351,10 +343,7 @@ struct curlFileTransfer : public FileTransfer curl_easy_setopt(req, CURLOPT_HEADERFUNCTION, TransferItem::headerCallbackWrapper); curl_easy_setopt(req, CURLOPT_HEADERDATA, this); - if (request.post) - curl_easy_setopt(req, CURLOPT_XFERINFOFUNCTION, silentProgressCallbackWrapper); - else - curl_easy_setopt(req, CURLOPT_XFERINFOFUNCTION, progressCallbackWrapper); + curl_easy_setopt(req, CURLOPT_XFERINFOFUNCTION, progressCallbackWrapper); curl_easy_setopt(req, CURLOPT_XFERINFODATA, this); curl_easy_setopt(req, CURLOPT_NOPROGRESS, 0); @@ -447,8 +436,7 @@ struct curlFileTransfer : public FileTransfer if (httpStatus == 304 && result.etag == "") result.etag = request.expectedETag; - if (!request.post) - act.progress(result.bodySize, result.bodySize); + act.progress(result.bodySize, result.bodySize); done = true; callback(std::move(result)); } diff --git a/src/libstore/include/nix/store/filetransfer.hh b/src/libstore/include/nix/store/filetransfer.hh index 217c52d77..f87f68e7f 100644 --- a/src/libstore/include/nix/store/filetransfer.hh +++ b/src/libstore/include/nix/store/filetransfer.hh @@ -77,7 +77,7 @@ struct FileTransferRequest FileTransferRequest(std::string_view uri) : uri(uri), parentAct(getCurActivity()) { } - std::string verb() + std::string verb() const { return data ? "upload" : "download"; } From 61bb40583987ccc2738f488de4f2e24b7cab0c2a Mon Sep 17 00:00:00 2001 From: Philipp Otterbein Date: Fri, 11 Apr 2025 22:34:15 +0200 Subject: [PATCH 3/3] add isInterrupted() call and replace some checkInterrupt() occurrences (cherry picked from commit 49f757c24ae10e6d32c19e27fd646fc21aca7679) --- src/libstore/s3-binary-cache-store.cc | 17 ++++------------- src/libutil/include/nix/util/signals.hh | 5 +++++ .../unix/include/nix/util/signals-impl.hh | 13 +++++++++---- .../windows/include/nix/util/signals-impl.hh | 7 ++++++- 4 files changed, 24 insertions(+), 18 deletions(-) diff --git a/src/libstore/s3-binary-cache-store.cc b/src/libstore/s3-binary-cache-store.cc index ca03c7cd8..f9e583307 100644 --- a/src/libstore/s3-binary-cache-store.cc +++ b/src/libstore/s3-binary-cache-store.cc @@ -187,11 +187,7 @@ S3Helper::FileTransferResult S3Helper::getObject( }); request.SetContinueRequestHandler([](const Aws::Http::HttpRequest*) { - try { - checkInterrupt(); - return true; - } catch(...) {} - return false; + return !isInterrupted(); }); FileTransferResult res; @@ -420,10 +416,9 @@ struct S3BinaryCacheStoreImpl : virtual S3BinaryCacheStoreConfig, public virtual TransferStatus status = transferHandle->GetStatus(); while (status == TransferStatus::IN_PROGRESS || status == TransferStatus::NOT_STARTED) { - try { - checkInterrupt(); + if (!isInterrupted()) { context->wait(); - } catch (...) { + } else { transferHandle->Cancel(); transferHandle->WaitUntilFinished(); } @@ -454,11 +449,7 @@ struct S3BinaryCacheStoreImpl : virtual S3BinaryCacheStoreConfig, public virtual }); request.SetContinueRequestHandler([](const Aws::Http::HttpRequest*) { - try { - checkInterrupt(); - return true; - } catch(...) {} - return false; + return !isInterrupted(); }); request.SetContentType(mimeType); diff --git a/src/libutil/include/nix/util/signals.hh b/src/libutil/include/nix/util/signals.hh index 45130a90c..5a2ba8e75 100644 --- a/src/libutil/include/nix/util/signals.hh +++ b/src/libutil/include/nix/util/signals.hh @@ -26,6 +26,11 @@ static inline bool getInterrupted(); */ void setInterruptThrown(); +/** + * @note Does nothing on Windows + */ +static inline bool isInterrupted(); + /** * @note Does nothing on Windows */ diff --git a/src/libutil/unix/include/nix/util/signals-impl.hh b/src/libutil/unix/include/nix/util/signals-impl.hh index ffa967344..7397744b2 100644 --- a/src/libutil/unix/include/nix/util/signals-impl.hh +++ b/src/libutil/unix/include/nix/util/signals-impl.hh @@ -85,17 +85,22 @@ static inline bool getInterrupted() return unix::_isInterrupted; } +static inline bool isInterrupted() +{ + using namespace unix; + return _isInterrupted || (interruptCheck && interruptCheck()); +} + /** * Throw `Interrupted` exception if the process has been interrupted. * * Call this in long-running loops and between slow operations to terminate * them as needed. */ -void inline checkInterrupt() +inline void checkInterrupt() { - using namespace unix; - if (_isInterrupted || (interruptCheck && interruptCheck())) - _interrupted(); + if (isInterrupted()) + unix::_interrupted(); } /** diff --git a/src/libutil/windows/include/nix/util/signals-impl.hh b/src/libutil/windows/include/nix/util/signals-impl.hh index 043f39100..f716ffd1a 100644 --- a/src/libutil/windows/include/nix/util/signals-impl.hh +++ b/src/libutil/windows/include/nix/util/signals-impl.hh @@ -22,7 +22,12 @@ inline void setInterruptThrown() /* Do nothing for now */ } -void inline checkInterrupt() +static inline bool isInterrupted() +{ + /* Do nothing for now */ +} + +inline void checkInterrupt() { /* Do nothing for now */ }