当前位置: 首页 > wzjs >正文

长沙网上商城网站建设方案提供秦皇岛网站建设

长沙网上商城网站建设方案,提供秦皇岛网站建设,水果网络营销策划书,标识公司简介 EventBus,又称消息总线,类似我们常见的消息中间件。支持点对点、请求与响应、发布订阅模式,支持跨服务跨语言通讯,分布式消息系统。 常见用法 点对点send 消息发送到某个地址上,Vertx把消息分发到注册到这个地…

简介

EventBus,又称消息总线,类似我们常见的消息中间件。支持点对点、请求与响应、发布订阅模式,支持跨服务跨语言通讯,分布式消息系统。

常见用法

点对点send

消息发送到某个地址上,Vertx把消息分发到注册到这个地址上的某个消费者上,若存在多个消费者,使用轮询算法选择一个消费者接收消息。

Producer

public class Producer extends AbstractVerticle {@Overridepublic void start(Promise<Void> startPromise) throws Exception {vertx.setPeriodic(5000L, h -> {System.out.println("***********************");vertx.eventBus().send("test", "你吃了吗");});super.start(startPromise);}
}

Consumer

public class Consumer extends AbstractVerticle {@Overridepublic void start(Promise<Void> startPromise) throws Exception {Random random = new Random();int num = random.nextInt(10000);vertx.eventBus().<String>consumer("test", msg -> {System.out.println(String.format("消费者%s接收到消息:%s", num, msg.body()));});super.start(startPromise);}
}

App

public class App {public static void main(String[] args) {Vertx vertx = Vertx.vertx();vertx.deployVerticle(Consumer.class.getName(), new DeploymentOptions().setInstances(3)).compose(res -> vertx.deployVerticle(Producer.class.getName()));}}

AppRunLog

***********************
消费者7414接收到消息:你吃了吗
***********************
消费者6421接收到消息:你吃了吗
***********************
消费者5802接收到消息:你吃了吗

请求响应request

请求与响应也是点对点模式的一种,区别在于消费者可以回复结果,两者可以进行会话交流。

Producer

public class Producer extends AbstractVerticle {@Overridepublic void start(Promise<Void> startPromise) throws Exception {vertx.setPeriodic(5000L, h -> {System.out.println("***********************");vertx.eventBus().<String>request("test", "你吃了吗").onComplete(res -> {if (res.succeeded()) {System.out.println("生产者收到回应:"+ res.result().body());}});});super.start(startPromise);}
}

Consumer

public class Consumer extends AbstractVerticle {@Overridepublic void start(Promise<Void> startPromise) throws Exception {Random random = new Random();int num = random.nextInt(10000);vertx.eventBus().<String>consumer("test", msg -> {System.out.println(String.format("消费者%s接收到消息:%s", num, msg.body()));msg.reply(num+"吃过啦");});super.start(startPromise);}
}

App

public class App {public static void main(String[] args) {Vertx vertx = Vertx.vertx();vertx.deployVerticle(Consumer.class, new DeploymentOptions().setInstances(3)).compose(res -> vertx.deployVerticle(Producer.class.getName()));}}

AppRunLog

***********************
消费者8585接收到消息:你吃了吗
生产者收到回应:8585吃过啦
***********************
消费者9672接收到消息:你吃了吗
生产者收到回应:9672吃过啦
***********************
消费者3615接收到消息:你吃了吗
生产者收到回应:3615吃过啦

发布订阅publish

消息发送到某个地址上,Vertx把消息分发到注册到这个地址上的所有消费者上。

Producer

public class Producer extends AbstractVerticle {@Overridepublic void start(Promise<Void> startPromise) throws Exception {vertx.setPeriodic(5000L, h -> {System.out.println("***********************");vertx.eventBus().publish("test", "你吃了吗");});super.start(startPromise);}
}

Consumer

public class Consumer extends AbstractVerticle {@Overridepublic void start(Promise<Void> startPromise) throws Exception {Random random = new Random();int num = random.nextInt(10000);vertx.eventBus().<String>consumer("test", msg -> {System.out.println(String.format("消费者%s接收到消息:%s", num, msg.body()));});super.start(startPromise);}
}

