Flink DataSet聚合、分区、排序、关联转换算子的使用

虾米姐 阅读:246 2022-06-06 14:18:29 评论:0

1. 聚合转换算子

1.1 reduce

package devBase 
 
import org.apache.flink.api.scala.{ExecutionEnvironment, createTypeInformation} 
 
object TranformationOperatorTest { 
   
  def main(args: Array[String]): Unit = { 
 
    val env = ExecutionEnvironment.getExecutionEnvironment 
    val input = env.fromElements(1,2,3,4) 
    val output = input.reduce((x1, x2) => x1+x2) 
    output.print() 
 
  } 
 
} 

输出结果:

10 

1.2 aggregate

  • 只能作用于Tuple数据类型
package devBase 
 
import org.apache.flink.api.java.aggregation.Aggregations.{SUM,MIN,MAX} 
import org.apache.flink.api.scala.{ExecutionEnvironment, createTypeInformation} 
 
object TranformationOperatorTest { 
 
  def main(args: Array[String]): Unit = { 
 
    val env = ExecutionEnvironment.getExecutionEnvironment 
    val input = env.fromElements( 
      ("Liming", 10, 100), 
      ("Liming", 11, 110), 
      ("Zhangsan", 50, 500), 
      ("Zhangsan", 51, 510) 
    ) 
    val output = input.groupBy(0) 
      // 对同一字段只能应用一次聚合计算,否则以最后一次聚合计算为准 
      .aggregate(SUM,1).and(MIN,2).and(MAX, 2) 
    output.print() 
 
  } 
 
} 

输出结果:

(Zhangsan,101,510) 
(Liming,21,110) 

1.3 distinct

package devBase 
 
import org.apache.flink.api.scala.{ExecutionEnvironment, createTypeInformation} 
 
object TranformationOperatorTest { 
 
  def main(args: Array[String]): Unit = { 
 
    val env = ExecutionEnvironment.getExecutionEnvironment 
    val input = env.fromElements( 
      ("Liming", 10, 100), 
      ("Liming", 10, 110), 
      ("Liming", 10, 110), 
      ("Zhangsan", 50, 510), 
      ("Zhangsan", 50, 500), 
      ("Zhangsan", 50, 500) 
    ) 
 
    val output1 = input.distinct() 
    output1.print() 
    println("====================") 
    // 根据键key进行去重, 相同key取第一条数据 
    val output2 = input.distinct(0,1) 
    output2.print() 
 
 
 
  } 
 
} 

执行结果:

(Zhangsan,50,500) 
(Zhangsan,50,510) 
(Liming,10,110) 
(Liming,10,100) 
==================== 
(Liming,10,100) 
(Zhangsan,50,510) 

2. 分区转换算子

2.1 partitionByHash、partitionByRange、sortPartition

package devBase 
 
import org.apache.flink.api.common.operators.Order 
import org.apache.flink.api.scala.{ExecutionEnvironment, createTypeInformation} 
 
object TranformationOperatorTest { 
 
  def main(args: Array[String]): Unit = { 
 
    val env = ExecutionEnvironment.getExecutionEnvironment 
    val input = env.fromElements( 
      ("Liming", 10, 100), 
      ("Liming", 15, 150), 
      ("Liming", 13, 130), 
      ("Zhangsan", 50, 500), 
      ("Zhangsan", 80, 800), 
      ("Zhangsan", 60, 600) 
    ).setParallelism(2) 
    input.printOnTaskManager("input") 
 
    // 按第1个字段进行hash分区 
    val output1=input.partitionByHash(0) 
    output1.printOnTaskManager("output1") 
    // 按第3个字段的值进行范围分区 
    val output2=input.partitionByRange("_3") 
    output2.printOnTaskManager("output2") 
    // 只能对partition内的数据进行排序 
    val output3 = input.sortPartition(1, Order.ASCENDING) 
      .sortPartition(2,Order.DESCENDING) 
    output3.printOnTaskManager("output3") 
 
    env.execute() 
  } 
 
} 

执行结果:

