当前位置: 首页 > wzjs >正文

网站广告文案千锋教育培训多少钱费用

网站广告文案,千锋教育培训多少钱费用,新闻网站建设源码,大连网站建设开发大纲 1.Pipeline和Handler的作用和构成 2.ChannelHandler的分类 3.几个特殊的ChannelHandler 4.ChannelHandler的生命周期 5.ChannelPipeline的事件处理 6.关于ChannelPipeline的问题整理 7.ChannelPipeline主要包括三部分内容 8.ChannelPipeline的初始化 9.ChannelPi…

大纲

1.Pipeline和Handler的作用和构成

2.ChannelHandler的分类

3.几个特殊的ChannelHandler

4.ChannelHandler的生命周期

5.ChannelPipeline的事件处理

6.关于ChannelPipeline的问题整理

7.ChannelPipeline主要包括三部分内容

8.ChannelPipeline的初始化

9.ChannelPipeline添加ChannelHandler

10.ChannelPipeline删除ChannelHandler

11.Inbound事件的传播

12.Outbound事件的传播

13.ChannelPipeline中异常的传播

14.ChannelPipeline总结

1.Pipeline和Handler的作用和构成

(1)Pipeline和Handler的作用

(2)Pipeline和Handler的构成

(1)Pipeline和Handler的作用

可以在处理复杂的业务逻辑时避免if else的泛滥,可以实现对业务逻辑的模块化处理,不同的逻辑放置到单独的类中进行处理。最后将这些逻辑串联起来,形成一个完整的逻辑处理链。

Netty通过责任链模式来组织代码逻辑,能够支持逻辑的动态添加和删除,能够支持各类协议的扩展。

(2)Pipeline和Handler的构成

在Netty里,一个连接对应着一个Channel。这个Channel的所有处理逻辑都在一个叫ChannelPipeline的对象里,ChannelPipeline是双向链表结构,它和Channel之间是一对一的关系。

ChannelPipeline里的每个结点都是一个ChannelHandlerContext对象,这个ChannelHandlerContext对象能够获得和Channel相关的所有上下文信息。每个ChannelHandlerContext对象都包含一个逻辑处理器ChannelHandler,每个逻辑处理器ChannelHandler都处理一块独立的逻辑。

2.ChannelHandler的分类

ChannelHandler有两大子接口,分别为Inbound和Outbound类型:第一个子接口是ChannelInboundHandler,用于处理读数据逻辑,最重要的方法是channelRead()。第二个子接口是ChannelOutboundHandler,用于处理写数据逻辑,最重要的方法是write()。

这两个子接口默认的实现分别是:ChannelInboundHandlerAdapter和ChannelOutboundHandlerAdapter。它们分别实现了两个子接口的所有功能,在默认情况下会把读写事件传播到下一个Handler。

InboundHandler的事件通常只会传播到下一个InboundHandler,OutboundHandler的事件通常只会传播到下一个OuboundHandler,InboundHandler的执行顺序与实际addLast的添加顺序相同,OutboundHandler的执行顺序与实际addLast的添加顺序相反。

Inbound事件通常由IO线程触发,如TCP链路的建立事件、关闭事件、读事件、异常通知事件等。其触发方法一般带有fire字眼,如下所示:

ctx.fireChannelRegister()、

ctx.fireChannelActive()、

ctx.fireChannelRead()、

ctx.fireChannelReadComplete()、

ctx.fireChannelInactive()。

Outbound事件通常由用户主动发起的网络IO操作触发,如用户发起的连接操作、绑定操作、消息发送等操作。其触发方法一般如:ctx.bind()、ctx.connect()、ctx.write()、ctx.flush()、ctx.read()、ctx.disconnect()、ctx.close()。

3.几个特殊的ChannelHandler

(1)ChannelInboundHandlerAdapter

(2)ChannelOutboundHandlerAdapter

(3)ByteToMessageDecoder

(4)SimpleChannelInboundHandler

(5)MessageToByteEncoder

(1)ChannelInboundHandlerAdapter

ChannelInboundHandlerAdapter主要用于实现ChannelInboundHandler接口的所有方法,这样我们在继承它编写自己的ChannelHandler时就不需要实现ChannelHandler里的每种方法了,从而避免了直接实现ChannelHandler时需要实现其所有方法而导致代码显得冗余和臃肿。

