Flink分布式缓存和广播变量的讲解

小虾米 阅读:256 2022-06-06 14:18:20 评论:0

1. 分布式缓存

readTextFile.txt文件内容:

hello 
world 

示例代码:

package devBase 
 
import org.apache.flink.api.common.functions.RichMapFunction 
import org.apache.flink.api.scala.{ExecutionEnvironment, createTypeInformation} 
import org.apache.flink.configuration.Configuration 
 
import java.io.{BufferedReader, FileReader} 
import scala.collection.mutable.ArrayBuffer 
 
object DatasetApiTest { 
 
  def main(args: Array[String]): Unit = { 
 
    val env = ExecutionEnvironment.getExecutionEnvironment 
    // 1. 注册HDFS文件或本地文件,将注册的文件复制到其它的服务器 
    env.registerCachedFile( 
      // HDFS文件前缀为:hdfs://ip:port/... 
      s"file:///${System.getProperty("user.dir")}\\src\\main\\resources\\readTextFile.txt", 
      "my_local_executable_file", 
      true) // 文件是否为可执行文件 
 
    val input = env.fromElements("A", "B") 
    val output = input.map(new RichMapFunction[String, String] { 
      val lines: ArrayBuffer[String] = ArrayBuffer() 
 
      override def open(parameters: Configuration): Unit = { 
        super.open(parameters) 
        // 2. 在分布式运行的RichFunction中,通过getRuntimeContext访问文件 
        val file = getRuntimeContext.getDistributedCache.getFile("my_local_executable_file") 
        val bf = new BufferedReader(new FileReader(file)) 
 
        var line: String = bf.readLine 
        while (line != null) { 
          lines += line 
          line = bf.readLine 
        } 
 
      } 
 
      override def map(value: String): String = { 
 
        // 3. 使用获取到的文件的内容 
        lines.mkString("_") + "_" + value 
 
      } 
    }) 
 
    output.print() 
 
  } 
 
} 

执行结果:

hello_world_A 
hello_world_B 

2. 广播变量

package devBase 
 
import org.apache.flink.api.common.functions.RichMapFunction 
import org.apache.flink.api.scala.{ExecutionEnvironment, createTypeInformation} 
import org.apache.flink.configuration.Configuration 
 
object DatasetApiTest { 
 
  def main(args: Array[String]): Unit = { 
 
    val env = ExecutionEnvironment.getExecutionEnvironment 
    val input1 = env.fromElements("hello", "world") 
    val input2 = env.fromElements("A", "B", "C") 
 
    val output = input2.map(new RichMapFunction[String, String] { 
      var broadcast_values: Array[String] = null 
 
      override def open(parameters: Configuration): Unit = { 
        super.open(parameters) 
 
        broadcast_values = 
        // 2. 在分布式运行的RichFunction中,通过getRuntimeContext访问广播变量 
          getRuntimeContext.getBroadcastVariable("my_broadcast") 
            .toArray.map(_.asInstanceOf[String]) 
      } 
 
      override def map(value: String): String = { 
        // 使用获取到的广播变量数据 
        broadcast_values.mkString("_") + "_" + value 
 
      } 
      // 1. 将DataSet input1注册为广播变量 
    }).withBroadcastSet(input1, "my_broadcast") 
 
    output.print() 
 
  } 
 
} 

执行结果:

hello_world_A 
hello_world_B 
hello_world_C 

标签:程序员
声明

1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,请转载时务必注明文章作者和来源,不尊重原创的行为我们将追究责任;3.作者投稿可能会经我们编辑修改或补充。

搜索
排行榜
关注我们

一个IT知识分享的公众号