App

public class App {public static void main(String[] args) {Vertx vertx = Vertx.vertx();vertx.deployVerticle(Consumer.class.getName(), new DeploymentOptions().setInstances(3)).compose(res -> vertx.deployVerticle(Producer.class.getName()));}}

AppRunLog

***********************
消费者5195接收到消息:你吃了吗
消费者9192接收到消息:你吃了吗
消费者2572接收到消息:你吃了吗
***********************
消费者2572接收到消息:你吃了吗
消费者5195接收到消息:你吃了吗
消费者9192接收到消息:你吃了吗

编码解码器

Vertx消息总线默认只对一些基础类型的消息提供编码解码,若想要发送一个实体对象消息,那么需要自定义消息编码解码器,不然会报错 No message codec for type.

自定义单个编解码器

User

@Data
public class User {private String id;private String name;
}

UserCodec

public class UserCodec  implements MessageCodec<User, User> {/*** 编码* @param buffer* @param user*/@Overridepublic void encodeToWire(Buffer buffer, User user) {Buffer encoded = Json.CODEC.toBuffer(user, false);buffer.appendInt(encoded.length());buffer.appendBuffer(encoded);}/*** 集群传输解码* @param pos* @param buffer* @return*/@Overridepublic User decodeFromWire(int pos, Buffer buffer) {int length = buffer.getInt(pos);pos += 4;Buffer slice = buffer.slice(pos, pos + length);String json = slice.toString();return JacksonUtil.jsonToBean(json, User.class);}/*** 本地传输解码* @param user* @return*/@Overridepublic User transform(User user) {return user;}@Overridepublic String name() {return "user";}@Overridepublic byte systemCodecID() {return -1;}
}

Producer

public class Producer extends AbstractVerticle {@Overridepublic void start(Promise<Void> startPromise) throws Exception {vertx.setTimer(2000L, h -> {System.out.println("***********************");User user = new User();user.setId("1");user.setName("张三");vertx.eventBus().<User>request("test", user).onComplete(res -> {if (res.succeeded()) {System.out.println("生产者收到回应:"+ res.result().body());}});});super.start(startPromise);}
}

Consumer

public class Consumer extends AbstractVerticle {@Overridepublic void start(Promise<Void> startPromise) throws Exception {vertx.eventBus().<User>consumer("test", msg -> {System.out.println(String.format("消费者接收到消息:%s",msg.body()));User user = new User();user.setId("2");user.setName("李四");msg.reply(user);});super.start(startPromise);}
}

App

public class App {public static void main(String[] args) {Vertx vertx = Vertx.vertx();vertx.eventBus().registerDefaultCodec(User.class, new UserCodec());vertx.deployVerticle(Consumer.class.getName()).compose(res -> vertx.deployVerticle(Producer.class.getName()));}}

AppRunLog

***********************
消费者接收到消息:User(id=1, name=张三)
生产者收到回应:User(id=2, name=李四)

自定义公用编解码器

RestRequest

公用请求参数类

@Data
public class RestRequest<T> {private String reqTopic;private T reqBody;
}

ReqMessageCodec

公用请求参数消息编解码类

public class ReqMessageCodec<T> implements MessageCodec<RestRequest<T>, T> {@Overridepublic void encodeToWire(Buffer buffer, RestRequest<T> request) {Buffer encoded = Json.CODEC.toBuffer(request, false);buffer.appendInt(encoded.length());buffer.appendBuffer(encoded);}@Overridepublic T decodeFromWire(int pos, Buffer buffer) {int length = buffer.getInt(pos);pos += 4;Buffer slice = buffer.slice(pos, pos + length);String s = slice.toString();RestRequest<T> request = JacksonUtil.jsonToBean(s, new TypeReference<>() {});return request.getReqBody();}@Overridepublic T transform(RestRequest<T> request) {return request.getReqBody();}@Overridepublic String name() {return "req";}@Overridepublic byte systemCodecID() {return -1;}
}

RestResponse

