Flink Accumulators累加器、Counters计数器的使用和实现自定义Accumulators、Counters

阿里 阅读:285 2022-06-06 14:17:48 评论:0

1. 自定义Accumulators

  • 自定义的方式有两种,两种之间除了要求的数据类型不一样,其它都一样
    1. Accumulator<V, R>: 其中V指add的数据的类型、R指结果的类型
    2. SimpleAccumulator<T>: 其中T指add和结果的数据的类型
  • 这里我们使用Accumulator来实现

devBase\Int2LongAccumulator.scala

package devBase 
 
import org.apache.flink.api.common.accumulators.{Accumulator} 
 
// 输出的类型要求能序列化, Long不能被序列化 
class Int2LongAccumulator extends Accumulator[Int, BigInt] { 
 
  private var local_value = 0L 
 
  def this(value:Int) = { 
    this() 
    local_value = value 
  } 
 
 
  override def resetLocal(): Unit = { 
    local_value = 0 
  } 
 
 
 
  override def add(value: Int): Unit = { 
    local_value += value 
  } 
 
  override def merge(other: Accumulator[Int, BigInt]): Unit = { 
 
    BigInt(local_value + other.getLocalValue.toLong) 
  } 
 
 
 
  override def getLocalValue: BigInt = { 
 
    BigInt(local_value) 
 
  } 
 
 
 
  override def clone(): Accumulator[Int, BigInt] = { 
 
    val result = new Int2LongAccumulator() 
    result.local_value = local_value 
 
    result 
 
  } 
 
 
} 

2. Accumulators的使用

devBase\AccumulatorTest.scala

package devBase 
 
import org.apache.flink.api.common.functions.RichMapFunction 
import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment, createTypeInformation} 
import org.apache.flink.configuration.Configuration 
 
 
class AccumulatorMapFunction extends RichMapFunction[String, String] { 
 
  val int2LongAccumulator = new Int2LongAccumulator(0) 
 
  override def map(value: String):String = { 
 
    int2LongAccumulator.add(1) 
 
    value 
  } 
 
  override def open(parameters: Configuration): Unit = { 
 
    getRuntimeContext.addAccumulator("int2LongAccumulator", int2LongAccumulator) 
 
    int2LongAccumulator.add(1) 
 
    super.open(parameters) 
 
  } 
 
 
  override def close(): Unit = { 
 
    int2LongAccumulator.add(1) 
 
    super.close() 
 
  } 
} 
 
 
object AccumulatorTest { 
 
  def main(args: Array[String]): Unit = { 
 
    val env = ExecutionEnvironment.getExecutionEnvironment 
 
    val text: DataSet[String] = env.fromElements("flink", "spark", "hadoop", "clickhouse") 
 
    val ds: DataSet[String] = text 
      .map(new AccumulatorMapFunction()).setParallelism(10) 
 
    ds.print() 
 
    // 在job执行完才返回结果 
    val jobExecutionResult = env.getLastJobExecutionResult 
    val accumulatorResult = jobExecutionResult.getAccumulatorResult("int2LongAccumulator") 
      .asInstanceOf[BigInt].toLong 
    println("accumulatorResult: " + accumulatorResult) 
 
  } 
} 

运行程序,输出的结果如下:

flink 
spark 
hadoop 
accumulatorResult: 3 

accumulatorResult的结果应该为6,这是因为map函数setParallelism大于1时,则Accumulators的结果不准确

3. 内置的Accumulators

  • Counter: 计数器,包含IntCounter、LongCounter、DoubleCounter
  • Histogram: 离散数据直方图,内部实现是一个整数(对应Accumulator中的V, 表示横坐标的值)到整数(对应Accumulator中的R, 表示纵坐标的值)的映射

标签:程序员
声明

1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,请转载时务必注明文章作者和来源,不尊重原创的行为我们将追究责任;3.作者投稿可能会经我们编辑修改或补充。

关注我们

一个IT知识分享的公众号