前言:本文是对 知乎@张宇杭 系列文章的补充,只对核心逻辑进行拆解。阅读后可继续通过 知乎@张宇杭 的 NCCL 源码阅读补充更多知识。


在使用 PyTorch 作为深度学习框架时,如果涉及模型的切分,需要创建一个“通信组”(process_group)。后续的所有模型操作需要在通信组内进行同步操作。以 DP 为例,我们在模型构建时会使用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, TensorDataset
from torch.utils.data.distributed import DistributedSampler
import torch.distributed as dist

class TinyModel(nn.Module):
def __init__(self):
super().__init__()
self.net = nn.Sequential(nn.Linear(16, 32), nn.ReLU(), nn.Linear(32, 2))
def forward(self, x):
return self.net(x)

def main():
# 1. 初始化分布式环境
dist.init_process_group(backend='gloo')
rank = dist.get_rank() # 当前卡号 (0 或 1)
world_size = dist.get_world_size() # 总卡数 (2)

print(f"[初始化] 我是进程 {rank}/{world_size}")

# 2. 数据准备与切分
dataset = TensorDataset(torch.randn(100, 16), torch.randn(100, 2))
# DistributedSampler 内部会使用 rank 和 world_size 来切分数据
sampler = DistributedSampler(dataset, num_replicas=world_size, rank=rank)
dataloader = DataLoader(dataset, batch_size=10, sampler=sampler)

# 3. 模型初始化与 DDP 包裹
model = TinyModel()
ddp_model = nn.parallel.DistributedDataParallel(model)
optimizer = torch.optim.SGD(ddp_model.parameters(), lr=0.01)
criterion = nn.MSELoss()

# 4. 训练
for batch_idx, (data, target) in enumerate(dataloader):
optimizer.zero_grad()
output = ddp_model(data)
loss = criterion(output, target)
loss.backward() # 在这里,PyTorch 会自动在所有 rank 之间 All-Reduce 梯度
optimizer.step()

print(f"[进程 {rank}] 正在处理属于我的数据, Loss: {loss.item():.4f}")

dist.destroy_process_group()

if __name__ == "__main__":
main()

其中 dist.init_process_group 建立了一个进程组,告诉 PyTorch 参与的机器数量和自身编号。以DP为例,DistributedSampler 只负责按 rank/world_size 切分数据,不参与梯度同步。梯度同步由 DistributedDataParallel 完成:DDP 在构造时为模型参数注册 autograd hook,在反向传播过程中,当参数梯度就绪时触发 all-reduce,从而在进程组内同步梯度。运行完成后调用 dist.destroy_process_group() 销毁通信组。

本文会简要分析NCCL通信组建立的过程。

应用层

在应用层,用户调用了 dist.init_process_group,传入自身机器编号(rank)和总机器数(world size)。这是一个阻塞的函数,PyTorch 会等待所有机器全部加入进程组后返回。这个函数会创建进程组,用于给服务同一个模型的所有进程一个轻量的通信接口;在进程组建立之后,实际的NCCL通信发生在同一个进程组的不同通信组里。

一般通过 torchrun 启动程序:torchrun --nproc_per_node=2 dp.py。torchrun 会自动分配好 MASTER_ADDRMASTER_PORTRANKWORLD_SIZE并写入环境变量。当一个进程同时属于多个通信组时,MASTER_ADDR/MASTER_PORT 只负责 rendezvous,也就是让进程们先找到彼此;它们不等价于通信组本身。init_process_group 建立的是默认进程组,后续如果需要更多通信组,应通过 dist.new_group(ranks=...) 创建子组,并在 collective 调用中显式传入 group 参数,例如:

1
2
3
4
default_pg = dist.group.WORLD
tp_group = dist.new_group(ranks=[0, 1])
dp_group = dist.new_group(ranks=[0, 2])
dist.all_reduce(tensor, group=dp_group)

同一台机器可以同时参与多个通信组:每个组在 PyTorch 内部对应一个 ProcessGroup 对象,collective 操作根据传入的 group 选择通信域;如果不传,则默认使用整个进程组作为通信组。

PyTorch 层

dist.init_process_group 位于 src/distributed/distribued_c10d.py:1664

