当前位置: 首页 > wzjs >正文

达州住房和城乡建设厅网站php网站开发周期多长

达州住房和城乡建设厅网站,php网站开发周期多长,wordpress创建表格,石家庄网站建设联系方式在 Kafka 中,默认情况下消息是按分区进行顺序存储和读取的,但全局顺序消费(即所有分区的消息按顺序消费)较难实现。下面分别介绍 Kafka 按分区顺序消费以及实现全局顺序消费的相关内容。 按分区顺序消费 Kafka 本身可以保证单个…

在 Kafka 中,默认情况下消息是按分区进行顺序存储和读取的,但全局顺序消费(即所有分区的消息按顺序消费)较难实现。下面分别介绍 Kafka 按分区顺序消费以及实现全局顺序消费的相关内容

按分区顺序消费

Kafka 本身可以保证单个分区内的消息是顺序写入和顺序读取的,以下是其原理和实现要点:

原理

  • 消息写入:Kafka 生产者在发送消息时,如果指定了分区,消息会被顺序追加到该分区的日志文件末尾。Kafka 的分区日志是一个只允许追加写入的文件,这种设计保证了消息在分区内的顺序性。
  • 消息读取:Kafka 消费者从分区中按偏移量(offset)顺序读取消息,偏移量是消息在分区内的唯一标识,消费者按照偏移量从小到大的顺序读取消息,从而保证了消息消费的顺序性。

实现要点

  • 生产者配置:生产者在发送消息时,需要明确指定消息要发送到的分区。可以通过自定义分区器或者直接指定分区号来实现。
import org.apache.kafka.clients.producer.*;
import java.util.Properties;public class OrderedProducer {public static void main(String[] args) {Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");Producer<String, String> producer = new KafkaProducer<>(props);String topic = "test-topic";int partition = 0; // 指定分区号for (int i = 0; i < 10; i++) {ProducerRecord<String, String> record = new ProducerRecord<>(topic, partition, "key-" + i, "value-" + i);producer.send(record);}producer.close();}
}
  • 消费者配置:消费者需要确保按顺序处理消息,并且在处理完一条消息后再处理下一条消息。同时,要避免手动调整偏移量,以免破坏消息的顺序。
import org.apache.kafka.clients.consumer.*;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;public class OrderedConsumer {public static void main(String[] args) {Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("group.id", "test-group");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("auto.offset.reset", "earliest");KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);String topic = "test-topic";consumer.subscribe(Collections.singletonList(topic));try {while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> record : records) {System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());// 处理完一条消息后再处理下一条}consumer.commitSync(); // 同步提交偏移量}} finally {consumer.close();}}
}

全局顺序消费

要实现全局顺序消费,需要将所有消息发送到同一个分区,因为 Kafka 只能保证单个分区内的消息顺序性。但这种方式会带来性能瓶颈,因为单个分区的处理能力是有限的。

实现要点

  • 生产者配置:生产者需要将所有消息都发送到同一个分区,可以通过自定义分区器来实现。
import org.apache.kafka.clients.producer.*;
import java.util.Properties;public class GlobalOrderedProducer {public static void main(String[] args) {Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("partitioner.class", "com.example.SinglePartitionPartitioner");Producer<String, String> producer = new KafkaProducer<>(props);String topic = "test-topic";for (int i = 0; i < 10; i++) {ProducerRecord<String, String> record = new ProducerRecord<>(topic, "key-" + i, "value-" + i);producer.send(record);}producer.close();}
}// 自定义分区器,将所有消息发送到同一个分区
class SinglePartitionPartitioner implements Partitioner {@Overridepublic int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {return 0; // 所有消息都发送到分区0}@Overridepublic void close() {}@Overridepublic void configure(java.util.Map<String, ?> configs) {}
}
  • 消费者配置:只需要一个消费者实例来消费该分区的消息,避免多个消费者同时消费同一个分区导致的顺序问题。
import org.apache.kafka.clients.consumer.*;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;public class GlobalOrderedConsumer {public static void main(String[] args) {Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("group.id", "test-group");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("auto.offset.reset", "earliest");KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);String topic = "test-topic";consumer.subscribe(Collections.singletonList(topic));try {while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> record : records) {System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());// 处理完一条消息后再处理下一条}consumer.commitSync(); // 同步提交偏移量}} finally {consumer.close();}}
}
http://www.dtcms.com/wzjs/567973.html

相关文章:

  • dede 网站版权信息网站备案花钱吗
  • 成都高端网站建设网站开发及维护合同范本
  • 做阿里巴巴网站费用吗网站建设福
  • 中国网站排名网官网上海营销咨询公司
  • 张家界简单的网站建设人才网最新招聘信息网
  • 有没有专门做京东天猫的人才网站北京智能网站建设平台
  • 临汾做网站网页的网站建设
  • 网站建设细化流程电工培训机构
  • 世界经理人网站手机版惠州做网站首选惠州邦
  • 咋把网站制作成软件哈尔滨做网站哪里好
  • 喊人做网站需要注意些什么上海做网站的公司多少钱
  • 如何在网上做自己的网站定制开发app价格
  • 网站改版合同广州安全教育平台账号是多少
  • 在线制作二维码网站携程旅行网站建设分析
  • 商汇通网站网络营销推广的方式方法有哪些
  • 天津做网站哪家好大量图片展示网站模板
  • 梅州网站建设wlwlwordpress分类目录链接
  • 建立个人网站怎么赚钱5118素材网站
  • 网站开发好做还是平面好做闸北区网站设计与制作
  • 口碑好网站建设哪家好顺德品牌网站建设价位
  • asp.net网站开发百科房产网站程序
  • 江西app网站建设手机画平面图软件
  • 网站关键词下降浙江新华建设有限公司网站
  • 宿迁大型三合一网站开发wordpress手机维护
  • 个人网站效果wordpress app 开发
  • 网站建设客户开发方法wordpress域名配置
  • 网站定位与功能分析地方门户系统源码
  • 网页设计与网站开发的区别六安网站建设招商
  • 哈尔滨市建设安全监察网站_首页做网站挣钱不
  • 在哪里做马可波罗网站网站运营培训班