当前位置: 首页 > news >正文

HTTP Server

HTTP Server

具体内容以官网文档为准

Reactor Netty 提供了易于使用和配置的HttpServer类。它隐藏了创建HTTP服务器所需的大部分Netty功能,并添加了Reactive Streams背压。

1. Starting and Stopping

要启动HTTP服务器,您必须创建并配置HttpServer实例。默认情况下,host 配置为任何本地地址,当调用bind(绑定)操作时,系统会拾取一个临时端口。以下示例显示了如何创建HttpServer实例:

Application.java

import reactor.netty.DisposableServer;
import reactor.netty.http.server.HttpServer;public class Application {public static void main(String[] args) {DisposableServer server =HttpServer.create()   (1).bindNow(); (2)server.onDispose().block();}
}

(1) 创建一个准备配置的HttpServer实例。
(2) 以阻塞方式启动服务器并等待其完成初始化。

返回的DisposableServer提供了一个简单的服务器API,包括disposeNow(),它以阻塞的方式关闭服务器。

1.1. Host and Port

要在特定主机和端口上提供服务,您可以将以下配置应用于HTTP服务器:

Application.java

import reactor.netty.DisposableServer;
import reactor.netty.http.server.HttpServer;public class Application {public static void main(String[] args) {DisposableServer server =HttpServer.create().host("localhost") (1).port(8080)        (2).bindNow();server.onDispose().block();}
}

(1) 配置HTTP服务器主机
(2) 配置HTTP服务器端口

要在多个地址上提供服务,在配置完HttpServer后,您可以多次绑定它以获得单独的DisposableServer。所有创建的服务器都将共享资源,如LoopResources,因为它们在后台使用相同的配置实例。

MultiAddressApplication.java

import reactor.core.publisher.Mono;
import reactor.netty.DisposableServer;
import reactor.netty.http.server.HttpServer;public class MultiAddressApplication {public static void main(String[] args) {HttpServer httpServer = HttpServer.create();DisposableServer server1 = httpServer.host("localhost") (1).port(8080)        (2).bindNow();DisposableServer server2 = httpServer.host("0.0.0.0") (3).port(8081)      (4).bindNow();Mono.when(server1.onDispose(), server2.onDispose()).block();}
}

(1) 配置第一个HTTP服务器主机
(2) 配置第一个HTTP服务器端口
(3) 配置第二个HTTP服务器主机
(4) 配置第二个HTTP服务器端口

2. Eager Initialization

默认情况下,HttpServer资源的初始化按需进行。这意味着bind operation(绑定操作)会占用初始化和加载所需的额外时间:

  • 事件循环组
  • 本机传输库(使用本机传输时)
  • 用于安全性的本机库(在OpenSsl的情况下)

Application.java

import reactor.netty.DisposableServer;
import reactor.netty.http.server.HttpServer;public class Application {public static void main(String[] args) {HttpServer httpServer =HttpServer.create().handle((request, response) -> request.receive().then());httpServer.warmup() (1).block();DisposableServer server = httpServer.bindNow();server.onDispose().block();}
}

(1) 初始化并加载事件循环组、本机传输库和本机安全库

3. Routing HTTP

为HTTP服务器定义路由需要配置提供的HttpServerRoutes构建器。以下示例显示了如何执行此操作:

Application.java

import reactor.core.publisher.Mono;
import reactor.netty.DisposableServer;
import reactor.netty.http.server.HttpServer;public class Application {public static void main(String[] args) {DisposableServer server =HttpServer.create().route(routes ->routes.get("/hello",        (1)(request, response) -> response.sendString(Mono.just("Hello World!"))).post("/echo",        (2)(request, response) -> response.send(request.receive().retain())).get("/path/{param}", (3)(request, response) -> response.sendString(Mono.just(request.param("param")))).ws("/ws",            (4)(wsInbound, wsOutbound) -> wsOutbound.send(wsInbound.receive().retain()))).bindNow();server.onDispose().block();}
}

(1) 向/hello发送GET请求并返回hello World!
(2) 向/echo发送POST请求,并将收到的请求body作为响应返回。
(3) 向/path/{param}发送GET请求,并返回path参数的值。
(4) 将websocket提供给/ws,并将接收到的传入数据作为传出数据返回。

NOTE

服务器路由是唯一的,并且只调用声明顺序中的第一个匹配。

3.1. SSE

以下代码显示了如何配置HTTP服务器以提供 Server-Sent Events:

Application.java

import com.fasterxml.jackson.databind.ObjectMapper;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.netty.DisposableServer;
import reactor.netty.http.server.HttpServer;
import reactor.netty.http.server.HttpServerRequest;
import reactor.netty.http.server.HttpServerResponse;import java.io.ByteArrayOutputStream;
import java.nio.charset.Charset;
import java.time.Duration;
import java.util.function.BiFunction;public class Application {public static void main(String[] args) {DisposableServer server =HttpServer.create().route(routes -> routes.get("/sse", serveSse())).bindNow();server.onDispose().block();}/*** Prepares SSE response. (准备 SSE 响应)* The "Content-Type" is "text/event-stream".* The flushing strategy is "flush after every element" emitted by the provided Publisher.(刷新策略是由提供的发布者发出的“每个元素后刷新”。)*/private static BiFunction<HttpServerRequest, HttpServerResponse, Publisher<Void>> serveSse() {Flux<Long> flux = Flux.interval(Duration.ofSeconds(10));return (request, response) ->response.sse().send(flux.map(Application::toByteBuf), b -> true);}/*** Transforms the Object to ByteBuf following the expected SSE format.(按照预期的SSE格式将对象转换为ByteBuf。)*/private static ByteBuf toByteBuf(Object any) {ByteArrayOutputStream out = new ByteArrayOutputStream();try {out.write("data: ".getBytes(Charset.defaultCharset()));MAPPER.writeValue(out, any);out.write("\n\n".getBytes(Charset.defaultCharset()));}catch (Exception e) {throw new RuntimeException(e);}return ByteBufAllocator.DEFAULT.buffer().writeBytes(out.toByteArray());}private static final ObjectMapper MAPPER = new ObjectMapper();
}

3.2. Static Resources

以下代码显示了如何配置HTTP服务器以提供静态资源:

Application.java

import reactor.netty.DisposableServer;
import reactor.netty.http.server.HttpServer;import java.net.URISyntaxException;
import java.nio.file.Path;
import java.nio.file.Paths;public class Application {public static void main(String[] args) throws URISyntaxException {Path file = Paths.get(Application.class.getResource("/logback.xml").toURI());DisposableServer server =HttpServer.create().route(routes -> routes.file("/index.html", file)).bindNow();server.onDispose().block();}
}

4. Writing Data

要将数据发送到已连接的客户端,必须使用handle(…)或route(…)附加I/O处理程序。I/O处理程序可以访问HttpServerResponse,从而能够写入数据。下面的例子使用handle(…)方法:

Application.java

import reactor.core.publisher.Mono;
import reactor.netty.DisposableServer;
import reactor.netty.http.server.HttpServer;public class Application {public static void main(String[] args) {DisposableServer server =HttpServer.create().handle((request, response) -> response.sendString(Mono.just("hello"))) (1).bindNow();server.onDispose().block();}
}

(1) 向连接的客户端发送hello字符串

4.1. Adding Headers and Other Metadata

当您向连接的客户端发送数据时,您可能需要发送额外的headers、cookie、状态码和其他元数据。您可以通过使用HttpServerResponse来提供这些额外的元数据。下面的例子展示了如何这样做:

Application.java

import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpResponseStatus;
import reactor.core.publisher.Mono;
import reactor.netty.DisposableServer;
import reactor.netty.http.server.HttpServer;public class Application {public static void main(String[] args) {DisposableServer server =HttpServer.create().route(routes ->routes.get("/hello",(request, response) ->response.status(HttpResponseStatus.OK).header(HttpHeaderNames.CONTENT_LENGTH, "12").sendString(Mono.just("Hello World!")))).bindNow();server.onDispose().block();}
}

4.2. Compression

根据请求头Accept-Encoding,可以配置HTTP服务器发送compressed(压缩)响应。

Reactor Netty提供了三种不同的策略来压缩传出数据:

  • compress(boolean):根据提供的布尔值,压缩是启用(true)还是禁用(false)。
  • compress(int):一旦响应大小超过给定值(以字节为单位),就执行压缩。
  • compress(BiPredicate<HttpServerRequest, HttpServerResponse>):如果predicate(谓词)返回true,则执行压缩。
  • compressOptions(HttpCompressionOption…compressionOptions):指定GZip, Deflate和ZSTD的压缩选项。

NOTE

GZip Compression Options1. compression level : 只允许取值范围为0到9。(默认值:6)2. window bits : 只允许取值范围为0到9。(默认值:15)3. memory level : 仅允许范围1至9。(默认值:8)Deflate Compression Options1. compression level : 只允许取值范围为0到9。(默认值:6)2. window bits : 只允许取值范围为0到9。(默认值:15)3. memory level : 仅允许范围1至9。(默认值:8)ZSTD Compression Options1.compression level : 只允许范围为-131072到9。(默认值是3)2.block size : 只允许为正数。(默认值:65536,即64KB)3.max encode size : 只允许正数。(默认值:33554432,即32MB)

以下示例使用compress方法(设置为true)启用压缩:

Application.java

import reactor.netty.DisposableServer;
import reactor.netty.http.server.HttpServer;import java.net.URISyntaxException;
import java.nio.file.Path;
import java.nio.file.Paths;public class Application {public static void main(String[] args) throws URISyntaxException {Path file = Paths.get(Application.class.getResource("/logback.xml").toURI());DisposableServer server =HttpServer.create().compress(true).route(routes -> routes.file("/index.html", file)).bindNow();server.onDispose().block();}
}

5. Consuming Data

要从连接的客户端接收数据,必须使用handle(…)或route(…)附加I/O处理程序。I/O处理程序可以访问HttpServerRequest,以便能够读取数据。

以下示例使用handle(…)方法:

Application.java

import reactor.netty.DisposableServer;
import reactor.netty.http.server.HttpServer;public class Application {public static void main(String[] args) {DisposableServer server =HttpServer.create().handle((request, response) -> request.receive().then()) (1).bindNow();server.onDispose().block();}
}

(1) 从连接的客户端接收数据

5.1. Reading Headers, URI Params, and other Metadata

从连接的客户机接收数据时,可能需要检查请求headers、参数和其他元数据。您可以通过使用HttpServerRequest获得这些额外的元数据。下面的例子展示了如何这样做:

Application.java

import reactor.core.publisher.Mono;
import reactor.netty.DisposableServer;
import reactor.netty.http.server.HttpServer;public class Application {public static void main(String[] args) {DisposableServer server =HttpServer.create().route(routes ->routes.get("/{param}",(request, response) -> {if (request.requestHeaders().contains("Some-Header")) {return response.sendString(Mono.just(request.param("param")));}return response.sendNotFound();})).bindNow();server.onDispose().block();}
}

5.2. Reading Post Form or Multipart Data

当您从连接的客户端接收数据时,您可能希望访问POST fromapplication/x-www-form-urlencoded)或multipartmultipart/form-data)数据。您可以通过使用HttpServerRequest获得这些数据。

Application.java

import reactor.core.publisher.Mono;
import reactor.netty.DisposableServer;
import reactor.netty.http.server.HttpServer;public class Application {public static void main(String[] args) {DisposableServer server =HttpServer.create().route(routes ->routes.post("/multipart", (request, response) -> response.sendString(request.receiveForm() (1).flatMap(data -> Mono.just('[' + data.getName() + ']'))))).bindNow();server.onDispose().block();}
}

(1) 接受 POST multipart/form-data 数据

当你需要更改默认设置时,你可以配置HttpServer,也可以为每个请求提供一个配置:

Application.java

import reactor.core.publisher.Mono;
import reactor.netty.DisposableServer;
import reactor.netty.http.server.HttpServer;public class Application {public static void main(String[] args) {DisposableServer server =HttpServer.create().httpFormDecoder(builder -> builder.maxInMemorySize(0))                  (1).route(routes ->routes.post("/multipart", (request, response) -> response.sendString(request.receiveForm(builder -> builder.maxInMemorySize(256)) (2).flatMap(data -> Mono.just('[' + data.getName() + ']'))))).bindNow();server.onDispose().block();}
}

(1) HttpServer上的配置,指定数据仅存储在磁盘上。
(2) 每个请求的配置,指定如果数据大小超过指定的大小,则数据存储在磁盘上。

下面的清单显示了可用的配置:

Configuration nameDescription
baseDirectory配置磁盘上存储数据的目录。默认为生成的临时目录。
charset为数据配置字符集。默认为标准字符集#UTF_8。
maxInMemorySize配置每个数据的最大内存大小,如果大小大于maxInMemorySize,则将数据写入磁盘,否则将其写入内存。如果设置为-1,则整个内容将存储在内存中。如果设置为0,则整个内容将存储在磁盘上。默认为16kb。
maxSize配置每个数据的最大大小。当达到限制时,会引发异常。如果设置为-1,则表示没有限制。默认值为-1-无限制。
scheduler将 scheduler 配置为用于在解码阶段卸载磁盘操作。默认为Schedulers#boundedElastic()
streaming当设置为true时,数据将直接从解析后的输入缓冲流中传输,这意味着数据既不存储在内存中,也不存储在文件中。当为false时,部件由内存和(或)文件存储支持。默认为false。注意,启用流时,提供的数据可能不处于完整状态,即必须检查HttpData#isCompleted()。还要注意,启用此属性可以有效地忽略maxInMemorySize、baseDirectory和scheduler。
5.2.1. Obtaining the Remote (Client) Address

除了可以从请求中获得的元数据外,还可以接收host (server)地址、remote (client)地址和scheme。根据所选的工厂方法,您可以直接从 channel 中检索信息,也可以使用ForwardedX-Forwarded-* HTTP请求标头检索信息。以下示例显示了如何执行此操作:

Application.java

import reactor.core.publisher.Mono;
import reactor.netty.DisposableServer;
import reactor.netty.http.server.HttpServer;public class Application {public static void main(String[] args) {DisposableServer server =HttpServer.create().forwarded(true) (1).route(routes ->routes.get("/clientip",(request, response) ->response.sendString(Mono.just(request.remoteAddress() (2).getHostString())))).bindNow();server.onDispose().block();}
}

(1) 如果可能的话,指定从ForwardedX-Forwarded-* HTTP请求headers中获取有关连接的信息。
(2) 返回远程(客户端)对等端的地址。

还可以自定义ForwardedX-Forwarded-* header 处理程序的行为。下面的例子展示了如何这样做:

CustomForwardedHeaderHandlerApplication.java