1
2
3
4
5
6
7
8
9
10
11
12
def init_process_group(
backend: str | None = None,
init_method: str | None = None,
timeout: timedelta | None = None,
world_size: int = -1,
rank: int = -1,
store: Store | None = None,
group_name: str = "",
pg_options: Any | None = None,
device_id: torch.device | int | None = None,
_ranks: list[int] | None = None,
) -> None:

init_process_group主要负责:

  • 检查没初始化过
  • 确定 backend / timeout / device
  • 通过 env:// 或 tcp:// 等 rendezvous 找到其他进程
  • 建立共享 Store
  • 创建 ProcessGroup
  • 创建 Gloo/NCCL 等具体通信后端
  • 注册到默认 WORLD group
  • 让后续 dist API 默认使用它通信

在运行之后,所有进程都注册到了同一个进程组;进程组中的每一个进程都获得了一个唯一的 rank。

但 NCCL 层的通信组建立是懒加载的,所以此时还没有建立具体的通信组。PyTorch 里 csrc/distributed/c10d/ProcessGroupNCCL.cpp:3757collective 函数是进行所有集合通信函数的 wrapper。在 3786 行有:

1
2
3
4
5
const auto key = getKeyFromDevice(device);
std::shared_ptr<NCCLComm> ncclComm = getNCCLComm(key);
if (ncclComm == nullptr) {
ncclComm = initNCCLComm(key, device, opType);
}

即每次尝试进行集合通讯时,都会判断当前通信组是否建立。如果没有建立则调用 initNCCLComm建立通信组获得一个 NCCLComm对象。在进行一些检查之后,通过 ncclComm_t comm = ncclComm->getNcclComm(); 获得一个真正存储了通信组元信息的 ncclComm_t 结构体。在调用 post(ncclStream, work); 之后,把这个结构体保存到 work 中并完成其余初始化,最后将带有通信组元信息的 work 对象加入工作队列。

initNCCLComm 是 PyTorch 里真正建立通信组的函数,位于 PyTorch 的 csrc/distributed/c10d/ProcessGroupNCCL.cpp:3052,签名是:

1
2
3
4
5
6
std::shared_ptr<NCCLComm> ProcessGroupNCCL::initNCCLComm(
const std::string& deviceKey,
at::Device& device,
OpType opType,
int p2pRank,
bool isSendRecvSelf)

参数含义:

  • deviceKey: 当前 CUDA device 的字符串 key,比如某个 GPU 对应的 key,用来缓存 communicator。
  • device: 当前 tensor 所在 CUDA device。
  • opType: 触发初始化的操作类型,比如 ALLREDUCE、SEND、RECV 等。
  • p2pRank: 单个 P2P 操作时,本 rank 在两端通信里的局部 rank,通常是 0 或 1。
  • isSendRecvSelf: 是否是同一个进程自己给自己 send/recv。
    运行结束之后会返回一个 NCCLComm 对象,这个对象是对 NCCL 层 ncclComm_t 对象的简单封装,可以用于获取 ncclComm_t 对象或在获取失败时提供错误信息。

