Get rid of LocalStore::addToStoreCommon

I got it to just become `LocalStore::addToStoreFromDump`, cleanly taking
a store and then doing nothing too fancy with it.

`LocalStore::addToStore(...Path...)` is now just a simple wrapper with a
bare-bones sinkToSource of the right dump command.
This commit is contained in:
John Ericson 2020-07-15 23:14:30 +00:00
parent 64b7421741
commit bc109648c4
4 changed files with 67 additions and 58 deletions

View File

@ -1033,38 +1033,22 @@ void LocalStore::addToStore(const ValidPathInfo & info, Source & source,
} }
StorePath LocalStore::addToStoreFromDump(Source & dump, const string & name,
FileIngestionMethod method, HashType hashAlgo, RepairFlag repair)
{
return addToStoreCommon(name, method, hashAlgo, repair, [&](auto & sink, size_t & wanted) {
while (1) {
constexpr size_t bufSize = 1024;
uint8_t buf[bufSize];
auto n = dump.read(buf, std::min(wanted, bufSize));
sink(buf, n);
// when control is yielded back to us wanted will be updated.
}
});
}
StorePath LocalStore::addToStore(const string & name, const Path & _srcPath, StorePath LocalStore::addToStore(const string & name, const Path & _srcPath,
FileIngestionMethod method, HashType hashAlgo, PathFilter & filter, RepairFlag repair) FileIngestionMethod method, HashType hashAlgo, PathFilter & filter, RepairFlag repair)
{ {
Path srcPath(absPath(_srcPath)); Path srcPath(absPath(_srcPath));
auto source = sinkToSource([&](Sink & sink, size_t & wanted) {
return addToStoreCommon(name, method, hashAlgo, repair, [&](auto & sink, size_t & _) {
if (method == FileIngestionMethod::Recursive) if (method == FileIngestionMethod::Recursive)
dumpPath(srcPath, sink, filter); dumpPath(srcPath, sink, filter);
else else
readFile(srcPath, sink); readFile(srcPath, sink);
}); });
return addToStoreFromDump(*source, name, method, hashAlgo, repair);
} }
StorePath LocalStore::addToStoreCommon( StorePath LocalStore::addToStoreFromDump(Source & source, const string & name,
const string & name, FileIngestionMethod method, HashType hashAlgo, RepairFlag repair, FileIngestionMethod method, HashType hashAlgo, RepairFlag repair)
std::function<void(Sink &, size_t &)> demux)
{ {
/* For computing the store path. */ /* For computing the store path. */
auto hashSink = std::make_unique<HashSink>(hashAlgo); auto hashSink = std::make_unique<HashSink>(hashAlgo);
@ -1075,50 +1059,53 @@ StorePath LocalStore::addToStoreCommon(
destination store path is already valid, we just delete the destination store path is already valid, we just delete the
temporary path. Otherwise, we move it to the destination store temporary path. Otherwise, we move it to the destination store
path. */ path. */
bool inMemory = true; bool inMemory = false;
std::string dump; std::string dump;
auto source = sinkToSource([&](Sink & sink, size_t & wanted) { /* Fill out buffer, and decide whether we are working strictly in
LambdaSink sink2([&](const unsigned char * buf, size_t len) { memory based on whether we break out because the buffer is full
(*hashSink)(buf, len); or the original source is empty */
while (dump.size() < settings.narBufferSize) {
if (inMemory) { auto oldSize = dump.size();
if (dump.size() + len > settings.narBufferSize) { constexpr size_t chunkSize = 1024;
inMemory = false; auto want = std::min(chunkSize, settings.narBufferSize - oldSize);
sink << 1; dump.resize(oldSize + want);
sink((const unsigned char *) dump.data(), dump.size()); auto got = 0;
dump.clear(); try {
} else { got = source.read((uint8_t *) dump.data() + oldSize, want);
dump.append((const char *) buf, len); } catch (EndOfFile &) {
} inMemory = true;
} break;
}
if (!inMemory) sink(buf, len); /* Start hashing as we get data */
}); (*hashSink)((const uint8_t *) dump.data() + oldSize, got);
demux(sink2, wanted); dump.resize(oldSize + got);
}); }
std::unique_ptr<AutoDelete> delTempDir; std::unique_ptr<AutoDelete> delTempDir;
Path tempPath; Path tempPath;
try { if (!inMemory) {
/* Wait for the source coroutine to give us some dummy StringSource dumpSource { dump };
data. This is so that we don't create the temporary TeeSource rest { source, *hashSink };
directory if the NAR fits in memory. */ ChainSource bothSource {
readInt(*source); .source1 = dumpSource,
/* Continue hashing what's left, but don't rehash what we
already did. */
.source2 = rest,
};
auto tempDir = createTempDir(realStoreDir, "add"); auto tempDir = createTempDir(realStoreDir, "add");
delTempDir = std::make_unique<AutoDelete>(tempDir); delTempDir = std::make_unique<AutoDelete>(tempDir);
tempPath = tempDir + "/x"; tempPath = tempDir + "/x";
if (method == FileIngestionMethod::Recursive) if (method == FileIngestionMethod::Recursive)
restorePath(tempPath, *source); restorePath(tempPath, bothSource);
else else
writeFile(tempPath, *source); writeFile(tempPath, bothSource);
} catch (EndOfFile &) { dump.clear();
if (!inMemory) throw;
/* The NAR fits in memory, so we didn't do restorePath(). */
} }
auto [hash, size] = hashSink->finish(); auto [hash, size] = hashSink->finish();
@ -1143,12 +1130,12 @@ StorePath LocalStore::addToStoreCommon(
autoGC(); autoGC();
if (inMemory) { if (inMemory) {
StringSource dumpSource { dump };
/* Restore from the NAR in memory. */ /* Restore from the NAR in memory. */
StringSource source(dump);
if (method == FileIngestionMethod::Recursive) if (method == FileIngestionMethod::Recursive)
restorePath(realPath, source); restorePath(realPath, dumpSource);
else else
writeFile(realPath, source); writeFile(realPath, dumpSource);
} else { } else {
/* Move the temporary path we restored above. */ /* Move the temporary path we restored above. */
if (rename(tempPath.c_str(), realPath.c_str())) if (rename(tempPath.c_str(), realPath.c_str()))

View File

@ -290,10 +290,6 @@ private:
specified by the secret-key-files option. */ specified by the secret-key-files option. */
void signPathInfo(ValidPathInfo & info); void signPathInfo(ValidPathInfo & info);
StorePath addToStoreCommon(
const string & name, FileIngestionMethod method, HashType hashAlgo, RepairFlag repair,
std::function<void(Sink &, size_t &)> demux);
Path getRealStoreDir() override { return realStoreDir; } Path getRealStoreDir() override { return realStoreDir; }
void createUser(const std::string & userName, uid_t userId) override; void createUser(const std::string & userName, uid_t userId) override;

View File

@ -329,5 +329,18 @@ void StringSink::operator () (const unsigned char * data, size_t len)
s->append((const char *) data, len); s->append((const char *) data, len);
} }
size_t ChainSource::read(unsigned char * data, size_t len)
{
if (useSecond) {
return source2.read(data, len);
} else {
try {
return source1.read(data, len);
} catch (EndOfFile &) {
useSecond = true;
return this->read(data, len);
}
}
}
} }

View File

@ -256,6 +256,19 @@ struct LambdaSource : Source
} }
}; };
/* Chain two sources together so after the first is exhausted, the second is
used */
struct ChainSource : Source
{
Source & source1, & source2;
bool useSecond = false;
ChainSource(Source & s1, Source & s2)
: source1(s1), source2(s2)
{ }
size_t read(unsigned char * data, size_t len) override;
};
/* Convert a function that feeds data into a Sink into a Source. The /* Convert a function that feeds data into a Sink into a Source. The
Source executes the function as a coroutine. */ Source executes the function as a coroutine. */
@ -271,7 +284,7 @@ static inline std::unique_ptr<Source> sinkToSource(
throw EndOfFile("coroutine has finished"); throw EndOfFile("coroutine has finished");
}) })
{ {
return sinkToSource([fun](Sink & s, size_t & _) { fun(s); }, eof); return sinkToSource([fun](Sink & s, size_t & _) { fun(s); }, eof);
} }