import java.net.InetSocketAddress;import reactor.core.publisher.Mono;
import reactor.netty.DisposableServer;
import reactor.netty.http.server.HttpServer;
import reactor.netty.transport.AddressUtils;public class CustomForwardedHeaderHandlerApplication {public static void main(String[] args) {DisposableServer server =HttpServer.create().forwarded((connectionInfo, request) -> {  (1)String hostHeader = request.headers().get("X-Forwarded-Host");if (hostHeader != null) {String[] hosts = hostHeader.split(",", 2);InetSocketAddress hostAddress = AddressUtils.createUnresolved(hosts[hosts.length - 1].trim(),connectionInfo.getHostAddress().getPort());connectionInfo = connectionInfo.withHostAddress(hostAddress);}return connectionInfo;}).route(routes ->routes.get("/clientip",(request, response) ->response.sendString(Mono.just(request.remoteAddress() (2).getHostString())))).bindNow();server.onDispose().block();}
}

(1) 添加自定义 header 处理程序。
(2) 返回远程(客户端)对等体的地址。

5.3. HTTP Request Decoder

默认情况下,Netty为传入请求配置了一些限制,例如:

  • 初始行的最大长度。
  • 所有headers的最大长度。
  • 内容或每个块的最大长度。

有关更多信息,请参见HttpRequestDecoder和HttpServerUpgradeHandler

默认情况下,HTTP 服务器配置了以下设置:

HttpDecoderSpec.java

	public static final int DEFAULT_MAX_INITIAL_LINE_LENGTH             = 4096;public static final int DEFAULT_MAX_HEADER_SIZE                     = 8192;/*** Default max chunk size.** @deprecated as of 1.1.0. This will be removed in 2.0.0 as Netty 5 does not support this configuration.*/@Deprecatedpublic static final int DEFAULT_MAX_CHUNK_SIZE                      = 8192;public static final boolean DEFAULT_VALIDATE_HEADERS                = true;public static final int DEFAULT_INITIAL_BUFFER_SIZE                 = 128;public static final boolean DEFAULT_ALLOW_DUPLICATE_CONTENT_LENGTHS = false;public static final boolean DEFAULT_ALLOW_PARTIAL_CHUNKS            = true;

HttpRequestDecoderSpec.java

	/*** The maximum length of the content of the HTTP/2.0 clear-text upgrade request.* By default, the server will reject an upgrade request with non-empty content,* because the upgrade request is most likely a GET request.*/public static final int DEFAULT_H2C_MAX_CONTENT_LENGTH = 0;

当需要更改这些默认设置时,可以通过如下方式配置HTTP服务器:

Application.java

import reactor.core.publisher.Mono;
import reactor.netty.DisposableServer;
import reactor.netty.http.server.HttpServer;public class Application {public static void main(String[] args) {DisposableServer server =HttpServer.create().httpRequestDecoder(spec -> spec.maxHeaderSize(16384)) (1).handle((request, response) -> response.sendString(Mono.just("hello"))).bindNow();server.onDispose().block();}
}

(1) 所有 headers 的最大长度为16384。当超出此值时,将引发TooLongFrameException。

6. Lifecycle Callbacks

提供了以下生命周期回调函数来扩展HttpServer

CallbackDescription
doOnBind当服务器channel即将绑定时调用。
doOnBound在绑定服务器channel时调用。
doOnChannelInit在初始化channel时调用。
doOnConnection在连接远程客户端时调用
doOnUnbound在服务器channel未绑定时调用。

以下示例使用 doOnConnectiondoOnChannelInit 回调:

Application.java

import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.timeout.ReadTimeoutHandler;
import reactor.netty.DisposableServer;
import reactor.netty.http.server.HttpServer;
import java.util.concurrent.TimeUnit;public class Application {public static void main(String[] args) {DisposableServer server =HttpServer.create().doOnConnection(conn ->conn.addHandlerFirst(new ReadTimeoutHandler(10, TimeUnit.SECONDS))) (1).doOnChannelInit((observer, channel, remoteAddress) ->channel.pipeline().addFirst(new LoggingHandler("reactor.netty.examples")))    (2).bindNow();server.onDispose().block();}
}

(1) 当连接远程客户端时,Netty管道通过ReadTimeoutHandler进行扩展。
(2) 初始化通道时,Netty管道使用LoggingHandler进行扩展。

7. TCP-level Configuration

当您需要更改TCP级别的配置时,可以使用以下代码段扩展默认的TCP服务器配置:

Application.java

import io.netty.channel.ChannelOption;
import reactor.netty.DisposableServer;
import reactor.netty.http.server.HttpServer;public class Application {public static void main(String[] args) {DisposableServer server =HttpServer.create().option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000).bindNow();server.onDispose().block();}
}

有关TCP级别配置的详细信息,请参阅TCP服务器。

7.1. Wire Logger

当需要检查对等体之间的流量时,Reactor Netty提供了wire logging记录。缺省情况下,禁用wire logging记录。要启用它,你必须将日志记录器reactor.netty.http.server.HttpServer级别设置为DEBUG,并应用以下配置:

Application.java

import reactor.netty.DisposableServer;
import reactor.netty.http.server.HttpServer;public class Application {public static void main(String[] args) {DisposableServer server =HttpServer.create().wiretap(true) (1).bindNow();server.onDispose().block();}
}

(1) 启用 wire logging

7.1.1. Wire Logger formatters

Reactor Netty支持3种不同的格式化程序:

  • AdvancedByteBufFormat#HEX_DUMP - /默认值

AdvancedByteBufFormat.java

	/*** When wire logging is enabled with this format, both events and content will be logged.* The content will be in hex format.* <p>Examples:</p>* <pre>* {@code* reactor.netty.http.HttpTests - [d5230a14, L:/0:0:0:0:0:0:0:1:60267 - R:/0:0:0:0:0:0:0:1:60269] REGISTERED* reactor.netty.http.HttpTests - [d5230a14, L:/0:0:0:0:0:0:0:1:60267 - R:/0:0:0:0:0:0:0:1:60269] ACTIVE* reactor.netty.http.HttpTests - [d5230a14, L:/0:0:0:0:0:0:0:1:60267 - R:/0:0:0:0:0:0:0:1:60269] READ: 145B*          +-------------------------------------------------+*          |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |* +--------+-------------------------------------------------+----------------+* |00000000| 50 4f 53 54 20 2f 74 65 73 74 2f 57 6f 72 6c 64 |POST /test/World|* |00000010| 20 48 54 54 50 2f 31 2e 31 0d 0a 43 6f 6e 74 65 | HTTP/1.1..Conte|* |00000020| 6e 74 2d 54 79 70 65 3a 20 74 65 78 74 2f 70 6c |nt-Type: text/pl|* |00000030| 61 69 6e 0d 0a 75 73 65 72 2d 61 67 65 6e 74 3a |ain..user-agent:|* |00000040| 20 52 65 61 63 74 6f 72 4e 65 74 74 79 2f 64 65 | ReactorNetty/de|* ...* reactor.netty.http.HttpTests - [d5230a14, L:/0:0:0:0:0:0:0:1:60267 - R:/0:0:0:0:0:0:0:1:60269] WRITE: 38B*          +-------------------------------------------------+*          |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |* +--------+-------------------------------------------------+----------------+* |00000000| 48 54 54 50 2f 31 2e 31 20 32 30 30 20 4f 4b 0d |HTTP/1.1 200 OK.|* |00000010| 0a 63 6f 6e 74 65 6e 74 2d 6c 65 6e 67 74 68 3a |.content-length:|* |00000020| 20 30 0d 0a 0d 0a                               | 0....          |* +--------+-------------------------------------------------+----------------+* }* </pre>*/
  • AdvancedByteBufFormat#SIMPLE

