From 9da01e69f96346d73c2d1c03adce109f3e57a9a4 Mon Sep 17 00:00:00 2001
From: Philipp Otterbein <potterbein@blockstream.com>
Date: Wed, 19 Feb 2025 18:51:02 +0100
Subject: [PATCH 1/3] libstore S3: fix progress bar and make file transfers
 interruptible

---
 src/libstore/filetransfer.cc          |   4 -
 src/libstore/s3-binary-cache-store.cc | 115 ++++++++++++++++++++++----
 2 files changed, 101 insertions(+), 18 deletions(-)

diff --git a/src/libstore/filetransfer.cc b/src/libstore/filetransfer.cc
index 475637d74..d00330601 100644
--- a/src/libstore/filetransfer.cc
+++ b/src/libstore/filetransfer.cc
@@ -789,10 +789,6 @@ struct curlFileTransfer : public FileTransfer
 
                 S3Helper s3Helper(profile, region, scheme, endpoint);
 
-                Activity act(*logger, lvlTalkative, actFileTransfer,
-                    fmt("downloading '%s'", request.uri),
-                    {request.uri}, request.parentAct);
-
                 // FIXME: implement ETag
                 auto s3Res = s3Helper.getObject(bucketName, key);
                 FileTransferResult res;
diff --git a/src/libstore/s3-binary-cache-store.cc b/src/libstore/s3-binary-cache-store.cc
index 87f5feb45..ca03c7cd8 100644
--- a/src/libstore/s3-binary-cache-store.cc
+++ b/src/libstore/s3-binary-cache-store.cc
@@ -160,7 +160,10 @@ ref<Aws::Client::ClientConfiguration> S3Helper::makeConfig(
 S3Helper::FileTransferResult S3Helper::getObject(
     const std::string & bucketName, const std::string & key)
 {
-    debug("fetching 's3://%s/%s'...", bucketName, key);
+    std::string uri = "s3://" + bucketName + "/" + key;
+    Activity act(*logger, lvlTalkative, actFileTransfer,
+        fmt("downloading '%s'", uri),
+        Logger::Fields{uri}, getCurActivity());
 
     auto request =
         Aws::S3::Model::GetObjectRequest()
@@ -171,6 +174,26 @@ S3Helper::FileTransferResult S3Helper::getObject(
         return Aws::New<std::stringstream>("STRINGSTREAM");
     });
 
+    size_t bytesDone = 0;
+    size_t bytesExpected = 0;
+    request.SetDataReceivedEventHandler([&](const Aws::Http::HttpRequest * req, Aws::Http::HttpResponse * resp, long long l) {
+        if (!bytesExpected && resp->HasHeader("Content-Length")) {
+            if (auto length = string2Int<size_t>(resp->GetHeader("Content-Length"))) {
+                bytesExpected = *length;
+            }
+        }
+        bytesDone += l;
+        act.progress(bytesDone, bytesExpected);
+    });
+
+    request.SetContinueRequestHandler([](const Aws::Http::HttpRequest*) {
+        try {
+            checkInterrupt();
+            return true;
+        } catch(...) {}
+        return false;
+    });
+
     FileTransferResult res;
 
     auto now1 = std::chrono::steady_clock::now();
@@ -180,6 +203,8 @@ S3Helper::FileTransferResult S3Helper::getObject(
         auto result = checkAws(fmt("AWS error fetching '%s'", key),
             client->GetObject(request));
 
+        act.progress(result.GetContentLength(), result.GetContentLength());
+
         res.data = decompress(result.GetContentEncoding(),
             dynamic_cast<std::stringstream &>(result.GetBody()).str());
 
@@ -307,11 +332,35 @@ struct S3BinaryCacheStoreImpl : virtual S3BinaryCacheStoreConfig, public virtual
     std::shared_ptr<TransferManager> transferManager;
     std::once_flag transferManagerCreated;
 
+    struct AsyncContext : public Aws::Client::AsyncCallerContext
+    {
+        mutable std::mutex mutex;
+        mutable std::condition_variable cv;
+        const Activity & act;
+
+        void notify() const
+        {
+            cv.notify_one();
+        }
+
+        void wait() const
+        {
+            std::unique_lock<std::mutex> lk(mutex);
+            cv.wait(lk);
+        }
+
+        AsyncContext(const Activity & act) : act(act) {}
+    };
+
     void uploadFile(const std::string & path,
         std::shared_ptr<std::basic_iostream<char>> istream,
         const std::string & mimeType,
         const std::string & contentEncoding)
     {
+        std::string uri = "s3://" + bucketName + "/" + path;
+        Activity act(*logger, lvlTalkative, actFileTransfer,
+            fmt("uploading '%s'", uri),
+            Logger::Fields{uri}, getCurActivity());
         istream->seekg(0, istream->end);
         auto size = istream->tellg();
         istream->seekg(0, istream->beg);
@@ -330,16 +379,25 @@ struct S3BinaryCacheStoreImpl : virtual S3BinaryCacheStoreConfig, public virtual
                 transferConfig.bufferSize = bufferSize;
 
                 transferConfig.uploadProgressCallback =
-                    [](const TransferManager *transferManager,
-                        const std::shared_ptr<const TransferHandle>
-                        &transferHandle)
+                    [](const TransferManager * transferManager,
+                        const std::shared_ptr<const TransferHandle> & transferHandle)
                     {
-                        //FIXME: find a way to properly abort the multipart upload.
-                        //checkInterrupt();
-                        debug("upload progress ('%s'): '%d' of '%d' bytes",
-                            transferHandle->GetKey(),
-                            transferHandle->GetBytesTransferred(),
-                            transferHandle->GetBytesTotalSize());
+                        auto context = std::dynamic_pointer_cast<const AsyncContext>(transferHandle->GetContext());
+                        size_t bytesDone = transferHandle->GetBytesTransferred();
+                        size_t bytesTotal = transferHandle->GetBytesTotalSize();
+                        try {
+                            checkInterrupt();
+                            context->act.progress(bytesDone, bytesTotal);
+                        } catch (...) {
+                            context->notify();
+                        }
+                    };
+                transferConfig.transferStatusUpdatedCallback =
+                    [](const TransferManager * transferManager,
+                        const std::shared_ptr<const TransferHandle> & transferHandle)
+                    {
+                        auto context = std::dynamic_pointer_cast<const AsyncContext>(transferHandle->GetContext());
+                        context->notify();
                     };
 
                 transferManager = TransferManager::Create(transferConfig);
@@ -353,29 +411,56 @@ struct S3BinaryCacheStoreImpl : virtual S3BinaryCacheStoreConfig, public virtual
             if (contentEncoding != "")
                 throw Error("setting a content encoding is not supported with S3 multi-part uploads");
 
+            auto context = std::make_shared<AsyncContext>(act);
             std::shared_ptr<TransferHandle> transferHandle =
                 transferManager->UploadFile(
                     istream, bucketName, path, mimeType,
                     Aws::Map<Aws::String, Aws::String>(),
-                    nullptr /*, contentEncoding */);
+                    context /*, contentEncoding */);
 
-            transferHandle->WaitUntilFinished();
+            TransferStatus status = transferHandle->GetStatus();
+            while (status == TransferStatus::IN_PROGRESS || status == TransferStatus::NOT_STARTED) {
+                try {
+                    checkInterrupt();
+                    context->wait();
+                } catch (...) {
+                    transferHandle->Cancel();
+                    transferHandle->WaitUntilFinished();
+                }
+                status = transferHandle->GetStatus();
+            }
+            act.progress(transferHandle->GetBytesTransferred(), transferHandle->GetBytesTotalSize());
 
-            if (transferHandle->GetStatus() == TransferStatus::FAILED)
+            if (status == TransferStatus::FAILED)
                 throw Error("AWS error: failed to upload 's3://%s/%s': %s",
                     bucketName, path, transferHandle->GetLastError().GetMessage());
 
-            if (transferHandle->GetStatus() != TransferStatus::COMPLETED)
+            if (status != TransferStatus::COMPLETED)
                 throw Error("AWS error: transfer status of 's3://%s/%s' in unexpected state",
                     bucketName, path);
 
         } else {
+            act.progress(0, size);
 
             auto request =
                 Aws::S3::Model::PutObjectRequest()
                 .WithBucket(bucketName)
                 .WithKey(path);
 
+            size_t bytesSent = 0;
+            request.SetDataSentEventHandler([&](const Aws::Http::HttpRequest * req, long long l) {
+                bytesSent += l;
+                act.progress(bytesSent, size);
+            });
+
+            request.SetContinueRequestHandler([](const Aws::Http::HttpRequest*) {
+                try {
+                    checkInterrupt();
+                    return true;
+                } catch(...) {}
+                return false;
+            });
+
             request.SetContentType(mimeType);
 
             if (contentEncoding != "")
@@ -385,6 +470,8 @@ struct S3BinaryCacheStoreImpl : virtual S3BinaryCacheStoreConfig, public virtual
 
             auto result = checkAws(fmt("AWS error uploading '%s'", path),
                 s3Helper.client->PutObject(request));
+
+            act.progress(size, size);
         }
 
         auto now2 = std::chrono::steady_clock::now();

From db297d3dda12306459341da01e9892b4df2d6d37 Mon Sep 17 00:00:00 2001
From: Philipp Otterbein <potterbein@blockstream.com>
Date: Wed, 12 Mar 2025 00:50:20 +0100
Subject: [PATCH 2/3] libstore: same progress bar behavior for PUT and POST
 requests

- no differentiation between uploads and downloads in CLI
---
 src/libstore/filetransfer.cc                  | 24 +++++--------------
 .../include/nix/store/filetransfer.hh         |  2 +-
 2 files changed, 7 insertions(+), 19 deletions(-)

diff --git a/src/libstore/filetransfer.cc b/src/libstore/filetransfer.cc
index d00330601..a917188d9 100644
--- a/src/libstore/filetransfer.cc
+++ b/src/libstore/filetransfer.cc
@@ -93,7 +93,7 @@ struct curlFileTransfer : public FileTransfer
             : fileTransfer(fileTransfer)
             , request(request)
             , act(*logger, lvlTalkative, actFileTransfer,
-                request.post ? "" : fmt(request.data ?  "uploading '%s'" : "downloading '%s'", request.uri),
+                fmt("%sing '%s'", request.verb(), request.uri),
                 {request.uri}, request.parentAct)
             , callback(std::move(callback))
             , finalSink([this](std::string_view data) {
@@ -270,19 +270,11 @@ struct curlFileTransfer : public FileTransfer
             return getInterrupted();
         }
 
-        int silentProgressCallback(curl_off_t dltotal, curl_off_t dlnow)
-        {
-            return getInterrupted();
-        }
-
         static int progressCallbackWrapper(void * userp, curl_off_t dltotal, curl_off_t dlnow, curl_off_t ultotal, curl_off_t ulnow)
         {
-            return ((TransferItem *) userp)->progressCallback(dltotal, dlnow);
-        }
-
-        static int silentProgressCallbackWrapper(void * userp, curl_off_t dltotal, curl_off_t dlnow, curl_off_t ultotal, curl_off_t ulnow)
-        {
-            return ((TransferItem *) userp)->silentProgressCallback(dltotal, dlnow);
+            auto & item = *static_cast<TransferItem *>(userp);
+            auto isUpload = bool(item.request.data);
+            return item.progressCallback(isUpload ? ultotal : dltotal, isUpload ? ulnow : dlnow);
         }
 
         static int debugCallback(CURL * handle, curl_infotype type, char * data, size_t size, void * userptr)
@@ -349,10 +341,7 @@ struct curlFileTransfer : public FileTransfer
             curl_easy_setopt(req, CURLOPT_HEADERFUNCTION, TransferItem::headerCallbackWrapper);
             curl_easy_setopt(req, CURLOPT_HEADERDATA, this);
 
-            if (request.post)
-                curl_easy_setopt(req, CURLOPT_XFERINFOFUNCTION, silentProgressCallbackWrapper);
-            else
-                curl_easy_setopt(req, CURLOPT_XFERINFOFUNCTION, progressCallbackWrapper);
+            curl_easy_setopt(req, CURLOPT_XFERINFOFUNCTION, progressCallbackWrapper);
             curl_easy_setopt(req, CURLOPT_XFERINFODATA, this);
             curl_easy_setopt(req, CURLOPT_NOPROGRESS, 0);
 
@@ -445,8 +434,7 @@ struct curlFileTransfer : public FileTransfer
                 if (httpStatus == 304 && result.etag == "")
                     result.etag = request.expectedETag;
 
-                if (!request.post)
-                    act.progress(result.bodySize, result.bodySize);
+                act.progress(result.bodySize, result.bodySize);
                 done = true;
                 callback(std::move(result));
             }
diff --git a/src/libstore/include/nix/store/filetransfer.hh b/src/libstore/include/nix/store/filetransfer.hh
index b0fe8fcce..f9b1f620f 100644
--- a/src/libstore/include/nix/store/filetransfer.hh
+++ b/src/libstore/include/nix/store/filetransfer.hh
@@ -77,7 +77,7 @@ struct FileTransferRequest
     FileTransferRequest(std::string_view uri)
         : uri(uri), parentAct(getCurActivity()) { }
 
-    std::string verb()
+    std::string verb() const
     {
         return data ? "upload" : "download";
     }

From 49f757c24ae10e6d32c19e27fd646fc21aca7679 Mon Sep 17 00:00:00 2001
From: Philipp Otterbein <potterbein@blockstream.com>
Date: Fri, 11 Apr 2025 22:34:15 +0200
Subject: [PATCH 3/3] add isInterrupted() call and replace some
 checkInterrupt() occurrences

---
 src/libstore/s3-binary-cache-store.cc           | 17 ++++-------------
 src/libutil/include/nix/util/signals.hh         |  5 +++++
 .../unix/include/nix/util/signals-impl.hh       | 13 +++++++++----
 .../windows/include/nix/util/signals-impl.hh    |  7 ++++++-
 4 files changed, 24 insertions(+), 18 deletions(-)

diff --git a/src/libstore/s3-binary-cache-store.cc b/src/libstore/s3-binary-cache-store.cc
index ca03c7cd8..f9e583307 100644
--- a/src/libstore/s3-binary-cache-store.cc
+++ b/src/libstore/s3-binary-cache-store.cc
@@ -187,11 +187,7 @@ S3Helper::FileTransferResult S3Helper::getObject(
     });
 
     request.SetContinueRequestHandler([](const Aws::Http::HttpRequest*) {
-        try {
-            checkInterrupt();
-            return true;
-        } catch(...) {}
-        return false;
+        return !isInterrupted();
     });
 
     FileTransferResult res;
