Flink DataStream Keyed和Non-Keyed窗口、WindowAssigner、窗口生命周期、窗口函数、迟到数据和窗口结果的处理

虾米姐 阅读:172 2022-06-06 14:17:30 评论:0

1. Keyed窗口(window)、Non-Keyed窗口(windowAll)

  1. Keyed窗口
val input:DataStream[T1] = ... 
val result:DataStream[T2] = input 
.keyBy(...)               // 按key进行分组 
.window(...)              // 指定一个key的窗口划分规则 
[.trigger(...)]            // 可选,否则使用默认触发器;作用是指定窗口函数执行的触发条件,和在删除窗口前删除窗口的元素 
[.evictor(...)]            // 可选, 否则不使用evictor;作用是在trigger之后、窗口函数执行之前和之后,删除窗口的元素 
[.allowedLateness(...)]    // 可选,否则允许的延迟为0 
[.sideOutputLateData(...)] 		// 可选,否则延迟数据不输出 
.reduce/aggregate/apply()      // 指定一个key对应的一个window中的数据处理函数 
 
val lateStream:DataStream[T1] = result.getSideOutput(...)      // 可选,和sideOutputLateData的outputTag对应 
  1. Non-Keyed窗口
    与Keyed窗口的区别是:(1) 不用keyBy进行分组, (2) 不用window函数,使用windowAll函数对所有key指定窗口划分规则, (3) reduce/aggregate/apply函数对所有key对应的一个window中的数据进行处理,数据处理并行度为1

2. WindowAssigner(基于时间)

Flink提供了四种WindowAssigner,我们也可以实现自定义的WindowAssigner, 划分后的window时间范围不包括窗口的结束时间

2.1 必须指定Time.hours(-8L)的offset

我们先来看不指定offset的情况

package datastreamApi 
 
import org.apache.commons.lang3.time.FastDateFormat 
import org.apache.flink.api.common.eventtime._ 
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment, createTypeInformation} 
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows 
import org.apache.flink.streaming.api.windowing.time.Time 
 
class RecordTimestampAssigner extends TimestampAssigner[(String, Int, String)] { 
  val fdf = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss") 
 
  override def extractTimestamp(element: (String, Int, String), recordTimestamp: Long): Long = { 
 
    fdf.parse(element._3).getTime 
 
  } 
 
} 
 
class PeriodWatermarkGenerator extends WatermarkGenerator[(String, Int, String)] { 
  val fdf = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss") 
  var maxTimestamp: Long = _ 
  val maxOutofOrderness = 0 
 
  override def onEvent(event: (String, Int, String), eventTimestamp: Long, output: WatermarkOutput): Unit = { 
 
    maxTimestamp = math.max(fdf.parse(event._3).getTime, maxTimestamp) 
 
  } 
 
  override def onPeriodicEmit(output: WatermarkOutput): Unit = { 
 
    output.emitWatermark(new Watermark(maxTimestamp - maxOutofOrderness - 1)) 
  } 
} 
 
 
class MyWatermarkStrategy extends WatermarkStrategy[(String, Int, String)] { 
 
  override def createTimestampAssigner(context: TimestampAssignerSupplier.Context): TimestampAssigner[(String, Int, String)] = { 
 
    new RecordTimestampAssigner() 
  } 
 
  override def createWatermarkGenerator(context: WatermarkGeneratorSupplier.Context): WatermarkGenerator[(String, Int, String)] = { 
    new PeriodWatermarkGenerator() 
 
  } 
 
} 
 
 
object WindowTest { 
 
  def main(args: Array[String]): Unit = { 
 
    val senv = StreamExecutionEnvironment.getExecutionEnvironment 
    val input = senv.fromElements( 
      ("A", 10, "2021-09-08 22:00:00"), 
      ("A", 20, "2021-09-08 23:00:00"), 
      ("A", 30, "2021-09-09 06:00:00"), 
      ("B", 100, "2021-09-08 22:00:00"), 
      ("B", 200, "2021-09-08 23:00:00"), 
      ("B", 300, "2021-09-09 06:00:00") 
    ).assignTimestampsAndWatermarks(new MyWatermarkStrategy()) 
 
    val result: DataStream[(String, Int, String)] = input.keyBy(_._1) 
      .window(TumblingEventTimeWindows.of(Time.days(1L))) 
      .sum(1) 
 
    result.print("result") 
 
    senv.execute("WindowTest") 
 
 
  } 
 
} 