下面详细对 initNCCLComm 进行拆解。(注意,此时仍在 PyTorch 层,所有信息均由 PyTorch 维护)

  1. 首先进行一些检查

    1
    2
    3
    4
    5
    6
    7
    8
    if (deviceKey.empty()) {
    ... // deviceKey 不能为空,即需要当前设备在之前 init_process_group 时被注册过
    }
    if (bound_device_id_) {
    if (*bound_device_id_ != device) {
    ... // 如果当前被强制规定在某个 device 上初始化,当前 tensor 也要在同一个 device 上
    }
    }
  2. 记录当前的通信组使用过了哪些 device

    1
    usedDeviceIdxs_.insert(device.index());
  3. 判断当前是否是一个P2P通信,分两种情况:

    1
    2
    bool batchP2P = ncclActiveGroupCounter_ > 0;
    bool singleP2POp = isP2POp(opType, batchP2P);

    区分的原因是 NCCL communicator 的 rank/size 对 collective 和单个 P2P 不一样:

  • collective / batch P2P:整个 process group 都参与
  • single P2P:只有两个 rank 参与
  • send-to-self:只有一个 rank
    后面会根据这个设置:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    if (!singleP2POp) {
    numRanks = getSize();
    rank = getRank();
    } else if (isSendRecvSelf) {
    numRanks = 1;
    rank = 0;
    } else {
    numRanks = 2;
    rank = p2pRank;
    }
    可以看出:NCCL communicator 里的 rank 不一定总是全局 rank,也不一定总是 ProcessGroup rank。单个 P2P 时,它是两端通信里的 0/1。
  1. 处理已有的 NCCL Group。这里的原因在源码中给出了解释:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    // [Group Start/End Note] This is used to ensure that nccl communicator will
    // be created before communication primitives are called. Let's look at this
    // example: Using the batch_isend_irecv to send a tensor to a target process.
    // On the sender side, the corresponding underlying NCCL calls will look like
    // ncclGroupStart() // This is in batch_isend_irecv
    // ncclCommInitRank() // Inside NCCLComm::create
    // ncclSend()
    // ncclGroupEnd() // This is in batch_isend_irecv
    // With this pattern, the nccl communicator will be created in the last
    // ncclGroupEnd which means when ncclSend is processed, the passed
    // communicator argument is NULL which will lead to runtime error. So we need
    // to "close" all active nccl groups to ensure nccl communicator is actually
    // created before encountering any communication calls. This is why we need
    // the following for loop.
    for (const auto i : c10::irange(ncclActiveGroupCounter_)) {
    (void)i;
    // comms have not been initiated yet, so can only check in blocking-way
    C10D_NCCL_CHECK(ncclGroupEnd(), std::nullopt);
    }

    简而言之,如果当前已经处在 ncclGroupStart() / ncclGroupEnd() 包裹的 batch P2P 中,ncclCommInitRank() 如果也被放进 group 里,可能导致 communicator 还没真正创建,后面 ncclSend / ncclRecv 就拿到空 communicator。所以这里先临时结束 active NCCL group,确保 communicator 先初始化完。初始化完成后,后面会再 ncclGroupStart() 恢复回来。

  2. 尝试通过 ncclCommSplit 创建通信组
    如果这个 PG 是从已有 PG split 出来的,并且不是单个 P2P,它会优先尝试:

    1
    2
    ncclComm = NCCLComm::split(
    parentComm.get(), options_->split_color, rank, options_->config);

    split 中会从一个已经创建好的通信组里fork出一份新的子通信组。通过这种方式创建子通信组更快,因为后面不需要再获取 UniqueId 并进行所有 rank 的同步了,但需要父 communicator 存在且还没有被 abort。

  3. 尝试通过 ncclCommInitRankScalable 进行大规模初始化
    和后面的逻辑是一样的。区别在于会生成多个 UniqueId 多点并行地进行初始化。

  4. 获取 UniqueId 并分发

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    if (getCvarBool(TORCH_NCCL_BCAST_UNIQUEID, true) && !isSendRecvSelf) {
    // For point-to-point communication, lower rank of the two will get
    // unique id.
    if (rank_ == 0 || (singleP2POp && p2pRank == 0)) {
    C10D_NCCL_CHECK(ncclGetUniqueId(&ncclID), std::nullopt);
    }

    // Broadcast so that each process can have a unique NCCL ID
    broadcastUniqueNCCLID(&ncclID, singleP2POp, deviceKey, p2pRank);
    }

    rank 0 会生成一个属于该通信组的临时 UniqueId,并将它广播给所有属于该通信组的 rank。这里 rank 0 会在 ncclGetUniqueId 里新开一个线程调用 NCCL 层的 bootstrapCreateRoot。这个新开的线程会阻塞住,等待其他 rank 主动发起连接;但 rank 0 的主线程会继续执行下面 UniqueId 的分发广播。 有关 bootstrap 的流程会在 NCCL 层解读。

对于广播而言,在 broadcaseUniqueNCCLID 中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
if (!isSingleP2POp) {
storeKey = std::to_string(ncclCommCounter_++); // ncclCommCounter_ 用于记录当前这个进程里,这是第几次初始化
} else {
storeKey = p2pKey;
}

