Flink DataStream的wordCount、数据源和Sink、Side Outputs、两阶段提交(two-phase commit, 2PC)
目录
1. pom.xml的依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
2. 使用DataStream实现word count
2.1 不含window窗口的word count
apiTest\WordSourceFunction.scala
作用:数据源,不停的随机产生word
package apiTest
import org.apache.flink.streaming.api.functions.source.SourceFunction
import scala.util.Random
class WordSourceFunction extends SourceFunction[String] {
private var is_running = true
private val words=Array("hello", "world", "flink", "stream", "batch", "table", "sql")
override def run(sourceContext: SourceFunction.SourceContext[String]): Unit = {
while(is_running) {
val index = Random.nextInt(words.size)
sourceContext.collect(words(index))
// 1秒
Thread.sleep(1000)
}
}
override def cancel(): Unit = {
is_running = false
}
}
apiTest\StreamWordCount.scala
package apiTest
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment, createTypeInformation}
object StreamWordCount {
def main(args: Array[String]): Unit = {
val senv = StreamExecutionEnvironment.getExecutionEnvironment
val ds:DataStream[(String,Int)] = senv.addSource(new WordSourceFunction())
.map(word => (word,1))
.keyBy(_._1)
.sum(1)
ds.print()
// 懒加载,调用execute执行
senv.execute()
}
}
word的计数不停的增加,执行结果如下:
8> (stream,1)
1> (table,1)
4> (sql,1)
3> (hello,1)
4> (sql,2)
1> (table,2)
7> (flink,1)
4> (sql,3)
7> (flink,2)
4> (sql,4)
7> (flink,3)
7> (flink,4)
......省略部分......
2.2 含window窗口的word count
apiTest\WindowStreamWordCount.scala
package apiTest
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment, createTypeInformation}
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
object WindowStreamWordCount {
def main(args: Array[String]): Unit = {
val senv = StreamExecutionEnvironment.getExecutionEnvironment
val ds:DataStream[(String,Int)] = senv.addSource(new WordSourceFunction())
.map(word => (word,1))
.keyBy(_._1)
.window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
.sum(1)
ds.print()
// 懒加载,调用execute执行
senv.execute()
}
}
第一个window窗口的结果,只有7个单词,可以通过Timestamp和Watermarks来解决,todo
8> (stream,2)
7> (batch,3)
3> (hello,1)
5> (world,1)
第二个window窗口的结果
5> (world,2)
7> (flink,2)
8> (stream,4)
4> (sql,1)
3> (hello,1)
第三个window窗口的结果
8> (stream,1)
5> (world,2)
4> (sql,1)
3> (hello,1)
7> (batch,3)
7> (flink,2)
3. DataStream API的数据源
支持以下几种数据源:
- 自带的数据源, 如senv.readTextFile(filePath)、senv.addSource(FlinkKafkaConsumer[OUT])
- senv.addSource(SourceFunction[OUT]),并行度为1
- senv.addSource(ParallelSourceFunction[OUT]),并行度为n;ParallelSourceFunction[OUT]是SourceFunction[OUT]的子类
- senv.addSource(RichParallelSourceFunction[OUT]),并行度为n, 可以访问RuntimeContext;RichParallelSourceFunction[OUT]是ParallelSourceFunction[OUT]的子类
下面讲下自带的数据源:
3.1 基于文件
文件的读取由两个子任务完成:
- 文件监视任务:并行度为1,对文件进行监视,然后将文件拆分,给数据读取任务处理
- 数据读取任务:并行读取拆分的文件
readTextFile.txt文件内容:
hello
world
示例代码:
package datastreamApi
import org.apache.flink.api.java.io.TextInputFormat
import org.apache.flink.core.fs.Path
import org.apache.flink.streaming.api.functions.source.FileProcessingMode
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, createTypeInformation}
object DatasourceTest {
def main(args: Array[String]): Unit = {
val senv = StreamExecutionEnvironment.getExecutionEnvironment
val text_filepath = "src/main/resources/readTextFile.txt"
val text_input = senv.readTextFile(text_filepath, "UTF-8")
text_input.print("text_input")
/*
text_input:8> hello
text_input:3> world
*/
val file_input = senv.readFile(
new TextInputFormat(new Path(text_filepath)),
text_filepath,
// FileProcessingMode.PROCESS_ONCE, // 只读取一次就退出
FileProcessingMode.PROCESS_CONTINUOUSLY, // 每5秒扫描一次文件,如果文件被更新,重新读取全部内容
5000L
)
file_input.print("file_input")
/*
file_input:5> hello
file_input:8> world
*/
senv.execute()
}
}
3.2 基于socket
- 安装ncat
[root@bigdata005 ~]#
[root@bigdata005 ~]# yum install -y nc
[root@bigdata005 ~]#
- 启动nc
[root@bigdata005 ~]#
[root@bigdata005 ~]# nc -lk 9998
hello
world
- 启动监听程序
package datastreamApi
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
object DatasourceTest {
def main(args: Array[String]): Unit = {
val senv = StreamExecutionEnvironment.getExecutionEnvironment
val socket_input = senv.socketTextStream(
"192.168.xxx.xxx",
9998,
'\n',
0L // 失败时最大尝试次数
)
socket_input.print("socket_input")
senv.execute("DatasourceTest")
}
}
执行结果:
socket_input:3> hello
socket_input:4> world
3.3 基于集合
package datastreamApi
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, createTypeInformation}
import org.apache.flink.util.NumberSequenceIterator
import scala.collection.mutable.ArrayBuffer
object DatasourceTest {
def main(args: Array[String]): Unit = {
val senv = StreamExecutionEnvironment.getExecutionEnvironment
val input1 = senv.fromElements(("key1", 10), ("key2", 20))
input1.print("input1")
/*
input1:7> (key2,20)
input1:6> (key1,10)
*/
val datas = ArrayBuffer(("key1", 10), ("key2", 20))
val input2 = senv.fromCollection(datas)
input2.print("input2")
/*
input2:6> (key2,20)
input2:5> (key1,10)
*/
// 参数为:SplittableIterator[T], 本示例生成0,1,2,3的序列
val input3 = senv.fromParallelCollection(new NumberSequenceIterator(0L, 3L))
input3.print("input3")
/*
input3:3> 2
input3:2> 1
input3:1> 0
input3:4> 3
*/
// 生成0,1,2,3的序列
val input4 = senv.fromSequence(0L, 3L)
input4.print("input4")
/*
input4:3> 0
input4:8> 3
input4:7> 2
input4:5> 1
*/
senv.execute("DatasourceTest")
}
}
4. DataStream API的数据Sink接收器
- DataStream.write*方法未实现checkpoint, 数据处理语义不能达到Exactly-Once;可以使用flink-connector-filesystem来达到Exactly-Once
package datastreamApi
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.api.java.io.TextOutputFormat
import org.apache.flink.core.fs.Path
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, createTypeInformation}
object DatasourceTest {
def main(args: Array[String]): Unit = {
val senv = StreamExecutionEnvironment.getExecutionEnvironment
val input = senv.fromElements("hello", "world")
input.writeUsingOutputFormat(
new TextOutputFormat[String](
new Path("src/main/resources/textOutputDir"),
"UTF-8"
)
)
input.writeToSocket(
"192.168.xxx.xxx",
9998,
new SimpleStringSchema()
)
/*
[root@bigdata005 ~]#
[root@bigdata005 ~]# nc -lk 9998
helloworld
*/
input.print("print")
/*
print:2> world
print:1> hello
*/
input.printToErr("printToErr")
/* 打印出来的字体颜色为红色
printToErr:7> world
printToErr:6> hello
*/
senv.execute("DatasourceTest")
}
}
其中textOutputDir的目录结构如下图所示:
5. Side Outputs
使用背景:如下所示,当我们对input进行分流,用两次filter, 需要遍历两次input, 造成服务器性能浪费,所以我们可以使用Side Outputs
val senv = StreamExecutionEnvironment.getExecutionEnvironment
val input = senv.fromElements(1, 2, 3, 4, 5, 6)
val output1 = input.filter(_ < 4)
val output2 = input.filter(_ >= 4)
- 可以在以下类中进行side output:
- ProcessFunction:DataStream.process(ProcessFunction<I, O>)
- KeyedProcessFunction
- CoProcessFunction
- KeyedCoProcessFunction
- ProcessWindowFunction
- ProcessAllWindowFunction
下面使用ProcessFunction来对Side Outputs进行说明:
package datastreamApi
import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.streaming.api.scala.{OutputTag, StreamExecutionEnvironment, createTypeInformation}
import org.apache.flink.util.Collector
object SideOutputTest {
def main(args: Array[String]): Unit = {
val senv = StreamExecutionEnvironment.getExecutionEnvironment
val input = senv.fromElements(1, 2, 3, 4, 5, 6)
val side_output_tag1 = new OutputTag[String]("my_side_output_tag1")
val side_output_tag2 = new OutputTag[Int]("my_side_output_tag2")
val output = input.process(new ProcessFunction[Int, Int] {
override def processElement(value: Int, ctx: ProcessFunction[Int, Int]#Context, out: Collector[Int]): Unit = {
// out.collect(value + 1) // do something
// 只需一次,就可进行分流
if (value < 4) {
ctx.output(side_output_tag1, s"side_output1>>>>>>${value}")
} else {
ctx.output(side_output_tag2, value)
}
}
})
val side_output1 = output.getSideOutput(side_output_tag1)
val side_output2 = output.getSideOutput(side_output_tag2)
side_output1.print("side_output1")
side_output2.print("side_output2")
senv.execute("SideOutputTest")
}
}
执行结果:
side_output1:4> side_output1>>>>>>2
side_output1:5> side_output1>>>>>>3
side_output1:3> side_output1>>>>>>1
side_output2:8> 6
side_output2:6> 4
side_output2:7> 5
6. 两阶段提交(two-phase commit, 2PC)
一种分布式一致协议,参与成员有coordinator(类似master)、participant(类似slave)
一. 2PC的提交流程
- 阶段一:投票阶段
- coordinator向所有participant发送prepare请求
- participant执行prepare操作,并记录rollback日志
- participant通知coordinator该prepare是成功(yes)还是失败(no)
- 阶段二:提交阶段
- prepare成功(yes)情况
1. coordinator向所有participant发送commit请求
2. participant执行commit操作
3. participant向coordinator发送commit结果
- prepare成功(yes)情况
- prepare失败(no)情况
- coordinator向所有participant发送rollback请求
- participant执行rollback操作
- participant向coordinator发送rollback结果
二. 2PC的缺点
3. coordinator存在单点故障
4. 执行慢的participant会阻塞其它participant
5. participant执行commit操作时,可能因网络故障等,造成有的participant commit成功,有的participant commit失败
三. Flink的2PC
Flink提供了一个TwoPhaseCommitSinkFunction抽象类,所有需要保证Exactly-Once的Sink都需要继承该类,并实现四个抽象方法:
protected abstract TXN beginTransaction() throws Exception;
// 第一阶段
protected abstract void preCommit(TXN transaction) throws Exception;
// 第二阶段的成功(yes)情况
protected abstract void commit(TXN transaction);
// 第二阶段的失败(no)情况
protected abstract void abort(TXN transaction);
FlinkKafkaProducer继承了TwoPhaseCommitSinkFunction,Flink的各种Source和Sink的流语义保证如下:
Source | Guarantees |
---|---|
Apache Kafka | exactly once |
Files | exactly once |
Sockets | at most once |
Sink | Guarantees |
---|---|
Elasticsearch | at least once |
Kafka producer | at least once / exactly once |
File sinks | exactly once |
Socket sinks | at least once |
Standard output | at least once |
Redis sink | at least once |
1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,请转载时务必注明文章作者和来源,不尊重原创的行为我们将追究责任;3.作者投稿可能会经我们编辑修改或补充。