# netty document ## ChannelPipeline I/O Request via Channel or ChannelHandlerContext | +---------------------------------------------------+---------------+ | ChannelPipeline | | | \|/ | | +---------------------+ +-----------+----------+ | | | Inbound Handler N | | Outbound Handler 1 | | | +----------+----------+ +-----------+----------+ | | /|\ | | | | \|/ | | +----------+----------+ +-----------+----------+ | | | Inbound Handler N-1 | | Outbound Handler 2 | | | +----------+----------+ +-----------+----------+ | | /|\ . | | . . | | ChannelHandlerContext.fireIN_EVT() ChannelHandlerContext.OUT_EVT()| | [ method call] [method call] | | . . | | . \|/ | | +----------+----------+ +-----------+----------+ | | | Inbound Handler 2 | | Outbound Handler M-1 | | | +----------+----------+ +-----------+----------+ | | /|\ | | | | \|/ | | +----------+----------+ +-----------+----------+ | | | Inbound Handler 1 | | Outbound Handler M | | | +----------+----------+ +-----------+----------+ | | /|\ | | +---------------+-----------------------------------+---------------+ | \|/ +---------------+-----------------------------------+---------------+ | | | | | [ Socket.read() ] [ Socket.write() ] | | | | Netty Internal I/O Threads (Transport Implementation) | +-------------------------------------------------------------------+ 如上图所示,i/o event将会被`ChannelInboundHandler`或`ChannelOutboundHandler`处理,并且通过调用`ChannelHandlerContext`中的event propagation method(触发事件传播的方法)来进行转发。 常见的event propagation method如下,例如`ChannelHandlerContext.write(Object)`。 ### inbound event inbound event会通过inbound handler自底向上的进行处理,例如上图的左边部分所示。inbound handler通常处理由io thread产生的inbound data,inbound则是通常从remote peer读取。 > 如果inbound event传播到高于top inbound handler,那么该event将会被丢弃。 ### outbound event outbound event会按照自顶向下的顺序被outbound进行处理,outbound handler通常产生outbound traffic或对outbound traffic进行处理。 > 如果outbound event传播到低于bottom outbound handler, 其会直接被channel关联的io线程处理。 > 通常,io thread会执行实际的输出操作,例如`SocketChannel.write(ByteBuffer)`。 ### pipeline处理顺序 例如,按照如下顺序项pipeline中添加handler时, ```java ChannelPipeline p = ...; p.addLast("1", new InboundHandlerA()); p.addLast("2", new InboundHandlerB()); p.addLast("3", new OutboundHandlerA()); p.addLast("4", new OutboundHandlerB()); p.addLast("5", new InboundOutboundHandlerX()); ``` #### inbound 对于inbound event,handler的执行顺序为`1,2,3,4,5` 但由于`3,4`没有实现ChannelInboundHandler,故而Inbound event会跳过`3,4` handler,实际Inbound event的handler顺序为`1,2,5` #### outbound 对于outbound event,handler的执行顺序为`5,4,3,2,1` 对于outbound evnet,由于`1,2`并没有实现ChannelOutboundHandler,故而outbound event的handler顺序为`5,4,3`。 > inbound顺序和pipeline handler的添加顺序相同,outbound顺序和pipeline handler添加顺序相反。 ### 将event转发给下一个handler 如上图所示,再handler中必须调用`ChannelHandlerContext`中的event propagation method来将event转发给下一个handler。`event propagation event`包括的方法如下: #### Inbound event propagation method - ChannelHandlerContext.fireChannelRegistered() - ChannelHandlerContext.fireChannelActive() - ChannelHandlerContext.fireChannelRead(Object) - ChannelHandlerContext.fireChannelReadComplete() - ChannelHandlerContext.fireExceptionCaught(Throwable) - ChannelHandlerContext.fireUserEventTriggered(Object) - ChannelHandlerContext.fireChannelWritabilityChanged() - ChannelHandlerContext.fireChannelInactive() - ChannelHandlerContext.fireChannelUnregistered() #### Outbound event propagation method - ChannelHandlerContext.bind(SocketAddress, ChannelPromise) - ChannelHandlerContext.connect(SocketAddress, SocketAddress, ChannelPromise) - ChannelHandlerContext.write(Object, ChannelPromise) - ChannelHandlerContext.flush() - ChannelHandlerContext.read() - ChannelHandlerContext.disconnect(ChannelPromise) - ChannelHandlerContext.close(ChannelPromise) - ChannelHandlerContext.deregister(ChannelPromise) ### build pipeline 在一个pipeline中,应该包含一个或多个ChannelHandler用于接收IO事件(read)和请求IO操作(write and close)。例如,一个典型的server其channel的pipeline中应该包含如下handler: - protocol decoder:将二进制数据转化为java object - protocol encoder:将java object转化为二进制数据 - business logic handler:执行实际业务操作 构建pipeline示例如下: ```java static final EventExecutorGroup group = new DefaultEventExecutorGroup(16); ... ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast("decoder", new MyProtocolDecoder()); pipeline.addLast("encoder", new MyProtocolEncoder()); // Tell the pipeline to run MyBusinessLogicHandler's event handler methods // in a different thread than an I/O thread so that the I/O thread is not blocked by // a time-consuming task. // If your business logic is fully asynchronous or finished very quickly, you don't // need to specify a group. pipeline.addLast(group, "handler", new MyBusinessLogicHandler()); ``` 当为BusinessLogicHandler指定DefaultEventExecutorGroup时,虽然会将操作从EventLoop中卸载,但是其针对每个ChannelHandlerContext仍然是串行进行处理的(即先提交的task先执行)。故而,串行处理可能仍然会导致性能瓶颈。如果在用例场景下顺序不太重要时,可以考虑使用`UnorderedThreadPoolEventExecutor`来最大化任务的并行执行。 > ### DefaultEventExecutorGroup > 当为handler指定DefaultEventExecutorGroup时,其ChannelHandlerContext中executor对应的是 > DefaultEventExecutorGroup中的其中一个,故而handler对所有任务的处理都通过同一个executor进行。 > > DefaultEventExecutorGroup中,所有executor其类型都为`SingleThreadEventExecutor`,故而handler对所有任务的处理都是有序的。在前面的exectuor处理完成之前,后续任务都被阻塞。 > ### UnorderedThreadPoolEventExecutor > UnorderedThreadPoolEventExecutor其实现了EventExecutorGroup,但是其`next`方法用于只返回其本身`(this)`. > > 并且,UnorderedThreadPoolEventExecutor继承了ScheduledThreadPoolExecutor,并可以指定线程数,故而,当为handler指定该类型为group时,提交给相同handler的tasks,可能会被不同的线程执行,不保证有序。 > > 故而,针对同一handler,其后续任务无需等待前面任务执行完成之后再执行,这样能够提高吞吐量和并发度。 ### ThreadSafety channelhandler可以在任何时刻添加到pipeline或从pipeline中移除,ChannelPipeline是线程安全的。例如,可以在交换敏感信息前添加encryption handler并且在交换完信息后将encryption handler移除。 ## Heartbeat机制 在netty中,可以通过使用`IdleStateHandler`来实现心跳机制,可以向pipeline中添加`IdleStateHandler`作为心跳检测processor,并且可以添加一个自定义handler并实现`userEventTriggered`接口作为对超时事件的处理。 ### 服务端heartbeat实现 服务端实现示例如下: ```java ServerBootstrap b= new ServerBootstrap(); b.group(bossGroup,workerGroup).channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG,1024) .childHandler(new ChannelInitializer() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception {  socketChannel.pipeline().addLast(new IdleStateHandler(5, 0, 0, TimeUnit.SECONDS)); socketChannel.pipeline().addLast(new StringDecoder()); socketChannel.pipeline().addLast(new HeartBeatServerHandler()); } }); ``` 自定义的心跳超时处理handler,其逻辑则如下所示。当每当timeout event被触发时,都会调用`userEventTriggered`事件,`timeout event`包含`read idle timeout`或`write idle timeout`。 ```java class HeartBeatServerHandler extends ChannelInboundHandlerAdapter { private int lossConnectCount = 0; @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { System.out.println("No message from the client has been received for 5 seconds!"); if (evt instanceof IdleStateEvent){ IdleStateEvent event = (IdleStateEvent)evt; if (event.state()== IdleState.READER_IDLE){ lossConnectCount++; if (lossConnectCount>2){ System.out.println("Close this inactive channel!"); ctx.channel().close(); } } }else { super.userEventTriggered(ctx,evt); } } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { lossConnectCount = 0; System.out.println("client says: "+msg.toString()); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); } } ``` ### 客户端heartbeat实现 客户端也会将IdleStateHandler作为心跳检测processor提娜佳到pipeline中,并且添加自定义handler作为超时事件处理。 如下示例将IdleStateHandler的心跳检测设置为了每4s一次,并且自定义handler的`userEventTriggered`方法用于处理write idle的场景,代码示例如下: ```java Bootstrap b = new Bootstrap(); b.group(group).channel(NioSocketChannel.class) .handler(new ChannelInitializer() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline().addLast(new IdleStateHandler(0,4,0, TimeUnit.SECONDS)); socketChannel.pipeline().addLast(new StringEncoder()); socketChannel.pipeline().addLast(new HeartBeatClientHandler()); } }); ``` ```java public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { System.out.println("Client cyclic heartbeat monitoring sending: "+new Date()); if (evt instanceof IdleStateEvent){ IdleStateEvent event = (IdleStateEvent)evt; if (event.state()== IdleState.WRITER_IDLE){ if (curTime