公用响应消息类

@Data
public class RestResponse<T> {private int code;private T data;private String msg;
}

RespMessageCodec

公用响应消息编解码类

public class RespMessageCodec<T> implements MessageCodec<RestResponse<T>, RestResponse<T>> {@Overridepublic void encodeToWire(Buffer buffer, RestResponse<T> response) {Buffer encoded = Json.CODEC.toBuffer(response, false);buffer.appendInt(encoded.length());buffer.appendBuffer(encoded);}@Overridepublic RestResponse<T> decodeFromWire(int pos, Buffer buffer) {int length = buffer.getInt(pos);pos += 4;Buffer slice = buffer.slice(pos, pos + length);return JacksonUtil.jsonToBean(slice.toString(), new TypeReference<>() {});}@Overridepublic RestResponse<T> transform(RestResponse<T> response) {return response;}@Overridepublic String name() {return "resp";}@Overridepublic byte systemCodecID() {return -1;}
}

Producer

public class Producer extends AbstractVerticle {@Overridepublic void start(Promise<Void> startPromise) throws Exception {vertx.setTimer(2000L, h -> {System.out.println("***********************");User user = new User();user.setId("1");user.setName("张三");RestRequest<User> req = new RestRequest<>();req.setReqTopic("addUser");req.setReqBody(user);vertx.eventBus().<RestResponse>request(req.getReqTopic(), req).onComplete(res -> {if (res.succeeded()) {System.out.println("生产者收到回应:"+ res.result().body());}else {System.out.println("生产者发送消息失败:"+res.cause().getMessage());}});});super.start(startPromise);}
}

Consumer

public class Consumer extends AbstractVerticle {@Overridepublic void start(Promise<Void> startPromise) throws Exception {vertx.eventBus().<User>consumer("addUser", msg -> {System.out.println(String.format("消费者接收到消息:%s",msg.body()));RestResponse<String> response = new RestResponse();response.setCode(200);response.setMsg("插入成功");msg.reply(response);});super.start(startPromise);}
}

SingleApp

本地传输数据

public class SingleApp {public static void main(String[] args) {Vertx vertx = Vertx.vertx();vertx.eventBus().registerDefaultCodec(RestRequest.class, new ReqMessageCodec()).registerDefaultCodec(RestResponse.class, new RespMessageCodec());vertx.deployVerticle(Consumer.class.getName()).compose(res -> vertx.deployVerticle(Producer.class.getName()));}}

SingleAppRunLog

***********************
消费者接收到消息:User(id=1, name=张三)
生产者收到回应:RestResponse(code=200, data=null, msg=插入成功)

ClusterApp

集群模式传输数据

public class ClusterApp {static Vertx vertx;public static void initCluster(Verticle service) {ClusterManager mgr = new HazelcastClusterManager();VertxOptions options = new VertxOptions().setClusterManager(mgr);Vertx.clusteredVertx(options).compose(v -> {vertx = v;vertx.eventBus().registerDefaultCodec(RestRequest.class, new ReqMessageCodec()).registerDefaultCodec(RestResponse.class, new RespMessageCodec());return vertx.deployVerticle(service);}).onSuccess(h -> {System.out.println("App Start Complete!");}).onFailure(err -> {err.printStackTrace();System.err.println("App Start Failed "+ err.getMessage());});}
}

ConsumerApp

消费服务

public class ConsumerApp extends ClusterApp{public static void main(String[] args) {initCluster(new Consumer());}
}

ConsumerAppRunLog

Members {size:2, ver:2} [Member [127.0.0.1]:5703 - 29afc653-a4ef-4933-ab6a-ed0bbbb7041e thisMember [127.0.0.1]:5704 - 289a8535-cb6f-4fef-8501-04c40c56f189
]
消费者接收到消息:{id=1, name=张三}

ProducerApp

生产服务

public class ProducerApp extends ClusterApp{public static void main(String[] args) {initCluster(new Producer());}
}

ProducerAppRunLog

