PyTorch 分布式教程
官方 DDP/FSDP 教程
观察 DP/TP/PP/FSDP 四种并行策略如何切分模型和数据。
💡 调整 GPU 数量观察切分方式变化
一句话定义:分布式训练是将模型或数据分散到多个 GPU/节点上并行计算的技术,用于训练单卡装不下或训练时间过长的大模型。
| 并行方式 | 类比 | 适用场景 |
|---|---|---|
| 数据并行 | 多条相同生产线 | 模型小,数据多 |
| 流水线并行 | 一条线分多个工序 | 模型深,按层切 |
| 张量并行 | 每个工序多人协作 | 单层太大 |
| 混合并行 | 大工厂综合管理 | 超大模型 |
| 模型 | 参数量 | FP16 显存 (仅模型) | 训练显存 |
|---|---|---|---|
| 7B | 70亿 | 14 GB | ~60 GB |
| 70B | 700亿 | 140 GB | ~600 GB |
| 175B | 1750亿 | 350 GB | ~1.4 TB |
A100 80GB 单卡根本装不下 70B 模型的训练!
GPT-3 (175B) 训练:- 单卡 A100: ~355 年- 1024 卡: ~34 天 ← 分布式训练最简单的分布式方法:
┌─────────────────────────────────────────────────────┐│ 数据并行 (DP) │├─────────────────────────────────────────────────────┤│ GPU 0: 完整模型 + 数据 batch 0 → 梯度 0 ││ GPU 1: 完整模型 + 数据 batch 1 → 梯度 1 ││ GPU 2: 完整模型 + 数据 batch 2 → 梯度 2 ││ GPU 3: 完整模型 + 数据 batch 3 → 梯度 3 ││ ↓ ││ AllReduce (梯度同步) ││ ↓ ││ 所有 GPU 更新权重 │└─────────────────────────────────────────────────────┘所有 GPU 的梯度求平均后同步更新。
| 方法 | 显存 | 通信量 | 复杂度 |
|---|---|---|---|
| DDP | 每卡完整模型 | 梯度 | 低 |
| FSDP | 分片模型+梯度+优化器 | 参数+梯度 | 中 |
按层切分模型:
┌────────────────────────────────────────────────────┐│ 流水线并行 (PP) │├────────────────────────────────────────────────────┤│ GPU 0: Layer 0-7 ──┐ ││ GPU 1: Layer 8-15 ──┼── 流水线执行 ││ GPU 2: Layer 16-23 ──┼── (micro-batch) ││ GPU 3: Layer 24-31 ──┘ │└────────────────────────────────────────────────────┘
时间 →GPU 0: [F0][F1][F2][F3][ ][ ][ ][ ][B3][B2][B1][B0]GPU 1: [ ][F0][F1][F2][F3][ ][ ][B3][B2][B1][B0][ ]GPU 2: [ ][ ][F0][F1][F2][F3][B3][B2][B1][B0][ ][ ]GPU 3: [ ][ ][ ][F0][F1][F2][B2][B1][B0][ ][ ][ ]
F = Forward, B = Backward问题: 流水线气泡(GPU 空闲时间)
切分单个矩阵运算:
┌────────────────────────────────────────────────────┐│ 张量并行 (TP) │├────────────────────────────────────────────────────┤│ 矩阵 A × B = C ││ ││ 切分 B 为 [B1, B2]: ││ GPU 0: A × B1 = C1 ││ GPU 1: A × B2 = C2 ││ 结果: C = [C1, C2] │└────────────────────────────────────────────────────┘大模型训练通常组合使用:
┌─────────────────────────────────────────────────────┐│ 3D 并行 (DP + PP + TP) │├─────────────────────────────────────────────────────┤│ ││ ┌─DP─┐ ┌─DP─┐ ┌─DP─┐ ┌─DP─┐ ││ │ │ │ │ │ │ │ │ ││ │ TP │ → │ TP │ → │ TP │ → │ TP │ ← PP ││ │ │ │ │ │ │ │ │ ││ └────┘ └────┘ └────┘ └────┘ ││ ││ 示例: 64 GPU = 8 DP × 4 PP × 2 TP │└─────────────────────────────────────────────────────┘import torchimport torch.distributed as distfrom torch.nn.parallel import DistributedDataParallel as DDP
def setup(rank, world_size): """初始化分布式环境""" dist.init_process_group( backend="nccl", # GPU 用 nccl rank=rank, world_size=world_size ) torch.cuda.set_device(rank)
def train(rank, world_size): setup(rank, world_size)
# 创建模型并移到对应 GPU model = MyModel().to(rank)
# 包装为 DDP model = DDP(model, device_ids=[rank])
# 分布式数据采样器 sampler = torch.utils.data.distributed.DistributedSampler( dataset, num_replicas=world_size, rank=rank ) dataloader = DataLoader(dataset, sampler=sampler, batch_size=32)
# 训练循环 for epoch in range(epochs): sampler.set_epoch(epoch) # 确保每 epoch shuffle 不同 for batch in dataloader: optimizer.zero_grad() loss = model(batch) loss.backward() # DDP 自动同步梯度 optimizer.step()
# 启动多进程if __name__ == "__main__": world_size = torch.cuda.device_count() torch.multiprocessing.spawn( train, args=(world_size,), nprocs=world_size )from torch.distributed.fsdp import ( FullyShardedDataParallel as FSDP, MixedPrecision, ShardingStrategy)from torch.distributed.fsdp.wrap import transformer_auto_wrap_policy
# 混合精度配置mixed_precision = MixedPrecision( param_dtype=torch.bfloat16, reduce_dtype=torch.bfloat16, buffer_dtype=torch.bfloat16,)
# 自动包装策略 (按 Transformer Block)auto_wrap_policy = transformer_auto_wrap_policy( transformer_layer_cls={TransformerBlock})
# 创建 FSDP 模型model = FSDP( model, sharding_strategy=ShardingStrategy.FULL_SHARD, mixed_precision=mixed_precision, auto_wrap_policy=auto_wrap_policy, device_id=torch.cuda.current_device(),)
# 训练与 DDP 类似for batch in dataloader: loss = model(batch) loss.backward() optimizer.step()import deepspeed
# DeepSpeed 配置ds_config = { "train_batch_size": 256, "gradient_accumulation_steps": 4, "fp16": {"enabled": True}, "zero_optimization": { "stage": 3, # ZeRO-3: 完全分片 "offload_optimizer": {"device": "cpu"}, # 可选: CPU offload "offload_param": {"device": "cpu"}, }, "optimizer": { "type": "AdamW", "params": {"lr": 1e-4, "betas": [0.9, 0.999]} }}
# 初始化model, optimizer, _, _ = deepspeed.initialize( model=model, config=ds_config, model_parameters=model.parameters())
# 训练for batch in dataloader: loss = model(batch) model.backward(loss) model.step()# PyTorch DDP 单机多卡torchrun --nproc_per_node=4 train.py
# PyTorch DDP 多机多卡torchrun \ --nnodes=2 \ --nproc_per_node=8 \ --node_rank=0 \ --master_addr="192.168.1.1" \ --master_port=29500 \ train.py
# DeepSpeeddeepspeed --num_gpus=8 train.py --deepspeed_config ds_config.json
# Accelerate (Hugging Face)accelerate launch --multi_gpu --num_processes=4 train.py| 框架 | 易用性 | 功能 | 适用场景 |
|---|---|---|---|
| PyTorch DDP | ⭐⭐⭐⭐⭐ | 数据并行 | 简单多卡 |
| PyTorch FSDP | ⭐⭐⭐⭐ | ZeRO-3 | 大模型 |
| DeepSpeed | ⭐⭐⭐ | 全面 | 超大模型 |
| Megatron-LM | ⭐⭐ | 3D 并行 | 训练 GPT |
| Accelerate | ⭐⭐⭐⭐⭐ | 封装各种后端 | 快速上手 |