From 3ce8f7f062394eac6e07339551374552f8fd7cff Mon Sep 17 00:00:00 2001 From: asahi Date: Sat, 8 Mar 2025 19:20:18 +0800 Subject: [PATCH] =?UTF-8?q?doc:=20=E9=98=85=E8=AF=BBnetty=E5=BF=83?= =?UTF-8?q?=E8=B7=B3=E6=9C=BA=E5=88=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- netty/netty-doc.md | 84 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 84 insertions(+) diff --git a/netty/netty-doc.md b/netty/netty-doc.md index d74ec14..a6bf5af 100644 --- a/netty/netty-doc.md +++ b/netty/netty-doc.md @@ -141,3 +141,87 @@ outbound event会按照自顶向下的顺序被outbound进行处理,outbound h ### ThreadSafety channelhandler可以在任何时刻添加到pipeline或从pipeline中移除,ChannelPipeline是线程安全的。例如,可以在交换敏感信息前添加encryption handler并且在交换完信息后将encryption handler移除。 +## Heartbeat机制 +在netty中,可以通过使用`IdleStateHandler`来实现心跳机制,可以向pipeline中添加`IdleStateHandler`作为心跳检测processor,并且可以添加一个自定义handler并实现`userEventTriggered`接口作为对超时事件的处理。 + +### 服务端heartbeat实现 +服务端实现示例如下: +```java +ServerBootstrap b= new ServerBootstrap(); +b.group(bossGroup,workerGroup).channel(NioServerSocketChannel.class) + .option(ChannelOption.SO_BACKLOG,1024) + .childHandler(new ChannelInitializer() { + @Override + protected void initChannel(SocketChannel socketChannel) throws Exception { +  socketChannel.pipeline().addLast(new IdleStateHandler(5, 0, 0, TimeUnit.SECONDS)); + socketChannel.pipeline().addLast(new StringDecoder()); + socketChannel.pipeline().addLast(new HeartBeatServerHandler()); + } + }); +``` +自定义的心跳超时处理handler,其逻辑则如下所示。当每当timeout event被触发时,都会调用`userEventTriggered`事件,`timeout event`包含`read idle timeout`或`write idle timeout`。 +```java +class HeartBeatServerHandler extends ChannelInboundHandlerAdapter { + private int lossConnectCount = 0; + + @Override + public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { + System.out.println("No message from the client has been received for 5 seconds!"); + if (evt instanceof IdleStateEvent){ + IdleStateEvent event = (IdleStateEvent)evt; + if (event.state()== IdleState.READER_IDLE){ + lossConnectCount++; + if (lossConnectCount>2){ + System.out.println("Close this inactive channel!"); + ctx.channel().close(); + } + } + }else { + super.userEventTriggered(ctx,evt); + } + } + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + lossConnectCount = 0; + System.out.println("client says: "+msg.toString()); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + ctx.close(); + } +} +``` + +### 客户端heartbeat实现 +客户端也会将IdleStateHandler作为心跳检测processor提娜佳到pipeline中,并且添加自定义handler作为超时事件处理。 + +如下示例将IdleStateHandler的心跳检测设置为了每4s一次,并且自定义handler的`userEventTriggered`方法用于处理write idle的场景,代码示例如下: +```java +Bootstrap b = new Bootstrap(); +b.group(group).channel(NioSocketChannel.class) + .handler(new ChannelInitializer() { + @Override + protected void initChannel(SocketChannel socketChannel) throws Exception { + socketChannel.pipeline().addLast(new IdleStateHandler(0,4,0, TimeUnit.SECONDS)); + socketChannel.pipeline().addLast(new StringEncoder()); + socketChannel.pipeline().addLast(new HeartBeatClientHandler()); + } + }); +``` + +```java +public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { + System.out.println("Client cyclic heartbeat monitoring sending: "+new Date()); + if (evt instanceof IdleStateEvent){ + IdleStateEvent event = (IdleStateEvent)evt; + if (event.state()== IdleState.WRITER_IDLE){ + if (curTime