Netty源码—Pipeline和Handler(二)
9.ChannelPipeline添加ChannelHandler
(1)常见的客户端代码
首先用一个拆包器Spliter对二进制数据流进行拆包,然后解码器Decoder会将拆出来的包进行解码,接着业务处理器BusinessHandler会处理解码出来的Java对象,最后编码器Encoder会将业务处理完的结果编码成二进制数据进行输出。
bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(newSpliter());
p.addLast(new Decoder());
p.addLast(new BusinessHandler());
p.addLast(new Encoder());
}
});
整个ChannelPipeline的结构如下所示:
这里共有两种不同类型的结点,结点之间通过双向链表连接。一种是ChannelInboundHandler,用来处理Inbound事件,比如读取数据流进行加工处理。一种是ChannelOutboundHandler,用来处理Outbound事件,比如当调用writeAndFlush()方法时就会经过这种类型的Handler。
(2)ChannelPipeline添加ChannelHandler入口
当服务端Channel的Reactor线程轮询到新连接接入的事件时,就会调用AbstractNioChannel的内部类NioUnsafe的read()方法,也就是调用AbstractNioMessageChannel的内部类NioMessageUnsafe的read()方法。
然后会触发执行代码pipeline.fireChannelRead()传播ChannelRead事件,从而最终触发调用ServerBootstrapAcceptor接入器的channelRead()方法。
在ServerBootstrapAcceptor的channelRead()方法中,便会通过执行代码channel.pipeline().addLast()添加ChannelHandler,也就是通过调用DefaultChannelPipeline的addLast()方法添加ChannelHandler。
//SingleThreadEventLoop implementation which register the Channel's to a Selector and so does the multi-plexing of these in the event loop.
public final class NioEventLoop extends SingleThreadEventLoop {
Selector selector;
private SelectedSelectionKeySet selectedKeys;
private boolean needsToSelectAgain;
private int cancelledKeys;
...
@Override
protected void run() {
for (;;) {
...
//1.调用select()方法执行一次事件轮询
select(wakenUp.getAndSet(false));
if (wakenUp.get()) {
selector.wakeup();
}
...
//2.处理产生IO事件的Channel
needsToSelectAgain = false;
processSelectedKeys();
...
//3.执行外部线程放入TaskQueue的任务
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
}
private void processSelectedKeys() {
if (selectedKeys != null) {
//selectedKeys.flip()会返回一个数组
processSelectedKeysOptimized(selectedKeys.flip());
} else {
processSelectedKeysPlain(selector.selectedKeys());
}
}
private void processSelectedKeysOptimized(SelectionKey[] selectedKeys) {
for (int i = 0;; i ++) {
//1.首先取出IO事件
final SelectionKey k = selectedKeys[i];
if (k == null) {
break;
}
selectedKeys[i] = null;//Help GC
//2.然后获取对应的Channel和处理该Channel
//默认情况下,这个a就是NioChannel,也就是服务端启动时经过Netty封装的Channel
final Object a = k.attachment();
if (a instanceof AbstractNioChannel) {
//网络事件的处理
processSelectedKey(k, (AbstractNioChannel) a);
} else {
//NioTask主要用于当一个SelectableChannel注册到Selector时,执行的一些任务
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
processSelectedKey(k, task);
}
//3.最后判断是否应该再进行一次轮询
if (needsToSelectAgain) {
for (;;) {
i++;
if (selectedKeys[i] == null) {
break;
}
selectedKeys[i] = null;
}
selectAgain();
//selectedKeys.flip()会返回一个数组
selectedKeys = this.selectedKeys.flip();
i = -1;
}
}
}
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
...
try {
int readyOps = k.readyOps();
...
//boss的Reactor线程已经轮询到有ACCEPT事件,即表明有新连接接入
//此时将调用Channel的unsafe变量来进行实际操作
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
//调用AbstractNioMessageChannel的NioMessageUnsafe.read()方法
//进行新连接接入处理
unsafe.read();
if (!ch.isOpen()) {
return;
}
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
}
}
...
}
//AbstractNioChannel base class for Channels that operate on messages.
public abstract class AbstractNioMessageChannel extends AbstractNioChannel {
...
private final class NioMessageUnsafe extends AbstractNioUnsafe {
//临时存放读到的连接NioSocketChannel
private final List<Object> readBuf = new ArrayList<Object>();
@Override
public void read() {
//断言确保该read()方法必须来自Reactor线程调用
assert eventLoop().inEventLoop();
//获得Channel对应的Pipeline
final ChannelPipeline pipeline = pipeline();
//获得Channel对应的RecvByteBufAllocator.Handle
final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
do {
//1.调用NioServerSocketChannel的doReadMessages()方法创建NioSocketChannel
//通过JDK的accept()方法去创建JDK Channel,然后把它包装成Netty自定义的Channel
int localRead = doReadMessages(readBuf);
if (localRead == 0) {
break;
}
} while (allocHandle.continueReading());//控制连接的接入速率,默认一次性读取16个连接
//2.设置并绑定NioSocketChannel
int size = readBuf.size();
for (int i = 0; i < size; i ++) {
//调用DefaultChannelPipeline的fireChannelRead()方法
pipeline.fireChannelRead(readBuf.get(i));
}
//3.清理容器并触发DefaultChannelPipeline的fireChannelReadComplete()方法
readBuf.clear();
pipeline.fireChannelReadComplete();
}
}
...
}
//The default ChannelPipeline implementation.
//It is usually created by a Channel implementation when the Channel is created.
public class DefaultChannelPipeline implements ChannelPipeline {
final AbstractChannelHandlerContext head;
final AbstractChannelHandlerContext tail;
...
protected DefaultChannelPipeline(Channel channel) {
...
tail = new TailContext(this);
head = new HeadContext(this);
head.next = tail;
tail.prev = head;
}
@Override
public final ChannelPipeline fireChannelRead(Object msg) {
//从Pipeline的第一个HeadContext处理器开始调用
AbstractChannelHandlerContext.invokeChannelRead(head, msg);
return this;
}
final class HeadContext extends AbstractChannelHandlerContext implements ChannelOutboundHandler, ChannelInboundHandler {
...
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//调用AbstractChannelHandlerContext的fireChannelRead()方法
ctx.fireChannelRead(msg);
}
@Override
public ChannelHandler handler() {
return this;
}
...
}
...
}
public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel> {
...
//初始化服务端Channel时,会向其Pipeline添加ServerBootstrapAcceptor处理器
@Override
void init(Channel channel) throws Exception {
//1.设置服务端Channel的Option与Attr
final Map<ChannelOption<?>, Object> options = options0();
synchronized (options) {
channel.config().setOptions(options);
}
final Map<AttributeKey<?>, Object> attrs = attrs0();
synchronized (attrs) {
for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
@SuppressWarnings("unchecked")
AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
channel.attr(key).set(e.getValue());
}
}
//2.设置客户端Channel的Option与Attr
final EventLoopGroup currentChildGroup = childGroup;
final ChannelHandler currentChildHandler = childHandler;
final Entry<ChannelOption<?>, Object>[] currentChildOptions;
final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
synchronized (childOptions) {
currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
}
synchronized (childAttrs) {
currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
}
//3.配置服务端启动逻辑
ChannelPipeline p = channel.pipeline();
//p.addLast()用于定义服务端启动过程中需要执行哪些逻辑
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(Channel ch) throws Exception {
//一.添加用户自定义的Handler,注意这是handler,而不是childHandler
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) pipeline.addLast(handler);
//二.添加一个特殊的Handler用于接收新连接
//自定义的childHandler会作为参数传入连接器ServerBootstrapAcceptor
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
//调用DefaultChannelPipeline的addLast()方法
pipeline.addLast(new ServerBootstrapAcceptor(
currentChildGroup,
currentChildHandler,
currentChildOptions,
currentChildAttrs)
);
}
});
}
});
}
private static class ServerBootstrapAcceptor extends ChannelInboundHandlerAdapter {
private final EventLoopGroup childGroup;
private final ChannelHandler childHandler;
private final Entry<ChannelOption<?>, Object>[] childOptions;
private final Entry<AttributeKey<?>, Object>[] childAttrs;
...
//channelRead()方法在新连接接入时被调用
@Override
@SuppressWarnings("unchecked")
public void channelRead(ChannelHandlerContext ctx, Object msg) {
final Channel child = (Channel) msg;
//1.给新连接的Channel添加用户自定义的Handler处理器
//这里的childHandler其实是一个特殊的Handler: ChannelInitializer
child.pipeline().addLast(childHandler);
//2.设置ChannelOption,主要和TCP连接一些底层参数及Netty自身对一个连接的参数有关
for (Entry<ChannelOption<?>, Object> e: childOptions) {
if (!child.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) {
logger.warn("Unknown channel option: " + e);
}
}
//3.设置新连接Channel的属性
for (Entry<AttributeKey<?>, Object> e: childAttrs) {
child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
}
//4.绑定Reactor线程
//childGroup是一个NioEventLoopGroup,所以下面会调用其父类的register()方法
childGroup.register(child);
}
...
}
...
}
(3)DefaultChannelPipeline的addLast()方法
使用synchronized关键字是为了防止多线程并发操作ChannelPipeline底层的双向链表,添加ChannelHandler结点的过程主要分为4个步骤:
步骤一:判断ChannelHandler是否重复添加
步骤二:创建结点
步骤三:添加结点到链表
步骤四:回调添加完成事件
这个结点便是ChannelHandlerContext,Pipeline里每个结点都是一个ChannelHandlerContext。addLast()方法便是把ChannelHandler包装成一个ChannelHandlerContext,然后添加到链表。
public class DefaultChannelPipeline implements ChannelPipeline {
...
@Override
public final ChannelPipeline addLast(ChannelHandler... handlers) {
return addLast(null, handlers);
}
@Override
public final ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) {
if (handlers == null) throw new NullPointerException("handlers");
for (ChannelHandler h: handlers) {
if (h == null) break;
addLast(executor, null, h);
}
return this;
}
@Override
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
final AbstractChannelHandlerContext newCtx;
synchronized (this) {
//1.检查是否有重复的ChannelHandler结点
checkMultiplicity(handler);
//2.创建ChannelHandlerContext结点
newCtx = newContext(group, filterName(name, handler), handler);
//3.添加ChannelHandlerContext结点
addLast0(newCtx);
...
}
//4.回调用户方法
//通过这个方法告诉用户这个ChannelHandler已添加完成,用户在回调方法里可以处理事情了
callHandlerAdded0(newCtx);
return this;
}
...
}
(4)检查是否重复添加ChannelHandler结点
Netty使用了一个成员变量added来表示一个ChannelHandler是否已经添加。如果当前要添加的ChannelHandler是非共享的并且已经添加过,那么抛出异常,否则标识该ChannelHandler已添加。
如果一个ChannelHandler支持共享,那么它就可以无限次被添加到ChannelPipeline中。如果要让一个ChannelHandler支持共享,只需要加一个@Sharable注解即可。而ChannelHandlerAdapter的isSharable()方法正是通过判断该ChannelHandler对应的类是否标有@Sharable注解来实现的。
Netty为了性能优化,还使用了ThreadLocal来缓存ChannelHandler是否共享的情况。在高并发海量连接下,每次有新连接添加ChannelHandler都会调用isSharable()方法,从而优化性能。
public class DefaultChannelPipeline implements ChannelPipeline {
...
private static void checkMultiplicity(ChannelHandler handler) {
if (handler instanceof ChannelHandlerAdapter) {
ChannelHandlerAdapter h = (ChannelHandlerAdapter) handler;
if (!h.isSharable() && h.added) {
throw new ChannelPipelineException(h.getClass().getName() +
" is not a @Sharable handler, so can't be added or removed multiple times.");
}
h.added = true;
}
}
...
}
//Skeleton implementation of a ChannelHandler.
public abstract class ChannelHandlerAdapter implements ChannelHandler {
//Not using volatile because it's used only for a sanity check.
boolean added;
//Return true if the implementation is Sharable and so can be added to different ChannelPipelines.
public boolean isSharable() {
//Cache the result of Sharable annotation detection to workaround a condition.
//We use a ThreadLocal and WeakHashMap to eliminate the volatile write/reads.
//Using different WeakHashMap instances per Thread is good enough for us and the number of Threads are quite limited anyway.
Class<?> clazz = getClass();
Map<Class<?>, Boolean> cache = InternalThreadLocalMap.get().handlerSharableCache();
Boolean sharable = cache.get(clazz);
if (sharable == null) {
sharable = clazz.isAnnotationPresent(Sharable.class);
cache.put(clazz, sharable);
}
return sharable;
}
...
}
(5)创建ChannelHandlerContext结点
根据ChannelHandler创建ChannelHandlerContext类型的结点时,会将该ChannelHandler的引用保存到结点的成员变量中。
public class DefaultChannelPipeline implements ChannelPipeline {
...
@Override
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
final AbstractChannelHandlerContext newCtx;
synchronized (this) {
//1.检查是否有重复的ChannelHandler结点
checkMultiplicity(handler);
//2.创建ChannelHandlerContext结点
newCtx = newContext(group, filterName(name, handler), handler);
//3.添加ChannelHandlerContext结点
addLast0(newCtx);
...
}
//4.回调用户方法
//通过这个方法告诉用户这个ChannelHandler已添加完成,用户在回调方法里可以处理事情了
callHandlerAdded0(newCtx);
return this;
}
//给ChannelHandler创建一个唯一性的名字
private String filterName(String name, ChannelHandler handler) {
if (name == null) {
return generateName(handler);
}
checkDuplicateName(name);
return name;
}
//根据ChannelHandler创建一个ChannelHandlerContext结点
private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) {
return new DefaultChannelHandlerContext(this, childExecutor(group), name, handler);
}
...
}
final class DefaultChannelHandlerContext extends AbstractChannelHandlerContext {
private final ChannelHandler handler;
DefaultChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, String name, ChannelHandler handler) {
super(pipeline, executor, name, isInbound(handler), isOutbound(handler));
if (handler == null) {
throw new NullPointerException("handler");
}
this.handler = handler;
}
...
}
abstract class AbstractChannelHandlerContext extends DefaultAttributeMap implements ChannelHandlerContext, ResourceLeakHint {
...
AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, String name, boolean inbound, boolean outbound) {
this.name = ObjectUtil.checkNotNull(name, "name");
this.pipeline = pipeline;
this.executor = executor;
this.inbound = inbound;
this.outbound = outbound;
ordered = executor == null || executor instanceof OrderedEventExecutor;
}
...
}
(6)添加ChannelHandlerContext结点
使用尾插法向双向链表添加结点。
public class DefaultChannelPipeline implements ChannelPipeline {
...
private void addLast0(AbstractChannelHandlerContext newCtx) {
AbstractChannelHandlerContext prev = tail.prev;
newCtx.prev = prev;
newCtx.next = tail;
prev.next = newCtx;
tail.prev = newCtx;
}
...
}
abstract class AbstractChannelHandlerContext extends DefaultAttributeMap implements ChannelHandlerContext, ResourceLeakHint {
volatile AbstractChannelHandlerContext next;
volatile AbstractChannelHandlerContext prev;
...
}
(7)回调handerAdded()方法
向ChannelPipeline添加完新结点后,会使用CAS修改结点的状态为ADD_COMPLETE表示结点添加完成,然后执行ctx.handler().handlerAdded(ctx),回调用户在这个要添加的ChannelHandler中实现的handerAdded()方法。
public class DefaultChannelPipeline implements ChannelPipeline {
...
private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {
//使用CAS修改结点的状态为ADD_COMPLETE表示结点添加完成
ctx.setAddComplete();
//回调用户在这个要添加的ChannelHandler中实现的handerAdded()方法
ctx.handler().handlerAdded(ctx);
}
...
}
//DemoHandler是用户定义的ChannelHandler
public class DemoHandler extends SimpleChannelInboundHandler<...> {
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
//这个DemoHandler结点被添加到ChannelPipeline之后,就会回调这里的方法
}
...
}
最典型的一个回调就是用户代码的ChannelInitializer被添加完成后,会先调用其initChannel()方法将用户自定义的ChannelHandler添加到ChannelPipeline,然后再调用pipeline.remove()方法将自身结点进行删除。
public class NettyServer {
private int port;
public NettyServer(int port) {
this.port = port;
}
public void start() throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap
.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)//监听端口的ServerSocketChannel
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true)
//设置一个ChannelInitializer类型的childHandler
//新连接接入时,会执行ServerBootstrapAcceptor.channelRead()中的代码"child.pipeline().addLast(childHandler)"
//也就是会把这个ChannelInitializer类型的结点会被添加到新连接Channel的Pipeline中
//添加完这个结点后会回调ChannelInitializer的handlerAdded()方法
//其中会调用ChannelInitializer的initChannel()方法给Pipeline添加真正的结点
//执行完initChannel()方法后,就会移除ChannelInitializer这个结点
.childHandler(new ChannelInitializer<SocketChannel>() {//处理每个客户端连接的SocketChannel
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline()
.addLast(new StringDecoder())
.addLast(new StringEncoder())
.addLast(new NettyServerHandler());//针对网络请求的处理逻辑
}
});
ChannelFuture channelFuture = serverBootstrap.bind(port).sync();//同步等待启动服务器监控端口
channelFuture.channel().closeFuture().sync();//同步等待关闭启动服务器的结果
} catch (Exception e) {
e.printStackTrace();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception{
System.out.println("Starting Netty Server...");
int port = 8998;
if (args.length > 0) {
port = Integer.parseInt(args[0]);
}
new NettyServer(port).start();
}
}
@Sharable
public abstract class ChannelInitializer<C extends Channel> extends ChannelInboundHandlerAdapter {
...
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
if (ctx.channel().isRegistered()) {
initChannel(ctx);
}
}
@SuppressWarnings("unchecked")
private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
if (initMap.putIfAbsent(ctx, Boolean.TRUE) == null) { // Guard against re-entrance.
try {
initChannel((C) ctx.channel());
} catch (Throwable cause) {
exceptionCaught(ctx, cause);
} finally {
remove(ctx);
}
return true;
}
return false;
}
private void remove(ChannelHandlerContext ctx) {
try {
ChannelPipeline pipeline = ctx.pipeline();
if (pipeline.context(this) != null) {
//ChannelPipeline删除ChannelHandler结点(ChannelInitializer)
pipeline.remove(this);
}
} finally {
initMap.remove(ctx);
}
}
...
}
(8)ChannelPipeline添加ChannelHandler总结
一.判断ChannelHandler是否重复添加的依据是:如果该ChannelHandler不是共享的且已被添加过,则拒绝添加。
二.否则就创建一个ChannelHandlerContext结点(ctx),并把这个ChannelHandler包装进去,也就是保存ChannelHandler的引用到ChannelHandlerContext的成员变量中。由于创建ctx时保存了ChannelHandler的引用、ChannelPipeline的引用到成员变量,ChannelPipeline又保存了Channel的引用,所以每个ctx都拥有一个Channel的所有信息。
三.接着通过双向链表的尾插法,将这个ChannelHandlerContext结点添加到ChannelPipeline中。
四.最后回调用户在这个要添加的ChannelHandler中实现的handerAdded()方法。
10.ChannelPipeline删除ChannelHandler
Netty最大的特征之一就是ChannelHandler是可插拔的,可以动态编织ChannelPipeline。比如在客户端首次连接服务端时,需要进行权限认证,认证通过后就可以不用再认证了。下面的AuthHandler便实现了只对第一个传来的数据包进行认证校验。如果通过验证则删除此AuthHandler,这样后续传来的数据包便不会再校验了。
public class AuthHandler extends SimpleChannelInboundHandler<ByteBuf> {
...
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf data) throw Exception {
if (verify(data)) {
ctx.pipeline().remove(this);
} else {
ctx.close();
}
}
}
DefaultChannelPipeline的remove()方法如下:
public class DefaultChannelPipeline implements ChannelPipeline {
...
@Override
public final ChannelPipeline remove(ChannelHandler handler) {
remove(getContextOrDie(handler));
return this;
}
private AbstractChannelHandlerContext getContextOrDie(ChannelHandler handler) {
AbstractChannelHandlerContext ctx = (AbstractChannelHandlerContext) context(handler);
if (ctx == null) {
throw new NoSuchElementException(handler.getClass().getName());
} else {
return ctx;
}
}
@Override
public final ChannelHandlerContext context(ChannelHandler handler) {
...
AbstractChannelHandlerContext ctx = head.next;
//遍历双向链表
for (;;) {
...
if (ctx.handler() == handler) return ctx;
ctx = ctx.next;
}
}
private AbstractChannelHandlerContext remove(final AbstractChannelHandlerContext ctx) {
//Pipeline中的head和tail结点不能被删除
assert ctx != head && ctx != tail;
synchronized (this) {
//调整链表指针并删除
remove0(ctx);
...
}
//回调用户在这个要删除的ChannelHandler实现的handlerRemoved()方法
callHandlerRemoved0(ctx);
return ctx;
}
private static void remove0(AbstractChannelHandlerContext ctx) {
AbstractChannelHandlerContext prev = ctx.prev;
AbstractChannelHandlerContext next = ctx.next;
prev.next = next;
next.prev = prev;
}
private void callHandlerRemoved0(final AbstractChannelHandlerContext ctx) {
...
ctx.handler().handlerRemoved(ctx);
...
}
...
}
ChannelPipeline删除ChannelHandler的步骤:
一.遍历双向链表,根据ChannelHandler找到对应的ChannelHandlerContext结点。
二.通过调整ChannelPipeline中双向链表的指针来删除对应的ChannelHandlerContext结点。
三.回调用户在这个要删除的ChannelHandler实现的handlerRemoved()方法,比如进行资源清理。
11.Inbound事件的传播
(1)Unsafe的介绍
Unsafe和ChannelPipeline密切相关,ChannelPipeline中有关IO的操作最终都会落地到Unsafe的。Unsafe是不安全的意思,即不要在应用程序里直接使用Unsafe及它的衍生类对象。Unsafe是在Channel中定义的,是属于Channel的内部类。Unsafe中的接口操作都和JDK底层相关,包括:分配内存、Socket四元组信息、注册事件循环、绑定端口、Socket的连接和关闭、Socket的读写。
//A nexus to a network socket or a component which is capable of I/O operations such as read, write, connect, and bind.
public interface Channel extends AttributeMap, ChannelOutboundInvoker, Comparable<Channel> {
//Returns the globally unique identifier of this Channel.
ChannelId id();
//Return the EventLoop this Channel was registered to.
EventLoop eventLoop();
//Returns the parent of this channel.
Channel parent();
//Returns the configuration of this channel.
ChannelConfig config();
//Returns true if the Channel is open and may get active later
boolean isOpen();
//Returns true if the Channel is registered with an EventLoop.
boolean isRegistered();
//Return true if the Channel is active and so connected.
boolean isActive();
//Return the ChannelMetadata of the Channel which describe the nature of the Channel.
ChannelMetadata metadata();
//Returns the local address where this channel is bound to.
//The returned SocketAddress is supposed to be down-cast into more concrete type such as InetSocketAddress to retrieve the detailed information.
SocketAddress localAddress();
//Returns the remote address where this channel is connected to.
//The returned SocketAddress is supposed to be down-cast into more concrete type such as InetSocketAddress to retrieve the detailed information.
SocketAddress remoteAddress();
//Returns the ChannelFuture which will be notified when this channel is closed.
//This method always returns the same future instance.
ChannelFuture closeFuture();
//Returns true if and only if the I/O thread will perform the requested write operation immediately.
//Any write requests made when this method returns false are queued until the I/O thread is ready to process the queued write requests.
boolean isWritable();
//Get how many bytes can be written until #isWritable() returns false.
//This quantity will always be non-negative. If #isWritable() is false then 0.
long bytesBeforeUnwritable();
//Get how many bytes must be drained from underlying buffers until #isWritable() returns true.
//This quantity will always be non-negative. If #isWritable() is true then 0.
long bytesBeforeWritable();
//Returns an <em>internal-use-only</em> object that provides unsafe operations.
Unsafe unsafe();
//Return the assigned ChannelPipeline.
ChannelPipeline pipeline();
//Return the assigned ByteBufAllocator which will be used to allocate ByteBufs.
ByteBufAllocator alloc();
@Override
Channel read();
@Override
Channel flush();
//Unsafe operations that should never be called from user-code.
//These methods are only provided to implement the actual transport, and must be invoked from an I/O thread except for the following methods:
//#invoker()
//#localAddress()
//#remoteAddress()
//#closeForcibly()
//#register(EventLoop, ChannelPromise)
//#deregister(ChannelPromise)
//#voidPromise()
interface Unsafe {
//Return the assigned RecvByteBufAllocator.Handle which will be used to allocate ByteBuf's when receiving data.
RecvByteBufAllocator.Handle recvBufAllocHandle();
//Return the SocketAddress to which is bound local or null if none.
SocketAddress localAddress();
//Return the SocketAddress to which is bound remote or null if none is bound yet.
SocketAddress remoteAddress();
//Register the Channel of the ChannelPromise and notify the ChannelFuture once the registration was complete.
void register(EventLoop eventLoop, ChannelPromise promise);
//Bind the SocketAddress to the Channel of the ChannelPromise and notify it once its done.
void bind(SocketAddress localAddress, ChannelPromise promise);
//Connect the Channel of the given ChannelFuture with the given remote SocketAddress.
//If a specific local SocketAddress should be used it need to be given as argument. Otherwise just pass null to it.
//he ChannelPromise will get notified once the connect operation was complete.
void connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise);
//Disconnect the Channel of the ChannelFuture and notify the ChannelPromise once the operation was complete.
void disconnect(ChannelPromise promise);
//Close the Channel of the ChannelPromise and notify the ChannelPromise once the operation was complete.
void close(ChannelPromise promise);
//Closes the Channel immediately without firing any events. Probably only useful when registration attempt failed.
void closeForcibly();
//Deregister the Channel of the ChannelPromise from EventLoop and notify the ChannelPromise once the operation was complete.
void deregister(ChannelPromise promise);
//Schedules a read operation that fills the inbound buffer of the first ChannelInboundHandler in the ChannelPipeline.
//If there's already a pending read operation, this method does nothing.
void beginRead();
//Schedules a write operation.
void write(Object msg, ChannelPromise promise);
//Flush out all write operations scheduled via #write(Object, ChannelPromise).
void flush();
//Return a special ChannelPromise which can be reused and passed to the operations in Unsafe.
//It will never be notified of a success or error and so is only a placeholder for operations
//that take a ChannelPromise as argument but for which you not want to get notified.
ChannelPromise voidPromise();
//Returns the ChannelOutboundBuffer of the Channel where the pending write requests are stored.
ChannelOutboundBuffer outboundBuffer();
}
}
public abstract class AbstractNioChannel extends AbstractChannel {
...
public interface NioUnsafe extends Unsafe {
//Return underlying SelectableChannel
SelectableChannel ch();
//Finish connect
void finishConnect();
//Read from underlying SelectableChannel
void read();
void forceFlush();
}
...
}
(2)Unsafe的继承结构
一.NioUnsafe增加了可以访问底层JDK的SelectableChannel的功能,定义了从SelectableChannel读取数据的read()方法。
二.AbstractUnsafe实现了大部分Unsafe的功能。
三.AbstractNioUnsafe主要是通过代理到其外部类AbstractNioChannel获得与JDK NIO相关的一些信息,比如SelectableChannel、SelectionKey等。
四.NioMessageUnsafe和NioByteUnsafe是处在同一层次的抽象,Netty将一个新连接的建立也当作一个IO操作来处理,这里Message的含义可以当作一个SelectableChannel,读的意思就是接收一个SelectableChannel。
(3)Unsafe的分类
有两种类型的Unsafe:一种是与连接的字节数据读写相关的NioByteUnsafe,另一种是与新连接建立操作相关的NioMessageUnsafe。
一.NioByteUnsafe的读和写
NioByteUnsafe的读会被委托到NioByteChannel的doReadBytes()方法进行读取处理,doReadBytes()方法会将JDK的SelectableChannel的字节数据读取到Netty的ByteBuf中。
NioByteUnsafe中的写有两个方法,一个是write()方法,一个是flush()方法。write()方法是将数据添加到Netty的缓冲区,flush()方法是将Netty缓冲区的字节流写到TCP缓冲区,并最终委托到NioSocketChannel的doWrite()方法通过JDK底层Channel的write()方法写数据。
//AbstractNioChannel base class for Channels that operate on bytes.
public abstract class AbstractNioByteChannel extends AbstractNioChannel {
...
protected class NioByteUnsafe extends AbstractNioUnsafe {
...
//NioByteUnsafe的读
@Override
public final void read() {
...
doReadBytes(byteBuf);
...
}
}
}
public class NioSocketChannel extends AbstractNioByteChannel implements SocketChannel {
...
@Override
protected int doReadBytes(ByteBuf byteBuf) throws Exception {
final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
allocHandle.attemptedBytesRead(byteBuf.writableBytes());
return byteBuf.writeBytes(javaChannel(), allocHandle.attemptedBytesRead());
}
@Override
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
...
ByteBuffer[] nioBuffers = in.nioBuffers();
SocketChannel ch = javaChannel();
...
ByteBuffer nioBuffer = nioBuffers[0];
...
ch.write(nioBuffer)
...
}
}
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
...
protected abstract class AbstractUnsafe implements Unsafe {
private volatile ChannelOutboundBuffer outboundBuffer = new ChannelOutboundBuffer(AbstractChannel.this);
...
//NioByteUnsafe的写
@Override
public final void write(Object msg, ChannelPromise promise) {
assertEventLoop();
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
...
outboundBuffer.addMessage(msg, size, promise);
}
//NioByteUnsafe的写
@Override
public final void flush() {
assertEventLoop();
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
...
outboundBuffer.addFlush();
flush0();
}
@SuppressWarnings("deprecation")
protected void flush0() {
...
doWrite(outboundBuffer);
...
}
}
}
二.NioMessageUnsafe的读
NioMessageUnsafe的读会委托到NioServerSocketChannel的doReadMessages()方法进行处理。doReadMessages()方法会调用JDK的accept()方法新建立一个连接,并将这个连接放到一个List里以方便后续进行批量处理。
public abstract class AbstractNioMessageChannel extends AbstractNioChannel {
...
private final class NioMessageUnsafe extends AbstractNioUnsafe {
private final List<Object> readBuf = new ArrayList<Object>();
//NioMessageUnsafe的读
@Override
public void read() {
...
doReadMessages(readBuf)
...
}
}
}
public class NioServerSocketChannel extends AbstractNioMessageChannel implements ServerSocketChannel {
...
@Override
protected int doReadMessages(List<Object> buf) throws Exception {
SocketChannel ch = javaChannel().accept();
if (ch != null) {
buf.add(new NioSocketChannel(this, ch));
return 1;
}
return 0;
}
}
(4)ChannelPipeline中Inbound事件传播
当新连接已准备接入或者已经存在的连接有数据可读时,会在NioEventLoop的processSelectedKey()方法中执行unsafe.read()。
如果是新连接已准备接入,执行的是NioMessageUnsafe的read()方法。如果是已经存在的连接有数据可读,执行的是NioByteUnsafe的read()方法。
最后都会执行pipeline.fireChannelRead()引发ChannelPipeline的读事件传播。首先会从HeadContext结点开始,也就是调用HeadContext的channelRead()方法。然后触发调用AbstractChannelHandlerContext的fireChannelRead()方法,接着通过findContextInbound()方法找到HeadContext的下一个结点,然后通过invokeChannelRead()方法继续调用该结点的channelRead()方法,直到最后一个结点TailContext。
public final class NioEventLoop extends SingleThreadEventLoop {
...
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
...
//新连接已准备接入或者已经存在的连接有数据可读
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
//如果是新连接已准备接入,则调用NioMessageUnsafe的read()方法
//如果是已经存在的连接有数据可读,执行的是NioByteUnsafe的read()方法
unsafe.read();
if (!ch.isOpen()) {
return;
}
}
}
...
}
public abstract class AbstractNioMessageChannel extends AbstractNioChannel {
...
private final class NioMessageUnsafe extends AbstractNioUnsafe {
private final List<Object> readBuf = new ArrayList<Object>();
@Override
public void read() {
assert eventLoop().inEventLoop();
final ChannelConfig config = config();
final ChannelPipeline pipeline = pipeline();
//创建ByteBuf分配器
final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
allocHandle.reset(config);
...
do {
int localRead = doReadMessages(readBuf);
...
allocHandle.incMessagesRead(localRead);
} while (allocHandle.continueReading());
int size = readBuf.size();
for (int i = 0; i < size; i ++) {
readPending = false;
//调用DefaultChannelPipeline的fireChannelRead()方法从Head结点开始传播事件
pipeline.fireChannelRead(readBuf.get(i));
}
readBuf.clear();
allocHandle.readComplete();
//调用DefaultChannelPipeline的fireChannelReadComplete()方法从Head结点开始传播事件
pipeline.fireChannelReadComplete();
...
}
...
}
}
public abstract class AbstractNioByteChannel extends AbstractNioChannel {
...
protected class NioByteUnsafe extends AbstractNioUnsafe {
...
@Override
public final void read() {
final ChannelConfig config = config();
final ChannelPipeline pipeline = pipeline();
//创建ByteBuf分配器
final ByteBufAllocator allocator = config.getAllocator();
final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
allocHandle.reset(config);
ByteBuf byteBuf = null;
do {
//1.分配一个ByteBuf
byteBuf = allocHandle.allocate(allocator);
//2.将数据读取到分配的ByteBuf中
allocHandle.lastBytesRead(doReadBytes(byteBuf));
if (allocHandle.lastBytesRead() <= 0) {
byteBuf.release();
byteBuf = null;
close = allocHandle.lastBytesRead() < 0;
break;
}
...
//3.调用DefaultChannelPipeline的fireChannelRead()方法从Head结点开始传播事件
pipeline.fireChannelRead(byteBuf);
byteBuf = null;
} while (allocHandle.continueReading());
allocHandle.readComplete();
//4.调用DefaultChannelPipeline的fireChannelReadComplete()方法从Head结点开始传播事件
pipeline.fireChannelReadComplete();
...
}
}
}
public class DefaultChannelPipeline implements ChannelPipeline {
//ChannelPipeline的头结点
final AbstractChannelHandlerContext head;
...
@Override
public final ChannelPipeline fireChannelRead(Object msg) {
AbstractChannelHandlerContext.invokeChannelRead(head, msg);
return this;
}
final class HeadContext extends AbstractChannelHandlerContext implements ChannelOutboundHandler, ChannelInboundHandler {
...
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//调用AbstractChannelHandlerContext的fireChannelRead()方法
ctx.fireChannelRead(msg);
}
}
}
abstract class AbstractChannelHandlerContext extends DefaultAttributeMap implements ChannelHandlerContext, ResourceLeakHint {
volatile AbstractChannelHandlerContext next;
volatile AbstractChannelHandlerContext prev;
...
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
//调用AbstractChannelHandlerContext的invokeChannelRead()方法
next.invokeChannelRead(m);
} else {
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeChannelRead(m);
}
});
}
}
private void invokeChannelRead(Object msg) {
if (invokeHandler()) {
try {
//比如调用HeadContext的channelRead()方法
((ChannelInboundHandler) handler()).channelRead(this, msg);
} catch (Throwable t) {
notifyHandlerException(t);
}
} else {
fireChannelRead(msg);
}
}
@Override
public ChannelHandlerContext fireChannelRead(final Object msg) {
invokeChannelRead(findContextInbound(), msg);
return this;
}
//寻找下一个结点
private AbstractChannelHandlerContext findContextInbound() {
AbstractChannelHandlerContext ctx = this;
do {
ctx = ctx.next;
} while (!ctx.inbound);
return ctx;
}
}
(5)ChannelPipeline中的头结点和尾结点
HeadContext是一个同时属于Inbound类型和Outbound类型的ChannelHandler,TailContext则只是一个属于Inbound类型的ChannelHandler。
HeadContext结点的作用就是作为头结点开始传递读写事件并调用unsafe进行实际的读写操作。比如Channel读完一次数据后,HeadContext的channelReadComplete()方法会被调用。然后继续执行如下的调用流程:readIfAutoRead() -> channel.read() -> pipeline.read() -> HeadContext.read() -> unsafe.beginRead() -> 再次注册读事件。所以Channel读完一次数据后,会继续向Selector注册读事件。这样只要Channel活跃就可以连续不断地读取数据,然后数据又会通过ChannelPipeline传递到HeadContext结点。
TailContext结点的作用是通过让方法体为空来终止大部分事件的传播,它的exceptionCaugh()方法和channelRead()方法分别会发出告警日志以及释放到达该结点的对象。
public class DefaultChannelPipeline implements ChannelPipeline {
//ChannelPipeline的头结点
final AbstractChannelHandlerContext head;
//ChannelPipeline的尾结点
final AbstractChannelHandlerContext tail;
...
@Override
public final ChannelPipeline fireChannelRead(Object msg) {
AbstractChannelHandlerContext.invokeChannelRead(head, msg);
return this;
}
final class HeadContext extends AbstractChannelHandlerContext implements ChannelOutboundHandler, ChannelInboundHandler {
private final Unsafe unsafe;
HeadContext(DefaultChannelPipeline pipeline) {
super(pipeline, null, HEAD_NAME, false, true);
unsafe = pipeline.channel().unsafe();
setAddComplete();
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//调用AbstractChannelHandlerContext的fireChannelRead()方法
ctx.fireChannelRead(msg);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelReadComplete();
readIfIsAutoRead();
}
private void readIfIsAutoRead() {
if (channel.config().isAutoRead()) {
channel.read();
}
}
}
final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler {
TailContext(DefaultChannelPipeline pipeline) {
super(pipeline, null, TAIL_NAME, true, false);
setAddComplete();
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
onUnhandledInboundMessage(msg);
}
...
}
//Called once a message hit the end of the ChannelPipeline without been handled by the user in ChannelInboundHandler#channelRead(ChannelHandlerContext, Object).
//This method is responsible to call ReferenceCountUtil#release(Object) on the given msg at some point.
protected void onUnhandledInboundMessage(Object msg) {
try {
logger.debug("Discarded inbound message {} that reached at the tail of the pipeline. " + "Please check your pipeline configuration.", msg);
} finally {
ReferenceCountUtil.release(msg);
}
}
}
(6)Inbound事件的传播总结
一般用户自定义的ChannelInboundHandler都继承自ChannelInboundHandlerAdapter。如果用户代码没有覆盖ChannelInboundHandlerAdapter的channelXXX()方法,那么Inbound事件会从HeadContext开始遍历ChannelPipeline的双向链表进行传播,并默认情况下传播到TailContext结点。
如果用户代码覆盖了ChannelInboundHandlerAdapter的channelXXX()方法,那么事件传播就会在当前结点结束。所以如果此时这个ChannelHandler又忘记了手动释放业务对象ByteBuf,则可能会造成内存泄露,而SimpleChannelInboundHandler则可以帮用户自动释放业务对象。
如果用户代码调用了ChannelHandlerContext的fireXXX()方法来传播事件,那么该事件就从当前结点开始往下传播。
public class ChannelInboundHandlerAdapter extends ChannelHandlerAdapter implements ChannelInboundHandler {
//Calls ChannelHandlerContext#fireChannelRegistered() to forward to the next ChannelInboundHandler in the ChannelPipeline.
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelRegistered();
}
//Calls ChannelHandlerContext#fireChannelUnregistered() to forward to the next ChannelInboundHandler in the ChannelPipeline.
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelUnregistered();
}
//Calls ChannelHandlerContext#fireChannelActive() to forward to the next ChannelInboundHandler in the ChannelPipeline.
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelActive();
}
//Calls ChannelHandlerContext#fireChannelInactive() to forward to the next ChannelInboundHandler in the ChannelPipeline.
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelInactive();
}
//Calls ChannelHandlerContext#fireChannelRead(Object) to forward to the next ChannelInboundHandler in the ChannelPipeline.
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ctx.fireChannelRead(msg);
}
//Calls ChannelHandlerContext#fireChannelReadComplete() to forward to the next ChannelInboundHandler in the ChannelPipeline.
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelReadComplete();
}
//Calls ChannelHandlerContext#fireUserEventTriggered(Object) to forward to the next ChannelInboundHandler in the ChannelPipeline.
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
ctx.fireUserEventTriggered(evt);
}
//Calls ChannelHandlerContext#fireChannelWritabilityChanged() to forward to the next ChannelInboundHandler in the ChannelPipeline.
@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelWritabilityChanged();
}
//Calls ChannelHandlerContext#fireExceptionCaught(Throwable) to forward to the next ChannelHandler in the ChannelPipeline.
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.fireExceptionCaught(cause);
}
}
12.Outbound事件的传播
(1)触发Outbound事件传播的入口
在消息推送系统中,可能会有如下代码,意思是根据用户ID获得对应的Channel,然后向用户推送消息。
Channel channel = ChannelManager.getChannel(userId);
channel.writeAndFlush(response);
(2)Outbound事件传播的源码
如果通过Channel来传播Outbound事件,则是从TailContext开始传播的。
和Inbound事件一样,Netty为了保证程序的高效执行,所有核心操作都要在Reactor线程中处理。如果业务线程调用了Channel的方法,那么Netty会将该操作封装成一个Task任务添加到任务队列中,随后在Reactor线程的事件循环中执行。
findContextOutbound()方法找Outbound结点的过程和findContextInbound()方法找Inbound结点类似,需要反向遍历ChannelPipeline中的双向链表,一直遍历到第一个Outbound结点HeadCountext。
如果用户的ChannelHandler覆盖了Outbound类型的方法,但没有把事件在方法中继续传播下去,那么会导致该事件的传播中断。
最后一个Inbound结点是TailContext,最后一个Outbound结点是HeadContext,而数据最终会落到HeadContext的write()方法上。
下面是channel.writeAndFlush()方法的源码:
public interface ChannelOutboundInvoker {
...
//Shortcut for call #write(Object) and #flush().
ChannelFuture writeAndFlush(Object msg);
...
}
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
private final Channel parent;
private final ChannelId id;
private final Unsafe unsafe;
private final DefaultChannelPipeline pipeline;
...
protected AbstractChannel(Channel parent) {
this.parent = parent;
id = newId();
unsafe = newUnsafe();
pipeline = newChannelPipeline();
}
protected DefaultChannelPipeline newChannelPipeline() {
return new DefaultChannelPipeline(this);
}
@Override
public ChannelFuture writeAndFlush(Object msg) {
return pipeline.writeAndFlush(msg);
}
...
}
public class DefaultChannelPipeline implements ChannelPipeline {
final AbstractChannelHandlerContext head;
final AbstractChannelHandlerContext tail;
private final Channel channel;
protected DefaultChannelPipeline(Channel channel) {
this.channel = ObjectUtil.checkNotNull(channel, "channel");
tail = new TailContext(this);
head = new HeadContext(this);
head.next = tail;
tail.prev = head;
}
@Override
public final ChannelFuture writeAndFlush(Object msg) {
//从TailContext开始传播
//但TailContext没有重写writeAndFlush()方法
//所以会调用AbstractChannelHandlerContext的writeAndFlush()方法
return tail.writeAndFlush(msg);
}
...
}
abstract class AbstractChannelHandlerContext extends DefaultAttributeMap implements ChannelHandlerContext, ResourceLeakHint {
volatile AbstractChannelHandlerContext next;
volatile AbstractChannelHandlerContext prev;
...
@Override
public ChannelFuture writeAndFlush(Object msg) {
return writeAndFlush(msg, newPromise());
}
@Override
public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
if (msg == null) throw new NullPointerException("msg");
if (!validatePromise(promise, true)) {
ReferenceCountUtil.release(msg);
return promise;
}
write(msg, true, promise);
return promise;
}
private void write(Object msg, boolean flush, ChannelPromise promise) {
//反向遍历链表进行查找
AbstractChannelHandlerContext next = findContextOutbound();
final Object m = pipeline.touch(msg, next);
EventExecutor executor = next.executor();
//最终都会由Reactor线程处理Channel的数据读写
if (executor.inEventLoop()) {
if (flush) {
//调用结点的invokeWriteAndFlush()方法
next.invokeWriteAndFlush(m, promise);
} else {
next.invokeWrite(m, promise);
}
} else {
AbstractWriteTask task;
if (flush) {
task = WriteAndFlushTask.newInstance(next, m, promise);
} else {
task = WriteTask.newInstance(next, m, promise);
}
safeExecute(executor, task, promise, m);
}
}
private void invokeWriteAndFlush(Object msg, ChannelPromise promise) {
if (invokeHandler()) {
//逐个调用ChannelHandler结点的write()方法,但前提是当前ChannelHandler可以往下传
//即write()方法在最后也像ChannelOutboundHandlerAdapter那样,调用了ctx.write()往下传播
invokeWrite0(msg, promise);
//逐个调用ChannelHandler结点的flush()方法,但前提是当前ChannelHandler可以往下传
//即flush()方法在最后也像ChannelOutboundHandlerAdapter那样,调用了ctx.flush()往下传播
invokeFlush0();
} else {
writeAndFlush(msg, promise);
}
}
private void invokeWrite0(Object msg, ChannelPromise promise) {
try {
//逐个调用,最终回到HeadContext的write()方法
((ChannelOutboundHandler) handler()).write(this, msg, promise);
} catch (Throwable t) {
notifyOutboundHandlerException(t, promise);
}
}
private void invokeFlush0() {
try {
//逐个调用,最终回到HeadContext的flush()方法
((ChannelOutboundHandler) handler()).flush(this);
} catch (Throwable t) {
notifyHandlerException(t);
}
}
...
}
public class DefaultChannelPipeline implements ChannelPipeline {
...
final class HeadContext extends AbstractChannelHandlerContext implements ChannelOutboundHandler, ChannelInboundHandler {
private final Unsafe unsafe;
HeadContext(DefaultChannelPipeline pipeline) {
super(pipeline, null, HEAD_NAME, false, true);
unsafe = pipeline.channel().unsafe();
setAddComplete();
}
...
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
unsafe.write(msg, promise);
}
@Override
public void flush(ChannelHandlerContext ctx) throws Exception {
unsafe.flush();
}
}
...
}
//Skeleton implementation of a ChannelOutboundHandler. This implementation just forwards each method call via the ChannelHandlerContext.
public class ChannelOutboundHandlerAdapter extends ChannelHandlerAdapter implements ChannelOutboundHandler {
//Calls ChannelHandlerContext#bind(SocketAddress, ChannelPromise) to forward to the next ChannelOutboundHandler in the ChannelPipeline.
@Override
public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception {
ctx.bind(localAddress, promise);
}
//Calls ChannelHandlerContext#connect(SocketAddress, SocketAddress, ChannelPromise) to forward to the next ChannelOutboundHandler in the ChannelPipeline.
@Override
public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) throws Exception {
ctx.connect(remoteAddress, localAddress, promise);
}
//Calls ChannelHandlerContext#disconnect(ChannelPromise) to forward to the next ChannelOutboundHandler in the ChannelPipeline.
@Override
public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
ctx.disconnect(promise);
}
//Calls ChannelHandlerContext#close(ChannelPromise) to forward to the next ChannelOutboundHandler in the ChannelPipeline.
@Override
public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
ctx.close(promise);
}
//Calls ChannelHandlerContext#deregister(ChannelPromise) to forward to the next ChannelOutboundHandler in the ChannelPipeline.
@Override
public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
ctx.deregister(promise);
}
//Calls ChannelHandlerContext#read() to forward to the next ChannelOutboundHandler in the ChannelPipeline.
@Override
public void read(ChannelHandlerContext ctx) throws Exception {
ctx.read();
}
//Calls ChannelHandlerContext#write(Object, ChannelPromise)} to forward to the next ChannelOutboundHandler in the ChannelPipeline.
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
ctx.write(msg, promise);
}
//Calls ChannelHandlerContext#flush() to forward to the next ChannelOutboundHandler in the ChannelPipeline.
@Override
public void flush(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
}
(3)总结
Outbound事件的传播机制和Inbound事件的传播机制类似。但Outbound事件是从链表尾部开始向前传播,而Inbound事件是从链表头部开始向后传播。Outbound事件传播中的写数据,最终都会落到HeadContext结点中的unsafe进行处理。
13.ChannelPipeline中异常的传播
Inbound事件和Outbound事件在传播时发生异常都会调用notifyHandlerExecption()方法,该方法会按Inbound事件的传播顺序找每个结点的异常处理方法exceptionCaught()进行处理。
我们通常在自定义的ChannelHandler中实现一个处理异常的方法exceptionCaught(),统一处理ChannelPipeline过程中的所有异常。这个自定义ChannelHandler一般继承自ChannelDuplexHandler,表示该结点既是一个Inbound结点,又是一个Outbound结点。
如果我们在自定义的ChannelHandler中没有处理异常,由于ChannelHandler通常都继承了ChannelInboundHandlerAdapter,通过其默认实现的exceptionCaught()方法可知异常会一直往下传递,直到最后一个结点的异常处理方法exceptionCaught()中结束。因此如果异常处理方法exceptionCaught()在ChannelPipeline中间的结点实现,则该结点后面的ChannelHandler抛出的异常就没法处理了。所以一般会在ChannelHandler链表的末尾结点实现处理异常的方法exceptionCaught()。
需要注意的是:在任何结点中发生的异常都会向下一个结点进行传递。
public class DefaultChannelPipeline implements ChannelPipeline {
...
@Override
public final ChannelPipeline fireChannelRead(Object msg) {
AbstractChannelHandlerContext.invokeChannelRead(head, msg);
return this;
}
...
}
abstract class AbstractChannelHandlerContext extends DefaultAttributeMap implements ChannelHandlerContext, ResourceLeakHint {
...
@Override
public ChannelHandlerContext fireChannelRead(final Object msg) {
invokeChannelRead(findContextInbound(), msg);
return this;
}
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeChannelRead(m);
} else {
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeChannelRead(m);
}
});
}
}
private void invokeChannelRead(Object msg) {
if (invokeHandler()) {
try {
((ChannelInboundHandler) handler()).channelRead(this, msg);
} catch (Throwable t) {
notifyHandlerException(t);
}
} else {
fireChannelRead(msg);
}
}
private void notifyHandlerException(Throwable cause) {
if (inExceptionCaught(cause)) {
if (logger.isWarnEnabled()) {
logger.warn("...", cause);
}
return;
}
invokeExceptionCaught(cause);
}
private void invokeExceptionCaught(final Throwable cause) {
if (invokeHandler()) {
try {
//调用ChannelHandler的exceptionCaught()
handler().exceptionCaught(this, cause);
} catch (Throwable error) {
if (logger.isDebugEnabled()) {
logger.debug("...",
ThrowableUtil.stackTraceToString(error), cause);
} else if (logger.isWarnEnabled()) {
logger.warn("...", error, cause);
}
}
} else {
fireExceptionCaught(cause);
}
}
@Override
public ChannelHandlerContext fireExceptionCaught(final Throwable cause) {
//调用下一个结点next的exceptionCaught()方法
invokeExceptionCaught(next, cause);
return this;
}
static void invokeExceptionCaught(final AbstractChannelHandlerContext next, final Throwable cause) {
ObjectUtil.checkNotNull(cause, "cause");
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeExceptionCaught(cause);
} else {
try {
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeExceptionCaught(cause);
}
});
} catch (Throwable t) {
if (logger.isWarnEnabled()) {
logger.warn("Failed to submit an exceptionCaught() event.", t);
logger.warn("The exceptionCaught() event that was failed to submit was:", cause);
}
}
}
}
...
}
public class ChannelInboundHandlerAdapter extends ChannelHandlerAdapter implements ChannelInboundHandler {
...
//Calls ChannelHandlerContext#fireExceptionCaught(Throwable) to forward to the next ChannelHandler in the ChannelPipeline.
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.fireExceptionCaught(cause);
}
}
14.ChannelPipeline总结
(1)ChannelPipeline的初始化
ChannelPipeline在服务端Channel和客户端Channel被创建时创建,创建ChannelPipeline的类是服务端Channel和客户端Channel的共同父类AbstractChannel。
(2)ChannelPipeline的数据结构
ChannelPipeline中的数据结构是双向链表结构,每一个结点都是一个ChannelHandlerContext对象。ChannelHandlerContext里包装了用户自定义的ChannelHandler,即前者会保存后者的引用到其成员变量handler中。ChannelHandlerContext中拥有ChannelPipeline和Channel的所有上下文信息。添加和删除ChannelHandler最终都是在ChannelPipeline的链表结构中添加和删除对应的ChannelHandlerContext结点。
(3)ChannelHandler类型的判断
在旧版Netty中,会使用instanceof关键字来判断ChannelHandler的类型,并使用两个成员变量inbound和outbound来标识。在新版Netty中,会使用一个16位的二进制数executionMask来表示ChannelHandler具体实现的事件类型,若实现则给对应的位标1。
(4)ChannelPipeline的头尾结点
创建ChannelPipeline时会默认添加两个结点:HeadContext结点和TailContext结点。HeadContext结点的作用是作为头结点,开始传播读写事件,并且通过它的unsafe变量实现具体的读写操作。TailContext结点的作用是起到终止事件传播(方法体为空)以及异常和对象未处理的告警。
(5)Channel与Unsafe
一个Channel对应一个Unsafe,Unsafe用于处理底层IO操作。NioServerSocketChannel对应NioMessageUnsafe,NioSocketChannel对应NioByteUnsafe。
(6)ChannelPipeline的事件传播机制
ChannelPipeline中的事件传播机制分为3种:Inbound事件的传播、Outbound事件的传播、异常事件的传播。
一.Inbound事件的传播
如果通过Channel的Pipeline触发这类事件(默认情况下),那么触发的规则是从head结点开始不断寻找下一个InboundHandler,最终落到tail结点。如果在当前ChannelHandlerContext上触发这类事件,那么事件只会从当前结点开始向下传播。
二.Outbound事件的传播
如果通过Channel的Pipeline触发这类事件(默认情况下),那么触发的规则是从tail结点开始不断寻找上一个InboundHandler,最终落到head结点。如果在当前ChannelHandlerContext上触发这类事件,那么事件只会从当前结点开始向上传播。
三.异常事件的传播
异常在ChannelPipeline中的双向链表传播时,无论Inbound结点还是Outbound结点,都是向下一个结点传播,直到tail结点为止。TailContext结点会打印这些异常信息,最佳实践是在ChannelPipeline的最后实现异常处理方法exceptionCaught()。
文章转载自:东阳马生架构
原文链接:Netty源码—5.Pipeline和Handler - 东阳马生架构 - 博客园
体验地址:引迈 - JNPF快速开发平台_低代码开发平台_零代码开发平台_流程设计器_表单引擎_工作流引擎_软件架构