执行结果为:

result:7> (A,60,2021-09-08 22:00:00) 
result:2> (B,600,2021-09-08 22:00:00) 

可以看出window的划分范围为第一天的8点到第二天的8点,但是我们实际想计算的window范围为第一天的0点到第二天的0点,可以将上面代码中的window(TumblingEventTimeWindows.of(Time.days(1L)))替换为window(TumblingEventTimeWindows.of(Time.days(1L),Time.hours(-8L))), 再次执行的结果如下:

result:2> (B,300,2021-09-08 22:00:00) 
result:2> (B,300,2021-09-09 06:00:00) 
result:7> (A,30,2021-09-08 22:00:00) 
result:7> (A,30,2021-09-09 06:00:00) 

2.2 Tumbling Windows

Tumbling Windows

// 窗口大小,偏移时间 
window(TumblingEventTimeWindows.of(Time.days(1L),Time.hours(-8L))) 
window(TumblingProcessingTimeWindows.of(Time.days(1L),Time.hours(-8L))) 

2.3 Sliding Windows

Sliding Windows

// 窗口大小,滑动时间, 偏移时间 
window(SlidingEventTimeWindows.of(Time.minutes(10L), Time.minutes(5L),Time.hours(-8L))) 
window(SlidingProcessingTimeWindows.of(Time.minutes(10L), Time.minutes(5L),Time.hours(-8L))) 

2.4 Session Windows

对于一个key, 会为每个到达的元素创建一个新窗口,如果两个元素的时间间隔比定义的小,则将它们的窗口进行合并;所有为了能合并窗口,需要合并的trigger和合并的窗口元素处理函数,如ReduceFunction、AggregateFunction、ProcessWindowFunction

Session Windows

// 固定会话间隔 
window(EventTimeSessionWindows.withGap(Time.minutes(5L))) 
window(ProcessingTimeSessionWindows.withGap(Time.minutes(5L))) 
 
// 动态时间间隔 
window(EventTimeSessionWindows.withDynamicGap(new SessionWindowTimeGapExtractor[(String, Int, String)]{ 
  override def extract(element: (String, Int, String)): Long = { 
    if(element._3 >= "2021-01-01 00:00:00") 5L else 10L 
  } 
})) 
 
window(ProcessingTimeSessionWindows.withDynamicGap(new SessionWindowTimeGapExtractor[(String, Int, String)]{ 
  override def extract(element: (String, Int, String)): Long = { 
    if(element._3 >= "2021-01-01 00:00:00") 5L else 10L 
  } 
})) 

2.5 Global Windows

Global Windows
该WindowAssigner不依赖timestamp时间戳,但需要指定trigger, 否则没有计算结果

window(GlobalWindows.create()) 

3. WindowAssigner(基于数据)

  • 不用通过DataStream.assignTimestampsAndWatermarks指定timestamp和watermarks
// ==============Keyed窗口============== 
// 窗口元素个数 
countWindow(10L) 
// 窗口元素个数,窗口滑动元素个数 
countWindow(10L, 5L) 
 
// ==============Non-Keyed窗口============== 
countWindowAll(10L) 
countWindowAll(10L, 5L) 

4. 认识窗口的生命周期

  1. 窗口的创建:当窗口的第一个元素到达时,就会创建窗口
  2. 窗口的删除:
    1. 对于时间窗口:当watermarks超过(窗口的结束时间 + allowedLateness), 窗口就会被删除
    2. 对于Global Windows和数据窗口:窗口不会被删除

5. 认识窗口函数

5.1 ReduceFunction和ProcessWindowFunction

  • ReduceFunction增量进行聚合
  • ProcessWindowFunction获取一个key的一个窗口所有元素再进行处理,但可以获取RuntimeContext
  • ReduceFunction将聚合完成后的结果,传递给ProcessWindowFunction
package datastreamApi 
 
import org.apache.commons.lang3.time.FastDateFormat 
import org.apache.flink.api.common.eventtime._ 
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction 
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment, createTypeInformation} 
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows 
import org.apache.flink.streaming.api.windowing.time.Time 
import org.apache.flink.streaming.api.windowing.windows.TimeWindow 
import org.apache.flink.util.Collector 
 
class RecordTimestampAssigner extends TimestampAssigner[(String, Int, String)] { 
  val fdf = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss") 
 
  override def extractTimestamp(element: (String, Int, String), recordTimestamp: Long): Long = { 
 
    fdf.parse(element._3).getTime 
 
  } 
 
} 
 