@@ -420,10 +416,9 @@ struct S3BinaryCacheStoreImpl : virtual S3BinaryCacheStoreConfig, public virtual
 
             TransferStatus status = transferHandle->GetStatus();
             while (status == TransferStatus::IN_PROGRESS || status == TransferStatus::NOT_STARTED) {
-                try {
-                    checkInterrupt();
+                if (!isInterrupted()) {
                     context->wait();
-                } catch (...) {
+                } else {
                     transferHandle->Cancel();
                     transferHandle->WaitUntilFinished();
                 }
@@ -454,11 +449,7 @@ struct S3BinaryCacheStoreImpl : virtual S3BinaryCacheStoreConfig, public virtual
             });
 
             request.SetContinueRequestHandler([](const Aws::Http::HttpRequest*) {
-                try {
-                    checkInterrupt();
-                    return true;
-                } catch(...) {}
-                return false;
+                return !isInterrupted();
             });
 
             request.SetContentType(mimeType);
diff --git a/src/libutil/include/nix/util/signals.hh b/src/libutil/include/nix/util/signals.hh
index 45130a90c..5a2ba8e75 100644
--- a/src/libutil/include/nix/util/signals.hh
+++ b/src/libutil/include/nix/util/signals.hh
@@ -26,6 +26,11 @@ static inline bool getInterrupted();
  */
 void setInterruptThrown();
 
