当前位置: 首页 > news >正文

宜章泰鑫建设有限公司网站给村里做网站

宜章泰鑫建设有限公司网站,给村里做网站,想当淘客自己的网站怎么做,wordpress 图片缩小文章目录 1.如何自定义分区机制2.示例 1.如何自定义分区机制 若需要使用自定义分区机制,需要完成两件事: 1)在 producer 程序中创建一个类,实现 org.apache.kafka.clients.producer.Partitioner 接口主要分区逻辑在 Partitioner.partition中…

文章目录

  • 1.如何自定义分区机制
  • 2.示例


1.如何自定义分区机制

若需要使用自定义分区机制,需要完成两件事:
1)在 producer 程序中创建一个类,实现 org.apache.kafka.clients.producer.Partitioner 接口主要分区逻辑在 Partitioner.partition中实现。
2)在用于构造KafkaProducer的Properties对象中设置 partitioner.class 参数。

2.示例

假设我们的消息中有一些消息是用于审计功能的,这类消息的 key 会被固定地分配一个字符串“audit”。我们想要让这类消息发送到 topic 的最后一个分区上,便于后续统一处理,而对于相同 topic 下的其他消息则采用随机发送的策略发送到其他分区上。那么现在就可以这样来实现自定义的分区策略,如下列代码所示:

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
import java.util.List;
import java.util.Map;
import java.util.Random;
public class AuditPartitioner implements Partitioner {private Random random;@Overridepublic void configure(Map<String, ?> map) {//该方法实现必要资源的初始化工作random= new Random();}@Overridepublic int partition(String topic, Object keyObj, byte[] keyBytes, Object valueObj, byte[] valueBytes, Cluster cluster) {String key=(String)keyObj;//从集群元数据中把属于该topic的所有分区信息都读取出供分区策略使用List<PartitionInfo> partitionInfoList = cluster.availablePartitionsForTopic(topic);int partitionCount =partitionInfoList.size();int auditPartition=partitionCount-1;return key == null|| key.isEmpty()|| !key.contains ("audit")?random.nextInt(partitionCount-1):auditPartition;}@Overridepublic void close() {//该方法实现必要资源的清理工作}
}

创建好自定义分区策略类后,在构建KafkaProducer 之前为Properties增加该属性;代码如下:

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
public class ProducerTest {public static void main(String[] args) throws ExecutionException, InterruptedException {Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");//必须指定props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");//必须指定props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");//必须指定props.put("acks", "-1");props.put("retries", 3);props.put("batch.size", 323840);props.put("linger.ms", 10);props.put("buffer.memory", 33554432);props.put("max.block.ms", 3000);props.put("partitioner.class","com.exm.collectcodenew.kafka.producer.custompartitioner.AuditPartitioner");Producer<String, String> producer = new KafkaProducer<>(props);ProducerRecord nonKeyRecord = new ProducerRecord("topic-test","non-key record");ProducerRecord auditRecord = new ProducerRecord("topic-test", "audit","audit record");ProducerRecord nonAuditRecord =new ProducerRecord("topic-test","other","non-sudit record");producer.send(nonKeyRecord).get();producer.send(nonAuditRecord).get();producer.send(auditRecord).get();producer.send(nonKeyRecord).get();producer.send(nonAuditRecord).get();producer.close();}
}
http://www.dtcms.com/a/504447.html

相关文章:

  • 【学习系列】SAP RAP 10:行为定义-Determinations和Validations
  • 织梦可以做导航网站网络营销的推广方式
  • 建设网站方法wordpress文章显示时间
  • 公司微信网站建设方案模板下载黑白色调网站
  • 中企视窗做网站怎么样今天哈尔滨最新通知
  • 网站域名的所有权网站建设资质备案
  • 网站建设目的主要包括哪些做网站要不要35类商标
  • MySQL 核心数据类型详解与实战案例
  • 郑州做网站公司电话php+mysql网站开发全程实例 pdf
  • 老薛主机做两个网站营销业务应用系统
  • SpringMVC初始
  • 哈尔滨房产信息网官网论坛seo网站
  • 离石网站建设16岁0元开网店赚钱软件
  • 建阳建设局网站建设网站企业运营
  • 基于 Zynq 的目标检测系统软硬协同架构设计(一)
  • 软件工程模块复盘:高频考点清单 + 5 道真题汇总
  • 网站js代码不显示wordpress 模板结构
  • 宁波网站推广厂家云浮网站设计
  • 做网站都需要什么东西网站开始怎么做
  • 网站开发子孙账号化妆品品牌推广方案
  • 前端埋点与监控实战指南
  • 通俗范畴论20 向量空间范畴与自然变换
  • ppt可以做网站桂城网站设计
  • 做pc端大型网站 前端用中国建设银行的网站设计
  • 楚雄市建设规划批前公示在那个网站网件路由器app 中文版
  • 【2026计算机毕业设计】基于Springboot的校园失物招领小程序
  • 第四章 IP地址规划
  • 12-头文件
  • 顺义做网站苏州专业做优化公司
  • 算法笔记 04