当前位置: 首页 > news >正文

Flink学习:Flink支持的数据类型

Flink

    • 一、原生数据类型
    • 二、Java Tuples类型
    • 三、Scala样例类
    • 四、POJOs类型
    • 五、特殊数据类型

Flink支持非常完善的数据类型,数据类型的描述信息都是由TypeInformation定义,比较常用的TypeInformation有BasicTypeInfo、TupleTypeInfo、CaseClassTypeInfo、PojoTypeInfo等。

一、原生数据类型

1、BasicTypeInfo

//创建Int类型的数据集
val env = StreamExecutionEnvironment.getExecutionEnvironment
val IntStream:DataStream[Int] = env.fromElements(4,51,2,7)
//创建String类型的数据集
val StringStream:DataStream[String] = env.fromElements("hello","flink")

2、BasicArrayTypeInfo

//通过从数组中创建数据集
val ArrayStream:DataStream[Int] = env.fromCollection(Array(8,5,2,31))
//通过List集合创建数据集
val ListStream:DataStream[Int] = env.fromCollection(List(6,23,63,9))

二、Java Tuples类型

//通过new Tuple2创建元组数据集
val TupleStream:DataStream[Tuple2[String,Int]] = env.fromElements(new Tuple2("唐太宗",1),new Tuple2("汉武帝",2))

三、Scala样例类

Flink通过实现CaseClassTypeInfo支持任意的Scala Case Class,包括Scala tuples类型,支持通过字段名称和位置索引获取指标,不支持存储空值

//scala样例类
case class bing(id:Int,name:String)
object TableTeat {
  def main(args: Array[String]): Unit = {
    //val senv = EnvironmentSettings.newInstance().inStreamingMode().build()
    val senv = StreamExecutionEnvironment.getExecutionEnvironment
    //val tenv = TableEnvironment.create(senv)
    val input = senv.fromElements(bing(1,"成吉思汗"),bing(2,"松赞干布"))
    input.print()
    senv.execute()
  }
}

四、POJOs类型

POJOs类可以完成复杂数据结构的定义,Flink通过实现PojoTypeInfo来描述任意的POJOs,包括Java类和Scala类

  • POJOs类必须是Public修饰且必须独立定义,不能是内部类;
  • POJOs类中必须含有默认空构造器;
  • POJOs类中所有的Fields必须是Public或者具有Public修饰的getter和setter方法;
  • POJOs类中的字段类型必须是Flink支持的;
public class Person{
	//字段具有public修饰符
	public String name;
	public int age;
	//具有默认空构造器
	public Person(){
	}
	public Person(String name,int age){
		this.name = name;
		this.age = age;
	}
}
class Person(var name:String,var age:Int){
	def this(){
		this(null,-1)
	}
}

定义好后,就可以在Flink环境中使用

val personStream = env.fromElements(new Person("刘病己",14),new Person("刘秀",25))
personStream.keyBy("name")

五、特殊数据类型

val mapStream = env.fromElements(Map("name" -> "朱元璋","age" -> "18"),Map("name"-> "朱棣","age" -> "24"))

相关文章:

  • 【论文复现】——FEC: Fast Euclidean Clustering for Point Cloud Segmentation
  • 第十三届蓝桥杯C++B组省赛 I 题——李白打酒加强版 (AC)
  • 队列的简单实现
  • java毕业设计家庭理财记账系统(附源码、数据库)
  • 【ASM】字节码操作 工具类与常用类 asm-utils 与 asm-commons
  • 用Python把附近的足浴店都给采集了一遍,好兄弟:针不戳~
  • 计算机毕业设计之java+javaweb的医院门诊挂号系统
  • 【Linux】基本指令(一)
  • 刷题记录 -- 面试题
  • 信息学奥赛一本通:1308:【例1.5】高精除
  • 【Linux】关于进程的理解、状态、优先级和进程切换
  • 一文吃透JavaScript中的DOM知识及用法
  • 中断和异常理论详解,Linux操作系统原理与应用
  • 对象的比较(上)PriorityQueue中的底层源码解析
  • 【面试官让我十分钟实现一个链表?一个双向带头循环链表甩给面试官】
  • 以太网交换机自学习、转发帧的流程
  • Django auth 应用模块
  • Cy3 PEG N-羟基琥珀酰亚胺,花菁染料CY3标记N-羟基琥珀酰亚胺,CY3-N-Hydroxy succinimide
  • nosql期末
  • C++多态_virtual
  • 国家卫健委对近日肖某引发舆情问题开展调查
  • 人民日报社论:坚定信心、奋发有为、创新创造——写在“五一”国际劳动节
  • 山西太原一小区发生爆炸,造成1人遇难21人受伤2人失联
  • 原国家有色金属工业局副局长黄春萼逝世,享年86岁
  • 中国银行副行长刘进任该行党委副书记
  • 王沪宁主持召开全国政协主席会议