向Flink传递自定义参数的3种方式(withParameters、setGlobalJobParameters、ParameterTool)

虾米姐 阅读:281 2022-06-06 14:17:50 评论:0

1. devBase\WithParameters.scala

package devBase 
 
import org.apache.flink.api.common.functions.RichMapFunction 
import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment, createTypeInformation} 
import org.apache.flink.configuration.{ConfigOptions, Configuration} 
 
 
 
class WithParametersMapFunction extends RichMapFunction[String, String] { 
 
  private var bigdata:String = null 
 
  override def map(value: String):String = { 
 
    if (value == bigdata) "bigdata" else value 
  } 
 
  override def open(parameters: Configuration): Unit = { 
 
    val configOption = ConfigOptions.key("bigdata") 
      .stringType() 
      .noDefaultValue() 
 
    bigdata = parameters.getString(configOption) 
 
  } 
 
} 
 
object WithParameters { 
 
 
  def main(args: Array[String]): Unit = { 
 
 
    val env = ExecutionEnvironment.getExecutionEnvironment 
    val conf = new Configuration() 
    conf.setString("bigdata", "flink") 
 
    val text: DataSet[String] = env.fromElements("flink", "spark", "hadoop") 
 
    val ds = text.map(new WithParametersMapFunction()) 
      // 只支持batch处理; 作用于上面的map, 将conf传递给rich类的open方法 
      .withParameters(conf) 
 
    ds.print() 
 
 
  } 
 
} 

执行结果如下:

bigdata 
spark 
hadoop 

2. devBase\GlobalJobParameters.scala

package devBase 
 
import apiTest.WordSourceFunction 
import org.apache.flink.api.common.functions.RichMapFunction 
import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment, createTypeInformation} 
import org.apache.flink.configuration.{ConfigOptions, Configuration} 
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} 
 
 
class GlobalJobParametersMapFunction extends RichMapFunction[String, String] { 
 
  private var bigdata:String = null 
 
  override def map(value: String):String = { 
 
    if (value == bigdata) "bigdata-" + value else value 
  } 
 
  override def open(parameters: Configuration): Unit = { 
 
    val globalJobParameters= getRuntimeContext 
      .getExecutionConfig.getGlobalJobParameters 
      .asInstanceOf[Configuration] 
 
    val configOption = ConfigOptions.key("bigdata") 
      .stringType() 
      .noDefaultValue() 
 
    bigdata = globalJobParameters.getString(configOption) 
 
  } 
 
} 
 
object GlobalJobParameters { 
 
  def main(args: Array[String]): Unit = { 
 
    val conf = new Configuration() 
    conf.setString("bigdata", "flink") 
    val senv = StreamExecutionEnvironment.getExecutionEnvironment 
    senv.getConfig.setGlobalJobParameters(conf) 
    // senv.getConfig.setGlobalJobParameters(parameterTool) 
 
 
    val text: DataStream[String] = senv.addSource(new WordSourceFunction) 
 
    val ds = text.map(new GlobalJobParametersMapFunction()) 
 
 
    ds.print() 
    senv.execute() 
 
 
 
  } 
 
} 

执行结果:

4> world 
5> stream 
6> table 
7> sql 
8> bigdata-flink 
1> bigdata-flink 
2> batch 
......省略部分...... 

3. devBase\ParameterToolTest.scala

package devBase 
 
import org.apache.flink.api.java.utils.ParameterTool 
 
import java.io.{File, FileInputStream} 
import scala.collection.JavaConversions.mapAsJavaMap 
 
 
object ParameterToolTest { 
 
 
  def main(args: Array[String]): Unit = { 
 
    val map = Map("id" -> "10", "name" -> "LiMing", "age" -> "10") 
    val parameter_map = ParameterTool.fromMap(map) 
    println("======map====== " + parameter_map.getRequired("name")) 
    println("======map====== " + parameter_map.get("name", "default_name")) 
    println("======map====== " + parameter_map.getInt("id", 0)) 
    println("======map====== " + parameter_map.getNumberOfParameters) 
    println("======map====== " + parameter_map.getProperties) 
 
 
    val prop_path = "src/main/resources/parameterToolTest.properties" 
    val parameter_prop_path=ParameterTool.fromPropertiesFile(prop_path) 
    println("======path====== " + parameter_prop_path.get("name")) 
 
    val prop_file = new File(prop_path) 
    val parameter_prop_file=ParameterTool.fromPropertiesFile(prop_file) 
    println("======file====== " + parameter_prop_file.get("name")) 
 
    val prop_stream = new FileInputStream(prop_file) 
    val parameter_prop_stream=ParameterTool.fromPropertiesFile(prop_stream) 
    println("======stream====== " + parameter_prop_stream.get("name")) 
 
    // 例子:--input hdfs://xxx --output hdfs://xxx 
    val parameter_args=ParameterTool.fromArgs(args) 
    println("======args====== " + parameter_args.get("name")) 
 
    val parameter_system=ParameterTool.fromSystemProperties() 
    println("======system====== " + parameter_system.get("java.vm.version")) 
 
 
  } 
 
} 

执行结果:

======map====== LiMing 
======map====== LiMing 
======map====== 10 
======map====== 3 
======map====== {age=10, name=LiMing, id=10} 
======path====== LiMing 
======file====== LiMing 
======stream====== LiMing 
======args====== null 
======system====== 25.201-b09 

4. ParameterTool解决中文乱码问题

使用ParameterTool读取properties配置文件,如果有中文,读取的是会乱码的。可以使用如下方法指定UTF-8编码读取

import java.io.{BufferedReader, InputStream, InputStreamReader} 
import java.util.Properties 
import org.apache.flink.api.java.utils.ParameterTool 
 
object ParameterToolTest { 
 
  def main(args: Array[String]): Unit = { 
   
    val resourceProperties = new Properties() 
    val resourceInputStream:InputStream = this.getClass.getClassLoader.getResourceAsStream("parameterToolTest.properties") 
    val resourceInputStreamReader:InputStreamReader = new InputStreamReader(resourceInputStream, "UTF-8") 
    val resourceBufferedReader:BufferedReader = new BufferedReader(resourceInputStreamReader) 
    resourceProperties.load(resourceBufferedReader) 
 
    val parameterTool = ParameterTool.fromMap(resourceProperties.asInstanceOf[java.util.Map[String,String]]) 
    println("======chinese stream====== " + parameterTool.get("chinese_name")) 
  } 
 
} 

执行结果:

======chinese stream====== 李明 

标签:程序员
声明

1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,请转载时务必注明文章作者和来源,不尊重原创的行为我们将追究责任;3.作者投稿可能会经我们编辑修改或补充。

搜索
排行榜
关注我们

一个IT知识分享的公众号