当前位置: 首页 > wzjs >正文

网站建设报告seo高手培训

网站建设报告,seo高手培训,视频点播网站建设,欧美做暧网站5.5 输出算子 5.5.1 概述 print也是一种输出类PrintSinkFunction 创建了一个PrintSinkFunction操作,然后调用addSink方法的作为传入参数 PrintSinkFunction这个类继承自RichSinkFunction富函数类 RichSourceFunction类 继承了AbstractRichFunction富函数类 …

5.5 输出算子

5.5.1 概述

  1. print也是一种输出类PrintSinkFunction
    image.png

创建了一个PrintSinkFunction操作,然后调用addSink方法的作为传入参数
image.png

PrintSinkFunction这个类继承自RichSinkFunction富函数类

  1. RichSourceFunction类

image.png

  • 继承了AbstractRichFunction富函数类

因此就可以调用富函数类的声明周期方法,例如open,close,以及获取运行时上下文,运行环境,定义状态等等

  • RichSourceFunction类又实现了SinkFunction这个接口,所以本质上也是SinkFunction

image.png

image.png

  • SinkFunction接口的抽象方法有invoke,传入是value,以及当前的上下文
  1. 如果需要自定义输出算子
    image.png

可以调用DataStream的addSink方法

image.png
然后传入自己实现的SinkFunction

  1. flink提供的第三方系统连接器

image.png

5.5.2 输出到文件

  1. StreamFileSink流失文件输出类
  • 来源

继承RichSinkFunction类,并实现CheckpointedFunction,CheckpointListener(检查点)

image.png

  • 底层

底层将数据写入bucket(桶),桶里面分大小存储分区文件,实现了分布式存储

  • 创建实例

使用Builder构建器构建

image.png

image.png)

RowFormatBuilder是行编码

image.png

BulkFormatBuilder是列存储编码格式

  1. 代码

image.png

public class SinkToFileTest {public static void main(String[] args) throws Exception{StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(4);DataStreamSource<Event> stream = env.fromElements(new Event("Mary", "./home", 1000L),new Event("Bob", "./cart", 2000L),new Event("Alice", "./prod?id=100", 3000L),new Event("Bob", "./prod?id=1", 3300L),new Event("Alice", "./prod?id=200", 3000L),new Event("Bob", "./home", 3500L),new Event("Bob", "./prod?id=2", 3800L),new Event("Bob", "./prod?id=3", 4200L));//2.为了得到并传入SinkFunction,需要构建StreamingFileSink的一个对象//调用forRowFormat方法或者forBulkformat方法得到一个DefaultRowFormatBuilder//  其中forBulkformat方法前面还有类型参数,以及传参要求一个目录名称,一个编码器//写入文件需要序列化,需要定义序列化方法并进行编码转换,当成Stream写入文件//然后再使用builder创建实例StreamingFileSink<String> streamingFileSink = StreamingFileSink.<String>forRowFormat(new Path("./output"),new SimpleStringEncoder<>("UTF-8")).withRollingPolicy(//指定滚动策略,根据事件或者文件大小新产生文件归档保存DefaultRollingPolicy.builder()//使用builder构建实例.withMaxPartSize(1024 * 1024 * 1024).withRolloverInterval(TimeUnit.MINUTES.toMinutes(15))//事件间隔毫秒数.withInactivityInterval(TimeUnit.MINUTES.toMinutes(15))//当前不活跃的间隔事件,隔多长事件没有数据到来.build()).build();//1.写入文件调用addSink()方法,并传入SinkFunctionstream.map(data -> data.toString())//把Event类型转换成String.addSink(streamingFileSink);env.execute();}
}
  • 结果

image.png

5.5.3 输出到kafka

image.png