//Handles an I/O event or intercepts an I/O operation, and forwards it to its next handler in its ChannelPipeline.
public interface ChannelHandler {//Gets called after the ChannelHandler was added to the actual context and it's ready to handle events.void handlerAdded(ChannelHandlerContext ctx) throws Exception;//Gets called after the ChannelHandler was removed from the actual context and it doesn't handle events anymore.void handlerRemoved(ChannelHandlerContext ctx) throws Exception;//Gets called if a Throwable was thrown.@Deprecatedvoid exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception;//Indicates that the same instance of the annotated ChannelHandler can be added to one or more ChannelPipelines multiple times without a race condition.@Inherited@Documented@Target(ElementType.TYPE)@Retention(RetentionPolicy.RUNTIME)@interface Sharable {// no value}
}//Skeleton implementation of a ChannelHandler.
public abstract class ChannelHandlerAdapter implements ChannelHandler {// Not using volatile because it's used only for a sanity check.boolean added;//Return true if the implementation is Sharable and so can be added to different ChannelPipelines.public boolean isSharable() {Class<?> clazz = getClass();Map<Class<?>, Boolean> cache = InternalThreadLocalMap.get().handlerSharableCache();Boolean sharable = cache.get(clazz);if (sharable == null) {sharable = clazz.isAnnotationPresent(Sharable.class);cache.put(clazz, sharable);}return sharable;}//Do nothing by default, sub-classes may override this method.@Overridepublic void handlerAdded(ChannelHandlerContext ctx) throws Exception {// NOOP}//Do nothing by default, sub-classes may override this method.@Overridepublic void handlerRemoved(ChannelHandlerContext ctx) throws Exception {// NOOP}//Calls ChannelHandlerContext#fireExceptionCaught(Throwable) to forward to the next ChannelHandler in the ChannelPipeline.@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {ctx.fireExceptionCaught(cause);}
}//ChannelHandler which adds callbacks for state changes. 
//This allows the user to hook in to state changes easily.
public interface ChannelInboundHandler extends ChannelHandler {//The Channel of the ChannelHandlerContext was registered with its EventLoopvoid channelRegistered(ChannelHandlerContext ctx) throws Exception;//The Channel of the ChannelHandlerContext was unregistered from its EventLoopvoid channelUnregistered(ChannelHandlerContext ctx) throws Exception;//The Channel of the ChannelHandlerContext is now activevoid channelActive(ChannelHandlerContext ctx) throws Exception;//The Channel of the ChannelHandlerContext was registered is now inactive and reached its end of lifetime.void channelInactive(ChannelHandlerContext ctx) throws Exception;//Invoked when the current Channel has read a message from the peer.void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception;//Invoked when the last message read by the current read operation has been consumed by #channelRead(ChannelHandlerContext, Object).//If ChannelOption#AUTO_READ is off, no further attempt to read an inbound data from the current Channel will be made until ChannelHandlerContext#read() is called.void channelReadComplete(ChannelHandlerContext ctx) throws Exception;//Gets called if an user event was triggered.void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception;//Gets called once the writable state of a Channel changed. //You can check the state with Channel#isWritable().void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception;//Gets called if a Throwable was thrown.@Override@SuppressWarnings("deprecation")void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception;
}//Abstract base class for ChannelInboundHandler implementations which provide implementations of all of their methods.
public class ChannelInboundHandlerAdapter extends ChannelHandlerAdapter implements ChannelInboundHandler {//Calls ChannelHandlerContext#fireChannelRegistered() to forward to the next ChannelInboundHandler in the ChannelPipeline.@Overridepublic void channelRegistered(ChannelHandlerContext ctx) throws Exception {ctx.fireChannelRegistered();}//Calls ChannelHandlerContext#fireChannelUnregistered() to forward to the next ChannelInboundHandler in the ChannelPipeline.@Overridepublic void channelUnregistered(ChannelHandlerContext ctx) throws Exception {ctx.fireChannelUnregistered();}//Calls ChannelHandlerContext#fireChannelActive() to forward to the next ChannelInboundHandler in the ChannelPipeline.@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {ctx.fireChannelActive();}//Calls ChannelHandlerContext#fireChannelInactive() to forward to the next ChannelInboundHandler in the ChannelPipeline.@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {ctx.fireChannelInactive();}//Calls ChannelHandlerContext#fireChannelRead(Object) to forward to the next ChannelInboundHandler in the ChannelPipeline.@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {ctx.fireChannelRead(msg);}//Calls ChannelHandlerContext#fireChannelReadComplete() to forward to the next ChannelInboundHandler in the ChannelPipeline.@Overridepublic void channelReadComplete(ChannelHandlerContext ctx) throws Exception {ctx.fireChannelReadComplete();}//Calls ChannelHandlerContext#fireUserEventTriggered(Object) to forward to the next ChannelInboundHandler in the ChannelPipeline.@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {ctx.fireUserEventTriggered(evt);}//Calls ChannelHandlerContext#fireChannelWritabilityChanged() to forward to the next ChannelInboundHandler in the ChannelPipeline.@Overridepublic void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {ctx.fireChannelWritabilityChanged();}//Calls ChannelHandlerContext#fireExceptionCaught(Throwable) to forward to the next ChannelHandler in the ChannelPipeline.@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {ctx.fireExceptionCaught(cause);}
}

(2)ChannelOutboundHandlerAdapter

ChannelOutboundHandlerAdapter主要用于实现ChannelOutboundHandler接口的所有方法,这样我们在继承它编写自己的ChannelHandler时就不需要实现ChannelHandler里的每种方法了,从而避免了直接实现ChannelHandler时需要实现其所有方法而导致代码显得冗余和臃肿。

//ChannelHandler which will get notified for IO-outbound-operations.
public interface ChannelOutboundHandler extends ChannelHandler {//Called once a bind operation is made.void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception;//Called once a connect operation is made.void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) throws Exception;//Called once a disconnect operation is made.void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;//Called once a close operation is made.void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;//Called once a deregister operation is made from the current registered EventLoop.void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;//Intercepts ChannelHandlerContext#read().void read(ChannelHandlerContext ctx) throws Exception;//Called once a write operation is made. The write operation will write the messages through the ChannelPipeline.//Those are then ready to be flushed to the actual Channel once Channel#flush() is called.void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception;//Called once a flush operation is made. The flush operation will try to flush out all previous written messages that are pending.void flush(ChannelHandlerContext ctx) throws Exception;
}//Skeleton implementation of a ChannelOutboundHandler. This implementation just forwards each method call via the ChannelHandlerContext.
public class ChannelOutboundHandlerAdapter extends ChannelHandlerAdapter implements ChannelOutboundHandler {//Calls ChannelHandlerContext#bind(SocketAddress, ChannelPromise) to forward to the next ChannelOutboundHandler in the ChannelPipeline.@Overridepublic void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception {ctx.bind(localAddress, promise);}//Calls ChannelHandlerContext#connect(SocketAddress, SocketAddress, ChannelPromise) to forward to the next ChannelOutboundHandler in the ChannelPipeline. @Overridepublic void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) throws Exception {ctx.connect(remoteAddress, localAddress, promise);}//Calls ChannelHandlerContext#disconnect(ChannelPromise) to forward to the next ChannelOutboundHandler in the ChannelPipeline.@Overridepublic void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {ctx.disconnect(promise);}//Calls ChannelHandlerContext#close(ChannelPromise) to forward to the next ChannelOutboundHandler in the ChannelPipeline.@Overridepublic void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {ctx.close(promise);}//Calls ChannelHandlerContext#deregister(ChannelPromise) to forward to the next ChannelOutboundHandler in the ChannelPipeline.@Overridepublic void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {ctx.deregister(promise);}//Calls ChannelHandlerContext#read() to forward to the next ChannelOutboundHandler in the ChannelPipeline.@Overridepublic void read(ChannelHandlerContext ctx) throws Exception {ctx.read();}//Calls ChannelHandlerContext#write(Object, ChannelPromise)} to forward to the next ChannelOutboundHandler in the ChannelPipeline.@Overridepublic void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {ctx.write(msg, promise);}//Calls ChannelHandlerContext#flush() to forward to the next ChannelOutboundHandler in the ChannelPipeline.@Overridepublic void flush(ChannelHandlerContext ctx) throws Exception {ctx.flush();}
}

