Flink DataStream的wordCount、数据源和Sink、Side Outputs、两阶段提交(two-phase commit, 2PC)

符号 阅读:252 2022-06-06 14:17:42 评论:0

1. pom.xml的依赖

        <dependency> 
            <groupId>org.apache.flink</groupId> 
            <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId> 
            <version>${flink.version}</version> 
            <scope>provided</scope> 
        </dependency> 
 
        <dependency> 
            <groupId>org.apache.flink</groupId> 
            <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId> 
            <version>${flink.version}</version> 
        </dependency> 

2. 使用DataStream实现word count

2.1 不含window窗口的word count

apiTest\WordSourceFunction.scala

作用:数据源,不停的随机产生word

package apiTest 
 
import org.apache.flink.streaming.api.functions.source.SourceFunction 
import scala.util.Random 
 
 
class WordSourceFunction extends SourceFunction[String] { 
 
  private var is_running = true 
  private val words=Array("hello", "world", "flink", "stream", "batch", "table", "sql") 
 
  override def run(sourceContext: SourceFunction.SourceContext[String]): Unit = { 
 
    while(is_running) { 
      val index = Random.nextInt(words.size) 
      sourceContext.collect(words(index)) 
 
      // 1秒 
      Thread.sleep(1000) 
    } 
 
  } 
 
 
  override def cancel(): Unit = { 
    is_running = false 
  } 
 
} 

apiTest\StreamWordCount.scala

package apiTest 
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment, createTypeInformation} 
 
 
 
object StreamWordCount { 
 
  def main(args: Array[String]): Unit = { 
 
    val senv = StreamExecutionEnvironment.getExecutionEnvironment 
    val ds:DataStream[(String,Int)] = senv.addSource(new WordSourceFunction()) 
      .map(word => (word,1)) 
      .keyBy(_._1) 
      .sum(1) 
 
    ds.print() 
 
    // 懒加载,调用execute执行 
    senv.execute() 
 
  } 
 
} 

word的计数不停的增加,执行结果如下:

8> (stream,1) 
1> (table,1) 
4> (sql,1) 
3> (hello,1) 
4> (sql,2) 
1> (table,2) 
7> (flink,1) 
4> (sql,3) 
7> (flink,2) 
4> (sql,4) 
7> (flink,3) 
7> (flink,4) 
......省略部分...... 

2.2 含window窗口的word count

apiTest\WindowStreamWordCount.scala

package apiTest 
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment, createTypeInformation} 
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows 
import org.apache.flink.streaming.api.windowing.time.Time 
 
object WindowStreamWordCount { 
 
  def main(args: Array[String]): Unit = { 
 
    val senv = StreamExecutionEnvironment.getExecutionEnvironment 
    val ds:DataStream[(String,Int)] = senv.addSource(new WordSourceFunction()) 
      .map(word => (word,1)) 
      .keyBy(_._1) 
      .window(TumblingProcessingTimeWindows.of(Time.seconds(10))) 
      .sum(1) 
 
    ds.print() 
 
    // 懒加载,调用execute执行 
    senv.execute() 
 
  } 
 
} 

第一个window窗口的结果,只有7个单词,可以通过Timestamp和Watermarks来解决,todo

8> (stream,2) 
7> (batch,3) 
3> (hello,1) 
5> (world,1) 

第二个window窗口的结果

5> (world,2) 
7> (flink,2) 
8> (stream,4) 
4> (sql,1) 
3> (hello,1) 

第三个window窗口的结果

8> (stream,1) 
5> (world,2) 
4> (sql,1) 
3> (hello,1) 
7> (batch,3) 
7> (flink,2) 

3. DataStream API的数据源

支持以下几种数据源:

  1. 自带的数据源, 如senv.readTextFile(filePath)、senv.addSource(FlinkKafkaConsumer[OUT])
  2. senv.addSource(SourceFunction[OUT]),并行度为1
  3. senv.addSource(ParallelSourceFunction[OUT]),并行度为n;ParallelSourceFunction[OUT]是SourceFunction[OUT]的子类
  4. senv.addSource(RichParallelSourceFunction[OUT]),并行度为n, 可以访问RuntimeContext;RichParallelSourceFunction[OUT]是ParallelSourceFunction[OUT]的子类

下面讲下自带的数据源:

3.1 基于文件

文件的读取由两个子任务完成:

  1. 文件监视任务:并行度为1,对文件进行监视,然后将文件拆分,给数据读取任务处理
  2. 数据读取任务:并行读取拆分的文件

readTextFile.txt文件内容:

hello 
world 

示例代码:

package datastreamApi 
 
import org.apache.flink.api.java.io.TextInputFormat 
import org.apache.flink.core.fs.Path 
import org.apache.flink.streaming.api.functions.source.FileProcessingMode 
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, createTypeInformation} 
 
