当前位置: 首页 > wzjs >正文

网站源码生成百度导航最新版本免费下载

网站源码生成,百度导航最新版本免费下载,百度手机网址提交,班级网站建设心得体会范文上节成功实现了FlinkKafkaConsumer消费Kafka数据&#xff0c;并将数据写入到控制台&#xff0c;接下来将继续将计算的结果输入到redis中。 pom.xml 引入redis到pom包 <?xml version"1.0" encoding"UTF-8"?> <project xmlns"http://mave…

上节成功实现了FlinkKafkaConsumer消费Kafka数据,并将数据写入到控制台,接下来将继续将计算的结果输入到redis中。

pom.xml

引入redis到pom包

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.example</groupId><artifactId>flink-demo</artifactId><version>1.0-SNAPSHOT</version><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding></properties><dependencies><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-java --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>1.17.1</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>1.17.1</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>1.17.1</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-api-java-bridge --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge</artifactId><version>1.17.1</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-api-java --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java</artifactId><version>1.17.1</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-base</artifactId><version>1.17.1</version></dependency><dependency><groupId>org.apache.maven</groupId><artifactId>maven-plugin-api</artifactId><version>2.0</version></dependency><dependency><groupId>org.apache.maven.plugin-tools</groupId><artifactId>maven-plugin-annotations</artifactId><version>3.2</version></dependency><dependency><groupId>org.codehaus.plexus</groupId><artifactId>plexus-utils</artifactId><version>3.0.8</version></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.8.2</version><scope>test</scope></dependency><!--mybatis坐标--><dependency><groupId>org.mybatis</groupId><artifactId>mybatis</artifactId><version>3.4.5</version></dependency><!--mysql驱动坐标--><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.6</version><scope>runtime</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka</artifactId><version>1.17.1</version></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.7.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-redis_2.11</artifactId><version>1.1.0</version></dependency></dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-plugin-plugin</artifactId><version>3.2</version><executions><execution><phase>package</phase><configuration><filters><filter><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters></configuration></execution></executions></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><configuration><source>8</source><target>8</target></configuration></plugin></plugins>
</build>
</project>

KafkaProducer.java 生产数据存入Kafka

同上一节,具体代码

package org.example.snow.demo5;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;import java.util.Properties;/*** @author snowsong*/
public class KafkaTestProducer {public static void main(String[] args) throws InterruptedException {Properties props = new Properties();// Kafka 集群的初始连接地址props.put("bootstrap.servers", "172.16.1.173:9092");// 序列化器 将 Java 对象序列化为字节数组props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");// kafka生产者KafkaProducer<String, String> producer = new KafkaProducer<>(props);// 消息循环for (int i = 0; i < 50; i++) {String key = "key-" + i;String value = "value-" + i;ProducerRecord<String, String> record = new ProducerRecord<>("xue", key, value);producer.send(record);System.out.println("send: " + key);Thread.sleep(200);}// 关闭生产者producer.close();}
}

启动服务类

Flink消费Kafka,并将结果存入redis。
设置FlinkRedisConfig