(3)ByteToMessageDecoder

基于这个ChannelHandler可以实现自定义解码,而不用关心ByteBuf的强转和解码结果的传递。Netty里的ByteBuf默认下使用的是堆外内存,ByteToMessageDecoder会自动进行内存的释放,不用操心内存管理。我们自定义的ChannelHandler继承了ByteToMessageDecoder后,需要实现decode()方法。

(4)SimpleChannelInboundHandler

基于这个ChannelHandler可以实现每一种指令的处理,不再需要强转、不再有冗长的if else逻辑、不再需要手动传递对象。

同时还可以自动释放没有往下传播的ByteBuf,因为我们编写指令处理ChannelHandler时,可能会编写不用关心的if else判断,然后手动传递无法处理的对象至下一个指令处理器。

//xxxHandler.java
if (packet instanceof xxxPacket) {//进行处理
} else {ctx.fireChannelRead(packet);
}

(5)MessageToByteEncoder

基于这个ChannelHandler可以实现自定义编码,而不用关心ByteBuf的创建,不用把创建完的ByteBuf进行返回。

4.ChannelHandler的生命周期

(1)ChannelHandler回调方法的执行顺序

ChannelHandler回调方法的执行顺序可以称为ChannelHandler的生命周期。

新建连接时ChannelHandler回调方法的执行顺序是:handlerAdded() -> channelRegistered() -> channelActive() -> channelRead() -> channelReadComplete()。

关闭连接时ChannelHandler回调方法的执行顺序是:channelInactive() -> channelUnregistered() -> handlerRemoved()。

接下来是ChannelHandler具体的回调方法说明,其中一二三的顺序可以参考AbstractChannel的内部类AbstractUnsafe的register0()方法。

一.handlerAdded()

检测到新连接后调用"ch.pipeline().addLast(...)"之后的回调,表示当前Channel已成功添加一个ChannelHandler。

二.channelRegistered()

表示当前Channel已和某个NioEventLoop线程建立了绑定关系,已经创建了一个Reactor线程来处理当前这个Channel的读写。

三.channelActive()

当Channel的Pipeline已经添加完所有的ChannelHandler以及绑定好一个NioEventLoop线程,这个Channel对应的连接才算真正被激活,接下来就会回调该方法。

四.channelRead()

服务端每次收到客户端发送的数据时都会回调该方法,表示有数据可读。

五.channelReadComplete()

服务端每次读完一条完整的数据都会回调该方法,表示数据读取完毕。

六.channelInactive()

表示这个连接已经被关闭,该连接在TCP层已经不再是ESTABLISH状态。

七.channelUnregister()

表示与这个连接对应的NioEventLoop线程移除了对这个连接的处理。

八.handlerRemoved()

表示给这个连接添加的所有的ChannelHandler都被移除了。

(2)ChannelHandler回调方法的应用场景

一.handlerAdded()方法与handlerRemoved()方法通常可用于一些资源的申请和释放。

二.channelActive()方法与channelInactive()方法表示的是TCP连接的建立与释放,可用于统计单机连接数或IP过滤。

三.channelRead()方法可用于根据自定义协议进行拆包。每次读到一定数据就累加到一个容器里,然后看看能否拆出完整的包。

四.channelReadComplete()方法可用于实现批量刷新。如果每次向客户端写数据都通过writeAndFlush()方法写数据并刷新到底层,其实并不高效。所以可以把调用writeAndFlush()方法的地方换成调用write()方法,然后再在channelReadComplete()方法里调用ctx.channel().flush()。

5.ChannelPipeline的事件处理

(1)消息读取和发送被Pipeline处理的过程

(2)ChannelPipeline的主要特征

(1)消息读取和发送被Pipeline处理的过程

消息的读取和发送被ChannelPipeline的ChannelHandler链拦截和处理的全过程:

一.首先AbstractNioChannel内部类NioUnsafe的read()方法读取ByteBuf时会触发ChannelRead事件,也就是由NioEventLoop线程调用ChannelPipeline的fireChannelRead()方法将ByteBuf消息传输到ChannelPipeline中。

二.然后ByteBuf消息会依次被HeadContext、xxxChannelHandler、...、TailContext拦截处理。在这个过程中,任何ChannelHandler都可以中断当前的流程,结束消息的传递。

三.接着用户可能会调用ChannelHandlerContext的write()方法发送ByteBuf消息。此时ByteBuf消息会从TailContext开始,途径xxxChannelHandler、...、HeadContext,最终被添加到消息发送缓冲区中等待刷新和发送。在这个过程中,任何ChannelHandler都可以中断当前的流程,中断消息的传递。

(2)ChannelPipeline的主要特征

一.ChannelPipeline支持运行时动态地添加或者删除ChannelHandler

例如业务高峰时对系统做拥塞保护。处于业务高峰期时,则动态地向当前的ChannelPipeline添加ChannelHandler。高峰期过后,再移除ChannelHandler。

二.ChannelPipeline是线程安全的

多个业务线程可以并发操作ChannelPipeline,因为使用了synchronized关键字。但ChannelHandler却不一定是线程安全的,这由用户保证。

6.关于ChannelPipeline的问题整理

一.Netty是如何判断ChannelHandler类型的?

即如何判断一个ChannelHandler是Inbound类型还是Outbound类型?

答:当调用Pipeline去添加一个ChannelHandler结点时,旧版Netty会使用instanceof关键字来判断该结点是Inbound类型还是Outbound类型,并分别用一个布尔类型的变量来进行标识。新版Netty则使用一个整形的executionMask来具体区分详细的Inbound事件和Outbound事件。这个executionMask对应一个16位的二进制数,是哪一种事件就对应哪一个Mask。

