Program Listing for File corpus.cpp¶
↰ Return to documentation for file (src/data/corpus.cpp
)
#include "data/corpus.h"
#include <numeric>
#include <random>
#include "common/utils.h"
#include "common/filesystem.h"
#include "data/corpus.h"
#include "data/factored_vocab.h"
namespace marian {
namespace data {
Corpus::Corpus(Ptr<Options> options, bool translate /*= false*/, size_t seed /*= Config:seed*/)
: CorpusBase(options, translate, seed),
shuffleInRAM_(options_->get<bool>("shuffle-in-ram", false)),
allCapsEvery_(options_->get<size_t>("all-caps-every", 0)),
titleCaseEvery_(options_->get<size_t>("english-title-case-every", 0)) {
auto numThreads = options_->get<size_t>("data-threads", 1);
if(numThreads > 1)
threadPool_.reset(new ThreadPool(numThreads));
}
Corpus::Corpus(std::vector<std::string> paths,
std::vector<Ptr<Vocab>> vocabs,
Ptr<Options> options,
size_t seed /*= Config:seed*/)
: CorpusBase(paths, vocabs, options, seed),
shuffleInRAM_(options_->get<bool>("shuffle-in-ram", false)),
allCapsEvery_(options_->get<size_t>("all-caps-every", 0)),
titleCaseEvery_(options_->get<size_t>("english-title-case-every", 0)) {
auto numThreads = options_->get<size_t>("data-threads", 1);
if(numThreads > 1)
threadPool_.reset(new ThreadPool(numThreads));
}
void Corpus::preprocessLine(std::string& line, size_t streamId, bool& altered) {
bool isFactoredVocab = vocabs_.back()->tryAs<FactoredVocab>() != nullptr;
altered = false;
if (allCapsEvery_ != 0 && pos_ % allCapsEvery_ == 0 && !inference_) {
line = vocabs_[streamId]->toUpper(line);
if (streamId == 0)
LOG_ONCE(info, "[data] Source all-caps'ed line to: {}", line);
else
LOG_ONCE(info, "[data] Target all-caps'ed line to: {}", line);
altered = isFactoredVocab ? false : true; // FS vocab does not really "alter" the token lemma for all caps
}
else if (titleCaseEvery_ != 0 && pos_ % titleCaseEvery_ == 1 && !inference_ && streamId == 0) {
// Only applied to stream 0 (source) since this feature is aimed at robustness against
// title case in the source (and not at translating into title case).
// Note: It is user's responsibility to not enable this if the source language is not English.
line = vocabs_[streamId]->toEnglishTitleCase(line);
if (streamId == 0)
LOG_ONCE(info, "[data] Source English-title-case'd line to: {}", line);
else
LOG_ONCE(info, "[data] Target English-title-case'd line to: {}", line);
altered = isFactoredVocab ? false : true; // FS vocab does not really "alter" the token lemma for title casing
}
}
SentenceTuple Corpus::next() {
size_t numStreams = corpusInRAM_.empty() ? files_.size() : corpusInRAM_.size();
std::vector<std::string> fields(numStreams);
while(true) { // retry loop
// get index of the current sentence
size_t curId = pos_; // note: at end, pos_ == total size
// if corpus has been shuffled, ids_ contains sentence indexes
if(pos_ < ids_.size())
curId = ids_[pos_];
pos_++;
size_t eofsHit = 0;
for(size_t i = 0; i < numStreams; ++i) { // looping of all streams
// fetch line, from cached copy in RAM or actual file
if (!corpusInRAM_.empty()) {
if (curId < corpusInRAM_[i].size())
fields[i] = corpusInRAM_[i][curId];
else {
eofsHit++;
continue;
}
}
else {
bool gotLine = io::getline(*files_[i], fields[i]).good();
if(!gotLine) {
eofsHit++;
continue;
}
}
}
if(eofsHit == numStreams)
return SentenceTuple(); // unintialized SentenceTuple which will be invalid when tested
ABORT_IF(eofsHit != 0, "not all input files have the same number of lines");
auto makeSentenceTuple = [this](size_t curId, std::vector<std::string> fields) {
if(tsv_) {
// with tsv inputs data, there is only one input stream, hence we only have one field
// which needs to be tokenized into tab-separated fields
ABORT_IF(fields.size() != 1, "Reading TSV file, but we have don't have exactly one stream??");
size_t numAllFields = tsvNumInputFields_;
if(alignFileIdx_ > -1)
++numAllFields;
if(weightFileIdx_ > -1)
++numAllFields;
// replace single-element fields array with extracted tsv fields
std::vector<std::string> tmpFields;
utils::splitTsv(fields[0], tmpFields, numAllFields); // this verifies the number of fields
fields.swap(tmpFields);
}
// fill up the sentence tuple with sentences from all input files
SentenceTupleImpl tup(curId);
size_t shift = 0;
for(size_t i = 0; i < fields.size(); ++i) {
// index j needs to be shifted to get the proper vocab index if guided-alignment or
// data-weighting are preceding source or target sequences in TSV input
if(i == alignFileIdx_ || i == weightFileIdx_) {
++shift;
} else {
size_t vocabId = i - shift;
bool altered;
preprocessLine(fields[i], vocabId, /*out=*/altered);
if (altered)
tup.markAltered();
addWordsToSentenceTuple(fields[i], vocabId, tup);
}
}
// weights are added last to the sentence tuple, because this runs a validation that needs
// length of the target sequence
if(alignFileIdx_ > -1)
addAlignmentToSentenceTuple(fields[alignFileIdx_], tup);
if(weightFileIdx_ > -1)
addWeightsToSentenceTuple(fields[weightFileIdx_], tup);
// check if all streams are valid, that is, non-empty and no longer than maximum allowed length
if(std::all_of(tup.begin(), tup.end(), [=](const Words& words) {
return words.size() > 0 && words.size() <= maxLength_;
})) {
return tup;
} else {
return SentenceTupleImpl(); // return an empty tuple if above test does not pass
}
};
if(threadPool_) { // use thread pool if available
return SentenceTuple(threadPool_->enqueue(makeSentenceTuple, curId, fields));
} else { // otherwise launch here and just pass the result into the wrapper
auto tup = makeSentenceTuple(curId, fields);
if(!tup.empty())
return SentenceTuple(tup);
}
} // end of retry loop
}
// reset and initialize shuffled reading
// Call either reset() or shuffle().
// @TODO: merge with reset() below to clarify mutual exclusiveness with reset()
void Corpus::shuffle() {
shuffleData(paths_);
}
// reset to regular, non-shuffled reading
// Call either reset() or shuffle().
// @TODO: make shuffle() private, instad pass a shuffle() flag to reset(), to clarify mutual
// exclusiveness with shuffle()
void Corpus::reset() {
corpusInRAM_.clear();
ids_.clear();
if (pos_ == 0) // no data read yet
return;
pos_ = 0;
for (size_t i = 0; i < paths_.size(); ++i) {
if(paths_[i] == "stdin" || paths_[i] == "-") {
std::cin.tie(0);
std::ios_base::sync_with_stdio(false);
files_[i].reset(new std::istream(std::cin.rdbuf()));
// Probably not necessary, unless there are some buffers
// that we want flushed.
}
else {
ABORT_IF(files_[i] && filesystem::is_fifo(paths_[i]),
"File '", paths_[i], "' is a pipe and cannot be re-opened.");
// Do NOT reset named pipes; that closes them and triggers a SIGPIPE
// (lost pipe) at the writing end, which may do whatever it wants
// in this situation.
files_[i].reset(new io::InputFileStream(paths_[i]));
}
}
}
void Corpus::restore(Ptr<TrainingState> ts) {
setRNGState(ts->seedCorpus);
}
void Corpus::shuffleData(const std::vector<std::string>& paths) {
LOG(info, "[data] Shuffling data");
ABORT_IF(tsv_ && (paths[0] == "stdin" || paths[0] == "-"),
"Shuffling training data from STDIN is not supported. Add --no-shuffle or provide "
"training sets with --train-sets");
size_t numStreams = paths.size();
size_t numSentences;
std::vector<std::vector<std::string>> corpus(numStreams); // [stream][id]
if (!corpusInRAM_.empty()) { // when caching, we use what we have instead
corpus = std::move(corpusInRAM_); // temporarily move ownership here, will be moved back
numSentences = corpus[0].size();
}
else {
files_.resize(numStreams);
for(size_t i = 0; i < numStreams; ++i) {
UPtr<io::InputFileStream> strm(new io::InputFileStream(paths[i]));
strm->setbufsize(10000000); // huge read-ahead buffer to avoid network round-trips
files_[i] = std::move(strm);
}
// read entire corpus into RAM
std::string lineBuf;
for (;;) {
size_t eofsHit = 0;
for(size_t i = 0; i < numStreams; ++i) {
bool gotLine = io::getline(*files_[i], lineBuf).good();
if (gotLine)
corpus[i].push_back(lineBuf);
else
eofsHit++;
}
if (eofsHit == numStreams)
break;
ABORT_IF(eofsHit != 0, "Not all input files have the same number of lines");
}
files_.clear();
numSentences = corpus[0].size();
LOG(info, "[data] Done reading {} sentences", utils::withCommas(numSentences));
}
// randomize sequence ids, and remember them
ids_.resize(numSentences);
std::iota(ids_.begin(), ids_.end(), 0);
std::shuffle(ids_.begin(), ids_.end(), eng_);
if (shuffleInRAM_) {
// when shuffling in RAM, we keep no files_, instead but the data itself
corpusInRAM_ = std::move(corpus);
LOG(info, "[data] Done shuffling {} sentences (cached in RAM)", utils::withCommas(numSentences));
}
else {
// create temp files that contain the data in randomized order
tempFiles_.resize(numStreams);
for(size_t i = 0; i < numStreams; ++i) {
tempFiles_[i].reset(new io::TemporaryFile(options_->get<std::string>("tempdir")));
io::TemporaryFile &out = *tempFiles_[i];
const auto& corpusStream = corpus[i];
for(auto id : ids_) {
out << corpusStream[id] << std::endl;
}
}
// replace files_[] by the tempfiles we just created
files_.resize(numStreams);
for(size_t i = 0; i < numStreams; ++i) {
auto inputStream = tempFiles_[i]->getInputStream();
inputStream->setbufsize(10000000);
files_[i] = std::move(inputStream);
}
LOG(info, "[data] Done shuffling {} sentences to temp files", utils::withCommas(numSentences));
}
pos_ = 0;
}
CorpusBase::batch_ptr Corpus::toBatch(const std::vector<Sample>& batchVector) {
size_t batchSize = batchVector.size();
std::vector<size_t> sentenceIds;
std::vector<int> maxDims; // @TODO: What's this? widths? maxLengths?
for(auto& ex : batchVector) { // @TODO: rename 'ex' to 'sample' or 'sentenceTuple'
if(maxDims.size() < ex.size())
maxDims.resize(ex.size(), 0);
for(size_t i = 0; i < ex.size(); ++i) {
if(ex[i].size() > (size_t)maxDims[i])
maxDims[i] = (int)ex[i].size();
}
sentenceIds.push_back(ex.getId());
}
std::vector<Ptr<SubBatch>> subBatches;
for(size_t j = 0; j < maxDims.size(); ++j) {
subBatches.emplace_back(New<SubBatch>(batchSize, maxDims[j], vocabs_[j]));
}
std::vector<size_t> words(maxDims.size(), 0);
for(size_t b = 0; b < batchSize; ++b) { // loop over batch entries
for(size_t j = 0; j < maxDims.size(); ++j) { // loop over streams
auto subBatch = subBatches[j];
for(size_t s = 0; s < batchVector[b][j].size(); ++s) { // loop over word positions
subBatch->data()[subBatch->locate(/*batchIdx=*/b, /*wordPos=*/s)/*s * batchSize + b*/] = batchVector[b][j][s];
subBatch->mask()[subBatch->locate(/*batchIdx=*/b, /*wordPos=*/s)/*s * batchSize + b*/] = 1.f;
words[j]++;
}
}
}
for(size_t j = 0; j < maxDims.size(); ++j)
subBatches[j]->setWords(words[j]);
auto batch = batch_ptr(new batch_type(subBatches));
batch->setSentenceIds(sentenceIds);
// Add prepared word alignments and weights if they are available
if(alignFileIdx_ > -1 && options_->get("guided-alignment", std::string("none")) != "none")
addAlignmentsToBatch(batch, batchVector);
if(weightFileIdx_ > -1 && options_->hasAndNotEmpty("data-weighting"))
addWeightsToBatch(batch, batchVector);
return batch;
}
} // namespace data
} // namespace marian