当前位置: 首页 > wzjs >正文

做一个网站的流程天津市市建设与管理局网站

做一个网站的流程,天津市市建设与管理局网站,做美食网站的图片,网站建设多久可以学会上节成功实现了FlinkKafkaConsumer消费Kafka数据&#xff0c;并将数据写入到控制台&#xff0c;接下来将继续将计算的结果输入到redis中。 pom.xml 引入redis到pom包 <?xml version"1.0" encoding"UTF-8"?> <project xmlns"http://mave…

上节成功实现了FlinkKafkaConsumer消费Kafka数据,并将数据写入到控制台,接下来将继续将计算的结果输入到redis中。

pom.xml

引入redis到pom包

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.example</groupId><artifactId>flink-demo</artifactId><version>1.0-SNAPSHOT</version><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding></properties><dependencies><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-java --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>1.17.1</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>1.17.1</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>1.17.1</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-api-java-bridge --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge</artifactId><version>1.17.1</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-api-java --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java</artifactId><version>1.17.1</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-base</artifactId><version>1.17.1</version></dependency><dependency><groupId>org.apache.maven</groupId><artifactId>maven-plugin-api</artifactId><version>2.0</version></dependency><dependency><groupId>org.apache.maven.plugin-tools</groupId><artifactId>maven-plugin-annotations</artifactId><version>3.2</version></dependency><dependency><groupId>org.codehaus.plexus</groupId><artifactId>plexus-utils</artifactId><version>3.0.8</version></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.8.2</version><scope>test</scope></dependency><!--mybatis坐标--><dependency><groupId>org.mybatis</groupId><artifactId>mybatis</artifactId><version>3.4.5</version></dependency><!--mysql驱动坐标--><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.6</version><scope>runtime</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka</artifactId><version>1.17.1</version></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.7.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-redis_2.11</artifactId><version>1.1.0</version></dependency></dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-plugin-plugin</artifactId><version>3.2</version><executions><execution><phase>package</phase><configuration><filters><filter><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters></configuration></execution></executions></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><configuration><source>8</source><target>8</target></configuration></plugin></plugins>
</build>
</project>

KafkaProducer.java 生产数据存入Kafka

同上一节,具体代码

package org.example.snow.demo5;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;import java.util.Properties;/*** @author snowsong*/
public class KafkaTestProducer {public static void main(String[] args) throws InterruptedException {Properties props = new Properties();// Kafka 集群的初始连接地址props.put("bootstrap.servers", "172.16.1.173:9092");// 序列化器 将 Java 对象序列化为字节数组props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");// kafka生产者KafkaProducer<String, String> producer = new KafkaProducer<>(props);// 消息循环for (int i = 0; i < 50; i++) {String key = "key-" + i;String value = "value-" + i;ProducerRecord<String, String> record = new ProducerRecord<>("xue", key, value);producer.send(record);System.out.println("send: " + key);Thread.sleep(200);}// 关闭生产者producer.close();}
}

启动服务类

Flink消费Kafka,并将结果存入redis。
设置FlinkRedisConfig

