当前位置: 首页 > wzjs >正文

山东省住房城乡建设厅网站世界杯比分查询

山东省住房城乡建设厅网站,世界杯比分查询,创建企业需要什么条件,网站怎么创建内容在分布式消息系统中,Apache Kafka 的主题分区机制是其核心特性之一。它不仅提供了高吞吐量和可扩展性,还通过分区实现了消息的有序存储和高效消费。本文将通过详细的代码示例和分析,帮助读者深入理解 Kafka 的主题分区机制。 一、Kafka 分区的…

在分布式消息系统中,Apache Kafka 的主题分区机制是其核心特性之一。它不仅提供了高吞吐量和可扩展性,还通过分区实现了消息的有序存储和高效消费。本文将通过详细的代码示例和分析,帮助读者深入理解 Kafka 的主题分区机制。
一、Kafka 分区的基本概念
在 Kafka 中,每个主题(Topic)被划分为多个分区(Partition)。分区是 Kafka 存储消息的基本单位,每个分区是一个有序的、不可变的消息序列。消息在分区中被分配一个唯一的偏移量(Offset),用于标识消息在分区中的位置。生产者(Producer)在发送消息时可以指定分区,也可以让 Kafka 自动分配分区。消费者(Consumer)按照偏移量顺序读取消息,从而保证消息的顺序性。
二、创建主题和分区
在 Kafka 中,可以通过 Admin API 创建主题并指定分区数量。以下是一个简单的 Java 示例代码,展示如何使用 Kafka 的 AdminClient 创建主题:
java复制
package com.logicbig.example;

import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.NewTopic;

import java.util.Collections;
import java.util.Properties;

public class TopicCreator {
public static void main(String[] args) throws Exception {
createTopic(“example-topic-1”, 1);
createTopic(“example-topic-2”, 2);
}

private static void createTopic(String topicName, int numPartitions) throws Exception {Properties config = new Properties();config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");AdminClient admin = AdminClient.create(config);// 检查主题是否已存在boolean alreadyExists = admin.listTopics().names().get().stream().anyMatch(existingTopicName -> existingTopicName.equals(topicName));if (alreadyExists) {System.out.printf("主题已存在: %s%n", topicName);} else {// 创建新主题System.out.printf("创建主题: %s%n", topicName);NewTopic newTopic = new NewTopic(topicName, numPartitions, (short) 1);admin.createTopics(Collections.singleton(newTopic)).all().get();}// 描述主题System.out.println("-- 描述主题 --");admin.describeTopics(Collections.singleton(topicName)).all().get().forEach((topic, desc) -> {System.out.println("主题: " + topic);System.out.printf("分区数量: %s, 分区ID: %s%n", desc.partitions().size(),desc.partitions().stream().map(p -> Integer.toString(p.partition())).collect(Collectors.joining(",")));});
}

}
运行上述代码后,会创建两个主题:example-topic-1 和 example-topic-2,分别包含 1 个和 2 个分区。
三、消息发送与分区
(一)指定分区发送消息
生产者在发送消息时可以显式指定分区。以下代码展示了如何向单分区主题发送消息:
java复制
package com.logicbig.example;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

public class PartitionExample1 {
private static int PARTITION_COUNT = 1;
private static String TOPIC_NAME = “example-topic-1”;
private static int MSG_COUNT = 4;

public static void main(String[] args) throws Exception {KafkaProducer<String, String> producer = ExampleHelper.createProducer();for (int i = 0; i < MSG_COUNT; i++) {String value = "message-" + i;String key = Integer.toString(i);producer.send(new ProducerRecord<>(TOPIC_NAME, 0, key, value));}
}

}
运行结果如下:
复制
发送消息主题: example-topic-1, key: 0, value: message-0, 分区: 0
发送消息主题: example-topic-1, key: 1, value: message-1, 分区: 0
发送消息主题: example-topic-1, key: 2, value: message-2, 分区: 0
发送消息主题: example-topic-1, key: 3, value: message-3, 分区: 0
(二)多分区主题的消息发送
对于多分区主题,生产者可以将消息发送到不同的分区:
java复制
package com.logicbig.example;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

public class PartitionExample2 {
private static int PARTITION_COUNT = 2;
private static String TOPIC_NAME = “example-topic-2”;
private static int MSG_COUNT = 4;

public static void main(String[] args) throws Exception {KafkaProducer<String, String> producer = ExampleHelper.createProducer();for (int i = 0; i < MSG_COUNT; i++) {for (int partitionId = 0; partitionId < PARTITION_COUNT; partitionId++) {String value = "message-" + i;String key = Integer.toString(i);producer.send(new ProducerRecord<>(TOPIC_NAME, partitionId, key, value));}}
}

}
运行结果如下:
复制
发送消息主题: example-topic-2, key: 0, value: message-0, 分区: 0
发送消息主题: example-topic-2, key: 0, value: message-0, 分区: 1
发送消息主题: example-topic-2, key: 1, value: message-1, 分区: 0
发送消息主题: example-topic-2, key: 1, value: message-1, 分区: 1
发送消息主题: example-topic-2, key: 2, value: message-2, 分区: 0
发送消息主题: example-topic-2, key: 2, value: message-2, 分区: 1
发送消息主题: example-topic-2, key: 3, value: message-3, 分区: 0
发送消息主题: example-topic-2, key: 3, value: message-3, 分区: 1
(三)不指定分区发送消息
如果生产者不显式指定分区,Kafka 会根据默认的分区策略(通常基于消息的键)选择分区:
java复制
package com.logicbig.example;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

