doc: 阅读netty心跳机制
This commit is contained in:
@@ -141,3 +141,87 @@ outbound event会按照自顶向下的顺序被outbound进行处理,outbound h
|
|||||||
### ThreadSafety
|
### ThreadSafety
|
||||||
channelhandler可以在任何时刻添加到pipeline或从pipeline中移除,ChannelPipeline是线程安全的。例如,可以在交换敏感信息前添加encryption handler并且在交换完信息后将encryption handler移除。
|
channelhandler可以在任何时刻添加到pipeline或从pipeline中移除,ChannelPipeline是线程安全的。例如,可以在交换敏感信息前添加encryption handler并且在交换完信息后将encryption handler移除。
|
||||||
|
|
||||||
|
## Heartbeat机制
|
||||||
|
在netty中,可以通过使用`IdleStateHandler`来实现心跳机制,可以向pipeline中添加`IdleStateHandler`作为心跳检测processor,并且可以添加一个自定义handler并实现`userEventTriggered`接口作为对超时事件的处理。
|
||||||
|
|
||||||
|
### 服务端heartbeat实现
|
||||||
|
服务端实现示例如下:
|
||||||
|
```java
|
||||||
|
ServerBootstrap b= new ServerBootstrap();
|
||||||
|
b.group(bossGroup,workerGroup).channel(NioServerSocketChannel.class)
|
||||||
|
.option(ChannelOption.SO_BACKLOG,1024)
|
||||||
|
.childHandler(new ChannelInitializer<SocketChannel>() {
|
||||||
|
@Override
|
||||||
|
protected void initChannel(SocketChannel socketChannel) throws Exception {
|
||||||
|
socketChannel.pipeline().addLast(new IdleStateHandler(5, 0, 0, TimeUnit.SECONDS));
|
||||||
|
socketChannel.pipeline().addLast(new StringDecoder());
|
||||||
|
socketChannel.pipeline().addLast(new HeartBeatServerHandler());
|
||||||
|
}
|
||||||
|
});
|
||||||
|
```
|
||||||
|
自定义的心跳超时处理handler,其逻辑则如下所示。当每当timeout event被触发时,都会调用`userEventTriggered`事件,`timeout event`包含`read idle timeout`或`write idle timeout`。
|
||||||
|
```java
|
||||||
|
class HeartBeatServerHandler extends ChannelInboundHandlerAdapter {
|
||||||
|
private int lossConnectCount = 0;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
|
||||||
|
System.out.println("No message from the client has been received for 5 seconds!");
|
||||||
|
if (evt instanceof IdleStateEvent){
|
||||||
|
IdleStateEvent event = (IdleStateEvent)evt;
|
||||||
|
if (event.state()== IdleState.READER_IDLE){
|
||||||
|
lossConnectCount++;
|
||||||
|
if (lossConnectCount>2){
|
||||||
|
System.out.println("Close this inactive channel!");
|
||||||
|
ctx.channel().close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}else {
|
||||||
|
super.userEventTriggered(ctx,evt);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
|
||||||
|
lossConnectCount = 0;
|
||||||
|
System.out.println("client says: "+msg.toString());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
|
||||||
|
ctx.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
### 客户端heartbeat实现
|
||||||
|
客户端也会将IdleStateHandler作为心跳检测processor提娜佳到pipeline中,并且添加自定义handler作为超时事件处理。
|
||||||
|
|
||||||
|
如下示例将IdleStateHandler的心跳检测设置为了每4s一次,并且自定义handler的`userEventTriggered`方法用于处理write idle的场景,代码示例如下:
|
||||||
|
```java
|
||||||
|
Bootstrap b = new Bootstrap();
|
||||||
|
b.group(group).channel(NioSocketChannel.class)
|
||||||
|
.handler(new ChannelInitializer<SocketChannel>() {
|
||||||
|
@Override
|
||||||
|
protected void initChannel(SocketChannel socketChannel) throws Exception {
|
||||||
|
socketChannel.pipeline().addLast(new IdleStateHandler(0,4,0, TimeUnit.SECONDS));
|
||||||
|
socketChannel.pipeline().addLast(new StringEncoder());
|
||||||
|
socketChannel.pipeline().addLast(new HeartBeatClientHandler());
|
||||||
|
}
|
||||||
|
});
|
||||||
|
```
|
||||||
|
|
||||||
|
```java
|
||||||
|
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
|
||||||
|
System.out.println("Client cyclic heartbeat monitoring sending: "+new Date());
|
||||||
|
if (evt instanceof IdleStateEvent){
|
||||||
|
IdleStateEvent event = (IdleStateEvent)evt;
|
||||||
|
if (event.state()== IdleState.WRITER_IDLE){
|
||||||
|
if (curTime<beatTime){
|
||||||
|
curTime++;
|
||||||
|
ctx.writeAndFlush("biubiu");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|||||||
Reference in New Issue
Block a user