if (rank_ == 0 || (isSingleP2POp && p2pRank == 0)) {
// 如果是 rank 0,就把自己的 UniqueId 存到 store 里
auto vec = std::vector<uint8_t>(
reinterpret_cast<uint8_t*>(ncclID),
reinterpret_cast<uint8_t*>(ncclID) + NCCL_UNIQUE_ID_BYTES
);
store_->set(storeKey, vec);
} else {
try {
// 如果不是 rank 0,就从 store 里读取 rank 0 写入的 UniqueId
auto vec = store_->get(storeKey);
std::memcpy(ncclID, vec.data(), vec.size());
} catch (const std::exception& e) { ... }
}

ncclCommCounter_ 用于记录当前这个进程里,这是第几次初始化。也就是说,PyTorch 依赖初始化的顺序严格一致来保证通信组建立的一致性,这也是为什么我们需要单独给 P2P 通信做特判:P2P 在通信组里发生的顺序是不一致的。

  1. 创建 NCCL Communicator

    1
    2
    3
    4
    5
    #ifdef NCCL_HAS_CONFIG
    ncclComm = NCCLComm::create(numRanks, rank, ncclID, deviceIndex, options_->config);
    #else
    ncclComm = NCCLComm::create(numRanks, rank, ncclID, deviceIndex);
    #endif // NCCL_HAS_CONFIG

    这里调用了 NCCLComm::create,在其中会调用 ncclCommInitRankncclCommInitRankConfig。在其中非 rank 0 的其他 rank 会主动向 rank 0 提供自身的信息,将在 NCCL 层中介绍。

  2. 创建 CUDA stream
    创建 communicator 后,PyTorch 为这个 device 取一个 CUDA stream:

    1
    2
    auto streamVal = at::cuda::getStreamFromPool(
    options_->is_high_priority_stream || force_high);

    之后实际发生的 NCCL collective 会在这个 stream 上排队;stream 相当于一个任务队列,CUDA 会依次从中获取通信任务并执行。

  3. 临时放入 inInitializationCommMap_

    1
    inInitializationCommMap_.emplace(deviceKey, ncclComm);
  4. 与4对应,恢复 Active Group 的状态

    1
    2
    3
    4
    for (const auto i : c10::irange(ncclActiveGroupCounter_)) {
    (void)i;
    C10D_NCCL_CHECK(ncclGroupStart(), std::nullopt);
    }
  5. 保存 stream 和 event

    1
    2
    ncclStreams_.emplace(deviceKey, streamVal);
    ncclEvents_.emplace(deviceKey, at::cuda::CUDAEvent(cudaEventDisableTiming));

    这些后续用于 stream 同步、work 完成检查等。

  6. 移入正式 communicator cache
    最后把 communicator 从初始化 map 移到正式 map:

    1
    2
    devNCCLCommMap_.emplace(deviceKey, std::move(it->second));
    inInitializationCommMap_.erase(deviceKey);

    后续同一个 PG、同一个 device 再做 collective,就会通过 getNCCLComm(deviceKey) 直接复用 communicator,不会重复初始化。

走到这里,在 PyTorch 层面就已经建立了可用的 NCCL 通信组,后续可以直接指定这一批 rank 组成的通信组进行符合拓扑条件的任何集合通信了。

NCCL 层

在 PyTorch 层的第 7 步和第 8 步,我们分别调用了 NCCL 层的 ncclGetUniqueIdncclCommInitRank/ncclCommInitRankConfig。NCCL 层正是在这两个函数中通过 bootstrap 机制完成了 NCCL 通信组的建立,包括通信组的识别和拓扑构建。本节将拆解这一流程。

本文中我们先关注 ncclGetUniqueId(这个函数只有 rank 0 会调用)。 它实现在 NCCL 的 src/init.cc:180

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
NCCL_API(ncclResult_t, ncclGetUniqueId, ncclUniqueId* out);
ncclResult_t ncclGetUniqueId(ncclUniqueId* out) {
NCCLCHECK(ncclInitEnv());
NCCLCHECK(ncclInit());
NCCLCHECK(PtrCheck(out, "GetUniqueId", "out"));
struct ncclBootstrapHandle handle;
NCCLCHECK(bootstrapGetUniqueId(&handle, NULL));
// ncclUniqueId and bootstrapHandle don't have the same size and alignment
// reset to 0 to avoid undefined data
memset(out, 0, sizeof(*out));
// copy to avoid alignment mismatch
memcpy(out, &handle, sizeof(handle));
TRACE_CALL("ncclGetUniqueId(0x%llx)", (unsigned long long)getHash(out->internal, NCCL_UNIQUE_ID_BYTES));
return ncclSuccess;
}

