Flink Accumulators累加器、Counters计数器的使用和实现自定义Accumulators、Counters
阿里
阅读:285
2022-06-06 14:17:48
评论:0
1. 自定义Accumulators
- 自定义的方式有两种,两种之间除了要求的数据类型不一样,其它都一样
- Accumulator<V, R>: 其中V指add的数据的类型、R指结果的类型
- SimpleAccumulator<T>: 其中T指add和结果的数据的类型
- 这里我们使用Accumulator来实现
devBase\Int2LongAccumulator.scala
package devBase
import org.apache.flink.api.common.accumulators.{Accumulator}
// 输出的类型要求能序列化, Long不能被序列化
class Int2LongAccumulator extends Accumulator[Int, BigInt] {
private var local_value = 0L
def this(value:Int) = {
this()
local_value = value
}
override def resetLocal(): Unit = {
local_value = 0
}
override def add(value: Int): Unit = {
local_value += value
}
override def merge(other: Accumulator[Int, BigInt]): Unit = {
BigInt(local_value + other.getLocalValue.toLong)
}
override def getLocalValue: BigInt = {
BigInt(local_value)
}
override def clone(): Accumulator[Int, BigInt] = {
val result = new Int2LongAccumulator()
result.local_value = local_value
result
}
}
2. Accumulators的使用
devBase\AccumulatorTest.scala
package devBase
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment, createTypeInformation}
import org.apache.flink.configuration.Configuration
class AccumulatorMapFunction extends RichMapFunction[String, String] {
val int2LongAccumulator = new Int2LongAccumulator(0)
override def map(value: String):String = {
int2LongAccumulator.add(1)
value
}
override def open(parameters: Configuration): Unit = {
getRuntimeContext.addAccumulator("int2LongAccumulator", int2LongAccumulator)
int2LongAccumulator.add(1)
super.open(parameters)
}
override def close(): Unit = {
int2LongAccumulator.add(1)
super.close()
}
}
object AccumulatorTest {
def main(args: Array[String]): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val text: DataSet[String] = env.fromElements("flink", "spark", "hadoop", "clickhouse")
val ds: DataSet[String] = text
.map(new AccumulatorMapFunction()).setParallelism(10)
ds.print()
// 在job执行完才返回结果
val jobExecutionResult = env.getLastJobExecutionResult
val accumulatorResult = jobExecutionResult.getAccumulatorResult("int2LongAccumulator")
.asInstanceOf[BigInt].toLong
println("accumulatorResult: " + accumulatorResult)
}
}
运行程序,输出的结果如下:
flink
spark
hadoop
accumulatorResult: 3
accumulatorResult的结果应该为6,这是因为map函数setParallelism大于1时,则Accumulators的结果不准确
3. 内置的Accumulators
- Counter: 计数器,包含IntCounter、LongCounter、DoubleCounter
- Histogram: 离散数据直方图,内部实现是一个整数(对应Accumulator中的V, 表示横坐标的值)到整数(对应Accumulator中的R, 表示纵坐标的值)的映射
声明
1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,请转载时务必注明文章作者和来源,不尊重原创的行为我们将追究责任;3.作者投稿可能会经我们编辑修改或补充。