  1. 代码
public class SinkToKafka {public static void main(String[] args) throws Exception{StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);//1.从kafka中读取数据Properties properties = new Properties();properties.setProperty("bootstrap.servers","hadoop2:9092");properties.setProperty("group.id", "consumer-group");DataStreamSource<String> kafkaStream = env.addSource(new FlinkKafkaConsumer<String>("clicks", new SimpleStringSchema(), properties));//2.用flink进行简单的etl处理转换SingleOutputStreamOperator<String> result = kafkaStream.map(new MapFunction<String, String>() {@Overridepublic String map(String value) throws Exception {String[] fields = value.split(",");return new Event(fields[0].trim(), fields[1].trim(), Long.valueOf(fields[2].trim())).toString();}});//3.结果数据写入kafka//FlinkKafkaProducer传参borckList,topicid,序列化result.addSink(new FlinkKafkaProducer<String>("hadoop2:9092","events",new SimpleStringSchema()));env.execute();}
}
  1. kafka输出结果
    image.png

5.5.4 输出到redis

  1. 引入依赖
<dependency><groupId>org.apache.bahir</groupId><artifactId>flink-connector-redis_2.11</artifactId><version>1.0</version>
</dependency>
  1. 代码
    image.png
    继承自RichSinkFunction
    image.png

去调构造方法,换入redis集群的配置FlinkJedisConfigBase以及RedisMapper写入命令

image.png

FlinkJedisPoolConfig用这个没毛病,直接继承的FlinkJedisConfigBase

image.png

  1. 代码
public class SinkToRedis {public static void main(String[] args) throws Exception{StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);//1.输入ClickSource是自定义输入DataStreamSource<Event> stream = env.addSource(new ClickSource());//2.创建一个jedis连接配置//FlinkJedisPoolConfig直接继承的FlinkJedisConfigBaseFlinkJedisPoolConfig config = new FlinkJedisPoolConfig.Builder().setHost("hadoop2").build();//3.写入redisstream.addSink(new RedisSink<>(config,new MyRedisMapper()));env.execute();}//3.自定义类实现 redisMapper接口public static class MyRedisMapper implements RedisMapper<Event>{@Overridepublic RedisCommandDescription getCommandDescription() {return new RedisCommandDescription(RedisCommand.HSET,"clicks");//写入哈希表}@Overridepublic String getKeyFromData(Event data) {return data.user;}@Overridepublic String getValueFromData(Event data) {return data.url;}}
}
  1. 结果

image.png

5.5.5 输出到ElasticSearch

  1. 引入依赖
<dependency><groupId>org.apache.flink</groupId> 
<artifactId>flink-connector-elasticsearch6_${scala.binary.version}</artifact
Id>
<version>${flink.version}</version>
</dependency>
  1. 代码
    image.png

image.png

传入参数是List和ElasticsearchSinkFunction


image.png

image.png

public class SinToES {public static void main(String[] args) throws Exception{StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);//1.输入DataStreamSource<Event> stream = env.fromElements(new Event("Mary", "./home", 1000L),new Event("Bob", "./cart", 2000L),new Event("Alice", "./prod?id=100", 3000L),new Event("Bob", "./prod?id=1", 3300L),new Event("Alice", "./prod?id=200", 3000L),new Event("Bob", "./home", 3500L),new Event("Bob", "./prod?id=2", 3800L),new Event("Bob", "./prod?id=3", 4200L));//2.定义hosts的列表ArrayList<HttpHost> httpHosts = new ArrayList<>();httpHosts.add(new HttpHost("hadoop",9200));//3.定义ElasticsearchSinkFunction<T>,是个接口,重写process方法//向es发送请求,并插入数据ElasticsearchSinkFunction<Event> elasticsearchSinkFunction = new ElasticsearchSinkFunction<Event>() {@Override//输入,运行上下文,发送任务请求public void process(Event element, RuntimeContext ctx, RequestIndexer indexer) {HashMap<String, String> map = new HashMap<>();map.put(element.user, element.url);//构建一个indexrequestIndexRequest request = Requests.indexRequest().index("clicks").type("types").source(map);indexer.add(request);}};//4.写入es//传入参数是List<HttpHost>和ElasticsearchSinkFunction<T>stream.addSink(new ElasticsearchSink.Builder<>(httpHosts,elasticsearchSinkFunction).build());env.execute();}
}
  1. 结果

image.png

image.png

5.5.6 输入到Mysql