AdvancedByteBufFormat.java

	/*** When wire logging is enabled with this format, only the events will be logged.* <p>Examples:</p>* <pre>* {@code* reactor.netty.http.HttpTests - [230d3686, L:/0:0:0:0:0:0:0:1:60241 - R:/0:0:0:0:0:0:0:1:60245] REGISTERED* reactor.netty.http.HttpTests - [230d3686, L:/0:0:0:0:0:0:0:1:60241 - R:/0:0:0:0:0:0:0:1:60245] ACTIVE* reactor.netty.http.HttpTests - [230d3686, L:/0:0:0:0:0:0:0:1:60241 - R:/0:0:0:0:0:0:0:1:60245] READ: 145B* reactor.netty.http.HttpTests - [230d3686, L:/0:0:0:0:0:0:0:1:60241 - R:/0:0:0:0:0:0:0:1:60245] WRITE: 38B* }* </pre>*/
  • AdvancedByteBufFormat#TEXTUAL

AdvancedByteBufFormat.java

	/*** When wire logging is enabled with this format, both events and content will be logged.* The content will be in plain text format.* <p>Examples:</p>* <pre>* {@code* reactor.netty.http.HttpTests - [02c3db6c, L:/0:0:0:0:0:0:0:1:60317 - R:/0:0:0:0:0:0:0:1:60319] REGISTERED* reactor.netty.http.HttpTests - [02c3db6c, L:/0:0:0:0:0:0:0:1:60317 - R:/0:0:0:0:0:0:0:1:60319] ACTIVE* reactor.netty.http.HttpTests - [02c3db6c, L:/0:0:0:0:0:0:0:1:60317 - R:/0:0:0:0:0:0:0:1:60319] READ: 145B POST /test/World HTTP/1.1* Content-Type: text/plain* user-agent: ReactorNetty/dev* ...* reactor.netty.http.HttpTests - [02c3db6c, L:/0:0:0:0:0:0:0:1:60317 - R:/0:0:0:0:0:0:0:1:60319] WRITE: 38B HTTP/1.1 200 OK* content-length: 0* }* </pre>*/

当你需要改变默认的格式化器时,你可以这样配置:

Application.java

import io.netty.handler.logging.LogLevel;
import reactor.netty.DisposableServer;
import reactor.netty.http.server.HttpServer;
import reactor.netty.transport.logging.AdvancedByteBufFormat;public class Application {public static void main(String[] args) {DisposableServer server =HttpServer.create().wiretap("logger-name", LogLevel.DEBUG, AdvancedByteBufFormat.TEXTUAL) (1).bindNow();server.onDispose().block();}
}

(1) 启用有线日志记录,AdvancedByteBufFormat#TEXTUAL用于打印内容。

7.2. Event Loop Group

默认情况下,Reactor Netty 使用“Event Loop Group”,其中工作线程的数量等于初始化时运行时可用的处理器数量(但最小值为4)。这个“Event Loop Group”在一个JVM中的所有服务器和客户端之间共享。当您需要不同的配置时,可以使用LoopResources#create方法之一。

以下列表显示了Event Loop Group的默认配置:

ReactorNetty.java

	/*** Default worker thread count, fallback to available processor* (but with a minimum value of 4).*/public static final String IO_WORKER_COUNT = "reactor.netty.ioWorkerCount";/*** Default selector thread count, fallback to -1 (no selector thread)* <p><strong>Note:</strong> In most use cases using a worker thread also as a selector thread works well.* A possible use case for specifying a separate selector thread might be when the worker threads are too busy* and connections cannot be accepted fast enough.* <p><strong>Note:</strong> Although more than 1 can be configured as a selector thread count, in reality* only 1 thread will be used as a selector thread.*/public static final String IO_SELECT_COUNT = "reactor.netty.ioSelectCount";/*** Default worker thread count for UDP, fallback to available processor* (but with a minimum value of 4).*/public static final String UDP_IO_THREAD_COUNT = "reactor.netty.udp.ioThreadCount";/*** Default quiet period that guarantees that the disposal of the underlying LoopResources* will not happen, fallback to 2 seconds.*/public static final String SHUTDOWN_QUIET_PERIOD = "reactor.netty.ioShutdownQuietPeriod";/*** Default maximum amount of time to wait until the disposal of the underlying LoopResources* regardless if a task was submitted during the quiet period, fallback to 15 seconds.*/public static final String SHUTDOWN_TIMEOUT = "reactor.netty.ioShutdownTimeout";/*** Default value whether the native transport (epoll, kqueue) will be preferred,* fallback it will be preferred when available.*/public static final String NATIVE = "reactor.netty.native";

如果需要更改这些设置,可以应用以下配置:

Application.java

import reactor.netty.DisposableServer;
import reactor.netty.http.server.HttpServer;
import reactor.netty.resources.LoopResources;public class Application {public static void main(String[] args) {LoopResources loop = LoopResources.create("event-loop", 1, 4, true);DisposableServer server =HttpServer.create().runOn(loop).bindNow();server.onDispose().block();}
}
7.2.1. Disposing Event Loop Group
  • 如果使用Reactor Netty提供的默认Event Loop Group,请调用HttpResources#displaceLoopsAndConnections/#displaceloopsAndConnectionsLater方法。

NOTE

处理HttpResources意味着每个正在使用它的服务器/客户端将无法再使用它!
  • 如果您使用自定义的LoopResources,请调用LoopResours#dispose/#disposeLater方法。

NOTE

Disposing 自定义LoopResources意味着配置为使用它的每个服务器/客户端都不能再使用它了!

8. SSL and TLS

当您需要SSL或TLS时,可以应用下一个示例中显示的配置。默认情况下,如果OpenSSL可用,则使用SslProvider.OPENSSL提供程序用作提供程序。否则使用SslProvider.JDK。您可以使用SslContextBuilder或通过设置 -Dio.netty.handler.ssl.noOpenSsl=true 来切换提供程序。

以下示例使用SslContextBuilder:

Application.java

import reactor.netty.DisposableServer;
import reactor.netty.http.Http11SslContextSpec;
import reactor.netty.http.server.HttpServer;
import java.io.File;public class Application {public static void main(String[] args) {File cert = new File("certificate.crt");File key = new File("private.key");Http11SslContextSpec http11SslContextSpec = Http11SslContextSpec.forServer(cert, key);DisposableServer server =HttpServer.create().secure(spec -> spec.sslContext(http11SslContextSpec)).bindNow();server.onDispose().block();}
}

8.1. Server Name Indication

您可以使用多个 SslContext 配置 HTTP 服务器,并将其映射到特定的域。在配置 SNI 映射时,可以使用确切的域名或包含通配符的域名。

以下示例使用了包含通配符的域名:

Application.java

import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import reactor.netty.DisposableServer;
import reactor.netty.http.server.HttpServer;import java.io.File;public class Application {public static void main(String[] args) throws Exception {File defaultCert = new File("default_certificate.crt");File defaultKey = new File("default_private.key");File testDomainCert = new File("default_certificate.crt");File testDomainKey = new File("default_private.key");SslContext defaultSslContext = SslContextBuilder.forServer(defaultCert, defaultKey).build();SslContext testDomainSslContext = SslContextBuilder.forServer(testDomainCert, testDomainKey).build();DisposableServer server =HttpServer.create().secure(spec -> spec.sslContext(defaultSslContext).addSniMapping("*.test.com",testDomainSpec -> testDomainSpec.sslContext(testDomainSslContext))).bindNow();server.onDispose().block();}
}

