Program Listing for File communicator_nccl.h¶
↰ Return to documentation for file (src/training/communicator_nccl.h
)
// Note: This must only be included if defined(CUDA_FOUND) && defined(USE_NCCL)
#include "training/communicator.h"
#include "3rd_party/threadpool.h"
#include "tensors/tensor_operators.h"
#include "tensors/gpu/cuda_helpers.h"
#include "common/timer.h"
// Generated by NCCL make files in build/nccl/include;
// include dir has been set in CMake files. NCCL add version number etc.
#ifdef _MSC_VER
#pragma warning(push)
#pragma warning(disable:4505) // "unreferenced local function has been removed" in cuda_fp16.hpp
#endif
#include "nccl.h"
#include <cuda_runtime.h>
#if (NCCL_MAJOR<3 || NCCL_MINOR<2)
#define ncclGetVersion(pv) (*(pv) = (NCCL_MAJOR * 1000 + NCCL_MINOR * 100 + NCCL_PATCH))
#endif
#include <signal.h> // HACK
#include <sys/types.h>
#include <sys/syscall.h>
pid_t gettid(void){ return syscall(SYS_gettid); }
namespace marian {
class NCCLCommunicator : public ICommunicator {
private:
ShardingMode shardingMode_{ShardingMode::global};
std::vector<ncclComm_t> globalComms_; // [device index]
std::vector<ncclComm_t> localComms_; // [device index]
std::vector<cudaStream_t> streams_; // [device index]
std::vector<int> devices_; // [device index]
Ptr<IMPIWrapper> mpi_; // (may be null)
mutable ThreadPool threadPool_;
void groupStart() const { NCCL_CHECK(ncclGroupStart()); } // helpers to make sure we check the error
void groupEnd() const { NCCL_CHECK(ncclGroupEnd()); }
void synchronizeAll() const {
for(int i = 0; i < graphs_.size(); ++i) {
CUDA_CHECK(cudaSetDevice(devices_[i]));
CUDA_CHECK(cudaStreamSynchronize(streams_[i]));
// @TODO: why do we sync the CPU, and not the GPU?
// - cudaEventRecord() an event on the nccl stream
// - submit a cudaStreamWaitEvent() into our compute stream (=NULL stream)
// cf. https://github.com/pytorch/pytorch/blob/master/torch/lib/c10d/ProcessGroupNCCL.cpp
}
}
void synchronizeAllOnNullStream() const {
for (int i = 0; i < graphs_.size(); ++i) {
auto backend = graphs_[i]->getBackend();
backend->setDevice();
backend->synchronize(); // note: synchronize() does not set the device by itself
}
}
std::string mpiIdStr() const { // (for logging)
return mpi_ ? mpi_->idStr() : "";
}
size_t numLocalRanks() const {
return devices_.size();
}
size_t myLocalRank(size_t localDeviceIndex) const { // map local device index to a global rank
return localDeviceIndex; // do nothing
}
size_t numNcclRanks() const { // total number of devices across all MPI processes
if (mpi_)
return mpi_->numMPIProcesses() * numLocalRanks();
else
return numLocalRanks();
}
size_t myNcclRank(size_t localDeviceIndex) const { // map local device index to a global rank
if (mpi_)
return mpi_->myMPIRank() * numLocalRanks() + myLocalRank(localDeviceIndex);
else
return myLocalRank(localDeviceIndex);
}
size_t dataSize() const { // total number of floats that comprise the concatenated parameter and gradient vector
return graphs_[0]->params()->vals()->size();
}
// determine the (max) shard size
// All shards except the last one have this size.
// Presently, even all shards must have identical size, due to a limitation in NCCL we have not yet worked around.
size_t shardSize() const {
size_t numShards = shardingMode_ == ShardingMode::global ? numNcclRanks() : numLocalRanks();
size_t size = (dataSize() + numShards - 1) / numShards;
#if 1 // for now, all shards must have the same size, since NCCL does not allow a sub-slice for the last shard
ABORT_IF(size * numShards != dataSize(), "presently, all shards must have the same size");
#endif
return size;
}
// determine the index range (begin, end) of a shard
std::pair<size_t, size_t> ncclRankShardRange(size_t rank) const {
size_t size = shardSize();
size_t begin = rank * size;
size_t end = begin + size;
end = std::min(end, dataSize()); // clip last shard. Note: Presently this never happens, since shardSize() enforces uniform shard size.
return{ begin, end };
}
// determine the index range (begin, end) of a shard
std::pair<size_t, size_t> localShardRange(size_t localDeviceIndex) const {
return ncclRankShardRange(shardingMode_ == ShardingMode::global ? myNcclRank(localDeviceIndex) : myLocalRank(localDeviceIndex));
}
static std::string ncclVersionString() {
int ncclVersion = 0;
ncclGetVersion(&ncclVersion);
return std::to_string(ncclVersion/1000) + "." + std::to_string((ncclVersion/100)%10) + "." + std::to_string(ncclVersion%100);
}
void mpiBarrier() const {
if (mpi_)
mpi_->barrier();
}
public:
// a NCCLCommunicator is bound to a set of graphs, one per GPU device
// If MPI is used, then each MPI process has an instance of this class for its specific
// set of GPU devices, which are communicating with each other. The total number of GPUs
// involved in the NCCL communication setup is (#MPI processes) x (#GPUs per process).
NCCLCommunicator(const std::vector<Ptr<ExpressionGraph>>& graphs, ShardingMode shardingMode, Ptr<IMPIWrapper> mpi)
: ICommunicator(graphs),
shardingMode_(shardingMode),
globalComms_(graphs.size()),
localComms_(graphs.size()),
streams_(graphs.size()),
devices_(graphs.size()),
mpi_(mpi),
threadPool_(graphs.size(), graphs.size()) {
mpiBarrier(); // barrier to group the multiple log messages from MPI processes
LOG(info, "[comm] Using NCCL {} {}for GPU communication",
ncclVersionString(),
(mpi_ && mpi_->numMPIProcesses() > 1) ? "and MPI " : "");
mpiBarrier(); // (synchronize the log messages)
// set up our local devices
for(int i = 0; i < graphs_.size(); ++i) {
auto device = graphs_[i]->getBackend()->getDeviceId();
ABORT_IF(device.type != DeviceType::gpu,
"NCCL communicator can only be used with GPUs");
devices_[i] = device.no;
CUDA_CHECK(cudaSetDevice(devices_[i]));
CUDA_CHECK(cudaStreamCreate(&streams_[i]));
}
// set up NCCL
// Since we want to use MPI, we cannot use NCCL's handy convenience function. Instead, we must go the laborious route.
// cf. https://docs.nvidia.com/deeplearning/sdk/nccl-developer-guide/index.html#multidevprothrd
// generate NCCL unique ID at one process and broadcast to all
if(!mpi_) // without MPI local and global is the same, so only handle global
shardingMode_ = ShardingMode::global;
mpiBarrier();
LOG(info, "[comm] Using {} sharding", shardingMode_ == ShardingMode::global ? "global" : "local");
mpiBarrier();
// Creating unique ids for NCCL as well as communicators, if global, we only need one unique id and broadcast it to all processes
if(shardingMode_ == ShardingMode::global) {
ncclUniqueId uniqueId = { 0 };
if (!mpi_ || mpi_->myMPIRank() == 0) {
NCCL_CHECK(ncclGetUniqueId(&uniqueId));
}
if(mpi_) {
static_assert(sizeof(uniqueId) == NCCL_UNIQUE_ID_BYTES, "wrong NCCL_UNIQUE_ID_BYTES??"); // (this value is used in NVidia examples)
mpi_->bCast(&uniqueId, sizeof(uniqueId), MPI_BYTE, 0);
}
groupStart();
for(int localDeviceIndex = 0; localDeviceIndex < numLocalRanks(); localDeviceIndex++) {
CUDA_CHECK(cudaSetDevice(devices_[localDeviceIndex]));
NCCL_CHECK(ncclCommInitRank(&globalComms_[localDeviceIndex], numNcclRanks(), uniqueId, myNcclRank(localDeviceIndex)));
}
groupEnd();
} else {
ABORT_IF(shardingMode_ == ShardingMode::local && !mpi_, "Local/global sharding only implemented for MPI, global is same as local with no MPI");
std::vector<ncclUniqueId> globalUniqueIds(numLocalRanks(), {0}); // one per local device, binds numProcesses shards at the same location in the processes together
std::vector<ncclUniqueId> localUniqueIds(mpi_->numMPIProcesses(), {0}); // one per process, binds all shards within one process, stays local to process
if(mpi->myMPIRank() == 0) {
for(auto& id : globalUniqueIds)
NCCL_CHECK(ncclGetUniqueId(&id));
for(auto& id : localUniqueIds)
NCCL_CHECK(ncclGetUniqueId(&id));
}
for(auto& id : globalUniqueIds)
mpi_->bCast(&id, sizeof(id), MPI_BYTE, 0);
for(auto& id : localUniqueIds)
mpi_->bCast(&id, sizeof(id), MPI_BYTE, 0);
groupStart();
for(int localDeviceIndex = 0; localDeviceIndex < numLocalRanks(); localDeviceIndex++) {
CUDA_CHECK(cudaSetDevice(devices_[localDeviceIndex]));
NCCL_CHECK(ncclCommInitRank(&globalComms_[localDeviceIndex], mpi_->numMPIProcesses(), globalUniqueIds[localDeviceIndex], mpi_->myMPIRank()));
NCCL_CHECK(ncclCommInitRank( &localComms_[localDeviceIndex], numLocalRanks(), localUniqueIds[mpi_->myMPIRank()], localDeviceIndex));
}
groupEnd();
}
mpiBarrier(); // (synchronize the log messages)
LOG(info, "[comm] NCCLCommunicators constructed successfully");
mpiBarrier(); // (synchronize the log messages)
}
~NCCLCommunicator() override {
for(int i = 0; i < devices_.size(); ++i) {
cudaSetDevice(devices_[i]);
cudaStreamDestroy(streams_[i]);
ncclCommDestroy(globalComms_[i]);
if(shardingMode_ == ShardingMode::local)
ncclCommDestroy(localComms_[i]);
}
}
template <typename Ret>
Ret foreachAcc(const ForeachFunc<Ret>& func, const AccFunc<Ret>& acc, Ret init, bool parallel = true) const {
parallel &= graphs_.size() > 1;
Ret retValue = init;
std::vector<std::future<Ret>> threadResults(graphs_.size()); // [device index]
for(size_t i = 0; i < graphs_.size(); ++i) {
size_t begin, end; std::tie
(begin, end) = localShardRange(i);
if (parallel)
threadResults[i] = threadPool_.enqueue(func, i, begin, end);
else
acc(retValue, func(i, begin, end));
}
if(parallel)
for(auto& task : threadResults)
acc(retValue, task.get());
return retValue;
}
float foreach(const ForeachFunc<float>& func, AccFunc<float> acc, float init, bool parallel = true) const override {
return foreachAcc(func, acc, init, parallel);
}
bool foreach(const ForeachFunc<bool>& func, bool parallel = true) const override {
AccFunc<bool> allTrue = [](bool& x, bool y) { x = x && y; };
return foreachAcc(func, allTrue, true, parallel);
}
void scatterReduceAndResetGrads() const override {
synchronizeAllOnNullStream();
groupStart();
for(int i = 0; i < graphs_.size(); ++i) {
size_t begin, end; std::tie
(begin, end) = localShardRange(i);
auto grads = graphs_[i]->params()->grads();
const auto* sendbuf = grads->data();
auto* recvbuf = grads->subtensor(begin, end-begin)->data();
size_t bufsize = shardSize();
ABORT_IF(grads->subtensor(begin, end-begin)->size() != bufsize, "unexpected subtensor size??");
ncclDataType_t ncclFloatType = ncclFloat32;
if(grads->type() == Type::float16)
ncclFloatType = ncclFloat16;
if(shardingMode_ == ShardingMode::global) {
NCCL_CHECK(ncclAllReduce(grads->data(), grads->data(), grads->size(), ncclFloatType, ncclSum, globalComms_[i], streams_[i])); // apparently this is somehow faster??
// NCCL_CHECK(ncclReduceScatter(sendbuf, recvbuf, bufsize, ncclFloatType, ncclSum, globalComms_[i], streams_[i]));
} else {
NCCL_CHECK(ncclReduceScatter(sendbuf, recvbuf, bufsize, ncclFloatType, ncclSum, localComms_[i], streams_[i])); // reduceScatter locally
NCCL_CHECK( ncclAllReduce(recvbuf, recvbuf, bufsize, ncclFloatType, ncclSum, globalComms_[i], streams_[i])); // then do tuple-wise allReduce across processes
}
}
groupEnd();
synchronizeAll();
// reset gradients outside the shards we reduce in
// In the future, we can keep quantization residuals here straight in the grads themselves.
// @TODO: all the different places where gradients get reset are confusing
auto resetGrads = [&](size_t i, size_t begin, size_t end) {
auto grads = graphs_[i]->params()->grads();
auto size = grads->size();
// reset everything outside the shard that we reduce in
if (begin > 0)
grads->subtensor(0, begin)->set(0.f);
if (end < size)
grads->subtensor(end, size - end)->set(0.f);
return true; // dummy success
};
foreach(resetGrads);
}
// This distributes all 64 model shards to all 64 GPUs.
// @TODO: For unknown reasons, this takes longer than any other operation incl. scatterReduceAndResetGrads().
// But both should have the same number of data transfers of the same size.
void allGatherParams() const override {
synchronizeAllOnNullStream();
groupStart();
for(int i = 0; i < graphs_.size(); ++i) {
size_t begin, end; std::tie
(begin, end) = localShardRange(i);
auto vals = graphs_[i]->params()->vals();
const auto* sendbuf = vals->subtensor(begin, end-begin)->data();
void* recvbuf = vals->data();
size_t bufsize = shardSize();
ncclDataType_t ncclFloatType = ncclFloat32;
if(vals->type() == Type::float16)
ncclFloatType = ncclFloat16;
auto& comms = shardingMode_ == ShardingMode::global ? globalComms_ : localComms_;
NCCL_CHECK(ncclAllGather(sendbuf, recvbuf, bufsize, ncclFloatType, comms[i], streams_[i]));
}
groupEnd();
synchronizeAll();
}
void broadcastParams(bool average = false) const override {
synchronizeAllOnNullStream();
groupStart();
for(int i = 0; i < graphs_.size(); ++i) {
auto vals = graphs_[i]->params()->vals();
ncclDataType_t ncclFloatType = ncclFloat32;
if(vals->type() == Type::float16)
ncclFloatType = ncclFloat16;
if(average)
NCCL_CHECK(ncclAllReduce(vals->data(), vals->data(), vals->size(), ncclFloatType, ncclSum, globalComms_[i], streams_[i]));
else
NCCL_CHECK(ncclBroadcast(vals->data(), vals->data(), vals->size(), ncclFloatType, 0, globalComms_[i], streams_[i]));
}
groupEnd();
synchronizeAll();
if(average) {
auto avg = [&](size_t i, size_t /*begin*/, size_t /*end*/) {
auto vals = graphs_[i]->params()->vals();
using namespace functional;
Element(_1 = _1 / (float)mpi_->numMPIProcesses(), vals);
return true; // dummy success
};
foreach(avg);
}
}
void broadcastShards(const std::vector<Ptr<OptimizerBase>>& opts, bool average = false) const override {
if(shardingMode_ == ShardingMode::global)
return; // nothing to do, shards are indepedent
auto floatType = [](Tensor tensor) {
ncclDataType_t ncclFloatType = ncclFloat32;
if(tensor->type() == Type::float16)
ncclFloatType = ncclFloat16;
return ncclFloatType;
};
// if we are here we use local mode and shards are process-wise copies
synchronizeAllOnNullStream();
groupStart();
for(int i = 0; i < opts.size(); ++i) {
for(auto shard : opts[i]->getShards()) {
if(shard) {
if(average) {
NCCL_CHECK(ncclAllReduce(shard->data(),
shard->data(),
shard->size(),
floatType(shard),
ncclSum,
globalComms_[i],
streams_[i]));
using namespace functional;
Element(_1 = _1 / (float)mpi_->numMPIProcesses(), shard);
} else {
NCCL_CHECK(ncclBroadcast(shard->data(),
shard->data(),
shard->size(),
floatType(shard),
0,
globalComms_[i],
streams_[i]));
}
}
}
}
groupEnd();
synchronizeAll();
}
// Distribute a single CPU-side io::Item to shards across multiple devices and MPI processes.
// This is used when restoring optimizer state, which is sharded.
// It is assumed that all MPI processes get the same data() passed. Hence, no MPI transfers are needed here.
void scatterState(const io::Item& data, const OptimizerBase::ScatterStateSetFunc& setFn) const override {
size_t dataSize = data.size();
size_t numShards = shardingMode_ == ShardingMode::global ? numNcclRanks() : numLocalRanks(); // @TODO: numShards() function
size_t shardSize = (dataSize + numShards - 1) / numShards;
for(size_t localDeviceIndex = 0; localDeviceIndex < graphs_.size(); localDeviceIndex++) {
// We only slice out data that is kept in our MPI process. Remember that all MPI processes receive the same, complete data.
auto ncclRank = shardingMode_ == ShardingMode::global ? myNcclRank(localDeviceIndex) : myLocalRank(localDeviceIndex);
size_t begin = ncclRank * shardSize;
size_t end = std::min(begin + shardSize, dataSize);
setFn(localDeviceIndex, data.bytes.data() + begin, data.bytes.data() + end);
}
}
// Collect shards across multiple devices and MPI processes in the NCCL configuration into a single CPU-side io::Item.
// This is used when persisting optimizer state which is sharded.
io::Item gatherState(const OptimizerBase::GatherStateGetFunc& getFn) const override {
// first, concatenate over all local devices
io::Item localData = getFn(0);
for(size_t localDeviceIndex = 1; localDeviceIndex < graphs_.size(); localDeviceIndex++) {
localData.append(getFn(localDeviceIndex));
}
// localData now contains a concatentation of all local data
// second, concatenate across MPI processes
// Note that all local devices occupy consecutive ncclRanks in order.
io::Item data;
if (mpi_ && shardingMode_ == ShardingMode::global) {
io::Item tmp = localData; // temp buffer used multiple times; assign localData for initialization
// push one rank's data at a time using broadcast
for(size_t mpiRank = 0; mpiRank < mpi_->numMPIProcesses(); mpiRank++) {
// broadcast mpiRank's localData to all
if(mpiRank == mpi_->myMPIRank()) {
tmp = localData;
}
mpi_->bCast(tmp, /*rootRank=*/mpiRank);
// now all ranks have the same slice: concatenate (we will end up with the same on all MPI processes)
if(mpiRank == 0)
data = tmp;
else
data.append(tmp);
}
}
else { // no MPI: localData is the complete result already
data = std::move(localData);
}
return data;
}
};
} // namespace marian