它的作用是:生成一个 ncclUniqueId,给后续 ncclCommInitRank 用。 典型流程是 rank 0 调一次 ncclGetUniqueId(&id),然后通过 MPI、文件、socket、环境变量等方式(在这里是 PyTorch 使用 store)把这个 id 广播给所有 rank,所有 rank 再用同一个 id 调 ncclCommInitRank(...),这样 NCCL 才知道这些进程属于同一个 communicator。

ncclUniqueId 的类型是:typedef struct { char internal[NCCL_UNIQUE_ID_BYTES]; } ncclUniqueId;,里面实际上塞的是 ncclBoostrapHandle

1
2
3
4
5
struct ncclBootstrapHandle {
uint64_t magic;
union ncclSocketAddress addr;
int nRanks; // number of existing ranks
};

也就是说,UniqueId 不是单纯随机数,而是 bootstrap 阶段需要的连接信息:一个 magic 标识、root 的 socket 地址,以及已有 rank 数量。后续 rank 连接 rank 0 时会用到这些信息(用 socket 找到 rank 0,用 magic 确保在同一个会话,rank 0 用 rank 数量确认所有人都到齐)。

ncclGetUniqueId 中的核心步骤是 bootstrapGetUniqueId,在这里面会进行具体的 UniqueId 生成和拓扑逻辑生成,位于 src/bootstrap.cc:426

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
ncclResult_t bootstrapGetUniqueId(struct ncclBootstrapHandle* handle, struct ncclComm* comm) {
memset(handle, 0, sizeof(ncclBootstrapHandle));

const char* env = ncclGetEnv("NCCL_COMM_ID");
if (env) {
// If comm is provided (grow operation), NCCL_COMM_ID should not be set
if (comm) {
WARN("ncclCommGetUniqueId should not be called when NCCL_COMM_ID is set");
return ncclInvalidUsage;
}
// Normal init: use NCCL_COMM_ID from environment
INFO(NCCL_ENV, "NCCL_COMM_ID set by environment to %s", env);
if (ncclSocketGetAddrFromString(&handle->addr, env) != ncclSuccess) {
WARN("Invalid NCCL_COMM_ID, please use format: <ipv4>:<port> or [<ipv6>]:<port> or <hostname>:<port>");
return ncclInvalidArgument;
}
handle->magic = NCCL_MAGIC;
} else {
if (comm) {
// comm->childCount will be increment in ncclCommGrow for all existing ranks, use +1 here
handle->magic = hashCombine(comm->magic, comm->childCount + 1);
} else {
NCCLCHECK(getRandomData(&handle->magic, sizeof(handle->magic)));
}
handle->nRanks = comm ? comm->nRanks : 0;
memcpy(&handle->addr, &bootstrapNetIfAddr, sizeof(union ncclSocketAddress));
NCCLCHECK(bootstrapCreateRoot(handle, false));
}

return ncclSuccess;
}

定位到其中的 bootstrapCreateRoot,位于 src/bootstrap.cc:401

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
ncclResult_t bootstrapCreateRoot(struct ncclBootstrapHandle* handle, bool idFromEnv) {
ncclResult_t ret = ncclSuccess;
struct ncclSocket* listenSock = NULL;
struct bootstrapRootArgs* args = NULL;
std::thread thread;

NCCLCHECK(ncclCalloc(&listenSock, 1));
NCCLCHECKGOTO(ncclSocketInit(listenSock, &handle->addr, handle->magic, ncclSocketTypeBootstrap, NULL, 0), ret, fail);
NCCLCHECKGOTO(ncclSocketListen(listenSock), ret, fail);
NCCLCHECKGOTO(ncclSocketGetAddr(listenSock, &handle->addr), ret, fail);

NCCLCHECKGOTO(ncclCalloc(&args, 1), ret, fail);
args->listenSock = listenSock;
args->magic = handle->magic;
thread = std::thread(bootstrapRoot, args);
ncclSetThreadName(thread, "NCCL BootstrapR");
thread.detach();
exit:
return ret;
fail:
if (listenSock) free(listenSock);
if (args) free(args);
goto exit;
}

