From bcdee80a0d80071b05a9c1bc5e649e762dcd1ecc Mon Sep 17 00:00:00 2001 From: John Ericson Date: Mon, 27 May 2024 17:54:02 -0400 Subject: [PATCH] More work on the scheduler for windows - Get a rump derivation goal: hook instance will come later, local derivation goal will come after that. - Start cleaning up the channel / waiting code with an abstraction. --- maintainers/flake-module.nix | 4 +- .../{unix => }/build/derivation-goal.cc | 77 ++++++--- .../{unix => }/build/derivation-goal.hh | 10 +- .../build/drv-output-substitution-goal.hh | 11 +- src/libstore/build/substitution-goal.hh | 11 +- src/libstore/build/worker.cc | 163 ++++-------------- src/libstore/build/worker.hh | 23 +-- src/libutil/muxable-pipe.hh | 82 +++++++++ src/libutil/processes.hh | 4 - src/libutil/unix/muxable-pipe.cc | 47 +++++ src/libutil/windows/muxable-pipe.cc | 70 ++++++++ src/libutil/windows/processes.cc | 26 +-- src/libutil/windows/windows-async-pipe.hh | 7 + 13 files changed, 331 insertions(+), 204 deletions(-) rename src/libstore/{unix => }/build/derivation-goal.cc (97%) rename src/libstore/{unix => }/build/derivation-goal.hh (97%) create mode 100644 src/libutil/muxable-pipe.hh create mode 100644 src/libutil/unix/muxable-pipe.cc create mode 100644 src/libutil/windows/muxable-pipe.cc diff --git a/maintainers/flake-module.nix b/maintainers/flake-module.nix index 1f19c673a..e0d66cbd3 100644 --- a/maintainers/flake-module.nix +++ b/maintainers/flake-module.nix @@ -223,8 +223,8 @@ ''^src/libstore/store-api\.cc$'' ''^src/libstore/store-api\.hh$'' ''^src/libstore/store-dir-config\.hh$'' - ''^src/libstore/unix/build/derivation-goal\.cc$'' - ''^src/libstore/unix/build/derivation-goal\.hh$'' + ''^src/libstore/build/derivation-goal\.cc$'' + ''^src/libstore/build/derivation-goal\.hh$'' ''^src/libstore/build/drv-output-substitution-goal\.cc$'' ''^src/libstore/build/drv-output-substitution-goal\.hh$'' ''^src/libstore/build/entry-points\.cc$'' diff --git a/src/libstore/unix/build/derivation-goal.cc b/src/libstore/build/derivation-goal.cc similarity index 97% rename from src/libstore/unix/build/derivation-goal.cc rename to src/libstore/build/derivation-goal.cc index 339f7273b..4226fb61a 100644 --- a/src/libstore/unix/build/derivation-goal.cc +++ b/src/libstore/build/derivation-goal.cc @@ -1,5 +1,8 @@ #include "derivation-goal.hh" -#include "hook-instance.hh" +#ifndef _WIN32 // TODO enable build hook on Windows +# include "hook-instance.hh" +#endif +#include "processes.hh" #include "worker.hh" #include "builtins.hh" #include "builtins/buildenv.hh" @@ -19,19 +22,8 @@ #include #include -#include -#include -#include -#include #include -#include #include -#include -#include -#include - -#include -#include #include @@ -101,7 +93,9 @@ std::string DerivationGoal::key() void DerivationGoal::killChild() { +#ifndef _WIN32 // TODO enable build hook on Windows hook.reset(); +#endif } @@ -641,9 +635,17 @@ void DerivationGoal::started() buildMode == bmCheck ? "checking outputs of '%s'" : "building '%s'", worker.store.printStorePath(drvPath)); fmt("building '%s'", worker.store.printStorePath(drvPath)); +#ifndef _WIN32 // TODO enable build hook on Windows if (hook) msg += fmt(" on '%s'", machineName); +#endif act = std::make_unique(*logger, lvlInfo, actBuild, msg, - Logger::Fields{worker.store.printStorePath(drvPath), hook ? machineName : "", 1, 1}); + Logger::Fields{worker.store.printStorePath(drvPath), +#ifndef _WIN32 // TODO enable build hook on Windows + hook ? machineName : +#endif + "", + 1, + 1}); mcRunningBuilds = std::make_unique>(worker.runningBuilds); worker.updateProgress(); } @@ -778,7 +780,13 @@ static void movePath(const Path & src, const Path & dst) { auto st = lstat(src); - bool changePerm = (geteuid() && S_ISDIR(st.st_mode) && !(st.st_mode & S_IWUSR)); + bool changePerm = ( +#ifndef _WIN32 + geteuid() +#else + !isRootUser() +#endif + && S_ISDIR(st.st_mode) && !(st.st_mode & S_IWUSR)); if (changePerm) chmod_(src, st.st_mode | S_IWUSR); @@ -796,7 +804,7 @@ void replaceValidPath(const Path & storePath, const Path & tmpPath) tmpPath (the replacement), so we have to move it out of the way first. We'd better not be interrupted here, because if we're repairing (say) Glibc, we end up with a broken system. */ - Path oldPath = fmt("%1%.old-%2%-%3%", storePath, getpid(), random()); + Path oldPath = fmt("%1%.old-%2%-%3%", storePath, getpid(), rand()); if (pathExists(storePath)) movePath(storePath, oldPath); @@ -818,14 +826,20 @@ void replaceValidPath(const Path & storePath, const Path & tmpPath) int DerivationGoal::getChildStatus() { +#ifndef _WIN32 // TODO enable build hook on Windows return hook->pid.kill(); +#else + return 0; +#endif } void DerivationGoal::closeReadPipes() { - hook->builderOut.readSide = -1; - hook->fromHook.readSide = -1; +#ifndef _WIN32 // TODO enable build hook on Windows + hook->builderOut.readSide.close(); + hook->fromHook.readSide.close(); +#endif } @@ -1019,13 +1033,16 @@ void DerivationGoal::buildDone() BuildResult::Status st = BuildResult::MiscFailure; +#ifndef _WIN32 if (hook && WIFEXITED(status) && WEXITSTATUS(status) == 101) st = BuildResult::TimedOut; else if (hook && (!WIFEXITED(status) || WEXITSTATUS(status) != 100)) { } - else { + else +#endif + { assert(derivationType); st = dynamic_cast(&e) ? BuildResult::NotDeterministic : @@ -1112,6 +1129,9 @@ void DerivationGoal::resolvedFinished() HookReply DerivationGoal::tryBuildHook() { +#ifdef _WIN32 // TODO enable build hook on Windows + return rpDecline; +#else if (settings.buildHook.get().empty() || !worker.tryBuildHook || !useDerivation) return rpDecline; if (!worker.hook) @@ -1205,17 +1225,18 @@ HookReply DerivationGoal::tryBuildHook() } hook->sink = FdSink(); - hook->toHook.writeSide = -1; + hook->toHook.writeSide.close(); /* Create the log file and pipe. */ Path logFile = openLogFile(); - std::set fds; + std::set fds; fds.insert(hook->fromHook.readSide.get()); fds.insert(hook->builderOut.readSide.get()); worker.childStarted(shared_from_this(), fds, false, false); return rpAccept; +#endif } @@ -1251,7 +1272,11 @@ Path DerivationGoal::openLogFile() Path logFileName = fmt("%s/%s%s", dir, baseName.substr(2), settings.compressLog ? ".bz2" : ""); - fdLogFile = open(logFileName.c_str(), O_CREAT | O_WRONLY | O_TRUNC | O_CLOEXEC, 0666); + fdLogFile = toDescriptor(open(logFileName.c_str(), O_CREAT | O_WRONLY | O_TRUNC +#ifndef _WIN32 + | O_CLOEXEC +#endif + , 0666)); if (!fdLogFile) throw SysError("creating log file '%1%'", logFileName); logFileSink = std::make_shared(fdLogFile.get()); @@ -1271,13 +1296,17 @@ void DerivationGoal::closeLogFile() if (logSink2) logSink2->finish(); if (logFileSink) logFileSink->flush(); logSink = logFileSink = 0; - fdLogFile = -1; + fdLogFile.close(); } -bool DerivationGoal::isReadDesc(int fd) +bool DerivationGoal::isReadDesc(Descriptor fd) { +#ifdef _WIN32 // TODO enable build hook on Windows + return false; +#else return fd == hook->builderOut.readSide.get(); +#endif } void DerivationGoal::handleChildOutput(Descriptor fd, std::string_view data) @@ -1310,6 +1339,7 @@ void DerivationGoal::handleChildOutput(Descriptor fd, std::string_view data) if (logSink) (*logSink)(data); } +#ifndef _WIN32 // TODO enable build hook on Windows if (hook && fd == hook->fromHook.readSide.get()) { for (auto c : data) if (c == '\n') { @@ -1344,6 +1374,7 @@ void DerivationGoal::handleChildOutput(Descriptor fd, std::string_view data) } else currentHookLine += c; } +#endif } diff --git a/src/libstore/unix/build/derivation-goal.hh b/src/libstore/build/derivation-goal.hh similarity index 97% rename from src/libstore/unix/build/derivation-goal.hh rename to src/libstore/build/derivation-goal.hh index be6eb50c4..04f13aedd 100644 --- a/src/libstore/unix/build/derivation-goal.hh +++ b/src/libstore/build/derivation-goal.hh @@ -2,7 +2,9 @@ ///@file #include "parsed-derivations.hh" -#include "user-lock.hh" +#ifndef _WIN32 +# include "user-lock.hh" +#endif #include "outputs-spec.hh" #include "store-api.hh" #include "pathlocks.hh" @@ -12,7 +14,9 @@ namespace nix { using std::map; +#ifndef _WIN32 // TODO enable build hook on Windows struct HookInstance; +#endif typedef enum {rpAccept, rpDecline, rpPostpone} HookReply; @@ -178,10 +182,12 @@ struct DerivationGoal : public Goal std::string currentHookLine; +#ifndef _WIN32 // TODO enable build hook on Windows /** * The build hook. */ std::unique_ptr hook; +#endif /** * The sort of derivation we are building. @@ -287,7 +293,7 @@ struct DerivationGoal : public Goal virtual void cleanupPostOutputsRegisteredModeCheck(); virtual void cleanupPostOutputsRegisteredModeNonCheck(); - virtual bool isReadDesc(int fd); + virtual bool isReadDesc(Descriptor fd); /** * Callback used by the worker to write to the log. diff --git a/src/libstore/build/drv-output-substitution-goal.hh b/src/libstore/build/drv-output-substitution-goal.hh index 4f06a9e9e..6967ca84f 100644 --- a/src/libstore/build/drv-output-substitution-goal.hh +++ b/src/libstore/build/drv-output-substitution-goal.hh @@ -7,10 +7,7 @@ #include "store-api.hh" #include "goal.hh" #include "realisation.hh" - -#ifdef _WIN32 -# include "windows-async-pipe.hh" -#endif +#include "muxable-pipe.hh" namespace nix { @@ -48,11 +45,7 @@ class DrvOutputSubstitutionGoal : public Goal { struct DownloadState { -#ifndef _WIN32 - Pipe outPipe; -#else - windows::AsyncPipe outPipe; -#endif + MuxablePipe outPipe; std::promise> promise; }; diff --git a/src/libstore/build/substitution-goal.hh b/src/libstore/build/substitution-goal.hh index 58d309424..1a051fc1f 100644 --- a/src/libstore/build/substitution-goal.hh +++ b/src/libstore/build/substitution-goal.hh @@ -3,10 +3,7 @@ #include "store-api.hh" #include "goal.hh" - -#ifdef _WIN32 -# include "windows-async-pipe.hh" -#endif +#include "muxable-pipe.hh" namespace nix { @@ -48,11 +45,7 @@ struct PathSubstitutionGoal : public Goal /** * Pipe for the substituter's standard output. */ -#ifndef _WIN32 - Pipe outPipe; -#else - windows::AsyncPipe outPipe; -#endif + MuxablePipe outPipe; /** * The substituter thread. diff --git a/src/libstore/build/worker.cc b/src/libstore/build/worker.cc index ac6f693a2..e6ed80346 100644 --- a/src/libstore/build/worker.cc +++ b/src/libstore/build/worker.cc @@ -3,19 +3,13 @@ #include "worker.hh" #include "substitution-goal.hh" #include "drv-output-substitution-goal.hh" +#include "derivation-goal.hh" #ifndef _WIN32 // TODO Enable building on Windows # include "local-derivation-goal.hh" # include "hook-instance.hh" #endif #include "signals.hh" -#ifndef _WIN32 -# include -#else -# include -# include "windows-error.hh" -#endif - namespace nix { Worker::Worker(Store & store, Store & evalStore) @@ -49,7 +43,6 @@ Worker::~Worker() assert(expectedNarSize == 0); } -#ifndef _WIN32 // TODO Enable building on Windows std::shared_ptr Worker::makeDerivationGoalCommon( const StorePath & drvPath, @@ -73,9 +66,13 @@ std::shared_ptr Worker::makeDerivationGoal(const StorePath & drv const OutputsSpec & wantedOutputs, BuildMode buildMode) { return makeDerivationGoalCommon(drvPath, wantedOutputs, [&]() -> std::shared_ptr { - return !dynamic_cast(&store) - ? std::make_shared(drvPath, wantedOutputs, *this, buildMode) - : std::make_shared(drvPath, wantedOutputs, *this, buildMode); + return +#ifndef _WIN32 // TODO Enable building on Windows + dynamic_cast(&store) + ? std::make_shared(drvPath, wantedOutputs, *this, buildMode) + : +#endif + std::make_shared(drvPath, wantedOutputs, *this, buildMode); }); } @@ -83,14 +80,16 @@ std::shared_ptr Worker::makeBasicDerivationGoal(const StorePath const BasicDerivation & drv, const OutputsSpec & wantedOutputs, BuildMode buildMode) { return makeDerivationGoalCommon(drvPath, wantedOutputs, [&]() -> std::shared_ptr { - return !dynamic_cast(&store) - ? std::make_shared(drvPath, drv, wantedOutputs, *this, buildMode) - : std::make_shared(drvPath, drv, wantedOutputs, *this, buildMode); + return +#ifndef _WIN32 // TODO Enable building on Windows + dynamic_cast(&store) + ? std::make_shared(drvPath, drv, wantedOutputs, *this, buildMode) + : +#endif + std::make_shared(drvPath, drv, wantedOutputs, *this, buildMode); }); } -#endif - std::shared_ptr Worker::makePathSubstitutionGoal(const StorePath & path, RepairFlag repair, std::optional ca) { @@ -122,14 +121,10 @@ GoalPtr Worker::makeGoal(const DerivedPath & req, BuildMode buildMode) { return std::visit(overloaded { [&](const DerivedPath::Built & bfd) -> GoalPtr { -#ifndef _WIN32 // TODO Enable building on Windows if (auto bop = std::get_if(&*bfd.drvPath)) return makeDerivationGoal(bop->path, bfd.outputs, buildMode); else throw UnimplementedError("Building dynamic derivations in one shot is not yet implemented."); -#else - throw UnimplementedError("Building derivations not yet implemented on Windows"); -#endif }, [&](const DerivedPath::Opaque & bo) -> GoalPtr { return makePathSubstitutionGoal(bo.path, buildMode == bmRepair ? Repair : NoRepair); @@ -155,11 +150,9 @@ static void removeGoal(std::shared_ptr goal, std::map> & void Worker::removeGoal(GoalPtr goal) { -#ifndef _WIN32 // TODO Enable building on Windows if (auto drvGoal = std::dynamic_pointer_cast(goal)) nix::removeGoal(drvGoal, derivationGoals); else -#endif if (auto subGoal = std::dynamic_pointer_cast(goal)) nix::removeGoal(subGoal, substitutionGoals); else if (auto subGoal = std::dynamic_pointer_cast(goal)) @@ -204,7 +197,7 @@ unsigned Worker::getNrSubstitutions() } -void Worker::childStarted(GoalPtr goal, const std::set & channels, +void Worker::childStarted(GoalPtr goal, const std::set & channels, bool inBuildSlot, bool respectTimeouts) { Child child; @@ -298,14 +291,12 @@ void Worker::run(const Goals & _topGoals) for (auto & i : _topGoals) { topGoals.insert(i); -#ifndef _WIN32 // TODO Enable building on Windows if (auto goal = dynamic_cast(i.get())) { topPaths.push_back(DerivedPath::Built { .drvPath = makeConstantStorePathRef(goal->drvPath), .outputs = goal->wantedOutputs, }); } else -#endif if (auto goal = dynamic_cast(i.get())) { topPaths.push_back(DerivedPath::Opaque{goal->storePath}); } @@ -428,47 +419,26 @@ void Worker::waitForInput() if (useTimeout) vomit("sleeping %d seconds", timeout); + MuxablePipePollState state; + #ifndef _WIN32 /* Use select() to wait for the input side of any logger pipe to become `available'. Note that `available' (i.e., non-blocking) includes EOF. */ - std::vector pollStatus; - std::map fdToPollStatus; for (auto & i : children) { for (auto & j : i.channels) { - pollStatus.push_back((struct pollfd) { .fd = j, .events = POLLIN }); - fdToPollStatus[j] = pollStatus.size() - 1; + state.pollStatus.push_back((struct pollfd) { .fd = j, .events = POLLIN }); + state.fdToPollStatus[j] = state.pollStatus.size() - 1; } } - - if (poll(pollStatus.data(), pollStatus.size(), - useTimeout ? timeout * 1000 : -1) == -1) { - if (errno == EINTR) return; - throw SysError("waiting for input"); - } -#else - OVERLAPPED_ENTRY oentries[0x20] = {0}; - ULONG removed; - bool gotEOF = false; - - // we are on at least Windows Vista / Server 2008 and can get many (countof(oentries)) statuses in one API call - if (!GetQueuedCompletionStatusEx( - ioport.get(), - oentries, - sizeof(oentries) / sizeof(*oentries), - &removed, - useTimeout ? timeout * 1000 : INFINITE, - false)) - { - windows::WinError winError("GetQueuedCompletionStatusEx"); - if (winError.lastError != WAIT_TIMEOUT) - throw winError; - assert(removed == 0); - } else { - assert(0 < removed && removed <= sizeof(oentries)/sizeof(*oentries)); - } #endif + state.poll( +#ifdef _WIN32 + ioport.get(), +#endif + useTimeout ? (std::optional { timeout * 1000 }) : std::nullopt); + auto after = steady_time_point::clock::now(); /* Process all available file descriptors. FIXME: this is @@ -482,75 +452,18 @@ void Worker::waitForInput() GoalPtr goal = j->goal.lock(); assert(goal); -#ifndef _WIN32 - std::set fds2(j->channels); - std::vector buffer(4096); - for (auto & k : fds2) { - const auto fdPollStatusId = get(fdToPollStatus, k); - assert(fdPollStatusId); - assert(*fdPollStatusId < pollStatus.size()); - if (pollStatus.at(*fdPollStatusId).revents) { - ssize_t rd = ::read(fromDescriptorReadOnly(k), buffer.data(), buffer.size()); - // FIXME: is there a cleaner way to handle pt close - // than EIO? Is this even standard? - if (rd == 0 || (rd == -1 && errno == EIO)) { - debug("%1%: got EOF", goal->getName()); - goal->handleEOF(k); - j->channels.erase(k); - } else if (rd == -1) { - if (errno != EINTR) - throw SysError("%s: read failed", goal->getName()); - } else { - printMsg(lvlVomit, "%1%: read %2% bytes", - goal->getName(), rd); - std::string_view data((char *) buffer.data(), rd); - j->lastOutput = after; - goal->handleChildOutput(k, data); - } - } - } -#else - decltype(j->channels)::iterator p = j->channels.begin(); - while (p != j->channels.end()) { - decltype(p) nextp = p; - ++nextp; - for (ULONG i = 0; i < removed; i++) { - if (oentries[i].lpCompletionKey == ((ULONG_PTR)((*p)->readSide.get()) ^ 0x5555)) { - printMsg(lvlVomit, "%s: read %s bytes", goal->getName(), oentries[i].dwNumberOfBytesTransferred); - if (oentries[i].dwNumberOfBytesTransferred > 0) { - std::string data { - (char *) (*p)->buffer.data(), - oentries[i].dwNumberOfBytesTransferred, - }; - //std::cerr << "read [" << data << "]" << std::endl; - j->lastOutput = after; - goal->handleChildOutput((*p)->readSide.get(), data); - } - - if (gotEOF) { - debug("%s: got EOF", goal->getName()); - goal->handleEOF((*p)->readSide.get()); - nextp = j->channels.erase(p); // no need to maintain `j->channels` ? - } else { - BOOL rc = ReadFile((*p)->readSide.get(), (*p)->buffer.data(), (*p)->buffer.size(), &(*p)->got, &(*p)->overlapped); - if (rc) { - // here is possible (but not obligatory) to call `goal->handleChildOutput` and repeat ReadFile immediately - } else { - windows::WinError winError("ReadFile(%s, ..)", (*p)->readSide.get()); - if (winError.lastError == ERROR_BROKEN_PIPE) { - debug("%s: got EOF", goal->getName()); - goal->handleEOF((*p)->readSide.get()); - nextp = j->channels.erase(p); // no need to maintain `j->channels` ? - } else if (winError.lastError != ERROR_IO_PENDING) - throw winError; - } - } - break; - } - } - p = nextp; - } -#endif + state.iterate( + j->channels, + [&](Descriptor k, std::string_view data) { + printMsg(lvlVomit, "%1%: read %2% bytes", + goal->getName(), data.size()); + j->lastOutput = after; + goal->handleChildOutput(k, data); + }, + [&](Descriptor k) { + debug("%1%: got EOF", goal->getName()); + goal->handleEOF(k); + }); if (goal->exitCode == Goal::ecBusy && 0 != settings.maxSilentTime && diff --git a/src/libstore/build/worker.hh b/src/libstore/build/worker.hh index b57734b45..7d67030d7 100644 --- a/src/libstore/build/worker.hh +++ b/src/libstore/build/worker.hh @@ -5,10 +5,7 @@ #include "store-api.hh" #include "goal.hh" #include "realisation.hh" - -#ifdef _WIN32 -# include "windows-async-pipe.hh" -#endif +#include "muxable-pipe.hh" #include #include @@ -16,9 +13,7 @@ namespace nix { /* Forward definition. */ -#ifndef _WIN32 // TODO Enable building on Windows struct DerivationGoal; -#endif struct PathSubstitutionGoal; class DrvOutputSubstitutionGoal; @@ -46,17 +41,9 @@ typedef std::chrono::time_point steady_time_point; */ struct Child { - using CommChannel = -#ifndef _WIN32 - Descriptor -#else - windows::AsyncPipe * -#endif - ; - WeakGoalPtr goal; Goal * goal2; // ugly hackery - std::set channels; + std::set channels; bool respectTimeouts; bool inBuildSlot; /** @@ -116,9 +103,7 @@ private: * Maps used to prevent multiple instantiations of a goal for the * same derivation / path. */ -#ifndef _WIN32 // TODO Enable building on Windows std::map> derivationGoals; -#endif std::map> substitutionGoals; std::map> drvOutputSubstitutionGoals; @@ -207,7 +192,6 @@ public: * Make a goal (with caching). */ -#ifndef _WIN32 // TODO Enable building on Windows /** * @ref DerivationGoal "derivation goal" */ @@ -222,7 +206,6 @@ public: std::shared_ptr makeBasicDerivationGoal( const StorePath & drvPath, const BasicDerivation & drv, const OutputsSpec & wantedOutputs, BuildMode buildMode = bmNormal); -#endif /** * @ref SubstitutionGoal "substitution goal" @@ -263,7 +246,7 @@ public: * Registers a running child process. `inBuildSlot` means that * the process counts towards the jobs limit. */ - void childStarted(GoalPtr goal, const std::set & channels, + void childStarted(GoalPtr goal, const std::set & channels, bool inBuildSlot, bool respectTimeouts); /** diff --git a/src/libutil/muxable-pipe.hh b/src/libutil/muxable-pipe.hh new file mode 100644 index 000000000..53ac39170 --- /dev/null +++ b/src/libutil/muxable-pipe.hh @@ -0,0 +1,82 @@ +#pragma once +///@file + +#include "file-descriptor.hh" +#ifdef _WIN32 +# include "windows-async-pipe.hh" +#endif + +#ifndef _WIN32 +# include +#else +# include +# include "windows-error.hh" +#endif + +namespace nix { + +/** + * An "muxable pipe" is a type of pipe supporting endpoints that wait + * for events on multiple pipes at once. + * + * On Unix, this is just a regular anonymous pipe. On Windows, this has + * to be a named pipe because we need I/O Completion Ports to wait on + * multiple pipes. + */ +using MuxablePipe = +#ifndef _WIN32 + Pipe +#else + windows::AsyncPipe +#endif + ; + +/** + * Use pool() (Unix) / I/O Completion Ports (Windows) to wait for the + * input side of any logger pipe to become `available'. Note that + * `available' (i.e., non-blocking) includes EOF. + */ +struct MuxablePipePollState +{ +#ifndef _WIN32 + std::vector pollStatus; + std::map fdToPollStatus; +#else + OVERLAPPED_ENTRY oentries[0x20] = {0}; + ULONG removed; + bool gotEOF = false; + +#endif + + /** + * Check for ready (Unix) / completed (Windows) operations + */ + void poll( +#ifdef _WIN32 + HANDLE ioport, +#endif + std::optional timeout); + + using CommChannel = +#ifndef _WIN32 + Descriptor +#else + windows::AsyncPipe * +#endif + ; + + /** + * Process for ready (Unix) / completed (Windows) operations, + * calling the callbacks as needed. + * + * @param handleRead callback to be passed read data. + * + * @param handleEOF callback for when the `MuxablePipe` has closed. + */ + void iterate( + std::set & channels, + std::function handleRead, + std::function handleEOF); +}; + +} diff --git a/src/libutil/processes.hh b/src/libutil/processes.hh index e319f79e0..9d5367b02 100644 --- a/src/libutil/processes.hh +++ b/src/libutil/processes.hh @@ -118,8 +118,6 @@ public: { } }; -#ifndef _WIN32 - /** * Convert the exit status of a child as returned by wait() into an * error string. @@ -128,6 +126,4 @@ std::string statusToString(int status); bool statusOk(int status); -#endif - } diff --git a/src/libutil/unix/muxable-pipe.cc b/src/libutil/unix/muxable-pipe.cc new file mode 100644 index 000000000..0104663c3 --- /dev/null +++ b/src/libutil/unix/muxable-pipe.cc @@ -0,0 +1,47 @@ +#include + +#include "logging.hh" +#include "util.hh" +#include "muxable-pipe.hh" + +namespace nix { + +void MuxablePipePollState::poll(std::optional timeout) +{ + if (::poll(pollStatus.data(), pollStatus.size(), timeout ? *timeout : -1) == -1) { + if (errno == EINTR) + return; + throw SysError("waiting for input"); + } +} + +void MuxablePipePollState::iterate( + std::set & channels, + std::function handleRead, + std::function handleEOF) +{ + std::set fds2(channels); + std::vector buffer(4096); + for (auto & k : fds2) { + const auto fdPollStatusId = get(fdToPollStatus, k); + assert(fdPollStatusId); + assert(*fdPollStatusId < pollStatus.size()); + if (pollStatus.at(*fdPollStatusId).revents) { + ssize_t rd = ::read(fromDescriptorReadOnly(k), buffer.data(), buffer.size()); + // FIXME: is there a cleaner way to handle pt close + // than EIO? Is this even standard? + if (rd == 0 || (rd == -1 && errno == EIO)) { + handleEOF(k); + channels.erase(k); + } else if (rd == -1) { + if (errno != EINTR) + throw SysError("read failed"); + } else { + std::string_view data((char *) buffer.data(), rd); + handleRead(k, data); + } + } + } +} + +} diff --git a/src/libutil/windows/muxable-pipe.cc b/src/libutil/windows/muxable-pipe.cc new file mode 100644 index 000000000..91a321f7c --- /dev/null +++ b/src/libutil/windows/muxable-pipe.cc @@ -0,0 +1,70 @@ +#include +#include "windows-error.hh" + +#include "logging.hh" +#include "util.hh" +#include "muxable-pipe.hh" + +namespace nix { + +void MuxablePipePollState::poll(HANDLE ioport, std::optional timeout) +{ + /* We are on at least Windows Vista / Server 2008 and can get many + (countof(oentries)) statuses in one API call. */ + if (!GetQueuedCompletionStatusEx( + ioport, oentries, sizeof(oentries) / sizeof(*oentries), &removed, timeout ? *timeout : INFINITE, false)) { + windows::WinError winError("GetQueuedCompletionStatusEx"); + if (winError.lastError != WAIT_TIMEOUT) + throw winError; + assert(removed == 0); + } else { + assert(0 < removed && removed <= sizeof(oentries) / sizeof(*oentries)); + } +} + +void MuxablePipePollState::iterate( + std::set & channels, + std::function handleRead, + std::function handleEOF) +{ + auto p = channels.begin(); + while (p != channels.end()) { + decltype(p) nextp = p; + ++nextp; + for (ULONG i = 0; i < removed; i++) { + if (oentries[i].lpCompletionKey == ((ULONG_PTR) ((*p)->readSide.get()) ^ 0x5555)) { + printMsg(lvlVomit, "read %s bytes", oentries[i].dwNumberOfBytesTransferred); + if (oentries[i].dwNumberOfBytesTransferred > 0) { + std::string data{ + (char *) (*p)->buffer.data(), + oentries[i].dwNumberOfBytesTransferred, + }; + handleRead((*p)->readSide.get(), data); + } + + if (gotEOF) { + handleEOF((*p)->readSide.get()); + nextp = channels.erase(p); // no need to maintain `channels`? + } else { + BOOL rc = ReadFile( + (*p)->readSide.get(), (*p)->buffer.data(), (*p)->buffer.size(), &(*p)->got, &(*p)->overlapped); + if (rc) { + // here is possible (but not obligatory) to call + // `handleRead` and repeat ReadFile immediately + } else { + windows::WinError winError("ReadFile(%s, ..)", (*p)->readSide.get()); + if (winError.lastError == ERROR_BROKEN_PIPE) { + handleEOF((*p)->readSide.get()); + nextp = channels.erase(p); // no need to maintain `channels` ? + } else if (winError.lastError != ERROR_IO_PENDING) + throw winError; + } + } + break; + } + } + p = nextp; + } +} + +} diff --git a/src/libutil/windows/processes.cc b/src/libutil/windows/processes.cc index 5ef4ed1e4..44a32f6a1 100644 --- a/src/libutil/windows/processes.cc +++ b/src/libutil/windows/processes.cc @@ -16,16 +16,6 @@ #include #include -#ifdef __APPLE__ -# include -#endif - -#ifdef __linux__ -# include -# include -#endif - - namespace nix { std::string runProgram(Path program, bool lookupPath, const Strings & args, @@ -34,15 +24,31 @@ std::string runProgram(Path program, bool lookupPath, const Strings & args, throw UnimplementedError("Cannot shell out to git on Windows yet"); } + // Output = error code + "standard out" output stream std::pair runProgram(RunOptions && options) { throw UnimplementedError("Cannot shell out to git on Windows yet"); } + void runProgram2(const RunOptions & options) { throw UnimplementedError("Cannot shell out to git on Windows yet"); } +std::string statusToString(int status) +{ + if (status != 0) + return fmt("with exit code %d", status); + else + return "succeeded"; +} + + +bool statusOk(int status) +{ + return status == 0; +} + } diff --git a/src/libutil/windows/windows-async-pipe.hh b/src/libutil/windows/windows-async-pipe.hh index c980201a8..8f554e403 100644 --- a/src/libutil/windows/windows-async-pipe.hh +++ b/src/libutil/windows/windows-async-pipe.hh @@ -5,6 +5,13 @@ namespace nix::windows { +/*** + * An "async pipe" is a pipe that supports I/O Completion Ports so + * multiple pipes can be listened too. + * + * Unfortunately, only named pipes support that on windows, so we use + * those with randomized temp file names. + */ class AsyncPipe { public: