Flink DataStream的多流、键控流、窗口、连接、物理分区转换算子的使用

无情 阅读:247 2022-06-06 14:17:37 评论:0

1. 多流转换算子

1.1 union

package devBase 
 
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, createTypeInformation} 
 
 
object TranformationOperatorTest { 
 
  def main(args: Array[String]): Unit = { 
 
    val senv = StreamExecutionEnvironment.getExecutionEnvironment 
    val input1 = senv.fromElements(("Liming",16), ("Zhangsan", 30)) 
    val input2 = senv.fromElements(("Zhaosi",40), ("Wangwu", 58)) 
 
    val output = input1.union(input2) 
    output.print() 
 
    senv.execute() 
 
  } 
 
} 

执行结果:

3> (Wangwu,58) 
4> (Zhangsan,30) 
2> (Zhaosi,40) 
3> (Liming,16) 
  • 支持union多个DataStream: def union(dataStreams: DataStream[T]*): DataStream[T] = ......

1.2 connect

package devBase 
 
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, createTypeInformation} 
 
 
object TranformationOperatorTest { 
 
  def main(args: Array[String]): Unit = { 
 
    val senv = StreamExecutionEnvironment.getExecutionEnvironment 
    val input1 = senv.fromElements(("Liming",16), ("Zhangsan", 30)) 
    val input2 = senv.fromElements(("Zhaosi","Beijing"), ("Wangwu", "Shanghai")) 
 
    // 虽然在同一个流,但两个源数据流的数据格式没变 
    val connectedStream = input1.connect(input2) 
    // 将流中的数据格式变成一样 
    val output = connectedStream.map( 
      // 如果是input1的数据,用此转换函数 
      input1_stream => (input1_stream._1, input1_stream._2.toString), 
      // 如果是input2的数据,用此转换函数 
      input2_stream => (input2_stream._1, input2_stream._2) 
    ) 
    output.print() 
 
    senv.execute() 
 
  } 
 
} 

执行结果:

2> (Liming,16) 
1> (Wangwu,Shanghai) 
8> (Zhaosi,Beijing) 
3> (Zhangsan,30) 

2. 键控流转换算子

2.1 keyBy和min、max、sum、minBy、maxBy

package devBase 
 
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, createTypeInformation} 
 
 
object TranformationOperatorTest { 
 
  def main(args: Array[String]): Unit = { 
 
    val senv = StreamExecutionEnvironment.getExecutionEnvironment 
    val input = senv.fromElements( 
      ("Liming",20, 21), 
      ("Liming",10, 11), 
      ("Liming",30, 31), 
      ("Zhangsan", 200, 210), 
      ("Zhangsan", 100, 110), 
      ("Zhangsan", 300, 310) 
    ) 
 
    val output_min = input.keyBy(tuple => tuple._1).min(1) 
    output_min.print("output_min") 
    /* 执行结果如下: 
    output_min:8> (Zhangsan,200,210) 
    output_min:8> (Zhangsan,100,210) 
    output_min:2> (Liming,20,21) 
    output_min:8> (Zhangsan,100,210) 
    output_min:2> (Liming,10,21) 
    output_min:2> (Liming,10,21) 
     */ 
 
    val output_max = input.keyBy(tuple => tuple._1).max(1) 
    output_max.print("output_max") 
    /* 执行结果如下: 
    output_max:8> (Zhangsan,200,210) 
    output_max:2> (Liming,20,21) 
    output_max:8> (Zhangsan,200,210) 
    output_max:2> (Liming,20,21) 
    output_max:8> (Zhangsan,300,210) 
    output_max:2> (Liming,30,21) 
     */ 
 
    val output_sum = input.keyBy(tuple => tuple._1).sum(1) 
    output_sum.print("output_sum") 
    /* 执行结果如下: 
    output_sum:2> (Liming,20,21) 
    output_sum:8> (Zhangsan,200,210) 
    output_sum:2> (Liming,30,21) 
    output_sum:2> (Liming,60,21) 
    output_sum:8> (Zhangsan,300,210) 
    output_sum:8> (Zhangsan,600,210) 
     */ 
 
    val output_minBy = input.keyBy(tuple => tuple._1).minBy(1) 
    output_minBy.print("output_minBy") 
    /* 执行结果如下: 
    output_minBy:8> (Zhangsan,200,210) 
    output_minBy:2> (Liming,20,21) 
    output_minBy:8> (Zhangsan,100,110) 
    output_minBy:2> (Liming,10,11) 
    output_minBy:2> (Liming,10,11) 
    output_minBy:8> (Zhangsan,100,110) 
     */ 
 
    val output_maxBy = input.keyBy(tuple => tuple._1).maxBy(1) 
    output_maxBy.print("output_maxBy") 
    /* 执行结果如下: 
    output_maxBy:8> (Zhangsan,200,210) 
    output_maxBy:2> (Liming,20,21) 
    output_maxBy:8> (Zhangsan,200,210) 
    output_maxBy:2> (Liming,20,21) 
    output_maxBy:8> (Zhangsan,300,310) 
    output_maxBy:2> (Liming,30,31) 
     */ 
 
    senv.execute() 
 
  } 
 
} 
  • min / max / sum根据选取的字段,迭代求最小 / 最大 / 求和, 其它字段选第一行
  • minBy / maxBy
    1. 根据选取的字段,迭代求最小 / 最大
    2. 其它字段,选第1步迭代求到最小 / 最大的那一行

2.2 keyBy和reduce

package devBase 
 
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, createTypeInformation} 
 
object TranformationOperatorTest { 
 
  def main(args: Array[String]): Unit = { 
 
    val senv = StreamExecutionEnvironment.getExecutionEnvironment 
    val input = senv.fromElements( 
      ("Liming",20, 21), 
      ("Liming",10, 11), 
      ("Liming",30, 31), 
      ("Zhangsan", 200, 210), 
      ("Zhangsan", 100, 110), 
      ("Zhangsan", 300, 310) 
    ) 
 
    val output_min = input.keyBy(tuple => tuple._1).reduce( 
      (v1, v2) => (v1._1, v1._2 + v2._2, v1._3 + v2._3) 
    ) 
    output_min.print() 
 
    senv.execute() 
 
  } 
 
} 

执行结果如下:

8> (Zhangsan,200,210) 
2> (Liming,20,21) 
8> (Zhangsan,300,320) 
2> (Liming,30,32) 
2> (Liming,60,63) 
8> (Zhangsan,600,630) 

3. 窗口转换算子

3.1 WindowedStream.apply

package devBase 
 
import apiTest.WordSourceFunction 
import org.apache.flink.streaming.api.scala.function.WindowFunction 
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, createTypeInformation} 
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows 
import org.apache.flink.streaming.api.windowing.time.Time 
import org.apache.flink.streaming.api.windowing.windows.TimeWindow 
import org.apache.flink.util.Collector 
 
object TranformationOperatorTest { 
 
  def main(args: Array[String]): Unit = { 
 
    val senv = StreamExecutionEnvironment.getExecutionEnvironment 
    val input = senv.addSource(new WordSourceFunction()) 
 
    val output = input.map(word => (word, 1)) 
      .keyBy(tuple => tuple._1) 
      .window(TumblingProcessingTimeWindows.of(Time.seconds(10L))) 
      .apply(new WindowFunction[(String, Int), (String, Int), String, TimeWindow] { 
        override def apply(key: String, window: TimeWindow, input: Iterable[(String, Int)], out: Collector[(String, Int)]): Unit = { 
 
          var count = 0 
          for (tuple <- input) { 
            count += tuple._2 
          } 
          out.collect((key, count)) 
 
        } 
      }) 
 
    output.print() 
 
    senv.execute() 
 
  } 
 
} 

执行结果

1> (table,1) 
8> (stream,1) 
7> (flink,1) 
7> (batch,2) 
4> (sql,2) 
5> (world,1) 
1> (table,1) 
3> (hello,2) 
8> (stream,2) 
5> (world,3) 
7> (batch,1) 
4> (sql,3) 
8> (stream,1) 
3> (hello,2) 
4> (sql,1) 
5> (world,2) 
3> (hello,1) 
8> (stream,3) 
7> (batch,1) 
7> (flink,2) 
......省略部分...... 
  • 1.13版本不用执行senv.setStreamTimeCharacteristic, 因为默认就是EventTime, 且processing time、ingestion time也可以在该时间语义下工作

3.2 AllWindowedStream.apply

package devBase 
 
import apiTest.WordSourceFunction 
import org.apache.flink.streaming.api.scala.function.{AllWindowFunction} 
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, createTypeInformation} 
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows 
import org.apache.flink.streaming.api.windowing.time.Time 
import org.apache.flink.streaming.api.windowing.windows.TimeWindow 
import org.apache.flink.util.Collector 
 
object TranformationOperatorTest { 
 
  def main(args: Array[String]): Unit = { 
 
    val senv = StreamExecutionEnvironment.getExecutionEnvironment 
    val input = senv.addSource(new WordSourceFunction()) 
 
    val output = input.map(word => (word, 1)) 
      .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(10L))) 
      .apply(new AllWindowFunction[(String, Int), (String,Int),TimeWindow] { 
        override def apply(window: TimeWindow, input: Iterable[(String, Int)], out: Collector[(String, Int)]): Unit = { 
 
          var window_word = "" 
          var count = 0 
          for (tuple <- input) { 
            window_word = tuple._1 
            count += tuple._2 
          } 
          out.collect((window_word, count)) 
 
        } 
      }) 
 
    output.print() 
 
    senv.execute() 
 
  } 
 
} 

执行结果:

2> (stream,9) 
3> (world,10) 
4> (batch,10) 
5> (stream,10) 
6> (hello,10) 
7> (flink,10) 
......省略部分...... 

3.3 DataStream.coGroup(DataStream)

package devBase 
 
import apiTest.WordSourceFunction 
import org.apache.flink.api.common.functions.CoGroupFunction 
import org.apache.flink.streaming.api.scala.function.AllWindowFunction 
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, createTypeInformation} 
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows 
import org.apache.flink.streaming.api.windowing.time.Time 
import org.apache.flink.streaming.api.windowing.windows.TimeWindow 
import org.apache.flink.util.Collector 
 
 
import java.lang 
 
 
object TranformationOperatorTest { 
 
  def main(args: Array[String]): Unit = { 
 
    val senv = StreamExecutionEnvironment.getExecutionEnvironment 
    val input = senv.addSource(new WordSourceFunction()) 
 
    val output = input.map(word => (word, 1)) 
      .coGroup( 
        input.map(word => (word, word + "666")) 
      ).where(tuple => tuple._1).equalTo(tuple => tuple._1) 
      .window(TumblingProcessingTimeWindows.of(Time.seconds(10L))) 
      .apply(new CoGroupFunction[(String, Int),(String, String), String] { 
        override def coGroup(first: lang.Iterable[(String, Int)], second: lang.Iterable[(String, String)], out: Collector[String]): Unit = { 
          val first_iter = first.iterator() 
          val second_iter = second.iterator() 
 
          while(first_iter.hasNext) {out.collect(first_iter.next()._1)} 
          while(second_iter.hasNext) {out.collect(second_iter.next()._2)} 
 
        } 
      }) 
 
    output.print() 
 
    senv.execute() 
 
  } 
 
} 

执行结果:

7> batch 
7> batch 
7> batch666 
7> batch666 
3> hello 
3> hello666 
8> stream 
......省略部分...... 

4. 连接转换算子

4.1 窗口连接

package devBase 
 
import org.apache.commons.lang3.time.FastDateFormat 
import org.apache.flink.api.common.eventtime._ 
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, createTypeInformation} 
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows 
import org.apache.flink.streaming.api.windowing.time.Time 
import org.apache.flink.util.Collector 
 
class RecordTimestampAssigner extends TimestampAssigner[(String, Int, String)] { 
  val fdf = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss") 
 
  override def extractTimestamp(element: (String, Int, String), recordTimestamp: Long): Long = { 
 
    fdf.parse(element._3).getTime 
 
  } 
 
} 
 
class PeriodWatermarkGenerator extends WatermarkGenerator[(String, Int, String)] { 
  val fdf = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss") 
  var maxTimestamp: Long = _ 
  val maxOutofOrderness = 0 
 
  override def onEvent(event: (String, Int, String), eventTimestamp: Long, output: WatermarkOutput): Unit = { 
 
    maxTimestamp = math.max(fdf.parse(event._3).getTime, maxTimestamp) 
 
  } 
 
  override def onPeriodicEmit(output: WatermarkOutput): Unit = { 
 
    output.emitWatermark(new Watermark(maxTimestamp - maxOutofOrderness - 1)) 
  } 
} 
 
 
class MyWatermarkStrategy extends WatermarkStrategy[(String, Int, String)] { 
 
  override def createTimestampAssigner(context: TimestampAssignerSupplier.Context): TimestampAssigner[(String, Int, String)] = { 
 
    new RecordTimestampAssigner() 
  } 
 
  override def createWatermarkGenerator(context: WatermarkGeneratorSupplier.Context): WatermarkGenerator[(String, Int, String)] = { 
    new PeriodWatermarkGenerator() 
 
 
  } 
 
} 
 
 
object TranformationOperatorTest { 
 
  def main(args: Array[String]): Unit = { 
 
    val senv = StreamExecutionEnvironment.getExecutionEnvironment 
    val input1 = senv.fromElements( 
      ("A", 110, "2021-09-01 08:00:11"), 
      ("A", 120, "2021-09-01 08:00:12"), 
      ("B", 130, "2021-09-01 08:00:13"), 
      ("B", 140, "2021-09-01 08:00:14"), 
      ("C", 210, "2021-09-01 08:00:21"), 
      ("C", 220, "2021-09-01 08:00:22"), 
      ("D", 310, "2021-09-01 08:00:31"), 
      ("D", 320, "2021-09-01 08:00:32") 
    ).assignTimestampsAndWatermarks(new MyWatermarkStrategy()) 
 
    val input2 = senv.fromElements( 
      ("A", 110, "2021-09-01 08:00:11"), 
      ("A", 120, "2021-09-01 08:00:12"), 
      ("B", 130, "2021-09-01 08:00:13"), 
      ("B", 140, "2021-09-01 08:00:14"), 
      ("C", 210, "2021-09-01 08:00:21") 
    ).assignTimestampsAndWatermarks(new MyWatermarkStrategy()) 
 
    val output = input1.join(input2) 
      .where(event => event._1).equalTo((event => event._1)) 
      .window(TumblingEventTimeWindows.of(Time.seconds(10L))) 
      .apply((left: (String, Int, String), right: (String, Int, String), collector: Collector[(String, Int, String)]) => { 
        val maxDatetime = if (left._3 > right._3) left._3 else right._3 
        collector.collect(left._1, left._2 + right._2, maxDatetime) 
      }) 
 
 
    output.print() 
 
 
    senv.execute() 
 
 
  } 
 
} 

执行结果:

7> (A,220,2021-09-01 08:00:11) 
7> (A,230,2021-09-01 08:00:12) 
7> (A,230,2021-09-01 08:00:12) 
7> (A,240,2021-09-01 08:00:12) 
2> (B,260,2021-09-01 08:00:13) 
2> (B,270,2021-09-01 08:00:14) 
2> (B,270,2021-09-01 08:00:14) 
2> (B,280,2021-09-01 08:00:14) 
2> (C,420,2021-09-01 08:00:21) 
2> (C,430,2021-09-01 08:00:22) 
  • 对于键B,在窗口[08:00:10, 08:00:20)中,apply函数的所有结果元素的timestamp为最大的timestamp 08:00:14
  • 滑动窗口:SlidingEventTimeWindows.of(Time.seconds(10L),Time.seconds(1L))
  • 会话窗口:EventTimeSessionWindows.withGap(Time.seconds(10L))