input:1> (Liming,10,100) 
input:1> (Liming,13,130) 
input:1> (Zhangsan,80,800) 
input:2> (Liming,15,150) 
input:2> (Zhangsan,50,500) 
input:2> (Zhangsan,60,600) 
output1:5> (Liming,10,100) 
output1:5> (Liming,15,150) 
output1:5> (Liming,13,130) 
output1:2> (Zhangsan,50,500) 
output1:2> (Zhangsan,80,800) 
output1:2> (Zhangsan,60,600) 
output3:1> (Liming,10,100) 
output3:1> (Liming,13,130) 
output3:1> (Zhangsan,80,800) 
output3:2> (Liming,15,150) 
output3:2> (Zhangsan,50,500) 
output3:2> (Zhangsan,60,600) 
output2:7> (Zhangsan,80,800) 
output2:4> (Zhangsan,50,500) 
output2:3> (Liming,15,150) 
output2:1> (Liming,10,100) 
output2:6> (Zhangsan,60,600) 
output2:2> (Liming,13,130) 
  • 本示例是在IDEA中执行的,所以结果能直接打印到控制台
  • 同时可以看出,每个Job的执行是异步的

2.2 mapPartition

package devBase 
 
import org.apache.flink.api.scala.{ExecutionEnvironment, createTypeInformation} 
import org.apache.flink.util.Collector 
 
object TranformationOperatorTest { 
 
  def main(args: Array[String]): Unit = { 
 
    val env = ExecutionEnvironment.getExecutionEnvironment 
    val input = env.fromElements(1, 2) 
      .setParallelism(2) 
    input.printOnTaskManager("input") 
 
    val output = input.mapPartition((iterator:Iterator[Int], out:Collector[(Int, String)]) => { 
      iterator.toArray.foreach(x => out.collect((x, "value"+x))) 
      out.collect((3, "value3")) 
    }) 
 
    output.printOnTaskManager("output") 
    env.execute() 
 
  } 
 
} 

执行结果:

input:2> 2 
input:1> 1 
output:2> (2,value2) 
output:1> (1,value1) 
output:2> (3,value3) 
output:1> (3,value3) 

3. 排序转换算子

3.1 MinBy / MaxBy、First-N

package devBase 
 
import org.apache.flink.api.common.operators.Order 
import org.apache.flink.api.scala.{ExecutionEnvironment, createTypeInformation} 
 
object TranformationOperatorTest { 
 
  def main(args: Array[String]): Unit = { 
 
    val env = ExecutionEnvironment.getExecutionEnvironment 
    val input = env.fromElements( 
      ("Liming", 10, 99, 1100), 
      ("Liming", 10, 99, 1000), 
      ("Liming", 10, 100, 1000), 
      ("Liming", 11, 110, 1100), 
      ("Zhangsan", 50, 500, 5000), 
      ("Zhangsan", 51, 510, 5100), 
      ("Zhangsan", 51, 520, 5100), 
      ("Zhangsan", 51, 520, 5000) 
    ) 
 
    // 先分组,然后根据选取的字段,取最小或最大的一条,如果根据选取的字段有多条相同的,则取第一条 
    val output_min = input.groupBy(0).minBy(1, 2) 
    val output_max = input.groupBy(0).maxBy(1, 2) 
 
    output_min.print() 
    println("=====================================") 
    output_max.print() 
 
    // 分组,排序,取前N条 
    val output_firstN=input.groupBy(0) 
      .sortGroup(1, Order.DESCENDING) 
      .first(2) 
    println("=====================================") 
    output_firstN.print() 
  } 
 
} 

执行结果:

(Zhangsan,50,500,5000) 
(Liming,10,99,1100) 
===================================== 
(Zhangsan,51,520,5100) 
(Liming,11,110,1100) 
===================================== 
(Zhangsan,51,510,5100) 
(Zhangsan,51,520,5100) 
(Liming,11,110,1100) 
(Liming,10,99,1100) 

4. 关联转换算子

4.1 join、leftOuterJoin、rightOuterJoin、fullOuterJoin、cross、coGroup

package devBase 
 
import org.apache.flink.api.scala.{ExecutionEnvironment, createTypeInformation} 
import org.apache.flink.util.Collector 
 
import scala.collection.mutable 
 
object TranformationOperatorTest { 
 
