当前位置: 首页 > wzjs >正文

网站主办者是谁宝安区网络公司

网站主办者是谁,宝安区网络公司,flash优秀网站,做简单的网站上节成功实现了FlinkKafkaConsumer消费Kafka数据&#xff0c;并将数据写入到控制台&#xff0c;接下来将继续将计算的结果输入到redis中。 pom.xml 引入redis到pom包 <?xml version"1.0" encoding"UTF-8"?> <project xmlns"http://mave…

上节成功实现了FlinkKafkaConsumer消费Kafka数据,并将数据写入到控制台,接下来将继续将计算的结果输入到redis中。

pom.xml

引入redis到pom包

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.example</groupId><artifactId>flink-demo</artifactId><version>1.0-SNAPSHOT</version><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding></properties><dependencies><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-java --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>1.17.1</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>1.17.1</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>1.17.1</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-api-java-bridge --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge</artifactId><version>1.17.1</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-api-java --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java</artifactId><version>1.17.1</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-base</artifactId><version>1.17.1</version></dependency><dependency><groupId>org.apache.maven</groupId><artifactId>maven-plugin-api</artifactId><version>2.0</version></dependency><dependency><groupId>org.apache.maven.plugin-tools</groupId><artifactId>maven-plugin-annotations</artifactId><version>3.2</version></dependency><dependency><groupId>org.codehaus.plexus</groupId><artifactId>plexus-utils</artifactId><version>3.0.8</version></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.8.2</version><scope>test</scope></dependency><!--mybatis坐标--><dependency><groupId>org.mybatis</groupId><artifactId>mybatis</artifactId><version>3.4.5</version></dependency><!--mysql驱动坐标--><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.6</version><scope>runtime</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka</artifactId><version>1.17.1</version></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.7.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-redis_2.11</artifactId><version>1.1.0</version></dependency></dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-plugin-plugin</artifactId><version>3.2</version><executions><execution><phase>package</phase><configuration><filters><filter><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters></configuration></execution></executions></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><configuration><source>8</source><target>8</target></configuration></plugin></plugins>
</build>
</project>

KafkaProducer.java 生产数据存入Kafka

同上一节,具体代码

package org.example.snow.demo5;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;import java.util.Properties;/*** @author snowsong*/
public class KafkaTestProducer {public static void main(String[] args) throws InterruptedException {Properties props = new Properties();// Kafka 集群的初始连接地址props.put("bootstrap.servers", "172.16.1.173:9092");// 序列化器 将 Java 对象序列化为字节数组props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");// kafka生产者KafkaProducer<String, String> producer = new KafkaProducer<>(props);// 消息循环for (int i = 0; i < 50; i++) {String key = "key-" + i;String value = "value-" + i;ProducerRecord<String, String> record = new ProducerRecord<>("xue", key, value);producer.send(record);System.out.println("send: " + key);Thread.sleep(200);}// 关闭生产者producer.close();}
}

启动服务类

Flink消费Kafka,并将结果存入redis。
设置FlinkRedisConfig