object DatasourceTest { 
 
  def main(args: Array[String]): Unit = { 
 
    val senv = StreamExecutionEnvironment.getExecutionEnvironment 
    val text_filepath = "src/main/resources/readTextFile.txt" 
 
    val text_input = senv.readTextFile(text_filepath, "UTF-8") 
    text_input.print("text_input") 
    /* 
    text_input:8> hello 
    text_input:3> world 
     */ 
 
    val file_input = senv.readFile( 
      new TextInputFormat(new Path(text_filepath)), 
      text_filepath, 
      // FileProcessingMode.PROCESS_ONCE,    // 只读取一次就退出 
      FileProcessingMode.PROCESS_CONTINUOUSLY, // 每5秒扫描一次文件,如果文件被更新,重新读取全部内容 
      5000L 
    ) 
    file_input.print("file_input") 
    /* 
    file_input:5> hello 
    file_input:8> world 
     */ 
 
    senv.execute() 
  } 
 
} 

3.2 基于socket

  1. 安装ncat
[root@bigdata005 ~]# 
[root@bigdata005 ~]# yum install -y nc 
[root@bigdata005 ~]# 
  1. 启动nc
[root@bigdata005 ~]# 
[root@bigdata005 ~]# nc -lk 9998 
hello 
world 
 
  1. 启动监听程序
package datastreamApi 
 
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment 
 
object DatasourceTest { 
 
  def main(args: Array[String]): Unit = { 
 
    val senv = StreamExecutionEnvironment.getExecutionEnvironment 
    val socket_input = senv.socketTextStream( 
      "192.168.xxx.xxx", 
      9998, 
      '\n', 
      0L        // 失败时最大尝试次数 
    ) 
    socket_input.print("socket_input") 
 
    senv.execute("DatasourceTest") 
 
  } 
 
} 

执行结果:

socket_input:3> hello 
socket_input:4> world 
 

3.3 基于集合

package datastreamApi 
 
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, createTypeInformation} 
import org.apache.flink.util.NumberSequenceIterator 
 
import scala.collection.mutable.ArrayBuffer 
 
object DatasourceTest { 
 
  def main(args: Array[String]): Unit = { 
 
    val senv = StreamExecutionEnvironment.getExecutionEnvironment 
    val input1 = senv.fromElements(("key1", 10), ("key2", 20)) 
    input1.print("input1") 
    /* 
    input1:7> (key2,20) 
    input1:6> (key1,10) 
     */ 
 
    val datas = ArrayBuffer(("key1", 10), ("key2", 20)) 
    val input2 = senv.fromCollection(datas) 
    input2.print("input2") 
    /* 
    input2:6> (key2,20) 
    input2:5> (key1,10) 
     */ 
 
    // 参数为:SplittableIterator[T], 本示例生成0,1,2,3的序列 
    val input3 = senv.fromParallelCollection(new NumberSequenceIterator(0L, 3L)) 
    input3.print("input3") 
    /* 
    input3:3> 2 
    input3:2> 1 
    input3:1> 0 
    input3:4> 3 
     */ 
 
    // 生成0,1,2,3的序列 
    val input4 = senv.fromSequence(0L, 3L) 
    input4.print("input4") 
    /* 
    input4:3> 0 
    input4:8> 3 
    input4:7> 2 
    input4:5> 1 
     */ 
 
    senv.execute("DatasourceTest") 
 
  } 
 
} 

4. DataStream API的数据Sink接收器

  • DataStream.write*方法未实现checkpoint, 数据处理语义不能达到Exactly-Once;可以使用flink-connector-filesystem来达到Exactly-Once
package datastreamApi 
 
import org.apache.flink.api.common.serialization.SimpleStringSchema 
import org.apache.flink.api.java.io.TextOutputFormat 
import org.apache.flink.core.fs.Path 
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, createTypeInformation} 
 
object DatasourceTest { 
 
  def main(args: Array[String]): Unit = { 
 
    val senv = StreamExecutionEnvironment.getExecutionEnvironment 
    val input = senv.fromElements("hello", "world") 
 
    input.writeUsingOutputFormat( 
      new TextOutputFormat[String]( 
        new Path("src/main/resources/textOutputDir"), 
        "UTF-8" 
      ) 
    ) 
 
    input.writeToSocket( 
      "192.168.xxx.xxx", 
      9998, 
      new SimpleStringSchema() 
    ) 
    /* 
    [root@bigdata005 ~]# 
    [root@bigdata005 ~]# nc -lk 9998 
    helloworld 
     
     */ 
 
    input.print("print") 
    /* 
    print:2> world 
    print:1> hello 
     */ 
     
    input.printToErr("printToErr") 
    /* 打印出来的字体颜色为红色 
    printToErr:7> world 
    printToErr:6> hello 
     */ 
 
    senv.execute("DatasourceTest") 
  } 
 
} 

