Flink的数据类型和序列化(Scala版)
java哥
阅读:258
2022-06-06 14:18:09
评论:0
目录
1. 数据类型
1.1 Tuple和case class
package devBase
import org.apache.flink.api.scala.{ExecutionEnvironment,createTypeInformation}
case class Student(name:String, age:Int)
object DataTypeTest {
def main(args: Array[String]): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val case_input= env.fromElements(Student("LiMing",16), Student("Zhangsan",18))
val tuple_input = env.fromElements(("LiMing",16),("Zhangsan",18))
case_input.print()
tuple_input.print()
}
}
执行结果:
Student(LiMing,16)
Student(Zhangsan,18)
(LiMing,16)
(Zhangsan,18)
- Tuple用TupleTypeInfo进行表示,case class用CaseClassTypeInfo进行表示
1.2 POJOs类
Flink处理POJOs类比普通的类更高效和易用,满足以下条件的类即是POJOs类:
- 类是访问权限是public
- 类有一个无参的默认构造器
- 类的字段访问权限都是public,且字段类型被Flink注册的序列化所支持
- POJOs类在Flink中用PojoTypeInfo所表示,并用PojoSerializer进行序列化(可以配置用Kryo进行序列化)
package devBase
import org.apache.flink.api.scala.{ExecutionEnvironment,createTypeInformation}
class Student(name:String, age:Int) {
def this() {
this("default_name", 0)
}
override def toString: String = {
s"name:${name}, age:${age}"
}
}
object DataTypeTest {
def main(args: Array[String]): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val class_iput= env.fromElements(new Student("LiMing",16),
new Student("Zhangsan",18))
class_iput.print()
}
}
执行结果:
name:LiMing, age:16
name:Zhangsan, age:18
1.3 原生数据类型
Flink支持所有Scala的原生数据类型,比如Int、String、Double; 用BasicTypeInfo进行表示
1.4 普通class
- Flink支持不是POJOs类型的普通class(除了字段不能被序列化的class,比如字段类型为file pointers、I/O streams、native resources)
- 不能访问普通class的字段
- 使用Kryo对普通class进行序列化
1.5 Values
todo
1.6 Hadoop Writables
todo
1.7 Special Types
todo
2. TypeInformation类
scala的所有数据类型在Flink中都有对应的TypeInformation类,TypeInformation类对Scala的数据类型进行描述并生成序列化器
2.1 创建TypeInformation和TypeSerializer
package devBase
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.common.typeutils.TypeSerializer
import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
object DataTypeTest {
def main(args: Array[String]): Unit = {
val senv = StreamExecutionEnvironment.getExecutionEnvironment
// 或者在rich函数中通过getRuntimeContext.getExecutionConfig
val config = senv.getConfig
val stringInfo: TypeInformation[String] = createTypeInformation[String]
val stringSerializer:TypeSerializer[String] = stringInfo.createSerializer(config)
val tupleInfo: TypeInformation[(String, Double)] = createTypeInformation[(String, Double)]
val tupleSerializer:TypeSerializer[(String, Double)] = tupleInfo.createSerializer(config)
}
}
2.2 泛型参数用TypeInformation表示
对于泛型参数,Flink并不知道具体的数据类型,可以参考下面:
package devBase
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.streaming.api.scala.DataStream
object DataTypeTest {
def selectFirst[T : TypeInformation](input: DataStream[(T, _)]) : DataStream[T] = {
input.map(_._1)
}
def main(args: Array[String]): Unit = {
}
}
声明
1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,请转载时务必注明文章作者和来源,不尊重原创的行为我们将追究责任;3.作者投稿可能会经我们编辑修改或补充。