在这里为 rank 0 创建了一个 LISTEN socket,这个 socket 就是 UniqueId 里携带的 socket,其他 rank 可以根据这个 socket 发送数据给 rank 0;此外完成了必要的信息填充。而后对当前 rank 0 启动了一个新的线程 bootstrapRoot,rank 0 的主线程可以直接返回

从这里开始,我们就真正涉及到了通信环建立的具体过程。NCCL 的 bootstrap 过程是主要是在所有的 rank 之间建立一个环,并且交换各自的通信元数据。这个函数就是 rank 0 在建立通信环中需要做的事。 对于一个通信环,我们期望达到下图的效果:
387
NCCL 采用的方案是:让每一个 rank 都把自己的 LISTEN socket 发给 root,再由 root 分发每个 rank 的 SEND socket。 NCCL 在 bootstrapRoot 里完成了 rank 0 对所有 socket 的监听和分发。

回想之前我们在 PyTorch 层面生成和广播的 UniqueId:其中包含了 rank 0 的 LISTEN socket,所以每个 rank 都可以向 root(rank 0) 发送信息。但 root 并不知道其他 rank 的 LISTEN socket,所以也没有办法把每个 rank 对应的 SEND socket 分发下去。为了解决这个问题,NCCL 会让其余 rank 额外创建一个 LISTEN socket 专门用于和 root 的通信,并在向 root 发送自己在通信环中的 LISTEN socket 时,顺便也把专用于和 root 通信的 LISTEN socket 也附带上。

具体到代码实现,在 bootstrapInit 里:

  • 每个 rank 都会先创建一个监听端口 info.connectInfo,代表“别人要连我进行 bootstrap ring 通信时需要的信息”
  • 每个 rank 还会创建一个给 root 回连自己的监听 socket info.listenRootAddress,代表“root 要把结果发回给我时用的地址”

每个 rank 把这两个信息打包成 extInfo 发给 root:

1
2
3
4
5
6
7
8
9
struct extInfo {
int rank;
int nranks;
int iroot;
int nroots;
int offset;
union ncclSocketAddress listenRootAddress;
union ringConnectInfo connectInfo;
};

root 收齐以后把对应每个人发送目标的 LISTEN socket 分发下去。

bootstrapRoot 函数位于 src/bootstrap.cc:287

1
2
3
4
5
6
7
8
9
10
11
12
do {
struct ncclSocket sock;
NCCLCHECKGOTO(ncclSocketInit(&sock), res, out);
NCCLCHECKGOTO(ncclSocketAccept(&sock, listenSock), res, out);
NCCLCHECKGOTO(socketRecv(&sock, &info, sizeof(info)), res, out);
NCCLCHECKGOTO(ncclSocketClose(&sock), res, out);

...

++c;
TRACE(NCCL_BOOTSTRAP, "Received connect from rank %d total %d/%d", info.rank, c, nrecv);
} while (c < nrecv);

