2025-09-20
Spark
00
请注意,本文编写于 94 天前,最后修改于 94 天前,其中某些信息可能已经过时。

目录

实现原理
基础编程
系统累加器
自定义累加器

实现原理

累加器用来把 Executor 端变量信息聚合到 Driver 端。在 Driver 程序中定义的变量,在 Executor 端的每个 Task 都会得到这个变量的一份新的副本,每个 task 更新这些副本的值后, 传回 Driver 端进行 merge。

基础编程

系统累加器

scala
val rdd = sc.makeRDD(List(1,2,3,4,5)) // 声明累加器 var sum = sc.longAccumulator("sum"); rdd.foreach( num => { // 使用累加器 sum.add(num) } ) // 获取累加器的值 println("sum = " + sum.value)

自定义累加器

scala
// 自定义累加器 // 1. 继承 AccumulatorV2,并设定泛型 // 2. 重写累加器的抽象方法 class WordCountAccumulator extends AccumulatorV2[String, mutable.Map[String, Long]]{ var map : mutable.Map[String, Long] = mutable.Map() // 累加器是否为初始状态 override def isZero: Boolean = { map.isEmpty } // 复制累加器 override def copy(): AccumulatorV2[String, mutable.Map[String, Long]] = { new WordCountAccumulator } // 重置累加器 override def reset(): Unit = { map.clear() } // 向累加器中增加数据 (In) override def add(word: String): Unit = { // 查询 map 中是否存在相同的单词 // 如果有相同的单词,那么单词的数量加 1 // 如果没有相同的单词,那么在 map 中增加这个单词 map(word) = map.getOrElse(word, 0L) + 1L } // 合并累加器 override def merge(other: AccumulatorV2[String, mutable.Map[String, Long]]): Unit = { val map1 = map val map2 = other.value // 两个 Map 的合并 map = map1.foldLeft(map2)( ( innerMap, kv ) => { innerMap(kv._1) = innerMap.getOrElse(kv._1, 0L) + kv._2 innerMap } ) } // 返回累加器的结果 (Out) override def value: mutable.Map[String, Long] = map }

本文作者:Dewar

本文链接:

版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!