Files
rikako-note/netty/netty-doc.md
2025-03-08 19:20:18 +08:00

12 KiB
Raw Blame History

netty document

ChannelPipeline

                                               I/O Request
                                          via Channel or
                                      ChannelHandlerContext
                                                    |
+---------------------------------------------------+---------------+
|                           ChannelPipeline         |               |
|                                                  \|/              |
|    +---------------------+            +-----------+----------+    |
|    | Inbound Handler  N  |            | Outbound Handler  1  |    |
|    +----------+----------+            +-----------+----------+    |
|              /|\                                  |               |
|               |                                  \|/              |
|    +----------+----------+            +-----------+----------+    |
|    | Inbound Handler N-1 |            | Outbound Handler  2  |    |
|    +----------+----------+            +-----------+----------+    |
|              /|\                                  .               |
|               .                                   .               |
| ChannelHandlerContext.fireIN_EVT() ChannelHandlerContext.OUT_EVT()|
|        [ method call]                       [method call]         |
|               .                                   .               |
|               .                                  \|/              |
|    +----------+----------+            +-----------+----------+    |
|    | Inbound Handler  2  |            | Outbound Handler M-1 |    |
|    +----------+----------+            +-----------+----------+    |
|              /|\                                  |               |
|               |                                  \|/              |
|    +----------+----------+            +-----------+----------+    |
|    | Inbound Handler  1  |            | Outbound Handler  M  |    |
|    +----------+----------+            +-----------+----------+    |
|              /|\                                  |               |
+---------------+-----------------------------------+---------------+
                |                                  \|/
+---------------+-----------------------------------+---------------+
|               |                                   |               |
|       [ Socket.read() ]                    [ Socket.write() ]     |
|                                                                   |
|  Netty Internal I/O Threads (Transport Implementation)            |
+-------------------------------------------------------------------+

如上图所示i/o event将会被ChannelInboundHandlerChannelOutboundHandler处理,并且通过调用ChannelHandlerContext中的event propagation method触发事件传播的方法来进行转发。

常见的event propagation method如下例如ChannelHandlerContext.write(Object)

inbound event

inbound event会通过inbound handler自底向上的进行处理例如上图的左边部分所示。inbound handler通常处理由io thread产生的inbound datainbound则是通常从remote peer读取。

如果inbound event传播到高于top inbound handler那么该event将会被丢弃。

outbound event

outbound event会按照自顶向下的顺序被outbound进行处理outbound handler通常产生outbound traffic或对outbound traffic进行处理。

如果outbound event传播到低于bottom outbound handler, 其会直接被channel关联的io线程处理。

通常io thread会执行实际的输出操作例如SocketChannel.write(ByteBuffer)

pipeline处理顺序

例如按照如下顺序项pipeline中添加handler时

   ChannelPipeline p = ...;
   p.addLast("1", new InboundHandlerA());
   p.addLast("2", new InboundHandlerB());
   p.addLast("3", new OutboundHandlerA());
   p.addLast("4", new OutboundHandlerB());
   p.addLast("5", new InboundOutboundHandlerX());

inbound

对于inbound eventhandler的执行顺序为1,2,3,4,5

但由于3,4没有实现ChannelInboundHandler故而Inbound event会跳过3,4 handler实际Inbound event的handler顺序为1,2,5

outbound

对于outbound eventhandler的执行顺序为5,4,3,2,1

对于outbound evnet由于1,2并没有实现ChannelOutboundHandler故而outbound event的handler顺序为5,4,3

inbound顺序和pipeline handler的添加顺序相同outbound顺序和pipeline handler添加顺序相反。

将event转发给下一个handler

如上图所示再handler中必须调用ChannelHandlerContext中的event propagation method来将event转发给下一个handler。event propagation event包括的方法如下:

Inbound event propagation method

  • ChannelHandlerContext.fireChannelRegistered()
  • ChannelHandlerContext.fireChannelActive()
  • ChannelHandlerContext.fireChannelRead(Object)
  • ChannelHandlerContext.fireChannelReadComplete()
  • ChannelHandlerContext.fireExceptionCaught(Throwable)
  • ChannelHandlerContext.fireUserEventTriggered(Object)
  • ChannelHandlerContext.fireChannelWritabilityChanged()
  • ChannelHandlerContext.fireChannelInactive()
  • ChannelHandlerContext.fireChannelUnregistered()

Outbound event propagation method

  • ChannelHandlerContext.bind(SocketAddress, ChannelPromise)
  • ChannelHandlerContext.connect(SocketAddress, SocketAddress, ChannelPromise)
  • ChannelHandlerContext.write(Object, ChannelPromise)
  • ChannelHandlerContext.flush()
  • ChannelHandlerContext.read()
  • ChannelHandlerContext.disconnect(ChannelPromise)
  • ChannelHandlerContext.close(ChannelPromise)
  • ChannelHandlerContext.deregister(ChannelPromise)

build pipeline

在一个pipeline中应该包含一个或多个ChannelHandler用于接收IO事件read和请求IO操作write and close。例如一个典型的server其channel的pipeline中应该包含如下handler

  • protocol decoder将二进制数据转化为java object
  • protocol encoder将java object转化为二进制数据
  • business logic handler执行实际业务操作

