当前位置: 首页 > news >正文

物联网之使用Vertx实现HTTP/WebSocket最佳实践

小伙伴们,你们好呀,我是老寇,跟我一起学习使用Vertx实现HTTP-Server和WebSocket-Server

实现Http/WebSocket【响应式】

Vertx-Web地址

实现过程

查看源码

代码比较简单,懒得讲解啦
代码比较简单,懒得讲解啦
代码比较简单,懒得讲解啦
http/websocket【响应式】
<dependency><groupId>io.vertx</groupId><artifactId>vertx-web</artifactId><version>5.0.0</version>
</dependency>

HttpServerProperties

/*** @author laokou*/
@Data
@Component
@ConfigurationProperties(prefix = "spring.http-server")
public class HttpServerProperties {private boolean auth = true;private String host = "0.0.0.0";private Set<Integer> ports = new HashSet<>(0);private boolean compressionSupported = false;private int compressionLevel = 6;private int maxWebSocketFrameSize = 65536;private int maxWebSocketMessageSize = 65536 * 4;private boolean handle100ContinueAutomatically = false;private int maxChunkSize = 8192;private int maxInitialLineLength = 4096;private int maxHeaderSize = 8192;private int maxFormAttributeSize = 8192;private int maxFormFields = 512;private int maxFormBufferedBytes = 2048;private Http2Settings initialSettings = new Http2Settings().setMaxConcurrentStreams(DEFAULT_INITIAL_SETTINGS_MAX_CONCURRENT_STREAMS);private List<HttpVersion> alpnVersions = new ArrayList<>(DEFAULT_ALPN_VERSIONS);private boolean http2ClearTextEnabled = true;private int http2ConnectionWindowSize = -1;private boolean decompressionSupported = false;private boolean acceptUnmaskedFrames = false;private int decoderInitialBufferSize = 256;private boolean perFrameWebSocketCompressionSupported = true;private boolean perMessageWebSocketCompressionSupported = true;private int webSocketCompressionLevel = 6;private boolean webSocketAllowServerNoContext = false;private boolean webSocketPreferredClientNoContext = false;private int webSocketClosingTimeout = 30;private TracingPolicy tracingPolicy = TracingPolicy.ALWAYS;private boolean registerWebSocketWriteHandlers = false;private int http2RstFloodMaxRstFramePerWindow = 400;private int http2RstFloodWindowDuration = 60;private TimeUnit http2RstFloodWindowDurationTimeUnit = TimeUnit.SECONDS;}

VertxHttpServer