9. HTTP Log

您可以通过编程或配置启用HTTP access或error日志。默认情况下,它是禁用的。

9.1. Access Log

您可以使用-Dreactor.netty.http.server.accessLogEnabled=true通过配置启用http访问日志。

您可以使用以下配置(用于Logback或类似的日志框架)来创建单独的HTTP访问日志文件:

<appender name="accessLog" class="ch.qos.logback.core.FileAppender"><file>access_log.log</file><encoder><pattern>%msg%n</pattern></encoder>
</appender>
<appender name="async" class="ch.qos.logback.classic.AsyncAppender"><appender-ref ref="accessLog" />
</appender><logger name="reactor.netty.http.server.AccessLog" level="INFO" additivity="false"><appender-ref ref="async"/>
</logger>

以下示例以编程方式启用它:

Application.java

import reactor.netty.DisposableServer;
import reactor.netty.http.server.HttpServer;public class Application {public static void main(String[] args) {DisposableServer server =HttpServer.create().accessLog(true).bindNow();server.onDispose().block();}
}

调用该方法优先于系统属性配置。

默认情况下,日志记录格式为 Common Log Format,但您可以指定自定义格式作为参数,如以下示例所示:

CustomLogAccessFormatApplication.java

import reactor.netty.DisposableServer;
import reactor.netty.http.server.HttpServer;
import reactor.netty.http.server.logging.AccessLog;public class CustomLogAccessFormatApplication {public static void main(String[] args) {DisposableServer server =HttpServer.create().accessLog(true, x -> AccessLog.create("method={}, uri={}", x.method(), x.uri())).bindNow();server.onDispose().block();}
}

您还可以使用AccessLogFactory#createFilter方法过滤HTTP访问日志,如下例所示:

FilterLogAccessApplication.java

import reactor.netty.DisposableServer;
import reactor.netty.http.server.HttpServer;
import reactor.netty.http.server.logging.AccessLogFactory;public class FilterLogAccessApplication {public static void main(String[] args) {DisposableServer server =HttpServer.create().accessLog(true, AccessLogFactory.createFilter(p -> !String.valueOf(p.uri()).startsWith("/health/"))).bindNow();server.onDispose().block();}
}

请注意,此方法也可以采用自定义格式参数,如下例所示:

CustomFormatAndFilterAccessLogApplication.java.java

import reactor.netty.DisposableServer;
import reactor.netty.http.server.HttpServer;
import reactor.netty.http.server.logging.AccessLog;
import reactor.netty.http.server.logging.AccessLogFactory;public class CustomFormatAndFilterAccessLogApplication {public static void main(String[] args) {DisposableServer server =HttpServer.create().accessLog(true, AccessLogFactory.createFilter(p -> !String.valueOf(p.uri()).startsWith("/health/"), (1)x -> AccessLog.create("method={}, uri={}", x.method(), x.uri()))) (2).bindNow();server.onDispose().block();}
}

(1) 指定要使用的筛选条件谓词
(2) 指定要应用的自定义格式

9.2. Error Log

您可以使用-Dreactor.netty.http.server.errorLogEnabled=true通过配置启用http错误日志。

您可以使用以下配置(用于 Logback 或类似的日志记录框架)来拥有单独的 HTTP 错误日志文件:

<appender name="errorLog" class="ch.qos.logback.core.FileAppender"><file>error_log.log</file><encoder><pattern>%msg%n</pattern></encoder>
</appender>
<appender name="async" class="ch.qos.logback.classic.AsyncAppender"><appender-ref ref="errorLog" />
</appender><logger name="reactor.netty.http.server.ErrorLog" level="ERROR" additivity="false"><appender-ref ref="async"/>
</logger>

以下示例以编程方式启用它:

Application.java

import reactor.netty.DisposableServer;
import reactor.netty.http.server.HttpServer;public class Application {public static void main(String[] args) {DisposableServer server =HttpServer.create().errorLog(true).bindNow();server.onDispose().block();}
}

调用该方法优先于系统属性配置。

默认情况下,日志记录格式为 [{datetime}] [pid {PID}] [client {remote address}] {exception message},但您可以指定自定义格式作为参数,如以下示例所示:

CustomLogErrorFormatApplication.java

import reactor.netty.DisposableServer;
import reactor.netty.http.server.HttpServer;
import reactor.netty.http.server.logging.error.ErrorLog;public class CustomLogErrorFormatApplication {public static void main(String[] args) {DisposableServer server =HttpServer.create().errorLog(true, x -> ErrorLog.create("method={}, uri={}", x.httpServerInfos().method(), x.httpServerInfos().uri())).bindNow();server.onDispose().block();}
}

您还可以使用ErrorLogFactory#createFilter方法过滤HTTP错误日志,如以下示例所示:

FilterLogErrorApplication.java

import reactor.netty.DisposableServer;
import reactor.netty.http.server.HttpServer;
import reactor.netty.http.server.logging.error.ErrorLogFactory;public class FilterLogErrorApplication {public static void main(String[] args) {DisposableServer server =HttpServer.create().errorLog(true, ErrorLogFactory.createFilter(p -> p.cause() instanceof RuntimeException)).bindNow();server.onDispose().block();}
}

请注意,此方法也可以采用自定义格式参数,如下例所示:

CustomFormatAndFilterErrorLogApplication.java.java

import reactor.netty.DisposableServer;
import reactor.netty.http.server.HttpServer;
import reactor.netty.http.server.logging.error.ErrorLog;
import reactor.netty.http.server.logging.error.ErrorLogFactory;public class CustomFormatAndFilterErrorLogApplication {public static void main(String[] args) {DisposableServer server =HttpServer.create().errorLog(true, ErrorLogFactory.createFilter(p -> p.cause() instanceof RuntimeException, (1)x -> ErrorLog.create("method={}, uri={}", x.httpServerInfos().method(), x.httpServerInfos().uri()))) (2).bindNow();server.onDispose().block();}
}

(1) 指定要使用的筛选条件谓词
(2) 指定要应用的自定义格式

10. HTTP/2

默认情况下,HTTP服务器支持HTTP/1.1。如果需要 HTTP/2,可以通过配置获取。除了协议配置之外,如果需要 H2 而不是 H2C (cleartext),还必须配置 SSL。

NOTE

由于 JDK8 不支持“开箱即用”的应用层协议协商 (ALPN)(尽管一些供应商将 ALPN 向后移植到 JDK8),因此您需要对支持它的本机库提供额外的依赖项,例如 netty-tcnative-boringssl-static]。

下面的清单显示了一个简单的 H2 示例:

H2Application.java

import reactor.core.publisher.Mono;
import reactor.netty.DisposableServer;
import reactor.netty.http.Http2SslContextSpec;
import reactor.netty.http.HttpProtocol;
import reactor.netty.http.server.HttpServer;
import java.io.File;public class H2Application {public static void main(String[] args) {File cert = new File("certificate.crt");File key = new File("private.key");Http2SslContextSpec http2SslContextSpec = Http2SslContextSpec.forServer(cert, key);DisposableServer server =HttpServer.create().port(8080).protocol(HttpProtocol.H2)                            (1).secure(spec -> spec.sslContext(http2SslContextSpec)) (2).handle((request, response) -> response.sendString(Mono.just("hello"))).bindNow();server.onDispose().block();}
}

(1) 将服务器配置为仅支持 HTTP/2
(2) 配置 SSL

应用程序现在的行为应如下所示:

$ curl --http2 https://localhost:8080 -i
HTTP/2 200hello

下面的清单给出了一个简单的 H2C 示例:

http2/H2CApplication.java

import reactor.core.publisher.Mono;
import reactor.netty.DisposableServer;
import reactor.netty.http.HttpProtocol;
import reactor.netty.http.server.HttpServer;public class H2CApplication {public static void main(String[] args) {DisposableServer server =HttpServer.create().port(8080).protocol(HttpProtocol.H2C).handle((request, response) -> response.sendString(Mono.just("hello"))).bindNow();server.onDispose().block();}
}

应用程序现在的行为应如下所示:

$ curl --http2-prior-knowledge http://localhost:8080 -i
HTTP/2 200hello

10.1. Protocol Selection

HttpProtocol.java

public enum HttpProtocol {/*** The default supported HTTP protocol by HttpServer and HttpClient.*/HTTP11,/*** HTTP/2.0 support with TLS* <p>If used along with HTTP/1.1 protocol, HTTP/2.0 will be the preferred protocol.* While negotiating the application level protocol, HTTP/2.0 or HTTP/1.1 can be chosen.* <p>If used without HTTP/1.1 protocol, HTTP/2.0 will always be offered as a protocol* for communication with no fallback to HTTP/1.1.*/H2,/*** HTTP/2.0 support with clear-text.* <p>If used along with HTTP/1.1 protocol, will support H2C "upgrade":* Request or consume requests as HTTP/1.1 first, looking for HTTP/2.0 headers* and {@literal Connection: Upgrade}. A server will typically reply a successful* 101 status if upgrade is successful or a fallback HTTP/1.1 response. When* successful the client will start sending HTTP/2.0 traffic.* <p>If used without HTTP/1.1 protocol, will support H2C "prior-knowledge": Doesn't* require {@literal Connection: Upgrade} handshake between a client and server but* fallback to HTTP/1.1 will not be supported.*/H2C,/*** HTTP/3.0 support.* @since 1.2.0*/@IncubatingHTTP3
}

11. HTTP/3

默认情况下,HTTP 服务器支持 HTTP/1.1。如果需要 HTTP/3,可以通过配置获取。除了协议配置外,还需要在 io.netty.incubator:netty-incubator-codec-http3 中添加依赖。

下面的清单显示了一个简单的 HTTP3 示例:

Application.java

import reactor.core.publisher.Mono;
import reactor.netty.DisposableServer;
import reactor.netty.http.Http3SslContextSpec;
import reactor.netty.http.HttpProtocol;
import reactor.netty.http.server.HttpServer;import java.io.File;
import java.time.Duration;public class Application {public static void main(String[] args) throws Exception {File certChainFile = new File("certificate chain file");File keyFile = new File("private key file");Http3SslContextSpec serverCtx = Http3SslContextSpec.forServer(keyFile, null, certChainFile);DisposableServer server =HttpServer.create().port(8080).protocol(HttpProtocol.HTTP3)                 (1).secure(spec -> spec.sslContext(serverCtx))   (2).http3Settings(spec -> spec.idleTimeout(Duration.ofSeconds(5)) (3).maxData(10000000).maxStreamDataBidirectionalLocal(1000000).maxStreamDataBidirectionalRemote(1000000).maxStreamsBidirectional(100)).handle((request, response) -> response.header("server", "reactor-netty").sendString(Mono.just("hello"))).bindNow();server.onDispose().block();}
}

(1) 将服务器配置为仅支持 HTTP/3
(2) 配置 SSL
(3) 配置 HTTP/3 设置

应用程序现在的行为应如下所示:

$ curl --http3 https://localhost:8080 -i
HTTP/3 200
server: reactor-netty
content-length: 5hello

12. Metrics

HTTP 服务器支持与 Micrometer 的内置集成。它公开了所有带有前缀 reactor.netty.http.server 的metrics(量度)。

下表提供了 HTTP 服务器metrics的信息:

metric nametypedescription
reactor.netty.http.server.streams.activeGauge活动 HTTP/2 流的数量。请参阅 Streams Active
reactor.netty.http.server.connections.activeGauge当前处理请求的 http 连接数。请参阅Connections Active
reactor.netty.http.server.connections.totalGauge所有打开的连接数。查看Connections Total
reactor.netty.http.server.data.receivedDistributionSummary接收的数据量(以字节为单位)。查看Data Received
reactor.netty.http.server.data.sentDistributionSummary发送的数据量(以字节为单位)。查看Data Sent
reactor.netty.http.server.errorsCounter发生的错误数。查看Errors Count
reactor.netty.http.server.data.received.timeTimer使用传入数据所花费的时间。参见 Http 服务器数据接收时间
reactor.netty.http.server.data.sent.timeTimer发送传出数据所花费的时间。请参阅 Http 服务器数据发送时间
reactor.netty.http.server.response.timeTimer请求/响应的总时间 请参阅 Http 服务器响应时间

这些其他metrics 也可用:

ByteBufAllocator metrics

metric nametypedescription
reactor.netty.bytebuf.allocator.used.heap.memoryGauge堆缓冲区分配器保留的字节数。请参阅已使用的堆内存
reactor.netty.bytebuf.allocator.used.direct.memoryGauge直接缓冲区分配器保留的字节数。请参阅已使用的直接内存
reactor.netty.bytebuf.allocator.heap.arenasGauge堆 arena 的数量(当 PooledByteBufAllocator 时)。参见 Heap Arenas
reactor.netty.bytebuf.allocator.direct.arenasGauge直接 arena 的数量(当 PooledByteBufAllocator 时)。参见 Direct Arenas
reactor.netty.bytebuf.allocator.threadlocal.cachesGauge线程本地缓存的数量(当 PooledByteBufAllocator 时)。请参见线程本地缓存
reactor.netty.bytebuf.allocator.small.cache.sizeGauge小缓存的大小(当 PooledByteBufAllocator 时)。请参阅小缓存大小
reactor.netty.bytebuf.allocator.normal.cache.sizeGauge正常缓存的大小(当 PooledByteBufAllocator 时)。请参阅正常缓存大小
reactor.netty.bytebuf.allocator.chunk.sizeGaugearena 的块大小(当 PooledByteBufAllocator 时)。参见 Chunk Size
reactor.netty.bytebuf.allocator.active.heap.memoryGauge从堆缓冲池分配的正在使用的缓冲区消耗的实际字节数(当 PooledByteBufAllocator 时)。请参阅 Active Heap Memory
reactor.netty.bytebuf.allocator.active.direct.memoryGauge从直接缓冲池分配的正在使用的缓冲区消耗的实际字节数(当 PooledByteBufAllocator 时)。请参阅 Active Direct Memory

EventLoop metrics

metric nametypedescription
reactor.netty.eventloop.pending.tasksGauge事件循环中等待处理的任务数。请参阅 待处理任务

以下示例启用该集成:

Application.java