   // 配置 Redis 连接池,设置 Redis 服务器地址和端口并构建对象FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost(REDIS_SERVER).setPort(REDIS_PORT).build();// 创建 RedisSink 对象,用于将数据写入 RedisRedisSink<Tuple2<String, String>> redisSink = new RedisSink<>(conf, new MyRedisMapper());// 将 RedisSink 添加到数据流中,作为数据的接收端wordData.addSink(redisSink);

MyRedisMapper
它实现了 RedisMapper 接口,用于自定义 Redis 数据的映射规则。MyRedisMapper 类用于将 Flink 数据流中的 Tuple2 对象映射到 Redis 命令中。

public static class MyRedisMapper implements RedisMapper<Tuple2<String,String>> {/*** 获取当前命令的描述信息。** @return 返回Redis命令的描述信息对象,其中包含了命令的类型为LPUSH。*/@Overridepublic RedisCommandDescription getCommandDescription() {return new RedisCommandDescription(RedisCommand.LPUSH);}/*** 从给定的Tuple2数据中获取键。** @param data 一个包含两个字符串元素的Tuple2对象* @return 返回Tuple2对象的第一个元素,即键*/@Overridepublic String getKeyFromData(Tuple2<String,String> data) {return data.f0;}/*** 从给定的元组中获取第二个元素的值。** @param data 一个包含两个字符串元素的元组* @return 元组中的第二个元素的值*/@Overridepublic String getValueFromData(Tuple2<String,String> data) {return data.f1;}

starApp的完整代码如下:

package org.example.snow.demo5;import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;import java.util.Properties;/*** @author snowsong*/
public class StartApp {private static final String REDIS_SERVER = "0.0.0.0";private static final Integer REDIS_PORT = 6379;public static void main(String[] args) throws Exception {// 初始化StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 配置 Kafka 客户端的连接参数Properties properties = new Properties();properties.setProperty("bootstrap.servers", "172.16.1.173:9092");FlinkKafkaConsumer flinkKafkaConsumer = new FlinkKafkaConsumer<>("xue",new SimpleStringSchema(), properties);DataStreamSource dataStreamSource = env.addSource(flinkKafkaConsumer);// 将接收的数据映射为二元组SingleOutputStreamOperator<Tuple2<String, String>> wordData = dataStreamSource.map(new MapFunction<String, Tuple2<String, String>>() {/*** 将输入的字符串映射为 Tuple2 对象。** @param value 输入的字符串* @return 一个包含两个元素的 Tuple2 对象,第一个元素为 "l_words",第二个元素为输入的字符串* @throws Exception 如果发生异常,则抛出该异常*/@Overridepublic Tuple2<String, String> map(String value) throws Exception {return new Tuple2<>("l_words", value);}});// 配置 Redis 连接池,设置 Redis 服务器地址和端口并构建对象FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost(REDIS_SERVER).setPort(REDIS_PORT).build();// 创建 RedisSink 对象,用于将数据写入 RedisRedisSink<Tuple2<String, String>> redisSink = new RedisSink<>(conf, new MyRedisMapper());// 将 RedisSink 添加到数据流中,作为数据的接收端wordData.addSink(redisSink);env.execute();}/*** MyRedisMapper 类用于将 Flink 数据流中的 Tuple2 对象映射到 Redis 命令中。* 它实现了 RedisMapper 接口,用于自定义 Redis 数据的映射规则。*/public static class MyRedisMapper implements RedisMapper<Tuple2<String,String>> {/*** 获取当前命令的描述信息。** @return 返回Redis命令的描述信息对象,其中包含了命令的类型为LPUSH。*/@Overridepublic RedisCommandDescription getCommandDescription() {return new RedisCommandDescription(RedisCommand.LPUSH);}/*** 从给定的Tuple2数据中获取键。** @param data 一个包含两个字符串元素的Tuple2对象* @return 返回Tuple2对象的第一个元素,即键*/@Overridepublic String getKeyFromData(Tuple2<String,String> data) {return data.f0;}/*** 从给定的元组中获取第二个元素的值。** @param data 一个包含两个字符串元素的元组* @return 元组中的第二个元素的值*/@Overridepublic String getValueFromData(Tuple2<String,String> data) {return data.f1;}}}

运行结果

请添加图片描述

存入redis结果
请添加图片描述

http://www.dtcms.com/wzjs/550115.html

相关文章:

  • 响应式网站设计建设制作南京网站设计建设
  • 网站建设规模广州网站优化软件
  • p2vr做的网站上传各种手艺培训班
  • 备案个人可以做视频网站吗建设电子商务网站的目的
  • 东莞优速网站建设推广罗裕电话营销
  • 400电话网站模板怎么申请小程序
  • 网站图片上传功能怎么做的动漫制作专业电脑配置要求
  • 做网站的属于什么wordpress轻量级插件
  • 网站各种按钮代码湖北黄石网站建设
  • 江苏汇算清缴在哪个网站做典型的四大综合门户网站
  • 手机网站源码wordpress+团购
  • 武夷山网站推广服务广东富盈建设有限公司企业网站
  • 做公司网站计入什么会计科目网站建设维护是做什么会计科目
  • 珠海建设工程备案网站网页制作模板保存
  • 遵义网站制作的网站深圳做网站设计公司
  • 深圳中瑞建设集团官方网站网站建设需求发布
  • 服装网站建设策划方案万维网如何建设网站
  • 网站维护费怎么做会计分录首页设计图
  • 建站的方式有哪些wordpress seo 百度
  • 网站开发分类南京做信息登记公司网站
  • ppt做书模板下载网站有哪些内容网站开发之前前后端不分离
  • 厦门人才网唯一官网招聘济南网络优化网站
  • 您的网站空间已过期绘制网站结构图
  • 个人网站建站目的网站费用预算
  • 国内网站在国外访问很慢有关做有机肥的企业网站
  • 网站制作方案和主要内容金华网站制作策划
  • 免费网站建设公司网页编辑如何添加图片
  • 电商网站做互联网金融网页设计动态效果怎么制作
  • 咸阳网站建设报价设计公司logo设计大全
  • qq邮件网站建设的模块商贸公司寮步网站建设