//Inbound事件的Mask
MASK_EXEPTION_CAUGHT = 1;
MASK_CHANNEL_REGISTER = 1 << 1;
MASK_CHANNEL_UNREGISTER = 1 << 2;
MASK_CHANNEL_ACTIVE = 1 << 3;
MASK_CHANNEL_INACTIVE = 1 << 4;
MASK_CHANNEL_READ = 1 << 5;
MASK_CHANNEL_READ_COMPLETE = 1 << 6;
MASK_CHANNEL_USER_EVENT_TRIGGERED = 1 << 7;
MASK_CHANNEL_WRITABLITY_CHANGED = 1 << 8;//Outbound事件的Mask
MASK_BIND = 1 << 9;
MASK_CONNECT = 1 << 10;
MASK_DISCONNECT = 1 << 11;
MASK_CLOSE = 1 << 12;
MASK_DEREGISTER = 1 << 13;
MASK_READ = 1 << 14;
MASK_WRITE = 1 << 15;
MASK_FLUSH = 1 << 16;

二.添加ChannelHandler时应遵循什么样的顺序?

答:Inbound类型的事件传播跟添加ChannelHandler的顺序一样,Outbound类型的事件传播跟添加ChannelHandler的顺序相反。

三.用户手动触发事件传播的两种方式有什么区别?

这两种方式是分别是:ctx.writeAndFlush()和ctx.channel().writeAndFlush()。

答:当通过Channel去触发一个事件时,那么该事件会沿整个ChannelPipeline传播。如果是Inbound类型事件,则从HeadContext结点开始向后传播到最后一个Inbound类型的结点。如果是Outbound类型事件,则从TailContext结点开始向前传播到第一个Outbound类型的结点。当通过当前结点去触发一个事件时,那么该事件只会从当前结点开始传播。如果是Inbound类型事件,则从当前结点开始一直向后传播到最后一个Inbound类型的结点。如果是Outbound类型事件,则从当前结点开始一直向前传播到第一个Outbound类型的结点。

7.ChannelPipeline主要包括三部分内容

一.ChannelPipeline的初始化

服务端Channel和客户端Channel在何时初始化ChannelPipeline?在初始化时又做了什么事情?

二.添加和删除ChannelHandler

Netty是如何实现业务逻辑处理器动态编织的?

三.事件和异常的传播

读写事件和异常在ChannelPipeline中的传播。

8.ChannelPipeline的初始化

(1)ChannelPipeline的初始化时机

(2)ChannelPipeline的初始化内容

(3)ChannelPipeline的说明

(1)ChannelPipeline的初始化时机

在服务端启动和客户端连接接入的过程中,在创建NioServerSocketChannel和NioSocketChannel时,会逐层执行父类的构造方法,最后执行到AbstractChannel的构造方法。AbstractChannel的构造方法会将Netty的核心组件创建出来。而核心组件中就包含了DefaultChannelPipeline类型的ChannelPipeline组件。

//A skeletal Channel implementation.
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel { private final Channel parent;private final ChannelId id;private final Unsafe unsafe;private final DefaultChannelPipeline pipeline;...//Creates a new instance.protected AbstractChannel(Channel parent) {this.parent = parent;id = newId();unsafe = newUnsafe();pipeline = newChannelPipeline();}//Returns a new DefaultChannelPipeline instance.protected DefaultChannelPipeline newChannelPipeline() {//创建ChannelPipeline组件return new DefaultChannelPipeline(this);}...
}//The default ChannelPipeline implementation.  
//It is usually created by a Channel implementation when the Channel is created.
public class DefaultChannelPipeline implements ChannelPipeline {final AbstractChannelHandlerContext head;final AbstractChannelHandlerContext tail;private final Channel channel;//保存了Channel的引用...protected DefaultChannelPipeline(Channel channel) {//保存Channel的引用到Pipeline组件的成员变量this.channel = ObjectUtil.checkNotNull(channel, "channel");...tail = new TailContext(this);head = new HeadContext(this);head.next = tail;tail.prev = head;}...
}

(2)ChannelPipeline的初始化内容

ChannelPipeline的初始化主要涉及三部分内容:

一.Pipeline在创建Channel时被创建

二.Pipeline的结点是ChannelHandlerContext

三.Pipeline两大哨兵HeadContext和TailContext

(3)ChannelPipeline的说明

ChannelPipeline中保存了Channel的引用,ChannelPipeline中每个结点都是一个ChannelHandlerContext对象,每个ChannelHandlerContext结点都包裹着一个ChannelHandler执行器,每个ChannelHandlerContext结点都保存了它包裹的执行器ChannelHandler执行操作时所需要的上下文ChannelPipeline。由于ChannelPipeline又保存了Channel的引用,所以每个ChannelHandlerContext结点都可以拿到所有的上下文信息。

ChannelHandlerContext接口多继承自AttributeMap、ChannelInboundInvoker、ChannelOutboundInvoker。

ChannelHandlerContext的关键方法有:channel()、executor()、handler()、pipeline()、alloc()。ChannelHandlerContext默认是由AbstractChannelHandlerContext去实现的,它实现了大部分功能。

ChannelPipeline初始化时会初始化两个结点:HeadContext和TailContext,并构成双向链表。HeadContext结点会比TailContext结点多一个unsafe成员变量。