Members {size:2, ver:2} [Member [127.0.0.1]:5703 - 29afc653-a4ef-4933-ab6a-ed0bbbb7041eMember [127.0.0.1]:5704 - 289a8535-cb6f-4fef-8501-04c40c56f189 this
]***********************
生产者收到回应:RestResponse(code=200, data=null, msg=插入成功)

DeliveryOptions

设置请求响应超时时间

public class Producer extends AbstractVerticle {@Overridepublic void start(Promise<Void> startPromise) throws Exception {System.out.println(new Date()+"***********************");vertx.eventBus().<String>request("test", "你吃了吗",new DeliveryOptions().setSendTimeout(5000)).onComplete(res -> {if (res.succeeded()) {System.out.println(new Date()+"生产者接收应答消息:"+res.result().body());} else {System.out.println(new Date()+"生产者发送消息失败"+res.cause().getMessage());}});super.start(startPromise);}
}

文章转载自:

http://9u5SIpUI.dcccL.cn
http://m5iq8eXU.dcccL.cn
http://aV26OJp0.dcccL.cn
http://iqjd78Ue.dcccL.cn
http://zKoWPmSc.dcccL.cn
http://SKbDwZcc.dcccL.cn
http://bxPt4uC5.dcccL.cn
http://KxaD3U8a.dcccL.cn
http://FSrICrDh.dcccL.cn
http://zddkgjIH.dcccL.cn
http://B3KUeZDc.dcccL.cn
http://WfCjLexf.dcccL.cn
http://m7oRDeVZ.dcccL.cn
http://lhW4ISgh.dcccL.cn
http://w7uro6a1.dcccL.cn
http://GzlI9NzU.dcccL.cn
http://Nb0r9w0o.dcccL.cn
http://xvxOPoZj.dcccL.cn
http://Rk7MVJZU.dcccL.cn
http://OQlinGUj.dcccL.cn
http://6AOiXrjn.dcccL.cn
http://OMTeaAGL.dcccL.cn
http://D6idFnqg.dcccL.cn
http://LtUSAZo6.dcccL.cn
http://okOpVmul.dcccL.cn
http://bA32egpo.dcccL.cn
http://5ofPxQxi.dcccL.cn
http://rEhgmJCH.dcccL.cn
http://xmgKaFbK.dcccL.cn
http://fkUQJciw.dcccL.cn
http://www.dtcms.com/wzjs/625452.html

相关文章:

  • 自己做的网站响应速度慢帝国做网站的步骤
  • 建设网站需要什么内容金华开发区人才网
  • 做喷绘可以在那个网站找外贸新手怎样用谷歌找客户
  • 网站的设计方法有哪些内容网站定制合同
  • 网站做影集安全吗网站建设公司利润
  • asp.net网站开发试题网站之间如何交换友情链接
  • 华大 网站建设郑东新区建设局网站
  • 万网发布网站做小程序商城
  • 还能用的wap网站后台查看网站容量
  • 昆山专业的网站建设哪些网站是做货源的
  • 网站建设与营销有没有做的很炫的科技型网站
  • 企业开展网站建设网站定制设计制作公司
  • 江西省城乡建设培训网官方网站百度网站名称
  • 购物网站怎么做项目简介wordpress为什么被
  • 网站刷流量会怎么样开发网站的基本流程五个阶段
  • 公司门户网站模板网站建立多少钱
  • 深圳网站建设哪家最好小程序登录网址
  • 大一学生做的网站哪个网站可以做代练
  • 一个网站大概多少页面网站建设的竞争对手的分析
  • 网站建设宣传方案建站优化公司
  • 站内优化网站怎么做兰州市网站建设公司
  • 装饰网站建设套餐报价营销型网站应用
  • 网站优化检测珠海网站定制
  • 搜狗网站推广网站弹出公告代码
  • 网站首页被k还有救吗上海外贸营销网站建设地址
  • 网站制作的服务怎么样鞍山做网站排名
  • 广东工程建设咨询有限公司网站门户网站的建设公司
  • 成都建设工程交易中心网站凡科女装
  • 西安网站制作哪家便宜又好宣传推广
  • 网站访问速度 云主机有网站后台模板如何做数据库