.. _program_listing_file_src_training_communicator_nccl.h: Program Listing for File communicator_nccl.h ============================================ |exhale_lsh| :ref:`Return to documentation for file ` (``src/training/communicator_nccl.h``) .. |exhale_lsh| unicode:: U+021B0 .. UPWARDS ARROW WITH TIP LEFTWARDS .. code-block:: cpp // Note: This must only be included if defined(CUDA_FOUND) && defined(USE_NCCL) #include "training/communicator.h" #include "3rd_party/threadpool.h" #include "tensors/tensor_operators.h" #include "tensors/gpu/cuda_helpers.h" #include "common/timer.h" // Generated by NCCL make files in build/nccl/include; // include dir has been set in CMake files. NCCL add version number etc. #ifdef _MSC_VER #pragma warning(push) #pragma warning(disable:4505) // "unreferenced local function has been removed" in cuda_fp16.hpp #endif #include "nccl.h" #include #if (NCCL_MAJOR<3 || NCCL_MINOR<2) #define ncclGetVersion(pv) (*(pv) = (NCCL_MAJOR * 1000 + NCCL_MINOR * 100 + NCCL_PATCH)) #endif #include // HACK #include #include pid_t gettid(void){ return syscall(SYS_gettid); } namespace marian { class NCCLCommunicator : public ICommunicator { private: ShardingMode shardingMode_{ShardingMode::global}; std::vector globalComms_; // [device index] std::vector localComms_; // [device index] std::vector streams_; // [device index] std::vector devices_; // [device index] Ptr mpi_; // (may be null) mutable ThreadPool threadPool_; void groupStart() const { NCCL_CHECK(ncclGroupStart()); } // helpers to make sure we check the error void groupEnd() const { NCCL_CHECK(ncclGroupEnd()); } void synchronizeAll() const { for(int i = 0; i < graphs_.size(); ++i) { CUDA_CHECK(cudaSetDevice(devices_[i])); CUDA_CHECK(cudaStreamSynchronize(streams_[i])); // @TODO: why do we sync the CPU, and not the GPU? // - cudaEventRecord() an event on the nccl stream // - submit a cudaStreamWaitEvent() into our compute stream (=NULL stream) // cf. https://github.com/pytorch/pytorch/blob/master/torch/lib/c10d/ProcessGroupNCCL.cpp } } void synchronizeAllOnNullStream() const { for (int i = 0; i < graphs_.size(); ++i) { auto backend = graphs_[i]->getBackend(); backend->setDevice(); backend->synchronize(); // note: synchronize() does not set the device by itself } } std::string mpiIdStr() const { // (for logging) return mpi_ ? mpi_->idStr() : ""; } size_t numLocalRanks() const { return devices_.size(); } size_t myLocalRank(size_t localDeviceIndex) const { // map local device index to a global rank return localDeviceIndex; // do nothing } size_t numNcclRanks() const { // total number of devices across all MPI processes if (mpi_) return mpi_->numMPIProcesses() * numLocalRanks(); else return numLocalRanks(); } size_t myNcclRank(size_t localDeviceIndex) const { // map local device index to a global rank if (mpi_) return mpi_->myMPIRank() * numLocalRanks() + myLocalRank(localDeviceIndex); else return myLocalRank(localDeviceIndex); } size_t dataSize() const { // total number of floats that comprise the concatenated parameter and gradient vector return graphs_[0]->params()->vals()->size(); } // determine the (max) shard size // All shards except the last one have this size. // Presently, even all shards must have identical size, due to a limitation in NCCL we have not yet worked around. size_t shardSize() const { size_t numShards = shardingMode_ == ShardingMode::global ? numNcclRanks() : numLocalRanks(); size_t size = (dataSize() + numShards - 1) / numShards; #if 1 // for now, all shards must have the same size, since NCCL does not allow a sub-slice for the last shard ABORT_IF(size * numShards != dataSize(), "presently, all shards must have the same size"); #endif return size; } // determine the index range (begin, end) of a shard std::pair ncclRankShardRange(size_t rank) const { size_t size = shardSize(); size_t begin = rank * size; size_t end = begin + size; end = std::min(end, dataSize()); // clip last shard. Note: Presently this never happens, since shardSize() enforces uniform shard size. return{ begin, end }; } // determine the index range (begin, end) of a shard std::pair localShardRange(size_t localDeviceIndex) const { return ncclRankShardRange(shardingMode_ == ShardingMode::global ? myNcclRank(localDeviceIndex) : myLocalRank(localDeviceIndex)); } static std::string ncclVersionString() { int ncclVersion = 0; ncclGetVersion(&ncclVersion); return std::to_string(ncclVersion/1000) + "." + std::to_string((ncclVersion/100)%10) + "." + std::to_string(ncclVersion%100); } void mpiBarrier() const { if (mpi_) mpi_->barrier(); } public: // a NCCLCommunicator is bound to a set of graphs, one per GPU device // If MPI is used, then each MPI process has an instance of this class for its specific // set of GPU devices, which are communicating with each other. The total number of GPUs // involved in the NCCL communication setup is (#MPI processes) x (#GPUs per process). NCCLCommunicator(const std::vector>& graphs, ShardingMode shardingMode, Ptr mpi) : ICommunicator(graphs), shardingMode_(shardingMode), globalComms_(graphs.size()), localComms_(graphs.size()), streams_(graphs.size()), devices_(graphs.size()), mpi_(mpi), threadPool_(graphs.size(), graphs.size()) { mpiBarrier(); // barrier to group the multiple log messages from MPI processes LOG(info, "[comm] Using NCCL {} {}for GPU communication", ncclVersionString(), (mpi_ && mpi_->numMPIProcesses() > 1) ? "and MPI " : ""); mpiBarrier(); // (synchronize the log messages) // set up our local devices for(int i = 0; i < graphs_.size(); ++i) { auto device = graphs_[i]->getBackend()->getDeviceId(); ABORT_IF(device.type != DeviceType::gpu, "NCCL communicator can only be used with GPUs"); devices_[i] = device.no; CUDA_CHECK(cudaSetDevice(devices_[i])); CUDA_CHECK(cudaStreamCreate(&streams_[i])); } // set up NCCL // Since we want to use MPI, we cannot use NCCL's handy convenience function. Instead, we must go the laborious route. // cf. https://docs.nvidia.com/deeplearning/sdk/nccl-developer-guide/index.html#multidevprothrd // generate NCCL unique ID at one process and broadcast to all if(!mpi_) // without MPI local and global is the same, so only handle global shardingMode_ = ShardingMode::global; mpiBarrier(); LOG(info, "[comm] Using {} sharding", shardingMode_ == ShardingMode::global ? "global" : "local"); mpiBarrier(); // Creating unique ids for NCCL as well as communicators, if global, we only need one unique id and broadcast it to all processes if(shardingMode_ == ShardingMode::global) { ncclUniqueId uniqueId = { 0 }; if (!mpi_ || mpi_->myMPIRank() == 0) { NCCL_CHECK(ncclGetUniqueId(&uniqueId)); } if(mpi_) { static_assert(sizeof(uniqueId) == NCCL_UNIQUE_ID_BYTES, "wrong NCCL_UNIQUE_ID_BYTES??"); // (this value is used in NVidia examples) mpi_->bCast(&uniqueId, sizeof(uniqueId), MPI_BYTE, 0); } groupStart(); for(int localDeviceIndex = 0; localDeviceIndex < numLocalRanks(); localDeviceIndex++) { CUDA_CHECK(cudaSetDevice(devices_[localDeviceIndex])); NCCL_CHECK(ncclCommInitRank(&globalComms_[localDeviceIndex], numNcclRanks(), uniqueId, myNcclRank(localDeviceIndex))); } groupEnd(); } else { ABORT_IF(shardingMode_ == ShardingMode::local && !mpi_, "Local/global sharding only implemented for MPI, global is same as local with no MPI"); std::vector globalUniqueIds(numLocalRanks(), {0}); // one per local device, binds numProcesses shards at the same location in the processes together std::vector localUniqueIds(mpi_->numMPIProcesses(), {0}); // one per process, binds all shards within one process, stays local to process if(mpi->myMPIRank() == 0) { for(auto& id : globalUniqueIds) NCCL_CHECK(ncclGetUniqueId(&id)); for(auto& id : localUniqueIds) NCCL_CHECK(ncclGetUniqueId(&id)); } for(auto& id : globalUniqueIds) mpi_->bCast(&id, sizeof(id), MPI_BYTE, 0); for(auto& id : localUniqueIds) mpi_->bCast(&id, sizeof(id), MPI_BYTE, 0); groupStart(); for(int localDeviceIndex = 0; localDeviceIndex < numLocalRanks(); localDeviceIndex++) { CUDA_CHECK(cudaSetDevice(devices_[localDeviceIndex])); NCCL_CHECK(ncclCommInitRank(&globalComms_[localDeviceIndex], mpi_->numMPIProcesses(), globalUniqueIds[localDeviceIndex], mpi_->myMPIRank())); NCCL_CHECK(ncclCommInitRank( &localComms_[localDeviceIndex], numLocalRanks(), localUniqueIds[mpi_->myMPIRank()], localDeviceIndex)); } groupEnd(); } mpiBarrier(); // (synchronize the log messages) LOG(info, "[comm] NCCLCommunicators constructed successfully"); mpiBarrier(); // (synchronize the log messages) } ~NCCLCommunicator() override { for(int i = 0; i < devices_.size(); ++i) { cudaSetDevice(devices_[i]); cudaStreamDestroy(streams_[i]); ncclCommDestroy(globalComms_[i]); if(shardingMode_ == ShardingMode::local) ncclCommDestroy(localComms_[i]); } } template Ret foreachAcc(const ForeachFunc& func, const AccFunc& acc, Ret init, bool parallel = true) const { parallel &= graphs_.size() > 1; Ret retValue = init; std::vector> threadResults(graphs_.size()); // [device index] for(size_t i = 0; i < graphs_.size(); ++i) { size_t begin, end; std::tie (begin, end) = localShardRange(i); if (parallel) threadResults[i] = threadPool_.enqueue(func, i, begin, end); else acc(retValue, func(i, begin, end)); } if(parallel) for(auto& task : threadResults) acc(retValue, task.get()); return retValue; } float foreach(const ForeachFunc& func, AccFunc acc, float init, bool parallel = true) const override { return foreachAcc(func, acc, init, parallel); } bool foreach(const ForeachFunc& func, bool parallel = true) const override { AccFunc allTrue = [](bool& x, bool y) { x = x && y; }; return foreachAcc(func, allTrue, true, parallel); } void scatterReduceAndResetGrads() const override { synchronizeAllOnNullStream(); groupStart(); for(int i = 0; i < graphs_.size(); ++i) { size_t begin, end; std::tie (begin, end) = localShardRange(i); auto grads = graphs_[i]->params()->grads(); const auto* sendbuf = grads->data(); auto* recvbuf = grads->subtensor(begin, end-begin)->data(); size_t bufsize = shardSize(); ABORT_IF(grads->subtensor(begin, end-begin)->size() != bufsize, "unexpected subtensor size??"); ncclDataType_t ncclFloatType = ncclFloat32; if(grads->type() == Type::float16) ncclFloatType = ncclFloat16; if(shardingMode_ == ShardingMode::global) { NCCL_CHECK(ncclAllReduce(grads->data(), grads->data(), grads->size(), ncclFloatType, ncclSum, globalComms_[i], streams_[i])); // apparently this is somehow faster?? // NCCL_CHECK(ncclReduceScatter(sendbuf, recvbuf, bufsize, ncclFloatType, ncclSum, globalComms_[i], streams_[i])); } else { NCCL_CHECK(ncclReduceScatter(sendbuf, recvbuf, bufsize, ncclFloatType, ncclSum, localComms_[i], streams_[i])); // reduceScatter locally NCCL_CHECK( ncclAllReduce(recvbuf, recvbuf, bufsize, ncclFloatType, ncclSum, globalComms_[i], streams_[i])); // then do tuple-wise allReduce across processes } } groupEnd(); synchronizeAll(); // reset gradients outside the shards we reduce in // In the future, we can keep quantization residuals here straight in the grads themselves. // @TODO: all the different places where gradients get reset are confusing auto resetGrads = [&](size_t i, size_t begin, size_t end) { auto grads = graphs_[i]->params()->grads(); auto size = grads->size(); // reset everything outside the shard that we reduce in if (begin > 0) grads->subtensor(0, begin)->set(0.f); if (end < size) grads->subtensor(end, size - end)->set(0.f); return true; // dummy success }; foreach(resetGrads); } // This distributes all 64 model shards to all 64 GPUs. // @TODO: For unknown reasons, this takes longer than any other operation incl. scatterReduceAndResetGrads(). // But both should have the same number of data transfers of the same size. void allGatherParams() const override { synchronizeAllOnNullStream(); groupStart(); for(int i = 0; i < graphs_.size(); ++i) { size_t begin, end; std::tie (begin, end) = localShardRange(i); auto vals = graphs_[i]->params()->vals(); const auto* sendbuf = vals->subtensor(begin, end-begin)->data(); void* recvbuf = vals->data(); size_t bufsize = shardSize(); ncclDataType_t ncclFloatType = ncclFloat32; if(vals->type() == Type::float16) ncclFloatType = ncclFloat16; auto& comms = shardingMode_ == ShardingMode::global ? globalComms_ : localComms_; NCCL_CHECK(ncclAllGather(sendbuf, recvbuf, bufsize, ncclFloatType, comms[i], streams_[i])); } groupEnd(); synchronizeAll(); } void broadcastParams(bool average = false) const override { synchronizeAllOnNullStream(); groupStart(); for(int i = 0; i < graphs_.size(); ++i) { auto vals = graphs_[i]->params()->vals(); ncclDataType_t ncclFloatType = ncclFloat32; if(vals->type() == Type::float16) ncclFloatType = ncclFloat16; if(average) NCCL_CHECK(ncclAllReduce(vals->data(), vals->data(), vals->size(), ncclFloatType, ncclSum, globalComms_[i], streams_[i])); else NCCL_CHECK(ncclBroadcast(vals->data(), vals->data(), vals->size(), ncclFloatType, 0, globalComms_[i], streams_[i])); } groupEnd(); synchronizeAll(); if(average) { auto avg = [&](size_t i, size_t /*begin*/, size_t /*end*/) { auto vals = graphs_[i]->params()->vals(); using namespace functional; Element(_1 = _1 / (float)mpi_->numMPIProcesses(), vals); return true; // dummy success }; foreach(avg); } } void broadcastShards(const std::vector>& opts, bool average = false) const override { if(shardingMode_ == ShardingMode::global) return; // nothing to do, shards are indepedent auto floatType = [](Tensor tensor) { ncclDataType_t ncclFloatType = ncclFloat32; if(tensor->type() == Type::float16) ncclFloatType = ncclFloat16; return ncclFloatType; }; // if we are here we use local mode and shards are process-wise copies synchronizeAllOnNullStream(); groupStart(); for(int i = 0; i < opts.size(); ++i) { for(auto shard : opts[i]->getShards()) { if(shard) { if(average) { NCCL_CHECK(ncclAllReduce(shard->data(), shard->data(), shard->size(), floatType(shard), ncclSum, globalComms_[i], streams_[i])); using namespace functional; Element(_1 = _1 / (float)mpi_->numMPIProcesses(), shard); } else { NCCL_CHECK(ncclBroadcast(shard->data(), shard->data(), shard->size(), floatType(shard), 0, globalComms_[i], streams_[i])); } } } } groupEnd(); synchronizeAll(); } // Distribute a single CPU-side io::Item to shards across multiple devices and MPI processes. // This is used when restoring optimizer state, which is sharded. // It is assumed that all MPI processes get the same data() passed. Hence, no MPI transfers are needed here. void scatterState(const io::Item& data, const OptimizerBase::ScatterStateSetFunc& setFn) const override { size_t dataSize = data.size(); size_t numShards = shardingMode_ == ShardingMode::global ? numNcclRanks() : numLocalRanks(); // @TODO: numShards() function size_t shardSize = (dataSize + numShards - 1) / numShards; for(size_t localDeviceIndex = 0; localDeviceIndex < graphs_.size(); localDeviceIndex++) { // We only slice out data that is kept in our MPI process. Remember that all MPI processes receive the same, complete data. auto ncclRank = shardingMode_ == ShardingMode::global ? myNcclRank(localDeviceIndex) : myLocalRank(localDeviceIndex); size_t begin = ncclRank * shardSize; size_t end = std::min(begin + shardSize, dataSize); setFn(localDeviceIndex, data.bytes.data() + begin, data.bytes.data() + end); } } // Collect shards across multiple devices and MPI processes in the NCCL configuration into a single CPU-side io::Item. // This is used when persisting optimizer state which is sharded. io::Item gatherState(const OptimizerBase::GatherStateGetFunc& getFn) const override { // first, concatenate over all local devices io::Item localData = getFn(0); for(size_t localDeviceIndex = 1; localDeviceIndex < graphs_.size(); localDeviceIndex++) { localData.append(getFn(localDeviceIndex)); } // localData now contains a concatentation of all local data // second, concatenate across MPI processes // Note that all local devices occupy consecutive ncclRanks in order. io::Item data; if (mpi_ && shardingMode_ == ShardingMode::global) { io::Item tmp = localData; // temp buffer used multiple times; assign localData for initialization // push one rank's data at a time using broadcast for(size_t mpiRank = 0; mpiRank < mpi_->numMPIProcesses(); mpiRank++) { // broadcast mpiRank's localData to all if(mpiRank == mpi_->myMPIRank()) { tmp = localData; } mpi_->bCast(tmp, /*rootRank=*/mpiRank); // now all ranks have the same slice: concatenate (we will end up with the same on all MPI processes) if(mpiRank == 0) data = tmp; else data.append(tmp); } } else { // no MPI: localData is the complete result already data = std::move(localData); } return data; } }; } // namespace marian