问题表现
- 总共有1000个task,997个task都在1分钟之内执行完了,但是剩余两三个task却要一两个小时
- 原本能够正常执行的Spark作业,某天突然报出OOM(内存溢出)异常
问题原因
在进行shuffle的时候,必须将各个节点上相同的key拉取到某个节点上的一个task来进行处理,比如按照key进行聚合或join等操作。此时如果某个key对应的数据量特别大的话,就会发生数据倾斜。比如大部分key对应10条数据,但是个别key却对应了100万条数据,那么大部分task可能就只会分配到10条数据,然后1秒钟就运行完了;但是个别task可能分配到了100万数据,要运行一两个小时。因此,整个Spark作业的运行进度是由运行时间最长的那个task决定的。
解决方法
| 适用场景 | 实现原理 | 方案优点 | 方案缺点 |
|---|---|---|---|
| 导致倾斜的key就少数几个 | 取数据量最多的key过滤掉即可 | 实现简单 | 适用场景不多 |
| 默认并行度较小(200) | 提高shuffle操作的并行度,让每个task处理比原来更少的数据 | 实现简单 | 只是缓解 |
| reduceByKey 和 groupByKey 操作 | 1. 先局部聚合: 给每个key都打上一个随机数, 再进行聚合操作 2. 后全局聚合: 将各个key的前缀给去掉,再进行一次聚合操作 | 对于聚合类的shuffle操作导致的数据倾斜,效果是非常不错的 | 不适用与 join类的shuffle操作 |
| 1. join 操作 2. 其中一个表数据量较小 | 不使用join算子进行连接操作,而使用Broadcast变量与map类算子实现join操作 | 对join操作导致的数据倾斜,效果非常好 | 只适用于一个大表和一个小表的情况, 并不适合两个都是大表的情况 |
| 1. join 操作 2. 某几个key导致了倾斜 | 将少数几个key分拆成独立RDD,并附加随机前缀打散成n份去进行join 另一份数据(这份数据要先膨胀成n条数据) | 在适用场景下效果较好 | 不适用于导致倾斜的key特别多的情况 |
| 1. join 操作 2. 大量key导致了倾斜 | 将所有key附加随机前缀打散成n份去进行join 另一份数据(这份数据要先膨胀成n条数据) | 对join类型的数据倾斜基本都可以处理 | 需要对整个RDD进行扩容,对内存资源要求很高 |
代码
参考资料
文档信息
- 本文作者:joey zhou
- 本文链接:https://joeyzyz.github.io/2021/09/05/spark%E6%95%B0%E6%8D%AE%E5%80%BE%E6%96%9C/
- 版权声明:自由转载-非商用-非衍生-保持署名(创意共享3.0许可证)