Flink分布式缓存和广播变量的讲解
小虾米
阅读:256
2022-06-06 14:18:20
评论:0
1. 分布式缓存
readTextFile.txt文件内容:
hello
world
示例代码:
package devBase
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.api.scala.{ExecutionEnvironment, createTypeInformation}
import org.apache.flink.configuration.Configuration
import java.io.{BufferedReader, FileReader}
import scala.collection.mutable.ArrayBuffer
object DatasetApiTest {
def main(args: Array[String]): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
// 1. 注册HDFS文件或本地文件,将注册的文件复制到其它的服务器
env.registerCachedFile(
// HDFS文件前缀为:hdfs://ip:port/...
s"file:///${System.getProperty("user.dir")}\\src\\main\\resources\\readTextFile.txt",
"my_local_executable_file",
true) // 文件是否为可执行文件
val input = env.fromElements("A", "B")
val output = input.map(new RichMapFunction[String, String] {
val lines: ArrayBuffer[String] = ArrayBuffer()
override def open(parameters: Configuration): Unit = {
super.open(parameters)
// 2. 在分布式运行的RichFunction中,通过getRuntimeContext访问文件
val file = getRuntimeContext.getDistributedCache.getFile("my_local_executable_file")
val bf = new BufferedReader(new FileReader(file))
var line: String = bf.readLine
while (line != null) {
lines += line
line = bf.readLine
}
}
override def map(value: String): String = {
// 3. 使用获取到的文件的内容
lines.mkString("_") + "_" + value
}
})
output.print()
}
}
执行结果:
hello_world_A
hello_world_B
2. 广播变量
package devBase
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.api.scala.{ExecutionEnvironment, createTypeInformation}
import org.apache.flink.configuration.Configuration
object DatasetApiTest {
def main(args: Array[String]): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val input1 = env.fromElements("hello", "world")
val input2 = env.fromElements("A", "B", "C")
val output = input2.map(new RichMapFunction[String, String] {
var broadcast_values: Array[String] = null
override def open(parameters: Configuration): Unit = {
super.open(parameters)
broadcast_values =
// 2. 在分布式运行的RichFunction中,通过getRuntimeContext访问广播变量
getRuntimeContext.getBroadcastVariable("my_broadcast")
.toArray.map(_.asInstanceOf[String])
}
override def map(value: String): String = {
// 使用获取到的广播变量数据
broadcast_values.mkString("_") + "_" + value
}
// 1. 将DataSet input1注册为广播变量
}).withBroadcastSet(input1, "my_broadcast")
output.print()
}
}
执行结果:
hello_world_A
hello_world_B
hello_world_C
声明
1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,请转载时务必注明文章作者和来源,不尊重原创的行为我们将追究责任;3.作者投稿可能会经我们编辑修改或补充。