   // 配置 Redis 连接池,设置 Redis 服务器地址和端口并构建对象FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost(REDIS_SERVER).setPort(REDIS_PORT).build();// 创建 RedisSink 对象,用于将数据写入 RedisRedisSink<Tuple2<String, String>> redisSink = new RedisSink<>(conf, new MyRedisMapper());// 将 RedisSink 添加到数据流中,作为数据的接收端wordData.addSink(redisSink);

MyRedisMapper
它实现了 RedisMapper 接口,用于自定义 Redis 数据的映射规则。MyRedisMapper 类用于将 Flink 数据流中的 Tuple2 对象映射到 Redis 命令中。

public static class MyRedisMapper implements RedisMapper<Tuple2<String,String>> {/*** 获取当前命令的描述信息。** @return 返回Redis命令的描述信息对象,其中包含了命令的类型为LPUSH。*/@Overridepublic RedisCommandDescription getCommandDescription() {return new RedisCommandDescription(RedisCommand.LPUSH);}/*** 从给定的Tuple2数据中获取键。** @param data 一个包含两个字符串元素的Tuple2对象* @return 返回Tuple2对象的第一个元素,即键*/@Overridepublic String getKeyFromData(Tuple2<String,String> data) {return data.f0;}/*** 从给定的元组中获取第二个元素的值。** @param data 一个包含两个字符串元素的元组* @return 元组中的第二个元素的值*/@Overridepublic String getValueFromData(Tuple2<String,String> data) {return data.f1;}

starApp的完整代码如下:

package org.example.snow.demo5;import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;import java.util.Properties;/*** @author snowsong*/
public class StartApp {private static final String REDIS_SERVER = "0.0.0.0";private static final Integer REDIS_PORT = 6379;public static void main(String[] args) throws Exception {// 初始化StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 配置 Kafka 客户端的连接参数Properties properties = new Properties();properties.setProperty("bootstrap.servers", "172.16.1.173:9092");FlinkKafkaConsumer flinkKafkaConsumer = new FlinkKafkaConsumer<>("xue",new SimpleStringSchema(), properties);DataStreamSource dataStreamSource = env.addSource(flinkKafkaConsumer);// 将接收的数据映射为二元组SingleOutputStreamOperator<Tuple2<String, String>> wordData = dataStreamSource.map(new MapFunction<String, Tuple2<String, String>>() {/*** 将输入的字符串映射为 Tuple2 对象。** @param value 输入的字符串* @return 一个包含两个元素的 Tuple2 对象,第一个元素为 "l_words",第二个元素为输入的字符串* @throws Exception 如果发生异常,则抛出该异常*/@Overridepublic Tuple2<String, String> map(String value) throws Exception {return new Tuple2<>("l_words", value);}});// 配置 Redis 连接池,设置 Redis 服务器地址和端口并构建对象FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost(REDIS_SERVER).setPort(REDIS_PORT).build();// 创建 RedisSink 对象,用于将数据写入 RedisRedisSink<Tuple2<String, String>> redisSink = new RedisSink<>(conf, new MyRedisMapper());// 将 RedisSink 添加到数据流中,作为数据的接收端wordData.addSink(redisSink);env.execute();}/*** MyRedisMapper 类用于将 Flink 数据流中的 Tuple2 对象映射到 Redis 命令中。* 它实现了 RedisMapper 接口,用于自定义 Redis 数据的映射规则。*/public static class MyRedisMapper implements RedisMapper<Tuple2<String,String>> {/*** 获取当前命令的描述信息。** @return 返回Redis命令的描述信息对象,其中包含了命令的类型为LPUSH。*/@Overridepublic RedisCommandDescription getCommandDescription() {return new RedisCommandDescription(RedisCommand.LPUSH);}/*** 从给定的Tuple2数据中获取键。** @param data 一个包含两个字符串元素的Tuple2对象* @return 返回Tuple2对象的第一个元素,即键*/@Overridepublic String getKeyFromData(Tuple2<String,String> data) {return data.f0;}/*** 从给定的元组中获取第二个元素的值。** @param data 一个包含两个字符串元素的元组* @return 元组中的第二个元素的值*/@Overridepublic String getValueFromData(Tuple2<String,String> data) {return data.f1;}}}

运行结果

请添加图片描述

存入redis结果
请添加图片描述

http://www.dtcms.com/wzjs/432140.html

相关文章:

  • 番禺网站建设优化推广东莞网站制作推广公司
  • 网站开发维护花费百度站长收录
  • 北海做网站公司搜索关键词排名提升
  • 众v创业营网站开发最近新闻摘抄
  • 天津西青区疫情最新消息今天零基础seo入门教学
  • 国内做交互网站seo排名优化哪家好
  • 重庆网站建设要点平台如何做推广
  • wordpress 本地写文章seo外包如何
  • 福州企业做网站哪里可以学企业管理培训
  • 佛山市南海区疫情惠州百度seo在哪
  • 网站制作-杭州恢复原来的百度
  • 厦门电子商务网站建设开发做一个网站需要多少钱
  • 哪些网站做批发广告投放的方式有哪些
  • 网站关键词优化技巧多地优化完善疫情防控措施
  • 怎么开店seo优化实训总结
  • 英语培训学校网站建设多少钱品牌网络营销策划
  • 怎样网站seo新媒体seo指的是什么
  • 商丘网签查询企业关键词优化推荐
  • 佛山做网站建设杭州seo网
  • 昆明网站建设排名seo网站优化培训怎么做
  • 专业酒店设计网站建设图们网络推广
  • 网上怎么接单做网站免费网站模板
  • 网站前台后台网站优化外包
  • 建设专业网站价格seo 资料包怎么获得
  • 有什么好的网站查做外贸出口的企业网络营销策略分析案例
  • 青岛 两学一做 网站临沂seo推广外包
  • .net做的网站怎么样萝卜建站
  • 网站流量分析报告搜索引擎优化的目的是
  • 个人如何注册公司流程seo算法入门教程
  • 中国做的好的房产网站网站设计费用明细