物联网之使用Vertx实现HTTP/WebSocket最佳实践
小伙伴们,你们好呀,我是老寇,跟我一起学习使用Vertx实现HTTP-Server和WebSocket-Server
实现Http/WebSocket【响应式】
Vertx-Web地址
实现过程
查看源码
代码比较简单,懒得讲解啦
代码比较简单,懒得讲解啦
代码比较简单,懒得讲解啦
http/websocket【响应式】
<dependency><groupId>io.vertx</groupId><artifactId>vertx-web</artifactId><version>5.0.0</version>
</dependency>
HttpServerProperties
/*** @author laokou*/
@Data
@Component
@ConfigurationProperties(prefix = "spring.http-server")
public class HttpServerProperties {private boolean auth = true;private String host = "0.0.0.0";private Set<Integer> ports = new HashSet<>(0);private boolean compressionSupported = false;private int compressionLevel = 6;private int maxWebSocketFrameSize = 65536;private int maxWebSocketMessageSize = 65536 * 4;private boolean handle100ContinueAutomatically = false;private int maxChunkSize = 8192;private int maxInitialLineLength = 4096;private int maxHeaderSize = 8192;private int maxFormAttributeSize = 8192;private int maxFormFields = 512;private int maxFormBufferedBytes = 2048;private Http2Settings initialSettings = new Http2Settings().setMaxConcurrentStreams(DEFAULT_INITIAL_SETTINGS_MAX_CONCURRENT_STREAMS);private List<HttpVersion> alpnVersions = new ArrayList<>(DEFAULT_ALPN_VERSIONS);private boolean http2ClearTextEnabled = true;private int http2ConnectionWindowSize = -1;private boolean decompressionSupported = false;private boolean acceptUnmaskedFrames = false;private int decoderInitialBufferSize = 256;private boolean perFrameWebSocketCompressionSupported = true;private boolean perMessageWebSocketCompressionSupported = true;private int webSocketCompressionLevel = 6;private boolean webSocketAllowServerNoContext = false;private boolean webSocketPreferredClientNoContext = false;private int webSocketClosingTimeout = 30;private TracingPolicy tracingPolicy = TracingPolicy.ALWAYS;private boolean registerWebSocketWriteHandlers = false;private int http2RstFloodMaxRstFramePerWindow = 400;private int http2RstFloodWindowDuration = 60;private TimeUnit http2RstFloodWindowDurationTimeUnit = TimeUnit.SECONDS;}
VertxHttpServer
/*** @author laokou*/
@Slf4j
final class VertxHttpServer extends AbstractVerticle {private final HttpServerProperties properties;private final Vertx vertx;private final Router router;private volatile Flux<HttpServer> httpServer;private boolean isClosed = false;VertxHttpServer(Vertx vertx, HttpServerProperties properties) {this.vertx = vertx;this.properties = properties;this.router = getRouter();}@Overridepublic synchronized void start() {httpServer = getHttpServerOptions().map(vertx::createHttpServer).doOnNext(server -> server.webSocketHandler(serverWebSocket -> {if (!RegexUtils.matches(WebsocketMessageEnum.UP_PROPERTY_REPORT.getPath(), serverWebSocket.path())) {serverWebSocket.close();return;}serverWebSocket.textMessageHandler(message -> log.info("【Vertx-Websocket-Server】 => 收到消息:{}", message)).closeHandler(v -> log.error("【Vertx-Websocket-Server】 => 断开连接")).exceptionHandler(err -> log.error("【Vertx-Websocket-Server】 => 错误信息:{}", err.getMessage(), err)).endHandler(v -> log.error("【Vertx-Websocket-Server】 => 结束"));}).requestHandler(router).listen().onComplete(completionHandler -> {if (isClosed) {return;}if (completionHandler.succeeded()) {log.info("【Vertx-Http-Server】 => HTTP服务启动成功,端口:{}", server.actualPort());}else {Throwable ex = completionHandler.cause();log.error("【Vertx-Http-Server】 => HTTP服务启动失败,错误信息:{}", ex.getMessage(), ex);}}));httpServer.subscribeOn(Schedulers.boundedElastic()).subscribe();}@Overridepublic synchronized void stop() {isClosed = true;httpServer.doOnNext(server -> server.close().onComplete(result -> {if (result.succeeded()) {log.info("【Vertx-Http-Server】 => HTTP服务停止成功,端口:{}", server.actualPort());}else {Throwable ex = result.cause();log.error("【Vertx-Http-Server】 => HTTP服务停止失败,错误信息:{}", ex.getMessage(), ex);}})).subscribeOn(Schedulers.boundedElastic()).subscribe();}public void deploy() {// 部署服务vertx.deployVerticle(this);// 停止服务Runtime.getRuntime().addShutdownHook(new Thread(this::stop));}private Router getRouter() {Router router = Router.router(vertx);router.route().handler(BodyHandler.create());router.post(HttpMessageEnum.UP_PROPERTY_REPORT.getRouter()).handler(ctx -> {String body = ctx.body().asString();Long deviceId = Long.valueOf(ctx.pathParam("deviceId"));Long productId = Long.valueOf(ctx.pathParam("productId"));log.info("productId:{},deviceId:{},body:{}", productId, deviceId, body);ctx.response().end();});return router;}private Flux<HttpServerOptions> getHttpServerOptions() {return Flux.fromIterable(properties.getPorts()).map(this::getHttpServerOption);}private HttpServerOptions getHttpServerOption(int port) {HttpServerOptions options = new HttpServerOptions();options.setHost(properties.getHost());options.setPort(port);options.setCompressionSupported(properties.isCompressionSupported());options.setDecompressionSupported(properties.isDecompressionSupported());options.setCompressionLevel(properties.getCompressionLevel());options.setMaxWebSocketFrameSize(properties.getMaxWebSocketFrameSize());options.setMaxWebSocketMessageSize(properties.getMaxWebSocketMessageSize());options.setHandle100ContinueAutomatically(properties.isHandle100ContinueAutomatically());options.setMaxChunkSize(properties.getMaxChunkSize());options.setMaxInitialLineLength(properties.getMaxInitialLineLength());options.setMaxHeaderSize(properties.getMaxHeaderSize());options.setMaxFormAttributeSize(properties.getMaxFormAttributeSize());options.setMaxFormFields(properties.getMaxFormFields());options.setMaxFormBufferedBytes(properties.getMaxFormBufferedBytes());options.setInitialSettings(properties.getInitialSettings());options.setAlpnVersions(properties.getAlpnVersions());options.setHttp2ClearTextEnabled(properties.isHttp2ClearTextEnabled());options.setHttp2ConnectionWindowSize(properties.getHttp2ConnectionWindowSize());options.setDecoderInitialBufferSize(properties.getDecoderInitialBufferSize());options.setPerFrameWebSocketCompressionSupported(properties.isPerFrameWebSocketCompressionSupported());options.setPerMessageWebSocketCompressionSupported(properties.isPerMessageWebSocketCompressionSupported());options.setWebSocketCompressionLevel(properties.getWebSocketCompressionLevel());options.setWebSocketAllowServerNoContext(properties.isWebSocketAllowServerNoContext());options.setWebSocketPreferredClientNoContext(properties.isWebSocketPreferredClientNoContext());options.setWebSocketClosingTimeout(properties.getWebSocketClosingTimeout());options.setTracingPolicy(properties.getTracingPolicy());options.setRegisterWebSocketWriteHandlers(properties.isRegisterWebSocketWriteHandlers());options.setHttp2RstFloodMaxRstFramePerWindow(properties.getHttp2RstFloodMaxRstFramePerWindow());options.setHttp2RstFloodWindowDuration(properties.getHttp2RstFloodWindowDuration());options.setHttp2RstFloodWindowDurationTimeUnit(properties.getHttp2RstFloodWindowDurationTimeUnit());return options;}}
这只是一个demo,实际情况,需要对http请求进行鉴权,推荐使用OAuth2
我是老寇,我们下次再见啦!