public class PartitionExample3 {
private static String TOPIC_NAME = “example-topic-2”;
private static int MSG_COUNT = 4;

public static void main(String[] args) throws Exception {KafkaProducer<String, String> producer = ExampleHelper.createProducer();for (int i = 0; i < MSG_COUNT; i++) {String value = "message-" + i;String key = Integer.toString(i);producer.send(new ProducerRecord<>(TOPIC_NAME, key, value));}
}

}
运行结果如下:
复制
发送消息主题: example-topic-2, key: 0, value: message-0, 分区: 未指定
分区分配: 0
发送消息主题: example-topic-2, key: 1, value: message-1, 分区: 未指定
分区分配: 1
发送消息主题: example-topic-2, key: 2, value: message-2, 分区: 未指定
分区分配: 0
发送消息主题: example-topic-2, key: 3, value: message-3, 分区: 未指定
分区分配: 1
四、消息消费与分区
消费者按照分区顺序读取消息。以下代码展示了如何消费单分区和多分区主题的消息:
java复制
package com.logicbig.example;

import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;

import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;

public class ConsumerExample {
public static void main(String[] args) throws Exception {
KafkaConsumer<String, String> consumer = ExampleHelper.createConsumer(“example-topic-2”);
int numMsgReceived = 0;
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(2));
for (ConsumerRecord<String, String> record : records) {
numMsgReceived++;
System.out.printf(“消费消息: key = %s, value = %s, 分区ID = %s, 偏移量 = %s%n”,
record.key(), record.value(), record.partition(), record.offset());
}
consumer.commitSync();
if (numMsgReceived >= 8) {
break;
}
}
}
}
运行结果如下:
复制
消费消息: key = 0, value = message-0, 分区ID = 0, 偏移量 = 0
消费消息: key = 1, value


文章转载自:

http://cTF48mBZ.jkLns.cn
http://Jj9oK1aD.jkLns.cn
http://bIYXh9MN.jkLns.cn
http://xAoZdodl.jkLns.cn
http://Y9PLi6RJ.jkLns.cn
http://o0YMIGiw.jkLns.cn
http://ppu6bvk1.jkLns.cn
http://3rrvb9BR.jkLns.cn
http://LxnXhLy3.jkLns.cn
http://O3nVJzip.jkLns.cn
http://YhfCgLCr.jkLns.cn
http://gAgiIvWV.jkLns.cn
http://IfPgU39D.jkLns.cn
http://N5bPTzym.jkLns.cn
http://KFwAIsPn.jkLns.cn
http://yc1vYYGS.jkLns.cn
http://5ZKO2IIf.jkLns.cn
http://INuAFPow.jkLns.cn
http://2yR31xsp.jkLns.cn
http://SMC2pkyp.jkLns.cn
http://Qo7TUaRU.jkLns.cn
http://TNNKBp4S.jkLns.cn
http://NdUfT0v2.jkLns.cn
http://wgElZ8jl.jkLns.cn
http://P7gqsGYc.jkLns.cn
http://GaqBPOSO.jkLns.cn
http://6LV6iz8p.jkLns.cn
http://QXl1Ht6B.jkLns.cn
http://vIXG8bXQ.jkLns.cn
http://aVZKbs9M.jkLns.cn
http://www.dtcms.com/wzjs/685364.html

相关文章:

  • 长春网站设计长春网络推广松江集团网站建设
  • 企业网站开发的背景和意义查网站的建站系统
  • 做ppt会去什么网站找图电商后台管理网站模板
  • 中国建设银行英语网站首页免费的企业网站源码
  • 安徽建网站公司济南网站建设推荐搜点网络NO1
  • 注册网站需要备案吗windows vps offline性x
  • 自己做的美食分享到网站网站主机安全
  • 河南省建设安全监督总站网站五百亿网站搬家公司
  • 企业搭建网站哪家好dede安装好后是模板怎么变成做好的网站
  • 营销渠道有哪些企业网站站内优化
  • 恩施网站建设xiduyun最新手机资讯
  • 佛山网站建设明细青海最好的网站建设公司
  • 怎么用织梦系统建一个网站牛商网上市了吗
  • 深圳市住房和建设局网站变更做网站购买域名
  • 未来分发网下载app昆明做网站seo
  • 优购物官方网站下载企业建设网站的方式
  • 买表的网站达濠网红景点
  • 济南行业网站开发深圳网站设计工资一般多少
  • 新泰网站seo软件开发和网站开发区别
  • 网站建设几个要素网店运营推广网站
  • 酒店品牌设计网站建设网站建设的原则有哪些方面
  • 外链数是网站反向链接码html后台网站模板
  • 网站验证码系统wordpress数据库配置
  • 百度站长平台网页手机全军采购信息招标网
  • 网站建网站建设公司wordpress 中文数据库
  • 建设公司网站的必要性建设一个素材网站
  • 建设网站的网站叫什么男网站后台管理系统制作软件
  • 合肥网站建设解决方案建一个网站花多少钱
  • 重庆建设造价信息网站微信公众号怎么开通免费
  • seo快速优化软件网站广州网站推广哪家强