import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.config.MeterFilter;
import reactor.core.publisher.Mono;
import reactor.netty.DisposableServer;
import reactor.netty.http.server.HttpServer;public class Application {public static void main(String[] args) {Metrics.globalRegistry (1).config().meterFilter(MeterFilter.maximumAllowableTags("reactor.netty.http.server", "URI", 100, MeterFilter.deny()));DisposableServer server =HttpServer.create().metrics(true, s -> {if (s.startsWith("/stream/")) { (2)return "/stream/{n}";}else if (s.startsWith("/bytes/")) {return "/bytes/{n}";}return s;}) (3).route(r ->r.get("/stream/{n}",(req, res) -> res.sendString(Mono.just(req.param("n")))).get("/bytes/{n}",(req, res) -> res.sendString(Mono.just(req.param("n"))))).bindNow();server.onDispose().block();}
}

(1) 对带有 URI 标记的meters 应用上限
(2) 模板化 URI 将尽可能用作 URI 标记值
(3) 支持与 Micrometer 的内置集成

NOTE

为了避免启用的metrics产生内存和 CPU 开销,请务必尽可能将实际 URI 转换为模板化 URI。
如果不转换为类似模板的形式,则每个不同的 URI 都会导致创建不同的标签,这会为metrics占用大量内存。

NOTE

始终对带有 URI 标签的metrics应用上限。在无法对实际 URI 进行模板化的情况下,配置metrics数量的上限会有所帮助。您可以在 maximumAllowableTags 中找到更多信息。

当需要 HTTP 服务器meters来与 Micrometer 以外的系统集成时,或者您希望提供自己的 Micrometer 集成时,您可以提供自己的meters记录器,如下所示:

Application.java