  def main(args: Array[String]): Unit = { 
 
    val env = ExecutionEnvironment.getExecutionEnvironment 
    val join1 = env.fromElements( 
      (1, "orange"),(1, "apple"),(2, "fruit2") 
    ) 
    val join2 = env.fromElements( 
      (1, 10),(1, 20),(3, 30) 
    ) 
 
    val result_join = join1.join(join2).where(0).equalTo(0) 
    println("===============join====================") 
    result_join.print() 
 
    val result_left = join1.leftOuterJoin(join2).where(0).equalTo(0) { 
          // 需要对右表的缺失值进行处理 
      (left, right) => { 
        // 填充的字段值不能为null 
        val right_data = if(right == null) (0, 0) else right 
        (left, right_data) 
      } 
    } 
    println("===============leftOuterJoin====================") 
    result_left.print() 
 
 
    val result_right = join1.rightOuterJoin(join2).where(0).equalTo(0) { 
      // 需要对左表的缺失值进行处理 
      (left, right) => { 
        // 填充的字段值不能为null 
        val left_data = if(left == null) (0, "") else left 
        (left_data, right) 
      } 
    } 
    println("===============rightOuterJoin====================") 
    result_right.print() 
 
    val result_full = join1.fullOuterJoin(join2).where(0).equalTo(0) { 
      // 需要对左右表的缺失值进行处理 
      (left, right) => { 
        // 填充的字段值不能为null 
        val left_data = if(left == null) (0, "") else left 
        val right_data = if(right == null) (0, 0) else right 
        (left_data, right_data) 
      } 
    } 
    println("===============fullOuterJoin====================") 
    result_full.print() 
 
    val result_cross = join1.cross(join2) 
    println("===============cross====================") 
    result_cross.print() 
 
    // 先按key进行关联,再将左边的放到一个iterator,右边放到另外一个iterator 
    // key关联不到的就为空iterator 
    val result_coGroup = join1.coGroup(join2).where(0).equalTo(0) 
      .apply((left_iterator:Iterator[(Int,String)], right_iterator:Iterator[(Int,Int)], out:Collector[(mutable.Buffer[(Int,String)],mutable.Buffer[(Int,Int)])]) => { 
        val left_buffer = left_iterator.toBuffer 
        val right_buffer = right_iterator.toBuffer 
        out.collect((left_buffer, right_buffer)) 
      }) 
    println("===============coGroup====================") 
    result_coGroup.print() 
 
  } 
 
} 

运行结果:

===============join==================== 
((1,orange),(1,10)) 
((1,apple),(1,10)) 
((1,orange),(1,20)) 
((1,apple),(1,20)) 
===============leftOuterJoin==================== 
((1,orange),(1,10)) 
((1,orange),(1,20)) 
((1,apple),(1,10)) 
((1,apple),(1,20)) 
((2,fruit2),(0,0)) 
===============rightOuterJoin==================== 
((0,),(3,30)) 
((1,orange),(1,10)) 
((1,apple),(1,10)) 
((1,orange),(1,20)) 
((1,apple),(1,20)) 
===============fullOuterJoin==================== 
((0,),(3,30)) 
((1,orange),(1,10)) 
((1,orange),(1,20)) 
((1,apple),(1,10)) 
((1,apple),(1,20)) 
((2,fruit2),(0,0)) 
===============cross==================== 
((1,orange),(1,10)) 
((1,orange),(1,20)) 
((1,orange),(3,30)) 
((1,apple),(1,10)) 
((1,apple),(1,20)) 
((1,apple),(3,30)) 
((2,fruit2),(1,10)) 
((2,fruit2),(1,20)) 
((2,fruit2),(3,30)) 
===============coGroup==================== 
(ArrayBuffer(),ArrayBuffer((3,30))) 
(ArrayBuffer((1,orange), (1,apple)),ArrayBuffer((1,10), (1,20))) 
(ArrayBuffer((2,fruit2)),ArrayBuffer()) 

标签:程序员
声明

1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,请转载时务必注明文章作者和来源,不尊重原创的行为我们将追究责任;3.作者投稿可能会经我们编辑修改或补充。

搜索
关注我们

一个IT知识分享的公众号