Flink的数据类型和序列化(Scala版)

java哥 阅读:258 2022-06-06 14:18:09 评论:0

1. 数据类型

1.1 Tuple和case class

package devBase 
 
import org.apache.flink.api.scala.{ExecutionEnvironment,createTypeInformation} 
 
case class Student(name:String, age:Int) 
 
object DataTypeTest { 
 
  def main(args: Array[String]): Unit = { 
 
    val env = ExecutionEnvironment.getExecutionEnvironment 
 
    val case_input= env.fromElements(Student("LiMing",16), Student("Zhangsan",18)) 
    val tuple_input = env.fromElements(("LiMing",16),("Zhangsan",18)) 
 
    case_input.print() 
    tuple_input.print() 
  } 
 
} 

执行结果:

Student(LiMing,16) 
Student(Zhangsan,18) 
(LiMing,16) 
(Zhangsan,18) 
  • Tuple用TupleTypeInfo进行表示,case class用CaseClassTypeInfo进行表示

1.2 POJOs类

Flink处理POJOs类比普通的类更高效和易用,满足以下条件的类即是POJOs类:

  1. 类是访问权限是public
  2. 类有一个无参的默认构造器
  3. 类的字段访问权限都是public,且字段类型被Flink注册的序列化所支持
  • POJOs类在Flink中用PojoTypeInfo所表示,并用PojoSerializer进行序列化(可以配置用Kryo进行序列化)
package devBase 
 
import org.apache.flink.api.scala.{ExecutionEnvironment,createTypeInformation} 
 
class Student(name:String, age:Int) { 
  def this() { 
    this("default_name", 0) 
  } 
 
  override def toString: String = { 
    s"name:${name}, age:${age}" 
  } 
} 
 
object DataTypeTest { 
 
  def main(args: Array[String]): Unit = { 
 
    val env = ExecutionEnvironment.getExecutionEnvironment 
 
    val class_iput= env.fromElements(new Student("LiMing",16), 
      new Student("Zhangsan",18)) 
 
    class_iput.print() 
  } 
 
} 
 

执行结果:

name:LiMing, age:16 
name:Zhangsan, age:18 

1.3 原生数据类型

Flink支持所有Scala的原生数据类型,比如Int、String、Double; 用BasicTypeInfo进行表示

1.4 普通class

  • Flink支持不是POJOs类型的普通class(除了字段不能被序列化的class,比如字段类型为file pointers、I/O streams、native resources)
  • 不能访问普通class的字段
  • 使用Kryo对普通class进行序列化

1.5 Values

todo

1.6 Hadoop Writables

todo

1.7 Special Types

todo

2. TypeInformation类

scala的所有数据类型在Flink中都有对应的TypeInformation类,TypeInformation类对Scala的数据类型进行描述并生成序列化器

2.1 创建TypeInformation和TypeSerializer

package devBase 
 
import org.apache.flink.api.common.typeinfo.TypeInformation 
import org.apache.flink.api.common.typeutils.TypeSerializer 
import org.apache.flink.api.scala.createTypeInformation 
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment 
 
 
 
object DataTypeTest { 
 
  def main(args: Array[String]): Unit = { 
    val senv = StreamExecutionEnvironment.getExecutionEnvironment 
    // 或者在rich函数中通过getRuntimeContext.getExecutionConfig 
    val config = senv.getConfig 
 
    val stringInfo: TypeInformation[String] = createTypeInformation[String] 
    val stringSerializer:TypeSerializer[String] = stringInfo.createSerializer(config) 
 
    val tupleInfo: TypeInformation[(String, Double)] = createTypeInformation[(String, Double)] 
    val tupleSerializer:TypeSerializer[(String, Double)] = tupleInfo.createSerializer(config) 
 
 
  } 
 
} 

2.2 泛型参数用TypeInformation表示

对于泛型参数,Flink并不知道具体的数据类型,可以参考下面:

package devBase 
 
import org.apache.flink.api.common.typeinfo.TypeInformation 
import org.apache.flink.streaming.api.scala.DataStream 
 
object DataTypeTest { 
 
  def selectFirst[T : TypeInformation](input: DataStream[(T, _)]) : DataStream[T] = { 
    input.map(_._1) 
  } 
 
  def main(args: Array[String]): Unit = { 
 
 
  } 
 
} 
 

标签:程序员
声明

1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,请转载时务必注明文章作者和来源,不尊重原创的行为我们将追究责任;3.作者投稿可能会经我们编辑修改或补充。

搜索
排行榜
关注我们

一个IT知识分享的公众号