+/**
+ * @note Does nothing on Windows
+ */
+static inline bool isInterrupted();
+
 /**
  * @note Does nothing on Windows
  */
diff --git a/src/libutil/unix/include/nix/util/signals-impl.hh b/src/libutil/unix/include/nix/util/signals-impl.hh
index ffa967344..7397744b2 100644
--- a/src/libutil/unix/include/nix/util/signals-impl.hh
+++ b/src/libutil/unix/include/nix/util/signals-impl.hh
@@ -85,17 +85,22 @@ static inline bool getInterrupted()
     return unix::_isInterrupted;
 }
 
+static inline bool isInterrupted()
+{
+    using namespace unix;
+    return _isInterrupted || (interruptCheck && interruptCheck());
+}
+
 /**
  * Throw `Interrupted` exception if the process has been interrupted.
  *
  * Call this in long-running loops and between slow operations to terminate
  * them as needed.
  */
-void inline checkInterrupt()
+inline void checkInterrupt()
 {
-    using namespace unix;
-    if (_isInterrupted || (interruptCheck && interruptCheck()))
-        _interrupted();
+    if (isInterrupted())
+        unix::_interrupted();
 }
 
 /**
diff --git a/src/libutil/windows/include/nix/util/signals-impl.hh b/src/libutil/windows/include/nix/util/signals-impl.hh
index 043f39100..f716ffd1a 100644
--- a/src/libutil/windows/include/nix/util/signals-impl.hh
+++ b/src/libutil/windows/include/nix/util/signals-impl.hh
@@ -22,7 +22,12 @@ inline void setInterruptThrown()
     /* Do nothing for now */
 }
 
-void inline checkInterrupt()
+static inline bool isInterrupted()
+{
+    /* Do nothing for now */
+}
+
+inline void checkInterrupt()
 {
     /* Do nothing for now */
 }