4.2 间隔连接

package devBase 
 
import org.apache.commons.lang3.time.FastDateFormat 
import org.apache.flink.api.common.eventtime._ 
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction 
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, createTypeInformation} 
import org.apache.flink.streaming.api.windowing.time.Time 
import org.apache.flink.util.Collector 
 
class RecordTimestampAssigner extends TimestampAssigner[(String, Int, String)] { 
  val fdf = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss") 
 
  override def extractTimestamp(element: (String, Int, String), recordTimestamp: Long): Long = { 
 
    fdf.parse(element._3).getTime 
 
  } 
 
} 
 
class PeriodWatermarkGenerator extends WatermarkGenerator[(String, Int, String)] { 
  val fdf = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss") 
  var maxTimestamp: Long = _ 
  val maxOutofOrderness = 0 
 
  override def onEvent(event: (String, Int, String), eventTimestamp: Long, output: WatermarkOutput): Unit = { 
 
    maxTimestamp = math.max(fdf.parse(event._3).getTime, maxTimestamp) 
 
  } 
 
  override def onPeriodicEmit(output: WatermarkOutput): Unit = { 
 
    output.emitWatermark(new Watermark(maxTimestamp - maxOutofOrderness - 1)) 
  } 
} 
 
 
class MyWatermarkStrategy extends WatermarkStrategy[(String, Int, String)] { 
 
  override def createTimestampAssigner(context: TimestampAssignerSupplier.Context): TimestampAssigner[(String, Int, String)] = { 
 
    new RecordTimestampAssigner() 
  } 
 
  override def createWatermarkGenerator(context: WatermarkGeneratorSupplier.Context): WatermarkGenerator[(String, Int, String)] = { 
    new PeriodWatermarkGenerator() 
 
 
  } 
 
} 
 
 
object TranformationOperatorTest { 
 
  def main(args: Array[String]): Unit = { 
 
    val senv = StreamExecutionEnvironment.getExecutionEnvironment 
    val input1 = senv.fromElements( 
      ("A", 110, "2021-09-01 08:00:11"), 
      ("A", 120, "2021-09-01 08:00:12"), 
      ("B", 130, "2021-09-01 08:00:13"), 
      ("B", 140, "2021-09-01 08:00:14"), 
      ("C", 210, "2021-09-01 08:00:21"), 
      ("C", 220, "2021-09-01 08:00:22"), 
      ("D", 310, "2021-09-01 08:00:31"), 
      ("D", 320, "2021-09-01 08:00:32") 
    ).assignTimestampsAndWatermarks(new MyWatermarkStrategy()) 
 
    val input2 = senv.fromElements( 
      ("A", 110, "2021-09-01 08:00:11"), 
      ("A", 120, "2021-09-01 08:00:12"), 
      ("B", 130, "2021-09-01 08:00:13"), 
      ("B", 140, "2021-09-01 08:00:14"), 
      ("C", 210, "2021-09-01 08:00:21") 
    ).assignTimestampsAndWatermarks(new MyWatermarkStrategy()) 
 
    val output = input1.keyBy(_._1) 
      .intervalJoin(input2.keyBy(_._1)) 
      .between(Time.seconds(-2L), Time.seconds(2L)) 
      .lowerBoundExclusive().upperBoundExclusive() 
      .process(new ProcessJoinFunction[(String, Int, String), (String, Int, String), (String, Int, String)] { 
        override def processElement(in1: (String, Int, String), in2: (String, Int, String), context: ProcessJoinFunction[(String, Int, String), (String, Int, String), (String, Int, String)]#Context, collector: Collector[(String, Int, String)]): Unit = { 
 
          val maxDatetime = if (in1._3 > in2._3) in1._3 else in2._3 
          collector.collect((in1._1, in1._2 + in2._2, maxDatetime)) 
        } 
      }) 
 
 
    output.print() 
 
 
    senv.execute() 
 
 
  } 
 
} 