  1. 引入依赖
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId><version>${flink.version}</version>
</dependency>
<dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.47</version>
</dependency>
  1. 代码

image.png

三个参数,sql,JdbcStatementBuilder构造,JdbcConnectionOptions等sql的连接配置
image.png

image.png

单一抽象方法,lambda使用

public class SinkToMysql {public static void main(String[] args) throws Exception{StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);//1.输入DataStreamSource<Event> stream = env.fromElements(new Event("Mary", "./home", 1000L),new Event("Bob", "./cart", 2000L),new Event("Alice", "./prod?id=100", 3000L),new Event("Bob", "./prod?id=1", 3300L),new Event("Alice", "./prod?id=200", 3000L),new Event("Bob", "./home", 3500L),new Event("Bob", "./prod?id=2", 3800L),new Event("Bob", "./prod?id=3", 4200L));//三个参数,sql,JdbcStatementBuilder构造,JdbcConnectionOptions等sql的连接配置stream.addSink(JdbcSink.sink("INSERT INTO clicks (user,url) VALUES(?,?)",((statement,event)->{statement.setString(1,event.user);statement.setString(2,event.url);}),new JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withUrl("jdbc:mysql://localhost:3306/test2").withDriverName("com.mysql.jdbc.Driver").withUsername("root").withPassword("123456").build()));env.execute();}
}
  1. mysql前期准备
  • 创建mysql的test2
  • 创建clicks表
mysql> create table clicks(-> user varchar(20) not null,-> url varchar(100) not null);
Query OK, 0 rows affected (0.02 sec)
  1. 结果
    image.png

5.5.7 自定义Sink输出

  1. 分析

调用DataStream的addSink()方法,并传入自定义好的SinkFunction(采用富函数类),重写关键方法invoke(),并且重写富函数类的生命周期相关方法open和close

  1. 导入依赖
<dependency><groupId>org.apache.hbase</groupId><artifactId>hbase-client</artifactId><version>${hbase.version}</version>
</dependency>
  1. 代码

http://www.dtcms.com/wzjs/381105.html

相关文章:

  • 贵州水利建设官方网站站长工具seo综合查询怎么用
  • 建设行业的门户网站外贸网站都有哪些
  • 2003 iis网站发布网站网络营销专业培训学校
  • 微信网站建设价格广告联盟怎么加入
  • 广州建设网站网络营销整合推广
  • 网站怎么弄外贸互联网推广的
  • 做网站业务员提成几个点湖南优化推广
  • 网站伪静态seo关键词优化举例
  • 智能网站建设设计媒体资源网官网
  • 有名做网站公司免费域名注册永久
  • 高端网站建设的市场广东网站se0优化公司
  • 网站做用户记录软文营销的技巧
  • 任丘网站优化搜索引擎营销案例分析
  • 做中介卖房子开哪个网站理发培训专业学校
  • 桂林旅游网官方网站google ads
  • 如何降低网站的权重上海seo顾问
  • wordpress 多少张表seo查询爱站网
  • 做网站的公司广州太原网站推广排名
  • 西安网站建设xazxcy国内专业seo公司
  • 淘宝放单网站怎么做的店铺推广方法
  • 网站建设高级专员营销网站有哪些
  • 做竞价的网站还用做seo台州seo排名优化
  • 视频相亲网站开发成本太原搜索引擎优化招聘信息
  • 合肥网站制作需海底捞口碑营销案例
  • 菠菜网站怎么做推广爱站网关键词长尾挖掘工具
  • 安徽做公司网站哪家好互联网推广渠道
  • 红色色系做网站的配色百度搜索app
  • 百度推广还要求做网站西安高端网站建设
  • 网站中的销量排序用Axure怎样做谷歌浏览器手机版免费官方下载
  • 高水平的徐州网站建设常用的网络推广的方法有哪些