构建pipeline示例如下

   static final EventExecutorGroup group = new DefaultEventExecutorGroup(16);
   ...
   ChannelPipeline pipeline = ch.pipeline();
  
   pipeline.addLast("decoder", new MyProtocolDecoder());
   pipeline.addLast("encoder", new MyProtocolEncoder());
  
   // Tell the pipeline to run MyBusinessLogicHandler's event handler methods
   // in a different thread than an I/O thread so that the I/O thread is not blocked by
   // a time-consuming task.
   // If your business logic is fully asynchronous or finished very quickly, you don't
   // need to specify a group.
   pipeline.addLast(group, "handler", new MyBusinessLogicHandler());

当为BusinessLogicHandler指定DefaultEventExecutorGroup时虽然会将操作从EventLoop中卸载但是其针对每个ChannelHandlerContext仍然是串行进行处理的(即先提交的task先执行)。故而,串行处理可能仍然会导致性能瓶颈。如果在用例场景下顺序不太重要时,可以考虑使用UnorderedThreadPoolEventExecutor来最大化任务的并行执行。

DefaultEventExecutorGroup

当为handler指定DefaultEventExecutorGroup时其ChannelHandlerContext中executor对应的是 DefaultEventExecutorGroup中的其中一个故而handler对所有任务的处理都通过同一个executor进行。

DefaultEventExecutorGroup中所有executor其类型都为SingleThreadEventExecutor故而handler对所有任务的处理都是有序的。在前面的exectuor处理完成之前后续任务都被阻塞。

UnorderedThreadPoolEventExecutor

UnorderedThreadPoolEventExecutor其实现了EventExecutorGroup但是其next方法用于只返回其本身(this).

并且UnorderedThreadPoolEventExecutor继承了ScheduledThreadPoolExecutor并可以指定线程数故而当为handler指定该类型为group时提交给相同handler的tasks可能会被不同的线程执行不保证有序。

故而针对同一handler其后续任务无需等待前面任务执行完成之后再执行这样能够提高吞吐量和并发度。

ThreadSafety

channelhandler可以在任何时刻添加到pipeline或从pipeline中移除ChannelPipeline是线程安全的。例如可以在交换敏感信息前添加encryption handler并且在交换完信息后将encryption handler移除。

Heartbeat机制

在netty中可以通过使用IdleStateHandler来实现心跳机制可以向pipeline中添加IdleStateHandler作为心跳检测processor并且可以添加一个自定义handler并实现userEventTriggered接口作为对超时事件的处理。

服务端heartbeat实现

服务端实现示例如下:

ServerBootstrap b= new ServerBootstrap();
b.group(bossGroup,workerGroup).channel(NioServerSocketChannel.class)
        .option(ChannelOption.SO_BACKLOG,1024)
        .childHandler(new ChannelInitializer<SocketChannel>() {
            @Override
            protected void initChannel(SocketChannel socketChannel) throws Exception {
             socketChannel.pipeline().addLast(new IdleStateHandler(5, 0, 0, TimeUnit.SECONDS));
                socketChannel.pipeline().addLast(new StringDecoder());
                socketChannel.pipeline().addLast(new HeartBeatServerHandler())
            }
        });

自定义的心跳超时处理handler其逻辑则如下所示。当每当timeout event被触发时都会调用userEventTriggered事件,timeout event包含read idle timeoutwrite idle timeout

class HeartBeatServerHandler extends ChannelInboundHandlerAdapter {
    private int lossConnectCount = 0;

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        System.out.println("No message from the client has been received for 5 seconds!");
        if (evt instanceof IdleStateEvent){
            IdleStateEvent event = (IdleStateEvent)evt;
            if (event.state()== IdleState.READER_IDLE){
                lossConnectCount++;
                if (lossConnectCount>2){
                    System.out.println("Close this inactive channel!");
                    ctx.channel().close();
                }
            }
        }else {
            super.userEventTriggered(ctx,evt);
        }
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        lossConnectCount = 0;
        System.out.println("client says: "+msg.toString());
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }
}

客户端heartbeat实现

客户端也会将IdleStateHandler作为心跳检测processor提娜佳到pipeline中并且添加自定义handler作为超时事件处理。

如下示例将IdleStateHandler的心跳检测设置为了每4s一次并且自定义handler的userEventTriggered方法用于处理write idle的场景代码示例如下

Bootstrap b = new Bootstrap();
b.group(group).channel(NioSocketChannel.class)
        .handler(new ChannelInitializer<SocketChannel>() {
            @Override
            protected void initChannel(SocketChannel socketChannel) throws Exception {
                socketChannel.pipeline().addLast(new IdleStateHandler(0,4,0, TimeUnit.SECONDS));
                socketChannel.pipeline().addLast(new StringEncoder());
                socketChannel.pipeline().addLast(new HeartBeatClientHandler());
            }
        });
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
    System.out.println("Client cyclic heartbeat monitoring sending: "+new Date());
    if (evt instanceof IdleStateEvent){
        IdleStateEvent event = (IdleStateEvent)evt;
        if (event.state()== IdleState.WRITER_IDLE){
            if (curTime<beatTime){
                curTime++;
                ctx.writeAndFlush("biubiu");
            }
        }
    }
}