当前位置: 首页 > wzjs >正文

wordpress 媒体库空白给你一个网站seo如何做

wordpress 媒体库空白,给你一个网站seo如何做,logo素材网站,小程序开发官网文章目录 前言引言数据流API基于POJO的数据流基本源流配置示例基本流接收器数据管道与ETL(提取、转换、加载)一对一映射构建面向流映射的构建键控流进行分组运算RichFlatMapFunction对于流的状态管理连接流的使用流式分析水位的基本概念和示例侧道输入的基本概念和示例Process …

文章目录

    • 前言
    • 引言
    • 数据流API
      • 基于POJO的数据流
      • 基本源流配置示例
      • 基本流接收器
    • 数据管道与ETL(提取、转换、加载)
      • 一对一映射构建
      • 面向流映射的构建
      • 键控流进行分组运算
      • RichFlatMapFunction对于流的状态管理
      • 连接流的使用
    • 流式分析
      • 水位的基本概念和示例
      • 侧道输入的基本概念和示例
    • Process Function
      • 基本概念介绍
      • 使用示例
    • 参考

前言

引言

数据流API

基于POJO的数据流

一般来说flink中的源数据我们都会以简单java对象即pojo(Plain Ordinary Java Object )的形式进行传输或游走,只要满足以下条件,flink就会识别这些数据类型:

  1. 类中所有非静态、非transient修饰的字段,要么以public且非final修饰或者对外提供get和set方法
  2. 该类不存在非静态的内部类
  3. 提供无参构造函数

对应的我们给出日常比较常用的POJO 示例,即私有成员但是提供get、set符合上述的要求:

public class Person {private String name;private Integer age;//提供无参构造函数public Person() {}public Person(String name, Integer age) {this.name = name;this.age = age;}//......//get set方法
}

以上述POJO作为源数据,可以看到笔者通过StreamExecutionEnvironment 构建流的执行环境,并通过fromData进行关联:

	final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//使用 fromData 关联源数据DataStreamSource<Person> source = env.fromData(new Person("Alice", 18),new Person("Bob", 28),new Person("Charlie", 32));

基于上述的源数据利用DataStream api尝试过滤出18岁以上的person数据并将过滤结果打印输出:

//基于  filter过滤出大于18岁的personSingleOutputStreamOperator<Person> filterRes = source.filter(person -> person.getAge() > 18);//输出打印filterRes.print();

flink中的流操作和lambda类似需要有一个终端操作才能启动运行,所以我们再完成上述的执行环境设置之后,需确保通过 env.execute();将当前job提交到JobManagerJobManager 切割为无数个子并行任务分发到指定的Task Managers 的slot槽中等待运行:

  //执行execute后,上述任务提交到JobManager中的taskmanager某个slot中等待执行,若没提交这个则不会execute执行,这一点和java lambda的终端流操作思想一致env.execute();

需要补充的是,flinkfromData方法提供了多种的重载,上面的示例我们也可以通过List的方式将源数据传入:

List<Person> list = Arrays.asList(new Person("Alice", 18),new Person("Bob", 28),new Person("Charlie", 32));//使用 fromData 关联源数据DataStreamSource<Person> source = env.fromData(list);

基本源流配置示例

上述的示例我们基于DataStreamfromData方法来构建一些简单源流,实际上flink支持在配置直接直接指明文件流或者socket流,因为socket流相对于物理文件流更常用,所以我们给出一个采集本地9999端口的socket流示例:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStream<String> dataStream = env.socketTextStream("localhost", 9999);

因为我们本案例发送的数据格式为hello,序列化的person对象的json字符串,所以收到数据流之后需要对数据进行提取转换,所以我们还是通过map和filter完成映射转换和过滤:

dataStream.map(s -> {String jsonStr = s.substring(s.indexOf(",")+1);Person person = JSONUtil.toBean(jsonStr, Person.class);return person;}).filter(p -> p.getAge() > 18).print();env.execute();

为了方便测试,笔者这里给出个人服务端socket代码使用示例,当然读者也可以在自己的系统上使用nc示例完成:

public static void main(String[] args) {try {// 1. 创建ServerSocket,监听9999端口ServerSocket serverSocket = new ServerSocket(9999);System.out.println("服务器启动,等待客户端连接...");// 2. 接受客户端连接Socket clientSocket = serverSocket.accept();System.out.println("客户端已连接: " + clientSocket.getInetAddress());// 3. 获取输出流,用于向客户端发送数据PrintWriter out = new PrintWriter(clientSocket.getOutputStream(), true);// 4. 每隔1秒发送"hello"while (true) {Person person = new Person(RandomUtil.randomString(3), RandomUtil.randomInt(35));out.println("hello," + JSONUtil.toJsonStr(person));System.out.println("服务器发送: hello " + JSONUtil.toJsonStr(person));Thread.sleep(1000); // 暂停1秒}// 注意:这里为了简化代码,没有关闭资源,实际应用应该添加try-catch-finally} catch (Exception e) {e.printStackTrace();}}

可以看到转换和实际收到的数据流结果如下:


2> {"name":"Qex","age":27}
7> {"name":"nH7","age":25}
14> {"name":"zmN","age":34}

在实际的应用中这种配置方式常用于那些高吞吐、低延迟的数据源,例如Kafka这样的消息中间件,这一点flink也提供和上述一样方便的操作API。

基本流接收器

上文过滤出成年person的例子中我们在完成filter过滤后调用print方法进行打印输出,实际上其原理本质上就是为这个源流添加一个以打印输出的sink,这一点我们可以查看DataStream的print方法源码知晓:

@PublicEvolvingpublic DataStreamSink<T> print() {PrintSinkFunction<T> printFunction = new PrintSinkFunction<>();return addSink(printFunction).name("Print to Std. Out");}

同时我们也需要说明在输出结果前面类似于14>7>代表当前输出是由哪个并行流线程(子任务)执行。

当然关于接收器我们也可以基于源数据类型进行自定义,例如下面这段代码,笔者指明源数据为person希望按照我们预期的方式打印,可通过创建一个SinkFunction指明person泛型重写invoke实现自定义输出逻辑:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();List<Person> list = Arrays.asList(new Person("Alice", 18),new Person("Bob", 28),new Person("Charlie", 32));//使用 fromData 关联源数据DataStreamSource<Person> source = env.fromData(list);//添加一个person的s
http://www.dtcms.com/wzjs/284196.html

相关文章:

  • 网站设计协议武汉seo网络优化公司
  • 最专业的网站建设seo优化服务公司seo综合查询中的具体内容有哪些
  • 通用网站建设需求分析做营销怎样才能吸引客户
  • 公司做网站的步骤能去百度上班意味着什么
  • 漳州市建设局网站足球排行榜前十名
  • 简易的网站制作企业员工培训课程有哪些
  • 信誉好的天津网站建设公司宣传网站制作
  • 免费ai设计logo网站汉中网站seo
  • 初学者想学网站建设浅议网络营销论文
  • 哪个网站专门做游戏脚本网站优化平台
  • 免费网页设计成品网站关键词优化是什么意思
  • 做一个人网站需要注意什么阿里巴巴数据分析官网
  • 网站开发流程规范百度竞价排名是什么
  • 移动端网站推广哪里有学电脑培训班
  • 网站建设是永久使用吗全网营销整合营销
  • 建设工程质量网站营销型网站和普通网站
  • 事业单位网站建设算固定资产吗电脑突然多了windows优化大师
  • 公司做的网站如何开启伪静态企业策划推广公司
  • 做ppt的模板的网站百度外推排名
  • 岳阳市网站建设淘特app推广代理
  • 公司网站开发人员离职后修改公司网站制作网站的步骤是什么
  • 建设培训学校网站seo课程心得体会
  • 建设一个下载网站免费数据统计网站
  • 网站服务器怎么选择宁波核心关键词seo收费
  • 洛阳制作网站哪家好百度指数关键词未收录怎么办
  • 吉林省最新疫情最新消息湖北网站seo策划
  • 石家庄做淘宝网站google谷歌搜索
  • 盾思途旅游网站建设网页设计与网站开发
  • wordpress网站建设seo工作流程图
  • 微信创建小程序网站文章优化技巧