class PeriodWatermarkGenerator extends WatermarkGenerator[(String, Int, String)] { 
  val fdf = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss") 
  var maxTimestamp: Long = _ 
  val maxOutofOrderness = 0 
 
  override def onEvent(event: (String, Int, String), eventTimestamp: Long, output: WatermarkOutput): Unit = { 
 
    maxTimestamp = math.max(fdf.parse(event._3).getTime, maxTimestamp) 
 
  } 
 
  override def onPeriodicEmit(output: WatermarkOutput): Unit = { 
 
    output.emitWatermark(new Watermark(maxTimestamp - maxOutofOrderness - 1)) 
  } 
} 
 
 
class MyWatermarkStrategy extends WatermarkStrategy[(String, Int, String)] { 
 
  override def createTimestampAssigner(context: TimestampAssignerSupplier.Context): TimestampAssigner[(String, Int, String)] = { 
 
    new RecordTimestampAssigner() 
  } 
 
  override def createWatermarkGenerator(context: WatermarkGeneratorSupplier.Context): WatermarkGenerator[(String, Int, String)] = { 
    new PeriodWatermarkGenerator() 
 
  } 
 
} 
 
 
object WindowTest { 
 
  def main(args: Array[String]): Unit = { 
 
    val senv = StreamExecutionEnvironment.getExecutionEnvironment 
    val input = senv.fromElements( 
      ("A", 10, "2021-09-08 01:00:00"), 
      ("A", 20, "2021-09-08 02:00:00"), 
      ("A", 30, "2021-09-08 03:00:00"), 
      ("B", 100, "2021-09-08 01:00:00"), 
      ("B", 200, "2021-09-08 02:00:00"), 
      ("B", 300, "2021-09-08 03:00:00") 
    ).assignTimestampsAndWatermarks(new MyWatermarkStrategy()) 
 
    val result: DataStream[String] = input.keyBy(_._1) 
      .window(TumblingEventTimeWindows.of(Time.days(1L), Time.hours(-8L))) 
      .reduce((x1: (String, Int, String), x2: (String, Int, String)) => { 
        val datetime1 = x1._3 
        val datetime2 = x2._3 
        val datetime = if (datetime1 > datetime2) datetime1 else datetime2 
 
        (x1._1, x1._2 + x2._2, datetime) 
      }, new ProcessWindowFunction[(String, Int, String), String, String, TimeWindow] { 
        override def process(key: String, context: Context, elements: Iterable[(String, Int, String)], out: Collector[String]): Unit = { 
 
          val element_str = elements.mkString(", ") 
          out.collect(s"=========窗口信息:${context.window}========${element_str}=====") 
 
        } 
      } 
      ) 
 
 
    result.print("result") 
 
    senv.execute("WindowTest") 
 
  } 
 
} 

执行结果:

result:2> =========窗口信息:TimeWindow{start=1631030400000, end=1631116800000}========(B,600,2021-09-08 03:00:00)===== 
result:7> =========窗口信息:TimeWindow{start=1631030400000, end=1631116800000}========(A,60,2021-09-08 03:00:00)===== 

5.2 AggregateFunction和ProcessWindowFunction

  • AggregateFunction增量进行聚合
  • ProcessWindowFunction获取一个key的一个窗口所有元素再进行处理, 但可以获取RuntimeContext
  • AggregateFunction将聚合完成后的结果,传递给ProcessWindowFunction
package datastreamApi 
 
import org.apache.commons.lang3.time.FastDateFormat 
import org.apache.flink.api.common.eventtime._ 
import org.apache.flink.api.common.functions.AggregateFunction 
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction 
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment, createTypeInformation} 
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows 
import org.apache.flink.streaming.api.windowing.time.Time 
import org.apache.flink.streaming.api.windowing.windows.TimeWindow 
import org.apache.flink.util.Collector 
 
class RecordTimestampAssigner extends TimestampAssigner[(String, Int, String)] { 
  val fdf = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss") 
 
  override def extractTimestamp(element: (String, Int, String), recordTimestamp: Long): Long = { 
 
    fdf.parse(element._3).getTime 
 
  } 
 
} 
 
class PeriodWatermarkGenerator extends WatermarkGenerator[(String, Int, String)] { 
  val fdf = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss") 
  var maxTimestamp: Long = _ 
  val maxOutofOrderness = 0 
 
