阅读netty文档

This commit is contained in:
asahi
2024-08-31 22:26:26 +08:00
parent 293252d913
commit 316e3c56cd
2 changed files with 348 additions and 269 deletions

View File

@@ -164,3 +164,351 @@ ServerBoostrap是一个helper类用于方便的构建一个netty server。
#### 为Channel绑定port
在经历完上述配置之后就可以为server绑定监听端口并启动了。
## TimeServer Example
如下示例展示了一个Time Server其在连接建立后就会向客户端发送一个32位的integer并在消息发送完之后关闭连接。该实例并不会接收任何请求。
```java
package io.netty.example.time;
public class TimeServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(final ChannelHandlerContext ctx) { // (1)
final ByteBuf time = ctx.alloc().buffer(4); // (2)
time.writeInt((int) (System.currentTimeMillis() / 1000L + 2208988800L));
final ChannelFuture f = ctx.writeAndFlush(time); // (3)
f.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) {
assert f == future;
ctx.close();
}
}); // (4)
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
```
上述示例中,重写了`channelActive`方法并在回写请求结束后立刻关闭了channel。
### channelActive
channelActive方法会在连接被建立并且可以产生网络通信之后被调用。
在channelActive实现中上述示例创建了一个代表当前时间的integer并且创建了一个容量为4字节的ByteBuf。
> ByteBuf并没有类似ByteBuffer的flip方法因为其是双指针的其中一个pointer属于read操作另一个属于write操作。
此外`ChannelHandlerContext.writeAndFlush`方法将会返回一个ChannelFuture类型的返回值。
而在对`ChannelHandlerContext.close`方法进行调用时其也不会马上对channel进行关闭而是会返回一个`ChannelFuture`
### ChannelFutureListener
可以为ChannelFuture指定一个listener在上述示例中为writeAndFlush操作指定了一个操作完成后的监听
```java
final ChannelFuture f = ctx.writeAndFlush(time); // (3)
f.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) {
assert f == future;
ctx.close();
}
}); // (4)
```
## TimeClient Example
如下示例会实现一个基于上述Time协议的client和TimeServer不同的是time client将会使用不同的Bootstrap和Channel实现。
```java
package io.netty.example.time;
public class TimeClient {
public static void main(String[] args) throws Exception {
String host = args[0];
int port = Integer.parseInt(args[1]);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap(); // (1)
b.group(workerGroup); // (2)
b.channel(NioSocketChannel.class); // (3)
b.option(ChannelOption.SO_KEEPALIVE, true); // (4)
b.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new TimeClientHandler());
}
});
// Start the client.
ChannelFuture f = b.connect(host, port).sync(); // (5)
// Wait until the connection is closed.
f.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
}
}
}
```
### Bootstrap
Bootstrap和ServerBoostrap类似但是Bootstrap是为了创建non-server channel。
并且如果只指定一个EventLoopGroup那么其将会被同时用作boss和worker
### NioSocketChannel
相较于time servertime client指定了NioSocketChannel用于创建client side channel。
在client side并不需要指定childOption
### connect
上完成上述配置之后只需要调用connect即可
在time client实现中解析time server返回的integer其逻辑如下所示
```java
package io.netty.example.time;
import java.util.Date;
public class TimeClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf m = (ByteBuf) msg; // (1)
try {
long currentTimeMillis = (m.readUnsignedInt() - 2208988800L) * 1000L;
System.out.println(new Date(currentTimeMillis));
ctx.close();
} finally {
m.release();
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
```
## 处理stream-based transport
在TCP/IP类似的stream-based transport中接收到的数据被存储在socket receive buffer中。该buffer中存储的数据是按字节存储的而不是按packet存储的。故而即使发送方发送了两个独立的packet操作系统也不会把他们看作是两条消息而是将其看作一系列的字节数据。故而client端读取到的数据并不一定是server端写入的数据。
例如TCP/IP协议栈接收到了三个packet
```
ABC DEF GHI
```
但是由于stream-based protocol的特性对方有可能按照如下顺序读取到
```
AB CDEFG H I
```
> TCP协议虽然保证了数据的可靠性和有序性但是其是stream-based协议传输的基本单位为字节并不具有packet的概念无法区分哪些位置的字节属于哪一个packet
故而作为数据的接收方无论是server端还是client端都需要将接收到的消息转化为一个或多个拥有实际意义的frame。
接收到的数据应该被分为有意义的frame
```
ABC DEF GHI
```
### 解决方案1
对于stream-based transport所产生的问题可以按照如下方案来解决只有当传输过来的数据达到4字节或以上时才对接收数据进行处理在数据量不足以解析一个frame时则不进行处理
```java
package io.netty.example.time;
import java.util.Date;
public class TimeClientHandler extends ChannelInboundHandlerAdapter {
private ByteBuf buf;
@Override
public void handlerAdded(ChannelHandlerContext ctx) {
buf = ctx.alloc().buffer(4); // (1)
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) {
buf.release(); // (1)
buf = null;
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf m = (ByteBuf) msg;
buf.writeBytes(m); // (2)
m.release();
if (buf.readableBytes() >= 4) { // (3)
long currentTimeMillis = (buf.readUnsignedInt() - 2208988800L) * 1000L;
System.out.println(new Date(currentTimeMillis));
ctx.close();
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
```
#### handlerAdded和handlerRemoved
ChannelHandler拥有两个生命周期handler method`handlerAdded``handlerRemoved`。可以执行任意任意的初始化动作。
在上述实现中channelRead被调用时会将数据积累到buf中如果buf中字节数不足一个frame则不进行后续处理等待下一次channelRead调用来积累更多字节数据直到积累的字节数达到一个frame
### 解决方案2
上述解决方案针对可变长度frame的处理可能不是很理想。
由于可以向ChannelPipeline中添加多个ChannelHandler故而可将单个复杂的ChannelHandler拆分为多个模块化的ChannelHandler来降低程序的复杂性。
故而可以将TimeClientHandler拆分为两个handler
- TimeDecoder处理fragment任务
- 解析fragement
对于数据碎片的处理。netty提供了一个开箱即用的类
```java
package io.netty.example.time;
public class TimeDecoder extends ByteToMessageDecoder { // (1)
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) { // (2)
if (in.readableBytes() < 4) {
return; // (3)
}
out.add(in.readBytes(4)); // (4)
}
}
```
#### ByteToMessageDecoder
ByteToMessageDecoder是ChannelInBoundHandler的一个实现可以将数据碎片问题的处理变得更加简单。
在接收到新消息后ByteToMessageDecoder会调用decode方法调用时会包含一个内部维护的积累缓冲区。
当内部积累缓冲区中的数据尚未达到一个frame时`ByteToMessageDecoder#decode`方法可以决定什么都不做当又有新数据到达时会重新调用decode方法判断内部积累缓冲区中是否有足够的数据。
如果decode方法向out中追加了一个out那么其代表decoder成功解析了一条数据ByteToMessageDecoder会丢弃此次读取的内容尚未读取的部分继续保留
> 注意在decode方法的实现中并不需要decode多条消息因为decode会被反复调用直到其在本次调用中未向out写入任何内容
根据上述介绍故而可以将ChannelHandler的注册改为如下方式
```java
b.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new TimeDecoder(), new TimeClientHandler());
}
});
```
除此之外netty还提供了许多开箱即用的decoder实现故而可以简单的实现大多数协议decoder实现类的路径如下
- io.netty.example.factorial针对二进制协议
- io.netty.example.telnet针对text line-based协议
## POJO
上述的所有实例其消息的数据结构都是ByteBuf在下述示例中会将消息数据结构改为POJO
在handler method中使用POJO的优势是明显的这可以将解析ByteBuf中数据的逻辑从handler method中剥离这将增加程序的可重用性和可维护性。
POJO定义如下
```java
package io.netty.example.time;
import java.util.Date;
public class UnixTime {
private final long value;
public UnixTime() {
this(System.currentTimeMillis() / 1000L + 2208988800L);
}
public UnixTime(long value) {
this.value = value;
}
public long value() {
return value;
}
@Override
public String toString() {
return new Date((value() - 2208988800L) * 1000L).toString();
}
}
```
之后就可以更改TimeDecoder的逻辑来向out中添加一个UnixTime类型的数据而不是ByteBuf
```java
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
if (in.readableBytes() < 4) {
return;
}
out.add(new UnixTime(in.readUnsignedInt()));
}
```
TimeClientHandler实现也可以改为如下方式
```java
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
UnixTime m = (UnixTime) msg;
System.out.println(m);
ctx.close();
}
```
### Encoder
除了在对消息进行decode时可以将二进制数据转化为pojo在server端向channel中发送数据时也可以写入POJO然后再由encoder转化为二进制字节数据。
发送数据时写入POJO如下
```java
@Override
public void channelActive(ChannelHandlerContext ctx) {
ChannelFuture f = ctx.writeAndFlush(new UnixTime());
f.addListener(ChannelFutureListener.CLOSE);
}
```
在对消息进行encode时需要实现ChannelOutboundHandler其会将POJO转化为ByteBuf
```java
package io.netty.example.time;
public class TimeEncoder extends ChannelOutboundHandlerAdapter {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
UnixTime m = (UnixTime) msg;
ByteBuf encoded = ctx.alloc().buffer(4);
encoded.writeInt((int)m.value());
ctx.write(encoded, promise); // (1)
}
}
```
在上述实现中将ChannelPromise原样传递给了ctx.write方法故而在encoded data被实际写入时会更新ChannelPromise的状态。
为了简化上述的流程,可以使用`MessageToByteEncoder`
```java
public class TimeEncoder extends MessageToByteEncoder<UnixTime> {
@Override
protected void encode(ChannelHandlerContext ctx, UnixTime msg, ByteBuf out) {
out.writeInt((int)msg.value());
}
}
```
## 关闭server
关闭netty应用只需要关闭所有创建的EventLoopGroup即可。在关闭EventLoopGroup时调用`shutdownGracefully`即可其会返回一个Future当eventLoopGroup被关闭并且所有关联的channel也被关闭时future会得到提醒。