累加器用来把 Executor 端变量信息聚合到 Driver 端。在 Driver 程序中定义的变量,在 Executor 端的每个 Task 都会得到这个变量的一份新的副本,每个 task 更新这些副本的值后, 传回 Driver 端进行 merge。
scalaval rdd = sc.makeRDD(List(1,2,3,4,5)) // 声明累加器 var sum = sc.longAccumulator("sum"); rdd.foreach( num => { // 使用累加器 sum.add(num) } ) // 获取累加器的值 println("sum = " + sum.value)
scala// 自定义累加器 // 1. 继承 AccumulatorV2,并设定泛型 // 2. 重写累加器的抽象方法 class WordCountAccumulator extends AccumulatorV2[String, mutable.Map[String, Long]]{ var map : mutable.Map[String, Long] = mutable.Map() // 累加器是否为初始状态 override def isZero: Boolean = { map.isEmpty } // 复制累加器 override def copy(): AccumulatorV2[String, mutable.Map[String, Long]] = { new WordCountAccumulator } // 重置累加器 override def reset(): Unit = { map.clear() } // 向累加器中增加数据 (In) override def add(word: String): Unit = { // 查询 map 中是否存在相同的单词 // 如果有相同的单词,那么单词的数量加 1 // 如果没有相同的单词,那么在 map 中增加这个单词 map(word) = map.getOrElse(word, 0L) + 1L } // 合并累加器 override def merge(other: AccumulatorV2[String, mutable.Map[String, Long]]): Unit = { val map1 = map val map2 = other.value // 两个 Map 的合并 map = map1.foldLeft(map2)( ( innerMap, kv ) => { innerMap(kv._1) = innerMap.getOrElse(kv._1, 0L) + kv._2 innerMap } ) } // 返回累加器的结果 (Out) override def value: mutable.Map[String, Long] = map }
本文作者:Dewar
本文链接:
版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!