当前位置: 首页 > wzjs >正文

建筑工程分包平台优化网站哪家好

建筑工程分包平台,优化网站哪家好,wordpress 数据插件,最近国家新闻夯实 kafka 系列|第五章:基于 kafka 分布式事件框架 eval-event 文章目录 夯实 kafka 系列|第五章:基于 kafka 分布式事件框架 eval-event1.前言2.需求2.1 描述2.2 使用方式2.2.1 事件 Event2.2.2 服务端 Publisher2.2.3 客户端 Listener 3.设计3.1 接口…

夯实 kafka 系列|第五章:基于 kafka 分布式事件框架 eval-event

文章目录

  • 夯实 kafka 系列|第五章:基于 kafka 分布式事件框架 eval-event
    • 1.前言
    • 2.需求
      • 2.1 描述
      • 2.2 使用方式
        • 2.2.1 事件 Event
        • 2.2.2 服务端 Publisher
        • 2.2.3 客户端 Listener
    • 3.设计
      • 3.1 接口设计
        • 3.1.1 事件监听者
        • 3.1.2 事件发送者
        • 3.1.3 事件
      • 3.2 EvalEventPublisher kafka 实现
      • 3.3 EvalEventListener kafka 实现
        • 3.3.1 KafkaEvalEventDispatcher 事件处理者
        • 3.3.2 EvalEventListenerThread 事件监听线程
        • 3.3.3 其他细节
    • 4.测试
      • 4.1 发送事件测试
        • 4.1.1 Hello World
        • 4.1.2 发送 Bean 对象
      • 4.2 监听事件测试
        • 4.2.1 源码调试
        • 4.2.2 监听 MyEvent 自定义事件
        • 4.2.3 监听 UserCreatedEvent 自定义事件

1.前言

本文分享基于 kafka 的分布式事件框架,从 0 到 1 的实现过程

  • 需求->设计->开发->测试 一起来实现一个分布式事件框架;

  • 使用方式类似于 Spring Event,事件可以在微服务集群中传递。

源码已上传 github

  • https://github.com/huajiexiewenfeng/eval-event

2.需求

2.1 描述

1.事件机制在集群中传递

  • 微服务发送事件消息
  • 微服务订阅事件消息

2.事件通过kafka 来进行发送和监听

2.2 使用方式

2.2.1 事件 Event

每个自定义事件对应一个 kafka 中的 topic

  • 继承 EvalEvent

  • 重写 getTopic 方法

    • 设置事件存储的 topic 名称
public class MyEvent extends EvalEvent {private String message;...@Overridepublic String getTopic() {return "my-test-topic";}
}
2.2.2 服务端 Publisher

EvalEventPublisher.publishEvent 发送事件

@RestController
@RequestMapping(value = {"/api/eval/event/v1"})
public class HelloController {@Autowiredprivate EvalEventPublisher<MyEvent> evalEventPublisher;@RequestMapping("/hello")public String hello() {MyEvent myEvent = new MyEvent();myEvent.setMessage("Hello, World!");// 发送事件evalEventPublisher.publishEvent(myEvent);return "ok";}
}
2.2.3 客户端 Listener
  • @EvalEventListener 注解方式(本文未实现该方式,有机会在下一篇文章中实现)
@EvalEventListener 
public void onEvalEvent(MyEvent event){...
}
  • 实现 Listener 接口方式
@Component
public class TestEventListener implements EvalEventListener<MyEvent> {...@Overridepublic void onEvent(MyEvent event) {...}
}

3.设计

3.1 接口设计

3.1.1 事件监听者
public interface EvalEventListener<T extends EvalEvent> {/*** 事件处理方法** @param event 事件*/void onEvent(T event);
}
3.1.2 事件发送者
public interface EvalEventPublisher<T extends EvalEvent> {/*** 发送事件** @param event 事件*/void publishEvent(T event);
}
3.1.3 事件
public abstract class EvalEvent implements Serializable {private static final long serialVersionUID = 1L;/*** 事件ID 对应 Kafka message 的 key*/private String id = UUID.randomUUID().toString();public abstract String getTopic();public String getId() {return id;}public void setId(String id) {this.id = id;}
}

3.2 EvalEventPublisher kafka 实现

需要实现