   // 配置 Redis 连接池,设置 Redis 服务器地址和端口并构建对象FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost(REDIS_SERVER).setPort(REDIS_PORT).build();// 创建 RedisSink 对象,用于将数据写入 RedisRedisSink<Tuple2<String, String>> redisSink = new RedisSink<>(conf, new MyRedisMapper());// 将 RedisSink 添加到数据流中,作为数据的接收端wordData.addSink(redisSink);

MyRedisMapper
它实现了 RedisMapper 接口,用于自定义 Redis 数据的映射规则。MyRedisMapper 类用于将 Flink 数据流中的 Tuple2 对象映射到 Redis 命令中。

public static class MyRedisMapper implements RedisMapper<Tuple2<String,String>> {/*** 获取当前命令的描述信息。** @return 返回Redis命令的描述信息对象,其中包含了命令的类型为LPUSH。*/@Overridepublic RedisCommandDescription getCommandDescription() {return new RedisCommandDescription(RedisCommand.LPUSH);}/*** 从给定的Tuple2数据中获取键。** @param data 一个包含两个字符串元素的Tuple2对象* @return 返回Tuple2对象的第一个元素,即键*/@Overridepublic String getKeyFromData(Tuple2<String,String> data) {return data.f0;}/*** 从给定的元组中获取第二个元素的值。** @param data 一个包含两个字符串元素的元组* @return 元组中的第二个元素的值*/@Overridepublic String getValueFromData(Tuple2<String,String> data) {return data.f1;}

starApp的完整代码如下:

package org.example.snow.demo5;import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;import java.util.Properties;/*** @author snowsong*/
public class StartApp {private static final String REDIS_SERVER = "0.0.0.0";private static final Integer REDIS_PORT = 6379;public static void main(String[] args) throws Exception {// 初始化StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 配置 Kafka 客户端的连接参数Properties properties = new Properties();properties.setProperty("bootstrap.servers", "172.16.1.173:9092");FlinkKafkaConsumer flinkKafkaConsumer = new FlinkKafkaConsumer<>("xue",new SimpleStringSchema(), properties);DataStreamSource dataStreamSource = env.addSource(flinkKafkaConsumer);// 将接收的数据映射为二元组SingleOutputStreamOperator<Tuple2<String, String>> wordData = dataStreamSource.map(new MapFunction<String, Tuple2<String, String>>() {/*** 将输入的字符串映射为 Tuple2 对象。** @param value 输入的字符串* @return 一个包含两个元素的 Tuple2 对象,第一个元素为 "l_words",第二个元素为输入的字符串* @throws Exception 如果发生异常,则抛出该异常*/@Overridepublic Tuple2<String, String> map(String value) throws Exception {return new Tuple2<>("l_words", value);}});// 配置 Redis 连接池,设置 Redis 服务器地址和端口并构建对象FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost(REDIS_SERVER).setPort(REDIS_PORT).build();// 创建 RedisSink 对象,用于将数据写入 RedisRedisSink<Tuple2<String, String>> redisSink = new RedisSink<>(conf, new MyRedisMapper());// 将 RedisSink 添加到数据流中,作为数据的接收端wordData.addSink(redisSink);env.execute();}/*** MyRedisMapper 类用于将 Flink 数据流中的 Tuple2 对象映射到 Redis 命令中。* 它实现了 RedisMapper 接口,用于自定义 Redis 数据的映射规则。*/public static class MyRedisMapper implements RedisMapper<Tuple2<String,String>> {/*** 获取当前命令的描述信息。** @return 返回Redis命令的描述信息对象,其中包含了命令的类型为LPUSH。*/@Overridepublic RedisCommandDescription getCommandDescription() {return new RedisCommandDescription(RedisCommand.LPUSH);}/*** 从给定的Tuple2数据中获取键。** @param data 一个包含两个字符串元素的Tuple2对象* @return 返回Tuple2对象的第一个元素,即键*/@Overridepublic String getKeyFromData(Tuple2<String,String> data) {return data.f0;}/*** 从给定的元组中获取第二个元素的值。** @param data 一个包含两个字符串元素的元组* @return 元组中的第二个元素的值*/@Overridepublic String getValueFromData(Tuple2<String,String> data) {return data.f1;}}}

运行结果

请添加图片描述

存入redis结果
请添加图片描述

http://www.dtcms.com/wzjs/615613.html

相关文章:

  • 重庆网站关键词排名优化在川航网站购票后怎么做
  • 河南企业站seowordpress小工具放入用户中心
  • 网站建设玖金手指谷哥十2023新冠结束了吗
  • 广园路建设公司网站镇江市质监站网址
  • 网站建设预算计算方法建站素材网
  • 绿植行业做网站的wordpress get_post
  • 网站的打开速度人力资源服务外包
  • 响应式网站设计软件网页代码大全
  • 企业网站的管理系统网站开发及维护合同
  • 广州网站建设开发微信企业app下载安装
  • 做淘宝美工图片网站交通运输局网站建设方案
  • 上海微信小程序网站建设易讯网络网站建设
  • 做免费的视频网站可以赚钱吗免费的网站模版
  • 做系统去哪网站下载镜像域名申请而完成以后怎么做网站
  • 心理服务网站建设内容江门特色
  • apache添加多个网站长春网站设计平台
  • 华为云专业网站定制网站开发实训
  • 徐州网站免费的个人网站平台
  • 九江公司网站建设网站建设一条龙全包
  • 自己做的网站套dedecms教程广州游戏网站建设
  • dedecms网站tag标签静态化网站建设制作培训
  • 深圳网站制作公司兴田德润放心免费申请地图定位
  • 购物网站建设价格网站开发人员的工资
  • 自助网站上海做网站的公司是什么
  • 如何申请建设网站视频软件制作
  • 宁波网站优化价格wordpress中文对照
  • 企业网站开发费是固定资产吗时间轴网站代码
  • 同人那个小说网站做的最好交互式网站设计怎么做
  • 网站建设案例图片便民的网站app怎么做
  • 专业的网站开发公司如何做公司网站网页