import reactor.core.publisher.Mono;
import reactor.netty.DisposableServer;
import reactor.netty.http.server.HttpServer;
import reactor.netty.http.server.HttpServerMetricsRecorder;import java.net.SocketAddress;
import java.time.Duration;public class Application {public static void main(String[] args) {DisposableServer server =HttpServer.create().metrics(true, CustomHttpServerMetricsRecorder::new) (1).route(r ->r.get("/stream/{n}",(req, res) -> res.sendString(Mono.just(req.param("n")))).get("/bytes/{n}",(req, res) -> res.sendString(Mono.just(req.param("n"))))).bindNow();server.onDispose().block();}

(1) 启用 HTTP 服务器指标并提供 HttpServerMetricsRecorder 实现。

13. Tracing

HTTP 服务器支持与 Micrometer Tracing 的内置集成。

下表提供了 HTTP 服务器跨度的信息:

contextual namedescription
<HTTP METHOD>_<URI>请求的信息和总时间。请参阅 Http Server Response Span。

以下示例启用该集成。这个具体示例使用 Brave 并将信息报告给 Zipkin。请参阅 Micrometer Tracing 文档,了解 OpenTelemetry 设置。

Application.java

import brave.Tracing;
import brave.propagation.StrictCurrentTraceContext;
import brave.sampler.Sampler;
import io.micrometer.tracing.CurrentTraceContext;
import io.micrometer.tracing.Tracer;
import io.micrometer.tracing.brave.bridge.BraveBaggageManager;
import io.micrometer.tracing.brave.bridge.BraveCurrentTraceContext;
import io.micrometer.tracing.brave.bridge.BravePropagator;
import io.micrometer.tracing.brave.bridge.BraveTracer;
import io.micrometer.tracing.propagation.Propagator;
import reactor.core.publisher.Mono;
import reactor.netty.DisposableServer;
import reactor.netty.http.observability.ReactorNettyPropagatingReceiverTracingObservationHandler;
import reactor.netty.http.server.HttpServer;
import zipkin2.reporter.brave.AsyncZipkinSpanHandler;
import zipkin2.reporter.urlconnection.URLConnectionSender;import static reactor.netty.Metrics.OBSERVATION_REGISTRY;public class Application {public static void main(String[] args) {init(); (1)DisposableServer server =HttpServer.create().metrics(true, s -> {if (s.startsWith("/stream/")) { (2)return "/stream/{n}";}return s;}) (3).route(r -> r.get("/stream/{n}",(req, res) -> res.sendString(Mono.just(req.param("n"))))).bindNow();server.onDispose().block();}/*** This setup is based on* <a href="https://micrometer.io/docs/tracing#_micrometer_tracing_brave_setup">Micrometer Tracing Brave Setup</a>.*/static void init() {AsyncZipkinSpanHandler spanHandler = AsyncZipkinSpanHandler.create(URLConnectionSender.create("http://localhost:9411/api/v2/spans"));StrictCurrentTraceContext braveCurrentTraceContext = StrictCurrentTraceContext.create();CurrentTraceContext bridgeContext = new BraveCurrentTraceContext(braveCurrentTraceContext);Tracing tracing =Tracing.newBuilder().currentTraceContext(braveCurrentTraceContext).supportsJoin(false).traceId128Bit(true).sampler(Sampler.ALWAYS_SAMPLE).addSpanHandler(spanHandler).localServiceName("reactor-netty-examples").build();brave.Tracer braveTracer = tracing.tracer();Tracer tracer = new BraveTracer(braveTracer, bridgeContext, new BraveBaggageManager());Propagator propagator = new BravePropagator(tracing);OBSERVATION_REGISTRY.observationConfig().observationHandler(new ReactorNettyPropagatingReceiverTracingObservationHandler(tracer, propagator));}
}

(1) 初始化 Brave、Zipkin 和 Observation 注册表。
(2) 模板化 URI 尽可能用作 URI 标记值。
(3) 启用与 Micrometer 的内置集成。

Zipkin 中的结果如下所示:

Icon

13.1. Access Current Observation

Project Micrometer 提供了一个库,该库有助于跨不同类型的上下文机制(如 ThreadLocal、Reactor Context 等)进行上下文传播。

下面的示例展示了如何在自定义 ChannelHandler 中使用此库:

Application.java

import brave.Tracing;
import brave.propagation.StrictCurrentTraceContext;
import brave.sampler.Sampler;
import io.micrometer.context.ContextSnapshot;
import io.micrometer.context.ContextSnapshotFactory;
import io.micrometer.tracing.CurrentTraceContext;
import io.micrometer.tracing.Tracer;
import io.micrometer.tracing.brave.bridge.BraveBaggageManager;
import io.micrometer.tracing.brave.bridge.BraveCurrentTraceContext;
import io.micrometer.tracing.brave.bridge.BravePropagator;
import io.micrometer.tracing.brave.bridge.BraveTracer;
import io.micrometer.tracing.propagation.Propagator;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise;
import reactor.core.publisher.Mono;
import reactor.netty.DisposableServer;
import reactor.netty.http.observability.ReactorNettyPropagatingReceiverTracingObservationHandler;
import reactor.netty.http.server.HttpServer;
import zipkin2.reporter.brave.AsyncZipkinSpanHandler;
import zipkin2.reporter.urlconnection.URLConnectionSender;import static reactor.netty.Metrics.OBSERVATION_REGISTRY;public class Application {public static void main(String[] args) {init(); (1)DisposableServer server =HttpServer.create().metrics(true, s -> {if (s.startsWith("/stream/")) { (2)return "/stream/{n}";}return s;}) (3).doOnConnection(conn -> conn.addHandlerLast(CustomChannelOutboundHandler.INSTANCE)) (4).route(r -> r.get("/stream/{n}",(req, res) -> res.sendString(Mono.just(req.param("n"))))).bindNow();server.onDispose().block();}static final class CustomChannelOutboundHandler extends ChannelOutboundHandlerAdapter {static final ChannelHandler INSTANCE = new CustomChannelOutboundHandler();@Overridepublic boolean isSharable() {return true;}@Overridepublic void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {try (ContextSnapshot.Scope scope = ContextSnapshotFactory.builder().build().setThreadLocalsFrom(ctx.channel())) {System.out.println("Current Observation in Scope: " + OBSERVATION_REGISTRY.getCurrentObservation());//"FutureReturnValueIgnored" this is deliberatectx.write(msg, promise);}System.out.println("Current Observation: " + OBSERVATION_REGISTRY.getCurrentObservation());}}

(1) 初始化 Brave、Zipkin 和 Observation 注册表。
(2) 模板化 URI 尽可能用作 URI 标记值。
(3) 启用与 Micrometer 的内置集成。
(4) 使用上下文传播库的自定义 ChannelHandler。这个具体的例子只覆盖了 ChannelOutboundHandlerAdapter#write,如果需要,其余的方法可以使用相同的逻辑。此外,这个具体示例设置了在给定 Channel 中有值的所有 ThreadLocal 值,如果需要其他行为,请检查上下文传播库 API。例如,你可能只想设置一些 ThreadLocal 值。

NOTE

在框架中启用 Reactor Netty 跟踪时,可能需要让 Reactor Netty 使用此框架创建的 ObservationRegistry 。为此,您需要调用 reactor.netty.Metrics#observationRegistry。您可能还需要使用框架提供的 API 配置 Reactor Netty ObservationHandlers。

14. Unix Domain Sockets

当本机传输用于所有 java 版本时,以及当 NIO 传输用于 java 17 及更高版本时,HTTP 服务器支持 Unix 域套接字 (UDS)。

以下示例说明如何使用 UDS 支持:

Application.java

import io.netty.channel.unix.DomainSocketAddress;
import reactor.netty.DisposableServer;
import reactor.netty.http.server.HttpServer;//import java.net.UnixDomainSocketAddress;public class Application {public static void main(String[] args) {DisposableServer server =HttpServer.create()// The configuration below is available only when Epoll/KQueue transport is used.bindAddress(() -> new DomainSocketAddress("/tmp/test.sock")) (1)// The configuration below is available only when NIO transport is used with Java 17+//.bindAddress(() -> UnixDomainSocketAddress.of("/tmp/test.sock")).bindNow();server.onDispose().block();}
}

(1) 指定将使用的 DomainSocketAddress

15. Timeout Configuration

本节描述了可以在 HttpServer 中使用的各种 timeout 配置选项。配置适当的超时可以改进或解决通信过程中的问题。配置选项可以按如下方式分组:

  • Request Timeout
  • Connection Timeout
  • SSL/TLS Timeout

15.1. Request Timeout

以下清单显示了所有可用的请求超时配置选项。

  • readTimeout - 读取给定请求内容时,每个网络级读取操作之间的最长时间(分辨率:ms)
  • requestTimeout - 读取给定请求内容的最长时间(分辨率:毫秒)。

NOTE

配置读取/请求超时始终是一种很好的做法。

要自定义默认设置,您可以按如下方式配置 HttpServer:

Application.java

import reactor.netty.DisposableServer;
import reactor.netty.http.server.HttpServer;import java.time.Duration;public class Application {public static void main(String[] args) {DisposableServer server =HttpServer.create().readTimeout(Duration.ofSeconds(5))     (1).requestTimeout(Duration.ofSeconds(30)) (2).handle((request, response) -> request.receive().then()).bindNow();server.onDispose().block();}
}

(1) 将读取超时配置为 5 秒。
(2) 将请求超时配置为 30 秒。

15.2. Connection Timeout

以下清单显示了所有可用的连接超时配置选项。

  • idleTimeout - 此连接保持打开状态并等待 HTTP 请求的最长时间(分辨率:毫秒)。达到超时后,连接将关闭。默认情况下,未指定
    idleTimeout,这表示没有超时(即无限),这意味着只有当其中一个对等点决定显式关闭连接时,连接才会关闭。

NOTE

配置 idle timeout 始终是一种很好的做法。

要自定义默认设置,您可以按如下方式配置 HttpServer:

Application.java

import reactor.netty.DisposableServer;
import reactor.netty.http.server.HttpServer;import java.time.Duration;public class Application {public static void main(String[] args) {DisposableServer server =HttpServer.create().idleTimeout(Duration.ofSeconds(1)) (1).bindNow();server.onDispose().block();}
}

(1) 将默认空闲超时配置为 1 秒。

15.3. SSL/TLS Timeout

HttpServer 支持 Netty 提供的 SSL/TLS 功能。

以下列表描述了可用的超时配置选项:

  • handshakeTimeout - 使用此选项可配置 SSL 握手超时(分辨率:ms)。默认值:10 秒。

NOTE

当预计网络连接速度较慢时,应考虑增加 SSL 握手超时。
  • closeNotifyFlushTimeout - 使用此选项可配置 SSL close_notify刷新超时(分辨率:ms)。默认值:3 秒。
  • closeNotifyReadTimeout - 使用此选项可配置 SSL close_notify读取超时(分辨率:ms)。默认值:0s。

要自定义默认设置,您可以按如下方式配置 HttpServer:

Application.java

import reactor.netty.DisposableServer;
import reactor.netty.http.Http11SslContextSpec;
import reactor.netty.http.server.HttpServer;import java.io.File;
import java.time.Duration;public class Application {public static void main(String[] args) {File cert = new File("certificate.crt");File key = new File("private.key");Http11SslContextSpec http11SslContextSpec = Http11SslContextSpec.forServer(cert, key);DisposableServer server =HttpServer.create().secure(spec -> spec.sslContext(http11SslContextSpec).handshakeTimeout(Duration.ofSeconds(30))         (1).closeNotifyFlushTimeout(Duration.ofSeconds(10))  (2).closeNotifyReadTimeout(Duration.ofSeconds(10)))  (3).bindNow();server.onDispose().block();}
}

(1) 将 SSL 握手超时配置为 30 秒。
(2) 将 SSL close_notify 刷新超时配置为 10 秒。
(3) 将 SSL close_notify读取超时配置为 10 秒。

相关文章:

  • SM3算法C语言实现(无第三方库,带测试)
  • Openlayers面试题198道
  • vue3 reactive重新赋值
  • 【React】React CSS 样式设置全攻略
  • maven之scope
  • 3DS中文游戏全集下载 任天堂3DS简介3DS第一方独占游戏推荐
  • 期货反向跟单-终止盘手合作原则(二)
  • 传输层协议UDP/TCP
  • 【Leetcode】字符串之二进制求和、字符串相乘
  • 数据结构-顺序表-数值统计
  • 设计模式之单例模式-----实现单例模式的五种方式
  • 多模态大语言模型arxiv论文略读(130)
  • 人力资源战略重构,AI驱动高质量发展论坛顺利召开
  • @annotation:Spring AOP 的“精准定位器“
  • Qt5.15.2 可执行程序发布
  • Rust 学习笔记:关于 Unsafe Rust 的练习题
  • Java八股文——消息队列「场景篇」
  • VSCode1.101.1Win多语言语言编辑器便携版安装教程
  • 【工具教程】识别PDF中文字内容,根据文字内容对PDF批量重命名,提取识别PDF内容给图片重新命名的操作步骤和注意事项
  • uniapp评价组件
  • 成都手机wap网站制作/站长素材音效网
  • 搜索引擎作弊网站有哪些/国外网站开发
  • 对网站建设的考核机制/网站推广文章
  • 龙泉市住房和城乡建设局网站/windows优化大师收费吗
  • 专业团队下一句/移动端关键词排名优化
  • 八年级上册信息书怎么做网站/营销策略的思路