向Flink传递自定义参数的3种方式(withParameters、setGlobalJobParameters、ParameterTool)
虾米姐
阅读:281
2022-06-06 14:17:50
评论:0
目录
1. devBase\WithParameters.scala
package devBase
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment, createTypeInformation}
import org.apache.flink.configuration.{ConfigOptions, Configuration}
class WithParametersMapFunction extends RichMapFunction[String, String] {
private var bigdata:String = null
override def map(value: String):String = {
if (value == bigdata) "bigdata" else value
}
override def open(parameters: Configuration): Unit = {
val configOption = ConfigOptions.key("bigdata")
.stringType()
.noDefaultValue()
bigdata = parameters.getString(configOption)
}
}
object WithParameters {
def main(args: Array[String]): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val conf = new Configuration()
conf.setString("bigdata", "flink")
val text: DataSet[String] = env.fromElements("flink", "spark", "hadoop")
val ds = text.map(new WithParametersMapFunction())
// 只支持batch处理; 作用于上面的map, 将conf传递给rich类的open方法
.withParameters(conf)
ds.print()
}
}
执行结果如下:
bigdata
spark
hadoop
2. devBase\GlobalJobParameters.scala
package devBase
import apiTest.WordSourceFunction
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment, createTypeInformation}
import org.apache.flink.configuration.{ConfigOptions, Configuration}
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
class GlobalJobParametersMapFunction extends RichMapFunction[String, String] {
private var bigdata:String = null
override def map(value: String):String = {
if (value == bigdata) "bigdata-" + value else value
}
override def open(parameters: Configuration): Unit = {
val globalJobParameters= getRuntimeContext
.getExecutionConfig.getGlobalJobParameters
.asInstanceOf[Configuration]
val configOption = ConfigOptions.key("bigdata")
.stringType()
.noDefaultValue()
bigdata = globalJobParameters.getString(configOption)
}
}
object GlobalJobParameters {
def main(args: Array[String]): Unit = {
val conf = new Configuration()
conf.setString("bigdata", "flink")
val senv = StreamExecutionEnvironment.getExecutionEnvironment
senv.getConfig.setGlobalJobParameters(conf)
// senv.getConfig.setGlobalJobParameters(parameterTool)
val text: DataStream[String] = senv.addSource(new WordSourceFunction)
val ds = text.map(new GlobalJobParametersMapFunction())
ds.print()
senv.execute()
}
}
执行结果:
4> world
5> stream
6> table
7> sql
8> bigdata-flink
1> bigdata-flink
2> batch
......省略部分......
3. devBase\ParameterToolTest.scala
package devBase
import org.apache.flink.api.java.utils.ParameterTool
import java.io.{File, FileInputStream}
import scala.collection.JavaConversions.mapAsJavaMap
object ParameterToolTest {
def main(args: Array[String]): Unit = {
val map = Map("id" -> "10", "name" -> "LiMing", "age" -> "10")
val parameter_map = ParameterTool.fromMap(map)
println("======map====== " + parameter_map.getRequired("name"))
println("======map====== " + parameter_map.get("name", "default_name"))
println("======map====== " + parameter_map.getInt("id", 0))
println("======map====== " + parameter_map.getNumberOfParameters)
println("======map====== " + parameter_map.getProperties)
val prop_path = "src/main/resources/parameterToolTest.properties"
val parameter_prop_path=ParameterTool.fromPropertiesFile(prop_path)
println("======path====== " + parameter_prop_path.get("name"))
val prop_file = new File(prop_path)
val parameter_prop_file=ParameterTool.fromPropertiesFile(prop_file)
println("======file====== " + parameter_prop_file.get("name"))
val prop_stream = new FileInputStream(prop_file)
val parameter_prop_stream=ParameterTool.fromPropertiesFile(prop_stream)
println("======stream====== " + parameter_prop_stream.get("name"))
// 例子:--input hdfs://xxx --output hdfs://xxx
val parameter_args=ParameterTool.fromArgs(args)
println("======args====== " + parameter_args.get("name"))
val parameter_system=ParameterTool.fromSystemProperties()
println("======system====== " + parameter_system.get("java.vm.version"))
}
}
执行结果:
======map====== LiMing
======map====== LiMing
======map====== 10
======map====== 3
======map====== {age=10, name=LiMing, id=10}
======path====== LiMing
======file====== LiMing
======stream====== LiMing
======args====== null
======system====== 25.201-b09
4. ParameterTool解决中文乱码问题
使用ParameterTool读取properties配置文件,如果有中文,读取的是会乱码的。可以使用如下方法指定UTF-8编码读取
import java.io.{BufferedReader, InputStream, InputStreamReader}
import java.util.Properties
import org.apache.flink.api.java.utils.ParameterTool
object ParameterToolTest {
def main(args: Array[String]): Unit = {
val resourceProperties = new Properties()
val resourceInputStream:InputStream = this.getClass.getClassLoader.getResourceAsStream("parameterToolTest.properties")
val resourceInputStreamReader:InputStreamReader = new InputStreamReader(resourceInputStream, "UTF-8")
val resourceBufferedReader:BufferedReader = new BufferedReader(resourceInputStreamReader)
resourceProperties.load(resourceBufferedReader)
val parameterTool = ParameterTool.fromMap(resourceProperties.asInstanceOf[java.util.Map[String,String]])
println("======chinese stream====== " + parameterTool.get("chinese_name"))
}
}
执行结果:
======chinese stream====== 李明
声明
1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,请转载时务必注明文章作者和来源,不尊重原创的行为我们将追究责任;3.作者投稿可能会经我们编辑修改或补充。