  override def onEvent(event: (String, Int, String), eventTimestamp: Long, output: WatermarkOutput): Unit = { 
 
    maxTimestamp = math.max(fdf.parse(event._3).getTime, maxTimestamp) 
 
  } 
 
  override def onPeriodicEmit(output: WatermarkOutput): Unit = { 
 
    output.emitWatermark(new Watermark(maxTimestamp - maxOutofOrderness - 1)) 
  } 
} 
 
 
class MyWatermarkStrategy extends WatermarkStrategy[(String, Int, String)] { 
 
  override def createTimestampAssigner(context: TimestampAssignerSupplier.Context): TimestampAssigner[(String, Int, String)] = { 
 
    new RecordTimestampAssigner() 
  } 
 
  override def createWatermarkGenerator(context: WatermarkGeneratorSupplier.Context): WatermarkGenerator[(String, Int, String)] = { 
    new PeriodWatermarkGenerator() 
 
  } 
 
} 
 
class MyAggregateFunction extends AggregateFunction[(String, Int, String), (String, Int, String), (String, Int, String)] { 
 
  def maxDatetime(datetime1: String, datetime2: String) = { 
    if (datetime1 > datetime2) datetime1 else datetime2 
  } 
 
  override def createAccumulator(): (String, Int, String) = { 
    ("", 0, "0000-00-00 00:00:00") 
  } 
 
  override def add(in: (String, Int, String), acc: (String, Int, String)): (String, Int, String) = { 
 
    (in._1, in._2 + acc._2, maxDatetime(in._3, acc._3)) 
 
  } 
 
  override def merge(acc1: (String, Int, String), acc2: (String, Int, String)): (String, Int, String) = { 
 
    (acc1._1, acc1._2 + acc2._2, maxDatetime(acc1._3, acc2._3)) 
  } 
 
  override def getResult(acc: (String, Int, String)): (String, Int, String) = { 
    acc 
  } 
} 
 
object WindowTest { 
 
  def main(args: Array[String]): Unit = { 
 
    val senv = StreamExecutionEnvironment.getExecutionEnvironment 
    val input = senv.fromElements( 
      ("A", 10, "2021-09-08 01:00:00"), 
      ("A", 20, "2021-09-08 02:00:00"), 
      ("A", 30, "2021-09-08 03:00:00"), 
      ("B", 100, "2021-09-08 01:00:00"), 
      ("B", 200, "2021-09-08 02:00:00"), 
      ("B", 300, "2021-09-08 03:00:00") 
    ).assignTimestampsAndWatermarks(new MyWatermarkStrategy()) 
 
    val result: DataStream[String] = input.keyBy(_._1) 
      .window(TumblingEventTimeWindows.of(Time.days(1L), Time.hours(-8L))) 
      .aggregate(new MyAggregateFunction(), 
        new ProcessWindowFunction[(String, Int, String), String, String, TimeWindow] { 
          override def process(key: String, context: Context, elements: Iterable[(String, Int, String)], out: Collector[String]): Unit = { 
 
            val element_str = elements.mkString(", ") 
            out.collect(s"=========窗口信息:${context.window}========${element_str}=====") 
 
          } 
        } 
      ) 
 
 
    result.print("result") 
 
    senv.execute("WindowTest") 
 
  } 
 
} 

执行结果:

result:2> =========窗口信息:TimeWindow{start=1631030400000, end=1631116800000}========(B,600,2021-09-08 03:00:00)===== 
result:7> =========窗口信息:TimeWindow{start=1631030400000, end=1631116800000}========(A,60,2021-09-08 03:00:00)===== 

6. 处理迟到数据

ncat发送的数据如下:

[root@bigdata005 ~]# nc -lk 9998 
A,10,2021-09-08 01:00:05 
A,20,2021-09-08 01:00:06 
A,30,2021-09-08 01:00:12 
A,40,2021-09-08 01:00:07 
A,30,2021-09-08 01:00:13 
A,40,2021-09-08 01:00:08 
 

示例程序如下:

package datastreamApi 
 
import org.apache.commons.lang3.time.FastDateFormat 
import org.apache.flink.api.common.eventtime._ 
import org.apache.flink.streaming.api.scala.{DataStream, OutputTag, StreamExecutionEnvironment, createTypeInformation} 
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows 
import org.apache.flink.streaming.api.windowing.time.Time 
 
class RecordTimestampAssigner extends TimestampAssigner[(String, Int, String)] { 
  val fdf = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss") 
 