其中textOutputDir的目录结构如下图所示:
textOutputDir目录结构

5. Side Outputs

使用背景:如下所示,当我们对input进行分流,用两次filter, 需要遍历两次input, 造成服务器性能浪费,所以我们可以使用Side Outputs

val senv = StreamExecutionEnvironment.getExecutionEnvironment 
val input = senv.fromElements(1, 2, 3, 4, 5, 6) 
val output1 = input.filter(_ < 4) 
val output2 = input.filter(_ >= 4) 
  • 可以在以下类中进行side output:
    1. ProcessFunction:DataStream.process(ProcessFunction<I, O>)
    2. KeyedProcessFunction
    3. CoProcessFunction
    4. KeyedCoProcessFunction
    5. ProcessWindowFunction
    6. ProcessAllWindowFunction

下面使用ProcessFunction来对Side Outputs进行说明:

package datastreamApi 
 
import org.apache.flink.streaming.api.functions.ProcessFunction 
import org.apache.flink.streaming.api.scala.{OutputTag, StreamExecutionEnvironment, createTypeInformation} 
import org.apache.flink.util.Collector 
 
object SideOutputTest { 
 
  def main(args: Array[String]): Unit = { 
 
    val senv = StreamExecutionEnvironment.getExecutionEnvironment 
    val input = senv.fromElements(1, 2, 3, 4, 5, 6) 
 
    val side_output_tag1 = new OutputTag[String]("my_side_output_tag1") 
    val side_output_tag2 = new OutputTag[Int]("my_side_output_tag2") 
    val output = input.process(new ProcessFunction[Int, Int] { 
 
      override def processElement(value: Int, ctx: ProcessFunction[Int, Int]#Context, out: Collector[Int]): Unit = { 
 
        // out.collect(value + 1)   // do something 
 
 
        // 只需一次,就可进行分流 
        if (value < 4) { 
          ctx.output(side_output_tag1, s"side_output1>>>>>>${value}") 
        } else { 
          ctx.output(side_output_tag2, value) 
        } 
 
      } 
    }) 
 
    val side_output1 = output.getSideOutput(side_output_tag1) 
    val side_output2 = output.getSideOutput(side_output_tag2) 
    side_output1.print("side_output1") 
    side_output2.print("side_output2") 
 
 
    senv.execute("SideOutputTest") 
 
 
  } 
 
} 

执行结果:

side_output1:4> side_output1>>>>>>2 
side_output1:5> side_output1>>>>>>3 
side_output1:3> side_output1>>>>>>1 
side_output2:8> 6 
side_output2:6> 4 
side_output2:7> 5 

6. 两阶段提交(two-phase commit, 2PC)

一种分布式一致协议,参与成员有coordinator(类似master)、participant(类似slave)

一. 2PC的提交流程

  1. 阶段一:投票阶段
    1. coordinator向所有participant发送prepare请求
    2. participant执行prepare操作,并记录rollback日志
    3. participant通知coordinator该prepare是成功(yes)还是失败(no)
  2. 阶段二:提交阶段
    1. prepare成功(yes)情况
      1. coordinator向所有participant发送commit请求
      2. participant执行commit操作
      3. participant向coordinator发送commit结果

提交成功

  1. prepare失败(no)情况
    1. coordinator向所有participant发送rollback请求
    2. participant执行rollback操作
    3. participant向coordinator发送rollback结果

提交失败
二. 2PC的缺点
3. coordinator存在单点故障
4. 执行慢的participant会阻塞其它participant
5. participant执行commit操作时,可能因网络故障等,造成有的participant commit成功,有的participant commit失败

三. Flink的2PC

Flink提供了一个TwoPhaseCommitSinkFunction抽象类,所有需要保证Exactly-Once的Sink都需要继承该类,并实现四个抽象方法:

protected abstract TXN beginTransaction() throws Exception; 
 
// 第一阶段 
protected abstract void preCommit(TXN transaction) throws Exception; 
 
// 第二阶段的成功(yes)情况 
protected abstract void commit(TXN transaction); 
 
// 第二阶段的失败(no)情况 
protected abstract void abort(TXN transaction); 

FlinkKafkaProducer继承了TwoPhaseCommitSinkFunction,Flink的各种Source和Sink的流语义保证如下:

Source Guarantees
Apache Kafka exactly once
Files exactly once
Sockets at most once
Sink Guarantees
Elasticsearch at least once
Kafka producer at least once / exactly once
File sinks exactly once
Socket sinks at least once
Standard output at least once
Redis sink at least once

标签:程序员
声明

1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,请转载时务必注明文章作者和来源,不尊重原创的行为我们将追究责任;3.作者投稿可能会经我们编辑修改或补充。

搜索
排行榜
关注我们

一个IT知识分享的公众号