.. _program_listing_file_src_training_graph_group.cpp: Program Listing for File graph_group.cpp ======================================== |exhale_lsh| :ref:`Return to documentation for file ` (``src/training/graph_group.cpp``) .. |exhale_lsh| unicode:: U+021B0 .. UPWARDS ARROW WITH TIP LEFTWARDS .. code-block:: cpp #include "training/graph_group.h" namespace marian { GraphGroup::GraphGroup(Ptr options, Ptr mpi) : options_(options), mpi_(mpi), devices_(Config::getDevices(options, mpi->myMPIRank(), mpi->numMPIProcesses())), shardingMode_(getShardingMode(options_, mpi)), mbRoundUp_(options_->get("mini-batch-round-up", true)) { if(options_->hasAndNotEmpty("cost-scaling")) { auto vcs = options_->get>("cost-scaling"); costScaling_ = true; costScalingFactor_ = std::stof( vcs[0]); if(vcs.size() > 1) costScalingFreq_ = std::stoul(vcs[1]); if(vcs.size() > 2) costScalingMultiplier_ = std::stof( vcs[2]); if(vcs.size() > 3) costScalingFactorMinimum_ = std::stof( vcs[3]); LOG_ONCE(info, "Training with cost scaling - factor: {}, frequency: {}, multiplier: {}, minimum: {}", costScalingFactor_, costScalingFreq_, costScalingMultiplier_, costScalingFactorMinimum_); } if(options_->hasAndNotEmpty("dynamic-gradient-scaling")) { auto vgc = options_->get>("dynamic-gradient-scaling"); dynamicGradientScaling_ = true; if(vgc.size() > 0) dynamicGradientScalingFactor_ = std::stof(vgc[0]); if(vgc.size() > 1) dynamicGradientScalingUseLogs_ = vgc[1] == "log"; if(vgc.size() > 2) dynamicGradientScalingFadeout_ = std::stoul(vgc[2]); LOG_ONCE(info, "Re-scaling gradient to have average gradient norm if (log={}) gradient norm diverges from average by {} sigmas", dynamicGradientScalingUseLogs_, dynamicGradientScalingFactor_); if(dynamicGradientScalingFadeout_ > 0) LOG_ONCE(info, "Dynamic gradient re-scaling will fade out linearly after {} updates", dynamicGradientScalingFadeout_); } if(options_->get("check-gradient-nan")) { checkGradientNan_ = true; LOG_ONCE(info, "Checking gradient for NaN"); } initGraphsAndOpts(); // Note: We may well end up with only one MPI process or only one graph per worker. // This part of the code will not special-case any of this here. // Rather, it is assumed that the communicator knows to reduce unnecessary transfers to no-ops. // @TODO: createCommunicator(options, ...) comm_ = createCommunicator(graphs_, /*noNccl=*/options_->get("no-nccl", false), shardingMode_, /*mpi=*/mpi_); auto formattedDeviceType = utils::utf8ToUpper(devices_.front().typeAsString()) + "s"; if (mpi_->numMPIProcesses() > 1) LOG(info, "[training] Using {} {}, distributed over {} MPI processes", mpi_->numMPIProcesses() * devices_.size(), formattedDeviceType, mpi_->numMPIProcesses()); else LOG(info, "[training] Using {} {}", devices_.size(), formattedDeviceType); } void GraphGroup::initGraphsAndOpts() { for(auto device : devices_) { auto graph = New(); // @TODO: validate precisions in config auto precisions = options_->get>("precision"); Type parameterType = typeFromString(precisions[0]); graph->setDefaultElementType(parameterType); graph->setCheckpointing(options_->get("gradient-checkpointing")); if(options_->get("check-nan")) // @TODO: add to other places graph->setThrowNaN(true); graph->setDevice(device); graph->reserveWorkspaceMB(options_->get("workspace")); graphs_.push_back(graph); optimizerShards_.push_back(Optimizer(options_)); models_.push_back(models::createCriterionFunctionFromOptions(options_, models::usage::training)); } } // increase cost-scaling factor if no NaN has been detected for a // given number of iterations. Usually we increase by 2 which adds // one more bit for precision. void GraphGroup::increaseCostScaleFactor() { if(!costScaling_) return; noNanSeen_++; size_t total = nanSeen_ + noNanSeen_; if(noNanSeen_ % costScalingFreq_ == 0) { costScalingFactor_ *= costScalingMultiplier_; if(isMainProcess()) LOG(debug, "No NaN/Inf after {} gradient updates. Increasing cost-scaling factor to {}", total, costScalingFactor_); // Resetting counts after cost-scale change noNanSeen_ = 0; nanSeen_ = 0; } } // call when a NaN was seen to decrease cost-scaling factor void GraphGroup::decreaseCostScaleFactor() { if(!costScaling_) return; nanSeen_++; size_t total = nanSeen_ + noNanSeen_; // do not reduce cost-scaling factor below minimum if(costScalingFactor_ > costScalingFactorMinimum_) costScalingFactor_ /= costScalingMultiplier_; if(isMainProcess()) { if(costScalingFactor_ > costScalingFactorMinimum_) LOG(debug, "Seen NaN/Inf after {} gradient updates. Reduced cost-scaling factor to {}", total, costScalingFactor_); else LOG(debug, "Seen NaN/Inf after {} gradient updates, Reduced cost-scaling factor to minimum {}. Pruning NaNs now.", total, costScalingFactor_); } // Resetting counts after cost-scale change noNanSeen_ = 0; nanSeen_ = 0; } float GraphGroup::checkNanOrNorm(size_t i, size_t begin, size_t end) { auto curGrad = graphs_[i]->params()->grads()->subtensor(begin, end-begin); // If costScaling_ then check for NaN values if the costScalingFactor_ is larger than // the minimum. If a NaN value is seen we exit here and will reduce the factor next and // this skips an update. // If costScalingFactor_ is already at the minimum, prune the NaN values away. This replaces // NaNs with 0. Updates are not skipped any more. // Regardless of NaNs, we clip +/-inf to the largest corresponding values for the gradient value type. // This changes the gradient but seems to be quite stable. In effect, for fp16 this is equivalent // to gradient clipping at (65504.f / costScalingFactor_) which in most cases is still large. if(costScaling_ || checkGradientNan_) { bool pruneNaN = !checkGradientNan_ && costScalingFactor_ == costScalingFactorMinimum_; bool clipInf = !checkGradientNan_; bool saneGradient = SanitizeGradient(curGrad, graphs_[i]->allocator(), pruneNaN, clipInf); // This should never happen, if it does, something is wrong with the kernel above and needs to be fixed. ABORT_IF(pruneNaN && clipInf && !saneGradient, "We are removing NaNs and clipping Infs, but gradient is still not sane??"); if(!saneGradient) { LOG(debug, "Found NaN"); return std::numeric_limits::quiet_NaN(); } } // The optional clipping above will affect the norm here. The norm can be non-finite despite the above // gradient sanitization, hence check again and propagate a NaN. if(dynamicGradientScaling_) { auto gNorm = L2Norm(curGrad, graphs_[i]->allocator()); if(isFinite(gNorm) && gNorm > 0.0) return gNorm; else return std::numeric_limits::quiet_NaN(); } return 0.f; }; float GraphGroup::executeAndCollectNorm(const std::function& task) { auto gradNorm = comm_->foreach(task, accNanOrNorm, 0.f); if(mpi_) { // accumulate gradientNorm from subprocesses auto gradNormSquared = gradNorm * gradNorm; // undo sqrt mpi_->allReduce(&gradNormSquared, &gradNormSquared, 1, MPI_FLOAT, MPI_SUM); // sum all if(shardingMode_ == ShardingMode::local) // we already have the correct norm on one device, but we also need to check for NaN gradNormSquared /= (float)mpi_->numMPIProcesses(); gradNorm = std::sqrt(gradNormSquared); // redo sqrt } return gradNorm; } float GraphGroup::computeNormalizationFactor(float gNorm, size_t updateTrgWords) { float normalizationFactor = 1.f; if(costScaling_) normalizationFactor *= costScalingFactor_; if(options_->get("normalize-gradient")) normalizationFactor *= updateTrgWords; if(!isFinite(gNorm)) // we are checking the sanity of the gradient elsewhere return normalizationFactor; if(dynamicGradientScaling_) { // make gradient norm invariant to changes in costScalingFactor_, luckily norm(c * g) = c * norm(g) if(costScaling_) gNorm = gNorm / costScalingFactor_; // Normalize gradient norm w.r.t. number of labels in batch for statistics, // there should be no gradient normalization before this point, @TODO: check this gNorm = gNorm / updateTrgWords; size_t window; float gNormAvgTransform, gNormVarTransform, gNormTransform, gNormAvg; if(dynamicGradientScalingUseLogs_) { // tracking the log of the gradient norms rather than the gradient norms itself results in a larger standard deviation as the actual // gradient norms go towards 0. From observation, the STD (of log norms) tends to become near constant after some time while the averages keep decreasing. std::tie(window, gNormAvgTransform, gNormVarTransform) = scheduler_->getLogGradientNormStats(); gNormTransform = std::log(gNorm); // we are using the average of log norms, so we need to compute the log gNormAvg = std::exp(gNormAvgTransform); // for rescaling we need to undo the log, we assume avg(log(norm)) is roughly log(avg(norm)) } else { std::tie(window, gNormAvgTransform, gNormVarTransform) = scheduler_->getGradientNormStats(); gNormTransform = gNorm; // we are not using logs, so we can just use the normal gradient norm gNormAvg = gNormAvgTransform; // we are getting the actual running average of gradient norms, no transformation needed. } auto deltaTransform = gNormTransform - gNormAvgTransform; // compute the difference between the current transformer gradient norm and the running average. auto gNormStdTransform = std::sqrt(gNormVarTransform); // compute STD for the running average of (log) gradient norms. float fadeoutMultiplier = 1.f; if(dynamicGradientScalingFadeout_ > 0ul) // fade out linearly after that many updates @TODO: allow units other than updates fadeoutMultiplier = (float)std::max(dynamicGradientScalingFadeout_, scheduler_->numberOfBatches()) / (float)dynamicGradientScalingFadeout_; float dynamicGradientScalingFactorWithFadeout = dynamicGradientScalingFactor_ * fadeoutMultiplier; // if fadeoutMultiplier increases dynamic gradient scaling becomes less and less likely to happen over time. // delta of (log) gradient norm vs (log) gradient norm average is larger than N standard deviations // hence rescale gradient using the average. if(scheduler_->numberOfBatches() >= window && deltaTransform > dynamicGradientScalingFactorWithFadeout * gNormStdTransform) { if(isMainProcess()) LOG(debug, "log gradient norms: {} :: {:.4f} - {:.4f} = {:.4f} > {:.4f} * {:.4f} - scaling gradient by {:.4f}", dynamicGradientScalingUseLogs_, gNormTransform, gNormAvgTransform, deltaTransform, dynamicGradientScalingFactorWithFadeout, gNormStdTransform, gNormAvg / gNorm); normalizationFactor *= gNorm / gNormAvg; // since we later do gradient / normalizationFactor this divides by norm and multiplies by the average, rescaling to the average. } } return normalizationFactor; }; void GraphGroup::load() { validate(); auto scatterFn = [&](const io::Item& optimizerState, const OptimizerBase::ScatterStateSetFunc& setShardFn) { comm_->scatterState(optimizerState, setShardFn); }; load(scatterFn); } void GraphGroup::save(bool isFinal) /*override*/ { auto gatherOpt = [&](const OptimizerBase::GatherStateGetFunc& getShardFn) { return comm_->gatherState(getShardFn); }; save(isFinal, gatherOpt); } void GraphGroup::load(const OptimizerBase::ScatterStateFunc& scatterFn) { /* if not no-reload (=> i.e. do reload): restore scheduler if checkpoint is available or not no-reload-checkpoint: reload from checkpoint else if model is available: reload from model, but warn that no checkpoint was used and the model could be smoothed else if pretrained-model path given: initialize matching weights from pretrained model else: (implicitly) don't do anything => initialize randomly later */ if(!options_->get("no-reload")) { std::string modelFileName = options_->get("model"); if(filesystem::exists(modelFileName)) { if(scheduler_) scheduler_->load(modelFileName); // we just load it N times from disk (it'll be in disk cache after the first) // this also allocates memory correctly when calling forward() inside restoreFromCheckPoint size_t i = 0; for(auto graph : graphs_) models_[i++]->load(graph, modelFileName); // try to restore everything from checkpoint now restoreFromCheckpoint(modelFileName, scatterFn); } else if(options_->hasAndNotEmpty("pretrained-model")) { std::string nameInit = options_->get("pretrained-model"); LOG(info, "[training] Initializing model weights with pre-trained model {}", nameInit); size_t i = 0; for(auto graph : graphs_) models_[i++]->load(graph, nameInit, false); } } } bool GraphGroup::restoreFromCheckpoint(const std::string& modelFileName, const OptimizerBase::ScatterStateFunc& scatterFn) { /* if model checkpoint is available: - load model from checkpoint, not from model.npz - abort if checkpoint model and graph size do not match, probably due to different model or precision */ std::string checkpointName = modelFileName + ".optimizer.npz"; // @TODO: change to .checkpoint.npz, would break backwards compat if(!filesystem::exists(checkpointName)) { LOG(warn, "No checkpoint found, parameters reloaded from last inference model"); return false; // failed to restore } auto items = io::loadItems(checkpointName); // make sure all nodes see the same checkpoint data, may not be the case with distributed file systems // when there was a delay in updating the caches accross nodes. So here node 0 sends its data to all. // We still load them all from disk, but that serves more as a trick to allocate the correct memory. if(mpi_) for(auto& item : items) mpi_->bCast(item); // @TODO: probably we want to have the list of DeviceIds as an attribute std::vector> backends; for(auto graph : graphs_) backends.push_back(graph->getBackend()); optimizerShards_[0]->load(items, optimizerShards_, backends, scatterFn, isMainProcess()); // restore the graph parameters from the checkpoint master copy. auto found = std::find_if(items.begin(), items.end(), [](const io::Item& item) { return item.name == "master_parameters"; }); if(found == items.end()) { LOG(warn, "No master parameters found in checkpoint, parameters reloaded from last inference model"); return false; // failed to restore } auto& masterParameters = *found; for(auto graph : graphs_) { graph->forward(); // allocate graph parameter memory and initialize parameters from inference model. This needs to // run a full forward pass over the paramters to allocate the parameters values in order (by parameter name). // Just doing graph->params()->allocateForward() is not sufficient. ABORT_IF(graph->params()->vals()->shape() != masterParameters.shape, "Graph parameter sizes and master copy parameter sizes in checkpoint do not match"); // Convert type of io::Item to match graph parameter type. if(masterParameters.type != graph->params()->vals()->type()) masterParameters.convert(graph->params()->vals()->type()); graph->params()->vals()->set(masterParameters); graph->clear(); } LOG(info, "[training] Master parameters and optimizers restored from training checkpoint {} and {}", modelFileName, checkpointName); return true; // succeeded to restore } void GraphGroup::saveCheckpoint(const std::string& modelFileName, const OptimizerBase::GatherStateFunc& gatherFn) { // @TODO: change to .checkpoint.npz, would break backwards compat std::string checkpointName = modelFileName + ".optimizer.npz"; std::vector items; optimizerShards_[0]->save(items, optimizerShards_, gatherFn, isMainProcess()); if(isMainProcess()) { // only main process does the actual saving auto found = std::find_if(items.begin(), items.end(), [](const io::Item& item) { return item.name == "master_parameters"; }); if(found == items.end()) { // if the optimizer does not provide a master parameters copy (the default when training with full precision) // then dump the parameters of graphs_[0] into the checkpoint. This should be called when the original parameters // are in the graph, not the smoothed version. Here we are getting called after a double swap, so that should be // the case. io::Item masterParameters; graphs_[0]->params()->vals()->get(masterParameters, "master_parameters"); items.push_back(masterParameters); } LOG(info, "[training] Saving training checkpoint to {} and {}", modelFileName, checkpointName); io::saveItems(checkpointName, items); } } void GraphGroup::save(bool isFinal, const OptimizerBase::GatherStateFunc& gatherOptimizerStateFn) { barrier(); // (for better grouping of log messages) // bring the smoothed model in // Note that it is sharded. For multi-node, it is sharded over multiple machines, so this is a network access. // Also note that the swap must run on all MPI processes concurrently, although only one actually validates. swapWithSmoothed(); if(isFinal && scheduler_) scheduler_->validate(graphs_, isFinal); barrier(); // (for better grouping of log messages) std::string modelFileName = options_->get("model"); if(isMainProcess()) { // save main model file if(options_->get("overwrite")) { models_[0]->save(graphs_[0], modelFileName, /*saveTranslatorConfig=*/true); // save scheduler-related state if(scheduler_) scheduler_->save(modelFileName); } else { if(!isFinal) { // save a model with iteration number std::string numberOfBatches = scheduler_ ? std::to_string(scheduler_->numberOfBatches()) : "unknown"; std::string nameOverwrite = modelFileName; nameOverwrite.replace(modelFileName.size() - 4, 4, ".iter" + numberOfBatches + ".npz"); models_[0]->save(graphs_[0], nameOverwrite); } models_[0]->save(graphs_[0], modelFileName, /*saveTranslatorConfig=*/true); // save scheduler-related state if(scheduler_) scheduler_->save(modelFileName); } } swapWithSmoothed(); saveCheckpoint(modelFileName, gatherOptimizerStateFn); barrier(); // (for better grouping of log messages) } void GraphGroup::swapWithSmoothed() { auto swap = [&](size_t i, size_t begin, size_t end) { auto curParam = graphs_[i]->params()->vals()->subtensor(begin, end-begin); optimizerShards_[i]->swapWithSmoothed(curParam); return true; // dummy success }; comm_->foreach(swap); comm_->allGatherParams(); if(shardingMode_ == ShardingMode::local) comm_->broadcastParams(); barrier(); } void GraphGroup::validate() { //@TODO: rename this function to something less confusing. ABORT_IF(finalized_, "Training has already finished."); } void GraphGroup::finalize() { finalized_ = true; } // @TODO: Can this be made const? It seems wrong to have a stateful method that still returns a result. Ptr GraphGroup::collectStats(Ptr graph, Ptr model, const std::vector>& vocabs, double multiplier) { // this runs with fake values, we do not care for overflow/underflow bool throwNan = graph->getThrowNaN(); graph->setThrowNaN(false); auto stats = New(); size_t numFiles = numberOfInputFiles(); // Initialize first batch to step size size_t first = options_->get("mini-batch-fit-step"); // Increase batch size and sentence length by this step size size_t step = options_->get("mini-batch-fit-step"); size_t maxLength = options_->get("max-length"); maxLength = (size_t)(std::ceil(maxLength / (float)step) * step); // this should be only one class label per line on input, hence restricting length to 1 std::vector localMaxes(numFiles, maxLength); auto inputTypes = options_->get>("input-types", {}); for(int i = 0; i < inputTypes.size(); ++i) if(inputTypes[i] == "class") localMaxes[i] = 1; size_t maxBatch = 512; bool fits = true; while(fits) { std::vector lengths(numFiles, first); for(int j = 0; j < lengths.size(); ++j) // apply length restrictions lengths[j] = std::min(lengths[j], localMaxes[j]); auto batch = data::CorpusBatch::fakeBatch(lengths, vocabs, maxBatch, options_); auto loss = model->build(graph, batch); fits = graph->fits(); if(fits) maxBatch *= 2; } // Do a binary search for maxmimum batch size that fits into given workspace memory // for a tested sentence length. for(size_t i = step; i <= maxLength; i += step) { size_t start = 1; size_t end = maxBatch; std::vector lengths(numFiles, i); for(int j = 0; j < lengths.size(); ++j) // apply length restrictions lengths[j] = std::min(lengths[j], localMaxes[j]); fits = true; do { size_t current = (start + end) / 2; auto batch = data::CorpusBatch::fakeBatch(lengths, vocabs, current, options_); auto loss = model->build(graph, batch); fits = graph->fits(); LOG(debug, "[batching] length: {} - size: {} - fits: {}", lengths[0], current, fits); if(fits) { stats->add(batch, multiplier); start = current + 1; } else { end = current - 1; } } while(end >= start); maxBatch = start; } // set back to original value for aborting on NaN or Inf graph->setThrowNaN(throwNan); return stats; } void GraphGroup::setTypicalTrgBatchWords(size_t typicalTrgBatchWords) { // needed for dynamic MB scaling typicalTrgBatchWords_ = (double)typicalTrgBatchWords; } double GraphGroup::getTypicalTrgBatchWords() { return typicalTrgBatchWords_; } void GraphGroup::updateAverageTrgBatchWords(size_t trgBatchWords) { typicalTrgBatchWords_ = 0.99 * typicalTrgBatchWords_ + 0.01 * (double)trgBatchWords; // record a running average of the batch size, factors are chosen empirically. } size_t GraphGroup::numberOfInputFiles() { if(options_->get("tsv", false)) { size_t n = options_->get("tsv-fields"); if(n > 0 && options_->get("guided-alignment", std::string("none")) != "none") --n; if(n > 0 && options_->hasAndNotEmpty("data-weighting")) --n; return n; } return options_->get>("train-sets").size(); } } // namespace marian