root 会循环使用自己的 LISTEN socket 建立连接接受其他 rank 的数据,处理完成后退出循环。每一次循环内的具体流程如下:

  1. 第一次收到信息时的初始化
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    if (c == 0) {
    BOOTSTRAP_PROF_CLOSE(timers[BOOTSTRAP_INIT_ROOT_WAIT]);
    BOOTSTRAP_PROF_OPEN(timers[BOOTSTRAP_INIT_ROOT_RECV]);
    nranks = info.nranks;
    iroot = info.iroot;
    nroots = info.nroots;
    offset = info.offset;
    // if the number of root > 1, we will receive one extra info from the first local_id of the next root
    n2send = nRankFromRoot(iroot, nranks, nroots, offset);
    // offset>0 automatically means that we need to switch to the multiroot logic
    nrecv = n2send + ((offset > 0 || nroots > 1) ? 1 : 0);
    NCCLCHECKGOTO(ncclCalloc(&rankInfo, nrecv), res, out);
    NCCLCHECKGOTO(ncclCalloc(&rankAddressesRoot, nrecv), res, out);
    }
    几个变量含义:
  • nranks:总 rank 数。
  • nroots:有多少个 bootstrap root。普通 ncclGetUniqueId 通常是 1;ncclCommInitRankScalable 可能多个。
  • iroot:当前这个 root 是第几个 root。
  • n2send:这个 root 正常负责的 rank 数量。
  • nrecv:这个 root 实际要接收的信息数量。多 root 或 grow 场景下,可能要额外接收一个边界 rank 的信息,用来跨 root 拼 ring。
  1. 看能不能把下一个/当前 rank 的信息发给当前/上一个 rank
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    // if the previous has already checked in, send the newly received handle, if not save the handle for later
    // if we have more than 1 root, I do not own the previous of local_id = 0
    // if we have prev > n2send, we do not send anything
    int prev = (nroots > 1) ? (localId - 1) : BOOTSTRAP_PID(localId - 1, nrecv);
    if (prev >= 0 && prev < n2send && memcmp(&zeroAddress, &rankAddressesRoot[prev], sizeof(union ncclSocketAddress)) != 0) {
    NCCLCHECKGOTO(rootSend(&rankAddressesRoot[prev], magic, &info.connectInfo), res, out);
    } else {
    memcpy(&rankInfo[localId], &info.connectInfo, sizeof(union ringConnectInfo));
    }
    // if the next rank has checked in, send the newly received info, if not save the addr for later
    // for nroots >=1, I will always own the information of the next connection
    // if the local_id id must be [0 ; n2send[ otherwise we do not answer
    int next = BOOTSTRAP_PID(localId + 1, nrecv);
    if (localId >= 0 && localId < n2send && memcmp(&zeroInfo, &rankInfo[next], sizeof(union ringConnectInfo)) != 0) {
    NCCLCHECKGOTO(rootSend(&info.listenRootAddress, magic, &rankInfo[next]), res, out);
    } else {
    memcpy(rankAddressesRoot + localId, &info.listenRootAddress, sizeof(union ncclSocketAddress));
    }
    root 不是先收集完成所有人再统一下发,而是边收边发。因为早点发可以更早结束目标 rank 的初始化阻塞部分,让他快点进入后面的通信组建立/具体计算过程。

循环结束之后有可能还有漏的,此时统一进行补发:

1
2
3
4
5
6
7
8
for (int r = 0; r < n2send; ++r) {
// use nrecv to periodize: if 1 root, we will send the first one to the last one, if >1 roots we will send the additional one we have received
int next = BOOTSTRAP_PID(r + 1, nrecv);
if (memcmp(&zeroAddress, &rankAddressesRoot[r], sizeof(union ncclSocketAddress)) != 0 &&
memcmp(&zeroInfo, &rankInfo[next], sizeof(union ringConnectInfo)) != 0) {
NCCLCHECKGOTO(rootSend(&rankAddressesRoot[r], magic, &rankInfo[next]), res, out);
}
}

举个例子。在上面 world_size = 4 的例子中,rank 0 会开一个线程进入 bootstrapRoot 并开始阻塞监听,把监听 socket 通过 UniqueId 在 Torch 层分发给了 rank 1/2/3。随后 4 个 rank 都会在 ncclCommInitRank 中向 rank 0 进行:

  • 发送自己的 bootstrap LISTEN socket
  • 发送自己专门为 rank 0 通信准备的 listenRootAddress
    rank 0 在收到 0/1/2/3 的 bootstrap LISTEN socket 后,把 rank 1 的 bootstrap LISTEN socket 发给 rank 0(自己)、把 rank 2 的 bootstrap LISTEN socket 通过 rank 1 的 listenRootAddress 发给 rank 1 ……以此类推。

程序运行至此就完成了整个通信组的建立。但我们还没有分析到其他 rank 是如何把 bootstrap LISTEN socketlistenRootAddress 发给 root的。这部分将在后续分析 ncclCommInitRank 时进行拆解。