public class DefaultChannelPipeline implements ChannelPipeline {//ChannelPipeline中每个结点都是一个ChannelHandlerContext对象final AbstractChannelHandlerContext head;final AbstractChannelHandlerContext tail;private final Channel channel;//ChannelPipeline中保存了Channel的引用...protected DefaultChannelPipeline(Channel channel) {//保存Channel的引用到Pipeline组件的成员变量this.channel = ObjectUtil.checkNotNull(channel, "channel");...//ChannelPipeline初始化时会初始化两个结点:HeadContext和TailContext,并构成双向链表tail = new TailContext(this);head = new HeadContext(this);head.next = tail;tail.prev = head;}final class HeadContext extends AbstractChannelHandlerContext implements ChannelOutboundHandler, ChannelInboundHandler {//HeadContext结点会比TailContext结点多一个unsafe成员变量private final Unsafe unsafe;HeadContext(DefaultChannelPipeline pipeline) {super(pipeline, null, HEAD_NAME, false, true);unsafe = pipeline.channel().unsafe();setAddComplete();}...}final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler {TailContext(DefaultChannelPipeline pipeline) {super(pipeline, null, TAIL_NAME, true, false);setAddComplete();}...}...
}//ChannelHandlerContext默认是由AbstractChannelHandlerContext去实现的,它实现了大部分功能
abstract class AbstractChannelHandlerContext extends DefaultAttributeMap implements ChannelHandlerContext, ResourceLeakHint { //每个ChannelHandlerContext结点都保存了它包裹的执行器ChannelHandler执行操作时所需要的上下文ChannelPipelineprivate final DefaultChannelPipeline pipeline;...AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, String name, boolean inbound, boolean outbound) {this.name = ObjectUtil.checkNotNull(name, "name");this.pipeline = pipeline;this.executor = executor;this.inbound = inbound;this.outbound = outbound;ordered = executor == null || executor instanceof OrderedEventExecutor;}
}public interface ChannelHandlerContext extends AttributeMap, ChannelInboundInvoker, ChannelOutboundInvoker {//Return the Channel which is bound to the ChannelHandlerContext.Channel channel();//Returns the EventExecutor which is used to execute an arbitrary task.EventExecutor executor();//The unique name of the ChannelHandlerContext.//The name was used when then ChannelHandler was added to the ChannelPipeline. //This name can also be used to access the registered ChannelHandler from the ChannelPipeline.String name();//The ChannelHandler that is bound this ChannelHandlerContext.ChannelHandler handler();//Return true if the ChannelHandler which belongs to this context was removed from the ChannelPipeline. //Note that this method is only meant to be called from with in the EventLoop.boolean isRemoved();ChannelHandlerContext fireChannelRegistered();ChannelHandlerContext fireChannelUnregistered();ChannelHandlerContext fireChannelActive();ChannelHandlerContext fireChannelInactive();ChannelHandlerContext fireExceptionCaught(Throwable cause);ChannelHandlerContext fireUserEventTriggered(Object evt);ChannelHandlerContext fireChannelRead(Object msg);ChannelHandlerContext fireChannelReadComplete();ChannelHandlerContext fireChannelWritabilityChanged();ChannelHandlerContext read();ChannelHandlerContext flush();//Return the assigned ChannelPipelineChannelPipeline pipeline();//Return the assigned ByteBufAllocator which will be used to allocate ByteBufs.ByteBufAllocator alloc();...
}

9.ChannelPipeline添加ChannelHandler

(1)常见的客户端代码

(2)ChannelPipeline添加ChannelHandler入口

(3)DefaultChannelPipeline的addLast()方法

(4)检查是否重复添加ChannelHandler结点

(5)创建ChannelHandlerContext结点

(6)添加ChannelHandlerContext结点

(7)回调handerAdded()方法

(8)ChannelPipeline添加ChannelHandler总结

(1)常见的客户端代码

首先用一个拆包器Spliter对二进制数据流进行拆包,然后解码器Decoder会将拆出来的包进行解码,接着业务处理器BusinessHandler会处理解码出来的Java对象,最后编码器Encoder会将业务处理完的结果编码成二进制数据进行输出。

bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch) throws Exception {ChannelPipeline p = ch.pipeline();   p.addLast(newSpliter());p.addLast(new Decoder());p.addLast(new BusinessHandler());p.addLast(new Encoder());          }
});

整个ChannelPipeline的结构如下所示:

图片

这里共有两种不同类型的结点,结点之间通过双向链表连接。一种是ChannelInboundHandler,用来处理Inbound事件,比如读取数据流进行加工处理。一种是ChannelOutboundHandler,用来处理Outbound事件,比如当调用writeAndFlush()方法时就会经过这种类型的Handler。

(2)ChannelPipeline添加ChannelHandler入口

当服务端Channel的Reactor线程轮询到新连接接入的事件时,就会调用AbstractNioChannel的内部类NioUnsafe的read()方法,也就是调用AbstractNioMessageChannel的内部类NioMessageUnsafe的read()方法。

然后会触发执行代码pipeline.fireChannelRead()传播ChannelRead事件,从而最终触发调用ServerBootstrapAcceptor接入器的channelRead()方法。

在ServerBootstrapAcceptor的channelRead()方法中,便会通过执行代码channel.pipeline().addLast()添加ChannelHandler,也就是通过调用DefaultChannelPipeline的addLast()方法添加ChannelHandler。