  override def extractTimestamp(element: (String, Int, String), recordTimestamp: Long): Long = { 
 
    fdf.parse(element._3).getTime 
 
  } 
 
} 
 
class PeriodWatermarkGenerator extends WatermarkGenerator[(String, Int, String)] { 
  val fdf = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss") 
  var maxTimestamp: Long = _ 
  val maxOutofOrderness = 0 
 
  override def onEvent(event: (String, Int, String), eventTimestamp: Long, output: WatermarkOutput): Unit = { 
 
    maxTimestamp = math.max(fdf.parse(event._3).getTime, maxTimestamp) 
 
  } 
 
  override def onPeriodicEmit(output: WatermarkOutput): Unit = { 
 
    output.emitWatermark(new Watermark(maxTimestamp - maxOutofOrderness - 1)) 
  } 
} 
 
 
class MyWatermarkStrategy extends WatermarkStrategy[(String, Int, String)] { 
 
  override def createTimestampAssigner(context: TimestampAssignerSupplier.Context): TimestampAssigner[(String, Int, String)] = { 
 
    new RecordTimestampAssigner() 
  } 
 
  override def createWatermarkGenerator(context: WatermarkGeneratorSupplier.Context): WatermarkGenerator[(String, Int, String)] = { 
    new PeriodWatermarkGenerator() 
 
  } 
 
} 
 
 
object WindowTest { 
 
  def main(args: Array[String]): Unit = { 
 
    val senv = StreamExecutionEnvironment.getExecutionEnvironment 
    val input = senv.socketTextStream("192.168.23.51", 9998) 
      .map(line => { 
        val words = line.split(",") 
        (words(0), words(1).toInt, words(2)) 
      }) 
      .assignTimestampsAndWatermarks(new MyWatermarkStrategy()) 
      .setParallelism(1) 
 
    val myOutputTag = new OutputTag[(String, Int, String)]("my_output_tag") 
    val result = input.keyBy(_._1) 
      .window(TumblingEventTimeWindows.of(Time.seconds(10L))) 
      .allowedLateness(Time.seconds(3L)) 
      .sideOutputLateData(myOutputTag) 
      .sum(1) 
 
    result.print("result") 
 
    val output_result: DataStream[(String, Int, String)] = result.getSideOutput(myOutputTag) 
    output_result.print("output_result") 
 
    senv.execute("WindowTest") 
  } 
 
} 

执行结果如下:

result:7> (A,30,2021-09-08 01:00:05) 
result:7> (A,70,2021-09-08 01:00:05) 
output_result:7> (A,40,2021-09-08 01:00:08) 

程序说明如下:

  • 将assignTimestampsAndWatermarks函数的Parallelism设置为1,这是因为该senv的默认Parallelism为8,当通过assignTimestampsAndWatermarks指定Watermarks时,没有元素的Slot的Watermarks为初始值0,导致向下游Operator传递的Watermarks为0,所有窗口不会被触发计算
  • 当Watermarks大于窗口A的(endTimestamp - 1)时,窗口被触发计算
  • 当Watermarks小于等于窗口A的(endTimestamp + allowedLateness - 1)时,新来的元素落入窗口A,则窗口再次被触发计算,此时注意数据的去重
  • 当Watermarks大于窗口A的(endTimestamp + allowedLateness - 1)时,窗口A被删除,如果新来的元素落入窗口A,则通过sideOutputLateData进行输出

allowedLateness延迟数据说明:

  • 对于Session Windows,延迟数据触发的计算,会导致Session窗口的合并

7. 处理窗口结果

  • 窗口函数返回的结果是一个DataStream, 该DataStream的元素的timestamp都被设置为窗口的(endTimestamp - 1)
  • 所以下游窗口的划分规则【如window(TumblingEventTimeWindows.of(Time.seconds(10L)))】,最好是上游窗口的划分规则的倍数【如window(TumblingEventTimeWindows.of(Time.seconds(10L)))
  • 一个窗口的数据大小,需考虑以下几点:
    1. 对于滑动窗口,如果一个元素属于3个窗口,则会创建3个元素副本
    2. 一个窗口的State的大小
    3. 使用的窗口函数是增量计算还是全量计算

标签:程序员
声明

1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,请转载时务必注明文章作者和来源,不尊重原创的行为我们将追究责任;3.作者投稿可能会经我们编辑修改或补充。

搜索
排行榜
关注我们

一个IT知识分享的公众号