  • 初始化 kafka producer
  • producer 发送 event
package com.csdn.event.kafka.publisher;import com.csdn.event.kafka.config.EvalKafkaProperties;
import com.csdn.event.sdk.EvalEvent;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;
import java.util.Properties;@Component
public class KafkaEvalEventPublisher<T extends EvalEvent> implements EvalEventPublisher<T> {private static final Logger log = LoggerFactory.getLogger(KafkaEvalEventPublisher.class);@Autowiredprivate EvalKafkaProperties kafkaProperties;private Producer<String, EvalEvent> producer;@PostConstructpublic void init() {try {// Initialize the Kafka producerProperties props = new Properties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers());props.put(ProducerConfig.ACKS_CONFIG, kafkaProperties.getProducer().getAcks());props.put(ProducerConfig.RETRIES_CONFIG, kafkaProperties.getProducer().getRetries());props.put(ProducerConfig.BATCH_SIZE_CONFIG, kafkaProperties.getProducer().getBatchSize());props.put(ProducerConfig.LINGER_MS_CONFIG, kafkaProperties.getProducer().getLingerMs());props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, kafkaProperties.getProducer().getBufferMemory());props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, kafkaProperties.getProducer().getMaxRequestSize());props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.springframework.kafka.support.serializer.JsonSerializer");producer = new KafkaProducer<>(props);} catch (Exception e) {log.error(">>>> Kafka producer initialization failed", e);}}@Overridepublic void publishEvent(T event) {producer.send(buildRecord(event), (recordMetadata, e) -> {if (e != null) {log.error(String.format(">>>> 事件id:%s, topic:%s, 发送失败", event.getId(), event.getTopic()), e);} else {log.debug(">>>> 事件id:{}, topic:{}, 发送成功", event.getId(), event.getTopic());}});}private ProducerRecord<String, EvalEvent> buildRecord(EvalEvent t) {return new ProducerRecord<>(t.getTopic(), t.getId(), t);}}

3.3 EvalEventListener kafka 实现

3.3.1 KafkaEvalEventDispatcher 事件处理者
  • 反射方式找到 EvalEvent 和 listener 的映射关系
  • 循环 EvalEventListener
  • 启动 EvalEventListenerThread 事件监听处理线程
@Component
@ConditionalOnProperty(value = "event.kafka.listener.enabled", havingValue = "true")
public class KafkaEvalEventDispatcher<T extends EvalEvent> {private static final Logger log = LoggerFactory.getLogger(KafkaEvalEventDispatcher.class);@Autowiredprivate List<EvalEventListener<T>> listeners;@Autowiredprivate EventKafkaConsumerFactory eventKafkaConsumerFactory;@Value("${event.kafka.base-package:com}")private String basePackage;@PostConstructpublic void init() {log.info(">>>> KafkaEvalEventListener initialized");// 反射方式找到 EvalEvent 和 listener 的映射关系try {Map<String, EvalEventDefinition> evalEventDefinitionsMap = EvalEventSubclassScannerUtil.getEvalEventDefinitions(basePackage);log.info(">>>> KafkaEvalEventListener topicMap: {}", evalEventDefinitionsMap);for (EvalEventListener<T> listener : listeners) {// 获取监听器的类名String cla = listener.getClass().getName();if (evalEventDefinitionsMap.containsKey(cla)) {EvalEventDefinition evalEventDefinition = evalEventDefinitionsMap.get(cla);log.info(">>>> KafkaEvalEventListener found listener: {}", evalEventDefinition);EvalEventListenerThread<T> evalEventListenerThread = new EvalEventListenerThread<>(evalEventDefinition, listener, eventKafkaConsumerFactory);evalEventListenerThread.start();}}} catch (Exception e) {e.printStackTrace();}}}
3.3.2 EvalEventListenerThread 事件监听线程
  • 创建 KafkaConsumer
  • Consumer 拉取消息 records
  • 处理消息 records
  • ack 提交
public class EvalEventListenerThread<T extends EvalEvent> extends Thread {...此处省略N行代码@Overridepublic void run() {// 1. 创建KafkaConsumerKafkaConsumer<String, ?> consumer;try {consumer = eventKafkaConsumerFactory.buildKafkaConsumer(evalEventListener);List<String> topicList = new ArrayList<>();topicList.add(evalEventDefinition.getTopic());consumer.subscribe(topicList);} catch (Exception e) {log.error("KafkaConsumer构造失败", e);e.printStackTrace();return;}// 2. 消费消息try {while (true) {try {// 3. 拉取消息ConsumerRecords<String, ?> records = consumer.poll(Duration.ofMillis(500));if (records.isEmpty()) {continue;}// 4. 处理消息dispatch(records);// 5. 使用异步提交规避阻塞consumer.commitAsync();} catch (Exception e) {log.error("消息处理异常", e);}}} finally {try {// 6.最后一次提交使用同步阻塞式提交consumer.commitSync();} finally {consumer.close();}}}...此处省略N行代码
3.3.3 其他细节

还有很多细节就不在这里赘述了,比如