//SingleThreadEventLoop implementation which register the Channel's to a Selector and so does the multi-plexing of these in the event loop.
public final class NioEventLoop extends SingleThreadEventLoop {Selector selector;private SelectedSelectionKeySet selectedKeys;private boolean needsToSelectAgain;private int cancelledKeys;...@Overrideprotected void run() {for (;;) {...//1.调用select()方法执行一次事件轮询select(wakenUp.getAndSet(false));if (wakenUp.get()) {selector.wakeup();}...//2.处理产生IO事件的ChannelneedsToSelectAgain = false;processSelectedKeys();...//3.执行外部线程放入TaskQueue的任务runAllTasks(ioTime * (100 - ioRatio) / ioRatio);}}private void processSelectedKeys() {if (selectedKeys != null) {//selectedKeys.flip()会返回一个数组processSelectedKeysOptimized(selectedKeys.flip());} else {processSelectedKeysPlain(selector.selectedKeys());}}private void processSelectedKeysOptimized(SelectionKey[] selectedKeys) {for (int i = 0;; i ++) {//1.首先取出IO事件final SelectionKey k = selectedKeys[i];if (k == null) {break;}selectedKeys[i] = null;//Help GC//2.然后获取对应的Channel和处理该Channel//默认情况下,这个a就是NioChannel,也就是服务端启动时经过Netty封装的Channelfinal Object a = k.attachment();if (a instanceof AbstractNioChannel) {//网络事件的处理processSelectedKey(k, (AbstractNioChannel) a);} else {//NioTask主要用于当一个SelectableChannel注册到Selector时,执行的一些任务NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;processSelectedKey(k, task);}//3.最后判断是否应该再进行一次轮询if (needsToSelectAgain) {for (;;) {i++;if (selectedKeys[i] == null) {break;}selectedKeys[i] = null;}selectAgain();//selectedKeys.flip()会返回一个数组selectedKeys = this.selectedKeys.flip();i = -1;}}}private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();...try {int readyOps = k.readyOps();...//boss的Reactor线程已经轮询到有ACCEPT事件,即表明有新连接接入//此时将调用Channel的unsafe变量来进行实际操作if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {//调用AbstractNioMessageChannel的NioMessageUnsafe.read()方法//进行新连接接入处理unsafe.read();if (!ch.isOpen()) {return;}}} catch (CancelledKeyException ignored) {unsafe.close(unsafe.voidPromise());}}...
}//AbstractNioChannel base class for Channels that operate on messages.
public abstract class AbstractNioMessageChannel extends AbstractNioChannel {...private final class NioMessageUnsafe extends AbstractNioUnsafe {//临时存放读到的连接NioSocketChannelprivate final List<Object> readBuf = new ArrayList<Object>();@Overridepublic void read() {//断言确保该read()方法必须来自Reactor线程调用assert eventLoop().inEventLoop();//获得Channel对应的Pipelinefinal ChannelPipeline pipeline = pipeline();//获得Channel对应的RecvByteBufAllocator.Handlefinal RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();do {//1.调用NioServerSocketChannel的doReadMessages()方法创建NioSocketChannel//通过JDK的accept()方法去创建JDK Channel,然后把它包装成Netty自定义的Channelint localRead = doReadMessages(readBuf);if (localRead == 0) {break;}} while (allocHandle.continueReading());//控制连接的接入速率,默认一次性读取16个连接//2.设置并绑定NioSocketChannelint size = readBuf.size();for (int i = 0; i < size; i ++) {//调用DefaultChannelPipeline的fireChannelRead()方法pipeline.fireChannelRead(readBuf.get(i));}//3.清理容器并触发DefaultChannelPipeline的fireChannelReadComplete()方法readBuf.clear();pipeline.fireChannelReadComplete();}}...
}//The default ChannelPipeline implementation.  
//It is usually created by a Channel implementation when the Channel is created.
public class DefaultChannelPipeline implements ChannelPipeline {final AbstractChannelHandlerContext head;final AbstractChannelHandlerContext tail;...protected DefaultChannelPipeline(Channel channel) {...tail = new TailContext(this);head = new HeadContext(this);head.next = tail;tail.prev = head;}@Overridepublic final ChannelPipeline fireChannelRead(Object msg) {//从Pipeline的第一个HeadContext处理器开始调用AbstractChannelHandlerContext.invokeChannelRead(head, msg);return this;}final class HeadContext extends AbstractChannelHandlerContext implements ChannelOutboundHandler, ChannelInboundHandler {...@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {//调用AbstractChannelHandlerContext的fireChannelRead()方法ctx.fireChannelRead(msg);}@Overridepublic ChannelHandler handler() {return this;}...}...
}public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel> {...//初始化服务端Channel时,会向其Pipeline添加ServerBootstrapAcceptor处理器@Overridevoid init(Channel channel) throws Exception {//1.设置服务端Channel的Option与Attrfinal Map<ChannelOption<?>, Object> options = options0();synchronized (options) {channel.config().setOptions(options);}final Map<AttributeKey<?>, Object> attrs = attrs0();synchronized (attrs) {for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {@SuppressWarnings("unchecked")AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();channel.attr(key).set(e.getValue());}}//2.设置客户端Channel的Option与Attrfinal EventLoopGroup currentChildGroup = childGroup;final ChannelHandler currentChildHandler = childHandler;final Entry<ChannelOption<?>, Object>[] currentChildOptions;final Entry<AttributeKey<?>, Object>[] currentChildAttrs;synchronized (childOptions) {currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));}synchronized (childAttrs) {currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));}//3.配置服务端启动逻辑ChannelPipeline p = channel.pipeline();//p.addLast()用于定义服务端启动过程中需要执行哪些逻辑p.addLast(new ChannelInitializer<Channel>() {@Overridepublic void initChannel(Channel ch) throws Exception {//一.添加用户自定义的Handler,注意这是handler,而不是childHandlerfinal ChannelPipeline pipeline = ch.pipeline();ChannelHandler handler = config.handler();if (handler != null) pipeline.addLast(handler);//二.添加一个特殊的Handler用于接收新连接//自定义的childHandler会作为参数传入连接器ServerBootstrapAcceptorch.eventLoop().execute(new Runnable() {@Overridepublic void run() {//调用DefaultChannelPipeline的addLast()方法pipeline.addLast(new ServerBootstrapAcceptor(currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));}});}});}private static class ServerBootstrapAcceptor extends ChannelInboundHandlerAdapter {private final EventLoopGroup childGroup;private final ChannelHandler childHandler;private final Entry<ChannelOption<?>, Object>[] childOptions;private final Entry<AttributeKey<?>, Object>[] childAttrs;...//channelRead()方法在新连接接入时被调用@Override@SuppressWarnings("unchecked")public void channelRead(ChannelHandlerContext ctx, Object msg) {final Channel child = (Channel) msg;//1.给新连接的Channel添加用户自定义的Handler处理器//这里的childHandler其实是一个特殊的Handler: ChannelInitializerchild.pipeline().addLast(childHandler);//2.设置ChannelOption,主要和TCP连接一些底层参数及Netty自身对一个连接的参数有关for (Entry<ChannelOption<?>, Object> e: childOptions) {if (!child.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) {logger.warn("Unknown channel option: " + e);}}//3.设置新连接Channel的属性for (Entry<AttributeKey<?>, Object> e: childAttrs) {child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());}//4.绑定Reactor线程//childGroup是一个NioEventLoopGroup,所以下面会调用其父类的register()方法childGroup.register(child);}...}...
}

(3)DefaultChannelPipeline的addLast()方法

使用synchronized关键字是为了防止多线程并发操作ChannelPipeline底层的双向链表,添加ChannelHandler结点的过程主要分为4个步骤:

步骤一:判断ChannelHandler是否重复添加

步骤二:创建结点

步骤三:添加结点到链表

步骤四:回调添加完成事件

这个结点便是ChannelHandlerContext,Pipeline里每个结点都是一个ChannelHandlerContext。addLast()方法便是把ChannelHandler包装成一个ChannelHandlerContext,然后添加到链表。

public class DefaultChannelPipeline implements ChannelPipeline {...@Overridepublic final ChannelPipeline addLast(ChannelHandler... handlers) {return addLast(null, handlers);}@Overridepublic final ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) {if (handlers == null) throw new NullPointerException("handlers");for (ChannelHandler h: handlers) {if (h == null) break;addLast(executor, null, h);}return this;}@Overridepublic final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {final AbstractChannelHandlerContext newCtx;synchronized (this) {//1.检查是否有重复的ChannelHandler结点checkMultiplicity(handler);//2.创建ChannelHandlerContext结点newCtx = newContext(group, filterName(name, handler), handler);//3.添加ChannelHandlerContext结点addLast0(newCtx);...}//4.回调用户方法//通过这个方法告诉用户这个ChannelHandler已添加完成,用户在回调方法里可以处理事情了callHandlerAdded0(newCtx);return this;}...
}

(4)检查是否重复添加ChannelHandler结点

Netty使用了一个成员变量added来表示一个ChannelHandler是否已经添加。如果当前要添加的ChannelHandler是非共享的并且已经添加过,那么抛出异常,否则标识该ChannelHandler已添加。

如果一个ChannelHandler支持共享,那么它就可以无限次被添加到ChannelPipeline中。如果要让一个ChannelHandler支持共享,只需要加一个@Sharable注解即可。而ChannelHandlerAdapter的isSharable()方法正是通过判断该ChannelHandler对应的类是否标有@Sharable注解来实现的。

Netty为了性能优化,还使用了ThreadLocal来缓存ChannelHandler是否共享的情况。在高并发海量连接下,每次有新连接添加ChannelHandler都会调用isSharable()方法,从而优化性能。

public class DefaultChannelPipeline implements ChannelPipeline {...private static void checkMultiplicity(ChannelHandler handler) {if (handler instanceof ChannelHandlerAdapter) {ChannelHandlerAdapter h = (ChannelHandlerAdapter) handler;if (!h.isSharable() && h.added) {throw new ChannelPipelineException(h.getClass().getName() +" is not a @Sharable handler, so can't be added or removed multiple times.");}h.added = true;}}...
}//Skeleton implementation of a ChannelHandler.
public abstract class ChannelHandlerAdapter implements ChannelHandler {//Not using volatile because it's used only for a sanity check.boolean added;//Return true if the implementation is Sharable and so can be added to different ChannelPipelines.public boolean isSharable() {//Cache the result of Sharable annotation detection to workaround a condition. //We use a ThreadLocal and WeakHashMap to eliminate the volatile write/reads. //Using different WeakHashMap instances per Thread is good enough for us and the number of Threads are quite limited anyway.Class<?> clazz = getClass();Map<Class<?>, Boolean> cache = InternalThreadLocalMap.get().handlerSharableCache();Boolean sharable = cache.get(clazz);if (sharable == null) {sharable = clazz.isAnnotationPresent(Sharable.class);cache.put(clazz, sharable);}return sharable;}...
}

(5)创建ChannelHandlerContext结点

根据ChannelHandler创建ChannelHandlerContext类型的结点时,会将该ChannelHandler的引用保存到结点的成员变量中。

public class DefaultChannelPipeline implements ChannelPipeline {...    @Overridepublic final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {final AbstractChannelHandlerContext newCtx;synchronized (this) {//1.检查是否有重复的ChannelHandler结点checkMultiplicity(handler);//2.创建ChannelHandlerContext结点newCtx = newContext(group, filterName(name, handler), handler);//3.添加ChannelHandlerContext结点addLast0(newCtx);...}//4.回调用户方法//通过这个方法告诉用户这个ChannelHandler已添加完成,用户在回调方法里可以处理事情了callHandlerAdded0(newCtx);return this;}//给ChannelHandler创建一个唯一性的名字private String filterName(String name, ChannelHandler handler) {if (name == null) {return generateName(handler);}checkDuplicateName(name);return name;}//根据ChannelHandler创建一个ChannelHandlerContext结点private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) {return new DefaultChannelHandlerContext(this, childExecutor(group), name, handler);}...
}final class DefaultChannelHandlerContext extends AbstractChannelHandlerContext {private final ChannelHandler handler;DefaultChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, String name, ChannelHandler handler) {super(pipeline, executor, name, isInbound(handler), isOutbound(handler));if (handler == null) {throw new NullPointerException("handler");}this.handler = handler;}...
}abstract class AbstractChannelHandlerContext extends DefaultAttributeMap implements ChannelHandlerContext, ResourceLeakHint {...AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, String name, boolean inbound, boolean outbound) {this.name = ObjectUtil.checkNotNull(name, "name");this.pipeline = pipeline;this.executor = executor;this.inbound = inbound;this.outbound = outbound;ordered = executor == null || executor instanceof OrderedEventExecutor;}...
}

(6)添加ChannelHandlerContext结点

使用尾插法向双向链表添加结点。

public class DefaultChannelPipeline implements ChannelPipeline {...private void addLast0(AbstractChannelHandlerContext newCtx) {AbstractChannelHandlerContext prev = tail.prev;newCtx.prev = prev;newCtx.next = tail;prev.next = newCtx;tail.prev = newCtx;}...
}abstract class AbstractChannelHandlerContext extends DefaultAttributeMap implements ChannelHandlerContext, ResourceLeakHint {volatile AbstractChannelHandlerContext next;volatile AbstractChannelHandlerContext prev;...
}

(7)回调handerAdded()方法

向ChannelPipeline添加完新结点后,会使用CAS修改结点的状态为ADD_COMPLETE表示结点添加完成,然后执行ctx.handler().handlerAdded(ctx),回调用户在这个要添加的ChannelHandler中实现的handerAdded()方法。

public class DefaultChannelPipeline implements ChannelPipeline {...private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {//使用CAS修改结点的状态为ADD_COMPLETE表示结点添加完成ctx.setAddComplete();//回调用户在这个要添加的ChannelHandler中实现的handerAdded()方法ctx.handler().handlerAdded(ctx);}...
}//DemoHandler是用户定义的ChannelHandler
public class DemoHandler extends SimpleChannelInboundHandler<...> {@Overridepublic void handlerAdded(ChannelHandlerContext ctx) throws Exception {//这个DemoHandler结点被添加到ChannelPipeline之后,就会回调这里的方法}...
}

最典型的一个回调就是用户代码的ChannelInitializer被添加完成后,会先调用其initChannel()方法将用户自定义的ChannelHandler添加到ChannelPipeline,然后再调用pipeline.remove()方法将自身结点进行删除。

public class NettyServer {private int port;public NettyServer(int port) {this.port = port;}public void start() throws Exception {EventLoopGroup bossGroup = new NioEventLoopGroup();EventLoopGroup workerGroup = new NioEventLoopGroup();try {ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)//监听端口的ServerSocketChannel.option(ChannelOption.SO_BACKLOG, 128).childOption(ChannelOption.SO_KEEPALIVE, true)//设置一个ChannelInitializer类型的childHandler//新连接接入时,会执行ServerBootstrapAcceptor.channelRead()中的代码"child.pipeline().addLast(childHandler)"//也就是会把这个ChannelInitializer类型的结点会被添加到新连接Channel的Pipeline中//添加完这个结点后会回调ChannelInitializer的handlerAdded()方法//其中会调用ChannelInitializer的initChannel()方法给Pipeline添加真正的结点//执行完initChannel()方法后,就会移除ChannelInitializer这个结点.childHandler(new ChannelInitializer<SocketChannel>() {//处理每个客户端连接的SocketChannel@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {socketChannel.pipeline().addLast(new StringDecoder()).addLast(new StringEncoder()).addLast(new NettyServerHandler());//针对网络请求的处理逻辑}});ChannelFuture channelFuture = serverBootstrap.bind(port).sync();//同步等待启动服务器监控端口channelFuture.channel().closeFuture().sync();//同步等待关闭启动服务器的结果} catch (Exception e) {e.printStackTrace();} finally {workerGroup.shutdownGracefully();bossGroup.shutdownGracefully();}}public static void main(String[] args) throws Exception{System.out.println("Starting Netty Server...");int port = 8998;if (args.length > 0) {port = Integer.parseInt(args[0]);}new NettyServer(port).start();}
}@Sharable
public abstract class ChannelInitializer<C extends Channel> extends ChannelInboundHandlerAdapter {...@Overridepublic void handlerAdded(ChannelHandlerContext ctx) throws Exception {if (ctx.channel().isRegistered()) {initChannel(ctx);}}@SuppressWarnings("unchecked")private boolean initChannel(ChannelHandlerContext ctx) throws Exception {if (initMap.putIfAbsent(ctx, Boolean.TRUE) == null) { // Guard against re-entrance.try {initChannel((C) ctx.channel());} catch (Throwable cause) {exceptionCaught(ctx, cause);} finally {remove(ctx);}return true;}return false;}private void remove(ChannelHandlerContext ctx) {try {ChannelPipeline pipeline = ctx.pipeline();if (pipeline.context(this) != null) {//ChannelPipeline删除ChannelHandler结点(ChannelInitializer)pipeline.remove(this);}} finally {initMap.remove(ctx);}}...
}

(8)ChannelPipeline添加ChannelHandler总结

一.判断ChannelHandler是否重复添加的依据是:如果该ChannelHandler不是共享的且已被添加过,则拒绝添加。

二.否则就创建一个ChannelHandlerContext结点(ctx),并把这个ChannelHandler包装进去,也就是保存ChannelHandler的引用到ChannelHandlerContext的成员变量中。由于创建ctx时保存了ChannelHandler的引用、ChannelPipeline的引用到成员变量,ChannelPipeline又保存了Channel的引用,所以每个ctx都拥有一个Channel的所有信息。

三.接着通过双向链表的尾插法,将这个ChannelHandlerContext结点添加到ChannelPipeline中。

四.最后回调用户在这个要添加的ChannelHandler中实现的handerAdded()方法。

http://www.dtcms.com/wzjs/251640.html

相关文章:

  • wordpress中文注册插件专业seo站长工具全面查询网站
  • centos网站开发制作网站要花多少钱
  • 坪山商城网站建设哪家公司靠谱百度app下载安装普通下载
  • 精选微信网站建设seo网络排名优化哪家好
  • 网站首页的布局青岛百度关键词优化
  • 珠海网站设计公司b站引流推广
  • 有没有免费做网站的站长工具网站推广
  • 有做赌博网站的么今天重要新闻
  • 布吉网站建设哪家便宜网店代运营
  • 家具网站模版企业营销策略分析论文
  • 网站建设报价新鸿儒如何使用免费b站推广网站
  • 成都网站建设yingrihe比较火的推广软件
  • 网页与网站的区别搜索大全浏览器
  • 宜昌视频网站建设近三天的国内外大事
  • 阿里云怎么建设网站模板网站哪个好
  • 中山网络公司网站百度广告点击一次多少钱
  • 江西安福县建设局网站百度搜索热度查询
  • 网站后期长沙seo网站排名
  • 漯河装修公司网站建设免费发广告的网站
  • 大网站的建设重点长沙网络营销顾问
  • 网站雪花代码互联网的推广
  • 淘宝客做网站需要那些条件seo网站推广经理招聘
  • 网站域名查主机关键词排名优化工具
  • 教育机构网站制作模板今日时政新闻
  • 家用电器销售的网站开发seo优化方案总结
  • 企业品牌网站建设应该怎么做网页制作三大软件
  • 做牙的网站叫什么百度优化插件
  • 做快照网站和推广 哪个效果好免费发布产品的网站
  • 自己做坑人网站的软件nba最新排行
  • 什么网站可以做字体效果好免费推广平台排行榜