/*** @author laokou*/
@Slf4j
final class VertxHttpServer extends AbstractVerticle {private final HttpServerProperties properties;private final Vertx vertx;private final Router router;private volatile Flux<HttpServer> httpServer;private boolean isClosed = false;VertxHttpServer(Vertx vertx, HttpServerProperties properties) {this.vertx = vertx;this.properties = properties;this.router = getRouter();}@Overridepublic synchronized void start() {httpServer = getHttpServerOptions().map(vertx::createHttpServer).doOnNext(server -> server.webSocketHandler(serverWebSocket -> {if (!RegexUtils.matches(WebsocketMessageEnum.UP_PROPERTY_REPORT.getPath(), serverWebSocket.path())) {serverWebSocket.close();return;}serverWebSocket.textMessageHandler(message -> log.info("【Vertx-Websocket-Server】 => 收到消息:{}", message)).closeHandler(v -> log.error("【Vertx-Websocket-Server】 => 断开连接")).exceptionHandler(err -> log.error("【Vertx-Websocket-Server】 => 错误信息:{}", err.getMessage(), err)).endHandler(v -> log.error("【Vertx-Websocket-Server】 => 结束"));}).requestHandler(router).listen().onComplete(completionHandler -> {if (isClosed) {return;}if (completionHandler.succeeded()) {log.info("【Vertx-Http-Server】 => HTTP服务启动成功,端口:{}", server.actualPort());}else {Throwable ex = completionHandler.cause();log.error("【Vertx-Http-Server】 => HTTP服务启动失败,错误信息:{}", ex.getMessage(), ex);}}));httpServer.subscribeOn(Schedulers.boundedElastic()).subscribe();}@Overridepublic synchronized void stop() {isClosed = true;httpServer.doOnNext(server -> server.close().onComplete(result -> {if (result.succeeded()) {log.info("【Vertx-Http-Server】 => HTTP服务停止成功,端口:{}", server.actualPort());}else {Throwable ex = result.cause();log.error("【Vertx-Http-Server】 => HTTP服务停止失败,错误信息:{}", ex.getMessage(), ex);}})).subscribeOn(Schedulers.boundedElastic()).subscribe();}public void deploy() {// 部署服务vertx.deployVerticle(this);// 停止服务Runtime.getRuntime().addShutdownHook(new Thread(this::stop));}private Router getRouter() {Router router = Router.router(vertx);router.route().handler(BodyHandler.create());router.post(HttpMessageEnum.UP_PROPERTY_REPORT.getRouter()).handler(ctx -> {String body = ctx.body().asString();Long deviceId = Long.valueOf(ctx.pathParam("deviceId"));Long productId = Long.valueOf(ctx.pathParam("productId"));log.info("productId:{},deviceId:{},body:{}", productId, deviceId, body);ctx.response().end();});return router;}private Flux<HttpServerOptions> getHttpServerOptions() {return Flux.fromIterable(properties.getPorts()).map(this::getHttpServerOption);}private HttpServerOptions getHttpServerOption(int port) {HttpServerOptions options = new HttpServerOptions();options.setHost(properties.getHost());options.setPort(port);options.setCompressionSupported(properties.isCompressionSupported());options.setDecompressionSupported(properties.isDecompressionSupported());options.setCompressionLevel(properties.getCompressionLevel());options.setMaxWebSocketFrameSize(properties.getMaxWebSocketFrameSize());options.setMaxWebSocketMessageSize(properties.getMaxWebSocketMessageSize());options.setHandle100ContinueAutomatically(properties.isHandle100ContinueAutomatically());options.setMaxChunkSize(properties.getMaxChunkSize());options.setMaxInitialLineLength(properties.getMaxInitialLineLength());options.setMaxHeaderSize(properties.getMaxHeaderSize());options.setMaxFormAttributeSize(properties.getMaxFormAttributeSize());options.setMaxFormFields(properties.getMaxFormFields());options.setMaxFormBufferedBytes(properties.getMaxFormBufferedBytes());options.setInitialSettings(properties.getInitialSettings());options.setAlpnVersions(properties.getAlpnVersions());options.setHttp2ClearTextEnabled(properties.isHttp2ClearTextEnabled());options.setHttp2ConnectionWindowSize(properties.getHttp2ConnectionWindowSize());options.setDecoderInitialBufferSize(properties.getDecoderInitialBufferSize());options.setPerFrameWebSocketCompressionSupported(properties.isPerFrameWebSocketCompressionSupported());options.setPerMessageWebSocketCompressionSupported(properties.isPerMessageWebSocketCompressionSupported());options.setWebSocketCompressionLevel(properties.getWebSocketCompressionLevel());options.setWebSocketAllowServerNoContext(properties.isWebSocketAllowServerNoContext());options.setWebSocketPreferredClientNoContext(properties.isWebSocketPreferredClientNoContext());options.setWebSocketClosingTimeout(properties.getWebSocketClosingTimeout());options.setTracingPolicy(properties.getTracingPolicy());options.setRegisterWebSocketWriteHandlers(properties.isRegisterWebSocketWriteHandlers());options.setHttp2RstFloodMaxRstFramePerWindow(properties.getHttp2RstFloodMaxRstFramePerWindow());options.setHttp2RstFloodWindowDuration(properties.getHttp2RstFloodWindowDuration());options.setHttp2RstFloodWindowDurationTimeUnit(properties.getHttp2RstFloodWindowDurationTimeUnit());return options;}}

这只是一个demo,实际情况,需要对http请求进行鉴权,推荐使用OAuth2

我是老寇,我们下次再见啦!

相关文章:

  • WordPress搜索引擎优化的最佳重定向插件:入门指南
  • 146. LRU 缓存
  • C++字符串处理:`std::string`和`std::string_view`的区别与使用
  • R 语言科研绘图第 49 期 --- 热力图-相关性
  • Geotools中关于坐标转换纬度超限问题
  • vue2、vue3项目打包生成txt文件-自动记录打包日期:git版本、当前分支、提交人姓名、提交日期、提交描述等信息 和 前端项目的版本号json文件
  • 物联网数据湖架构
  • 【C++】异常解析
  • YouTube视频字幕转成文章算重复内容吗?
  • 五分钟完成PolarDB替换postgresql
  • paddle ocr本地化部署进行文字识别
  • 基于Elasticsearch的搜索引擎简介
  • 为 Windows 和 Ubuntu 中设定代理服务器的详细方法
  • 区块链blog2_中心化与效率
  • 解决软件连接RabbitMQ突发System.IO.IOException: 无法从传输连接中读取数据: 远程主机强迫关闭了一个现有的连接异常
  • VR 互动实训的显著优势​
  • 一文了解VR拍摄制作
  • 江协科技EXTI外部中断hal库实现
  • HarmonyOS开发样式布局
  • Ubuntu ping网络没有问题,但是浏览器无法访问到网络
  • 大语言模型在线辩论说服力比人类辩手高出64%
  • 国家统计局:4月社会消费品零售总额同比增长5.1%
  • 多图|多款先进预警机亮相雷达展,专家:中国预警机已达世界先进水平
  • 上海这个咖啡文化节首次“走出去”,率本土品牌亮相英国伦敦
  • 媒体评欧阳娜娜遭民进党当局威胁:艺人表达国家认同是民族大义
  • 国家统计局向多省份反馈统计督察意见