  • 序列化-自定义 Json 序列化
    • 这个需要自己来实现,因为默认提供的 json 序列化,会出现 Java Class 不匹配的情况
      • com.csdn.event.kafka.serialization.JsonDeserializer
      • com.csdn.event.kafka.serialization.JsonSerializer
  • 反射获取 EventListener 泛型工具类
    • com.csdn.event.kafka.utils.EvalEventSubclassScannerUtil
  • 提交优化
    • 同时使用了 commitSync() 和 commitAsync()。对于常规性、阶段性的手动提交,我们调用 commitAsync() 避免程序阻塞,而在 Consumer 要关闭前,我们调用 commitSync() 方法执行同步阻塞式的位移提交,以确保 Consumer 关闭前能够保存正确的位移数据

4.测试

4.1 发送事件测试

4.1.1 Hello World
@RestController
@RequestMapping(value = {"/api/eval/event/v1"})
public class HelloController {@Autowiredprivate EvalEventPublisher<MyEvent> evalEventPublisher;@GetMapping("/hello")public String hello() {MyEvent myEvent = new MyEvent();myEvent.setMessage("Hello, World!");// 发送事件evalEventPublisher.publishEvent(myEvent);return "ok";}
}

postman 发送请求

请添加图片描述

断点1

请添加图片描述

断点2

请添加图片描述

kafka tools 查看 topic 消息

请添加图片描述

4.1.2 发送 Bean 对象

HelloController 新增接口