执行结果

2> (B,260,2021-09-01 08:00:13) 
2> (B,270,2021-09-01 08:00:14) 
2> (B,270,2021-09-01 08:00:14) 
2> (B,280,2021-09-01 08:00:14) 
7> (A,220,2021-09-01 08:00:11) 
2> (C,420,2021-09-01 08:00:21) 
7> (A,230,2021-09-01 08:00:12) 
7> (A,230,2021-09-01 08:00:12) 
7> (A,240,2021-09-01 08:00:12) 
2> (C,430,2021-09-01 08:00:22) 
  • 连接条件是:left_key == right_key && left_timestamp - 2 < right_timestamp < left_timestamp + 2,默认是包含上下界的
  • 只支持EventTime模式
  • processElement函数的结果元素timestamp为in1、in2的最大timestamp,即ProcessJoinFunction.Context.getTimestamp函数的返回结果

5. 物理分区算子

package devBase 
 
import org.apache.flink.api.common.functions.Partitioner 
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, createTypeInformation} 
 
class StringHashPartitioner extends Partitioner[String] { 
  override def partition(key: String, numPartitions: Int): Int = { 
 
    (key.hashCode) % numPartitions 
 
  } 
} 
 
object TranformationOperatorTest { 
 
  def main(args: Array[String]): Unit = { 
 
    val senv = StreamExecutionEnvironment.getExecutionEnvironment 
    println("parallelism: " + senv.getParallelism)    // parallelism: 8 
 
    val input = senv.fromElements(("key1","value1"),("key2","value2"),("key3","value3")) 
    input.print("input") 
    /* 
    input:5> (key1,value1) 
    input:7> (key3,value3) 
    input:6> (key2,value2) 
     */ 
 
    val partitionCustom_output = input.partitionCustom(new StringHashPartitioner(),tuple => tuple._1) 
    partitionCustom_output.print("partitionCustom_output") 
    /* 
    partitionCustom_output:5> (key3,value3) 
    partitionCustom_output:3> (key1,value1) 
    partitionCustom_output:4> (key2,value2) 
     */ 
 
 
    val shuffle_output = input.shuffle 
    shuffle_output.print("shuffle_output") 
    /* 
    shuffle_output:8> (key2,value2) 
    shuffle_output:7> (key3,value3) 
    shuffle_output:3> (key1,value1) 
     */ 
 
    val rebalance_output = input.rebalance 
    rebalance_output.print("rebalance_output") 
    /* 
    rebalance_output:1> (key2,value2) 
    rebalance_output:8> (key1,value1) 
    rebalance_output:2> (key3,value3) 
    */ 
 
    val rescale_output = input.rescale 
    rescale_output.print("rescale_output") 
    /* 
    rescale_output:1> (key1,value1) 
    rescale_output:2> (key2,value2) 
    rescale_output:3> (key3,value3) 
     */ 
 
    val broadcast_output = input.broadcast 
    broadcast_output.print("broadcast_output") 
    /* 
    broadcast_output:1> (key1,value1) 
    broadcast_output:1> (key2,value2) 
    broadcast_output:1> (key3,value3) 
    broadcast_output:8> (key1,value1) 
    broadcast_output:8> (key2,value2) 
    broadcast_output:8> (key3,value3) 
    ......省略16条输出...... 
    */ 
 
    senv.execute() 
 
 
  } 
 
} 
  • shuffle将元素进行随机分区,可能会导致某些分区随机的数据量多
  • rebalance进行轮询分区,不用计算随机数,性能更好
  • rescale只在同一服务器进行轮询分区,不产生网络传输
  • broadcast将元素进行广播,例如dataStreamA有3条数据,senv环境有8个分区,则广播后有24条数据

标签:程序员
声明

1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,请转载时务必注明文章作者和来源,不尊重原创的行为我们将追究责任;3.作者投稿可能会经我们编辑修改或补充。

关注我们

一个IT知识分享的公众号