    @Autowiredprivate EvalEventPublisher<UserCreatedEvent> userEventPublisher;  @GetMapping("/user")public String user() {UserCreatedEvent userCreatedEvent = new UserCreatedEvent();User user = new User("xwf", 18);userCreatedEvent.setUser(user);// 发送事件userEventPublisher.publishEvent(userCreatedEvent);return "ok";}

UserCreatedEvent 用户创建事件

public class UserCreatedEvent extends EvalEvent {private User user;public UserCreatedEvent() {}public UserCreatedEvent(User user) {this.user = user;}public User getUser() {return user;}public void setUser(User user) {this.user = user;}@Overridepublic String getTopic() {return "user-create-topic";}

测试结果
kafka 中的消息如下:
请添加图片描述

4.2 监听事件测试

4.2.1 源码调试

断点1

  • listeners -》MyEventEventListener
  • 同时获取到对应的 EvalEventDefinition
    • topic -》my-test-topic 用于 consumer 拉取 topic 消息
    • eventClass-》com.csdn.example.listener.model.MyEvent(自定事件)用于消息反序列化

请添加图片描述

断点2,EvalEventListenerThread 拉取 kafka 消息

请添加图片描述

最终调用方法

  • UserEventEventListener#onEvent

请添加图片描述

4.2.2 监听 MyEvent 自定义事件

请添加图片描述

4.2.3 监听 UserCreatedEvent 自定义事件

请添加图片描述


文章转载自:

http://ZZFfUS61.yqyhr.cn
http://Bce5s6uV.yqyhr.cn
http://7lDqcndk.yqyhr.cn
http://xRlchoLd.yqyhr.cn
http://nyT4xlnY.yqyhr.cn
http://7xUB1Y0S.yqyhr.cn
http://ZIuyy1lL.yqyhr.cn
http://BPnSdi4P.yqyhr.cn
http://YlU3k3uG.yqyhr.cn
http://JqqiONR1.yqyhr.cn
http://nWOqHhtH.yqyhr.cn
http://AgD5OB2L.yqyhr.cn
http://ioAj8Tzl.yqyhr.cn
http://MGwzUpem.yqyhr.cn
http://yUvgQtvr.yqyhr.cn
http://ws36K3b3.yqyhr.cn
http://de2vLJqc.yqyhr.cn
http://893JuIY4.yqyhr.cn
http://rpMcTTLm.yqyhr.cn
http://tx67RliU.yqyhr.cn
http://UmLBlL9o.yqyhr.cn
http://oBJ03wOG.yqyhr.cn
http://mIuHXLuh.yqyhr.cn
http://xipvo7ji.yqyhr.cn
http://a3rXjC6d.yqyhr.cn
http://Dih2MEip.yqyhr.cn
http://FPkkgaOv.yqyhr.cn
http://cApQGTsW.yqyhr.cn
http://31fWmCWn.yqyhr.cn
http://PizRVbEv.yqyhr.cn
http://www.dtcms.com/wzjs/647960.html

相关文章:

  • 邯郸做移动网站价格厦门品牌网站设计
  • 学校信息门户网站建设江苏建设外贸公司网站
  • 那种系统做网站比较好上海做网站的公司
  • 支持微信支付的网站开发室内空间设计
  • 电商网站有哪些特色qq空间如何发布wordpress
  • 上杭网站设计公司找别人建网站去哪里
  • 山东省建设局网站首页网站模块制作
  • 本地化网站建设网站建设零基础教材免费下载
  • 制作网站学什么专业开源cms建站
  • 网站改版优化果洛wap网站建设
  • 淄博市建设业协会网站万网 网站超市
  • 万户网站做的怎样网站建设服务器有哪些
  • yp77731域名查询福州网站建设方案优化
  • 建网站找兴田德润网上商城网站建设规划
  • 燕郊建设局网站国外做兼职网站设计
  • 国外有哪些网站做推广的比较好网站设计公司哪家好如何选择呀
  • 南宁营销型网站建设公司哪家好生鲜网站建设背景
  • 建瓯网站制作如何做企业产品推广
  • 小江网站建设公司统一门户网站建设规范
  • 谷歌收录网站网站自己怎么做直播
  • 珠海网站艰涩和合肥网站建设渠道
  • 临沂网站建设兼职优秀的网站建设解决方案
  • 做网站界面尺寸是多少答辩ppt模板下载免费完整版
  • 好看大方的企业网站源码.net石家庄新闻最新
  • 做网站的说3年3年包括什么如何把网站上传到网上
  • aspcms中引文 网站修改配置wordpress博客分类
  • 如何建设一个收费的影视图文网站广西响应式网站哪家好
  • 个人电影网站建设wordpress评论ajax加载
  • 网站下载系统网站建设优化公司招聘
  • 宁波自主建站模板做网站的价格贵吗