Files
rikako-note/netty/netty-j.md
2024-08-31 22:26:26 +08:00

20 KiB
Raw Blame History

netty

netty解决的问题

当前http协议被广泛使用于client和server之间的通信。但是部分场景下http协议并不适用例如传输大文件、e-mail消息、事实的经济或游戏数据。对于该类场景http协议并无法很好的满足此时需要一个高度优化的自定义协议实现而通过netty可以及进行快速的协议实现。

netty提供了一个异步的事件驱动的网络应用框架用于快速开发一个拥有高性能、高可拓展性的协议的server和client。

netty example

Discard Server Example

如下是一个Discard Protocol的样例其会对所有接收到的数据进行丢弃并且不返回任何结果。handler方法会处理由netty产生的io事件

package io.netty.example.discard;

import io.netty.buffer.ByteBuf;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

/**
 * Handles a server-side channel.
 */
public class DiscardServerHandler extends ChannelInboundHandlerAdapter { // (1)

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) { // (2)
        // Discard the received data silently.
        ((ByteBuf) msg).release(); // (3)
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // (4)
        // Close the connection when an exception is raised.
        cause.printStackTrace();
        ctx.close();
    }
}

DiscardServerHandler继承了ChannelInboundHandlerAdapter,而ChannelInboundHandlerAdapter则是实现了ChannelInboundHandlerChannelInboundHandler提供了各种不同的io事件处理方法可以对这些方法进行覆盖。

在上述示例中对chnnelRead方法进行了重写channelRead方法将会在从客户端接收到数据时被调用被调用时会把从客户端接收到的消息作为参数。在上述示例中接收到的消息是ByteBuf类型。

ByteBuf是一个引用计数对象必须要通过调用release方法来显式释放。并且,handler method有责任对传入的引用计数对象进行释放操作

通常channelRead方法的重写按照如下形式

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
    try {
        // Do something with msg
    } finally {
        // handler makes sure that msg passed has been released
        ReferenceCountUtil.release(msg);
    }
}

上述示例中除了重写channelRead方法外还重写了exceptionCaught方法。exceptionCaught方法会在 netty由于io error抛出异常 或是 handler method实现在处理事件时抛出异常 的场景下被调用。

在通常情况下被exceptionCaught方法捕获的异常其异常信息应该被打印到日志中且异常关联的channel应该被关闭。通过重写caughtException也可以自定义异常捕获后的实现例如在关闭channel之前向client发送error code.

netty应用程序实现示例

如下是netty实现的Discard Server示例

package io.netty.example.discard;
    
import io.netty.bootstrap.ServerBootstrap;

import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
    
/**
 * Discards any incoming data.
 */
public class DiscardServer {
    
    private int port;
    
    public DiscardServer(int port) {
        this.port = port;
    }
    
    public void run() throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1)
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap(); // (2)
            b.group(bossGroup, workerGroup)
             .channel(NioServerSocketChannel.class) // (3)
             .childHandler(new ChannelInitializer<SocketChannel>() { // (4)
                 @Override
                 public void initChannel(SocketChannel ch) throws Exception {
                     ch.pipeline().addLast(new DiscardServerHandler());
                 }
             })
             .option(ChannelOption.SO_BACKLOG, 128)          // (5)
             .childOption(ChannelOption.SO_KEEPALIVE, true); // (6)
    
            // Bind and start to accept incoming connections.
            ChannelFuture f = b.bind(port).sync(); // (7)
    
            // Wait until the server socket is closed.
            // In this example, this does not happen, but you can do that to gracefully
            // shut down your server.
            f.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }
    
    public static void main(String[] args) throws Exception {
        int port = 8080;
        if (args.length > 0) {
            port = Integer.parseInt(args[0]);
        }

        new DiscardServer(port).run();
    }
}

核心组件解析

NioEventLoopGroup

NioEventLoopGroup是一个多线程的event loop用于处理io操作。netty提供了不同的EventLoopGroup实现用于不同种类的传输。

在上述示例中使用了两个NioEventLoopGroup

  • bossboss event loop用于接收incoming connections
  • workerworker event loop用于处理accepted connection的通信

每当boss event loop接收到connection之后其会将新的连接注册到worker event loop。

event group loop中拥有的线程数以及线程如何与channel相关联可能是每个线程对应一个channel、或是一个线程管理多个channel由EventLoopGroup的实现来决定并且可以通过构造器来进行配置。

ServerBootstrap

ServerBoostrap是一个helper类用于方便的构建一个netty server。

NioServerSocketChannel

在上述使用中使用了NioServerSocketChannel来实例化一个channel改channel用于接收incoming connection。

handler注册

在上述示例中为accepted connection注册handler的示例如下所示

.childHandler(new ChannelInitializer<SocketChannel>() { // (4)
                 @Override
                 public void initChannel(SocketChannel ch) throws Exception {
                     ch.pipeline().addLast(new DiscardServerHandler());
                 }
             })

在上述注册示例中每个newly accepted channel都会调用DiscardServerHandler。

ChannelInitializer是一个特殊的handler用于帮助用户为新channel配置ahndler。其initChannel为新channel的ChannelPipeline配置添加自定义的handler从而实现自定义的消息处理逻辑。

为Channel实现设置参数

通过option和childOption方法可以为channel设置参数。由于netty server基于TCP/IP故而可以指定socket option例如tcpNoDelaykeepAlive参数。

  • optionoption设置的是NioServerSocketCahnel该channel用于接收incoming connection
  • childOptionchildOption用于设置被server channel接收的channel被接收的channel为SocketChannel

为Channel绑定port

在经历完上述配置之后就可以为server绑定监听端口并启动了。

TimeServer Example

如下示例展示了一个Time Server其在连接建立后就会向客户端发送一个32位的integer并在消息发送完之后关闭连接。该实例并不会接收任何请求。

package io.netty.example.time;

public class TimeServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelActive(final ChannelHandlerContext ctx) { // (1)
        final ByteBuf time = ctx.alloc().buffer(4); // (2)
        time.writeInt((int) (System.currentTimeMillis() / 1000L + 2208988800L));
        
        final ChannelFuture f = ctx.writeAndFlush(time); // (3)
        f.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) {
                assert f == future;
                ctx.close();
            }
        }); // (4)
    }
    
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}

上述示例中,重写了channelActive方法并在回写请求结束后立刻关闭了channel。

channelActive

channelActive方法会在连接被建立并且可以产生网络通信之后被调用。

在channelActive实现中上述示例创建了一个代表当前时间的integer并且创建了一个容量为4字节的ByteBuf。

ByteBuf并没有类似ByteBuffer的flip方法因为其是双指针的其中一个pointer属于read操作另一个属于write操作。

此外ChannelHandlerContext.writeAndFlush方法将会返回一个ChannelFuture类型的返回值。

而在对ChannelHandlerContext.close方法进行调用时其也不会马上对channel进行关闭而是会返回一个ChannelFuture

ChannelFutureListener

可以为ChannelFuture指定一个listener在上述示例中为writeAndFlush操作指定了一个操作完成后的监听

final ChannelFuture f = ctx.writeAndFlush(time); // (3)
        f.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) {
                assert f == future;
                ctx.close();
            }
        }); // (4)

TimeClient Example

如下示例会实现一个基于上述Time协议的client和TimeServer不同的是time client将会使用不同的Bootstrap和Channel实现。

package io.netty.example.time;

public class TimeClient {
    public static void main(String[] args) throws Exception {
        String host = args[0];
        int port = Integer.parseInt(args[1]);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        
        try {
            Bootstrap b = new Bootstrap(); // (1)
            b.group(workerGroup); // (2)
            b.channel(NioSocketChannel.class); // (3)
            b.option(ChannelOption.SO_KEEPALIVE, true); // (4)
            b.handler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline().addLast(new TimeClientHandler());
                }
            });
            
            // Start the client.
            ChannelFuture f = b.connect(host, port).sync(); // (5)

            // Wait until the connection is closed.
            f.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
        }
    }
}

Bootstrap

Bootstrap和ServerBoostrap类似但是Bootstrap是为了创建non-server channel。

并且如果只指定一个EventLoopGroup那么其将会被同时用作boss和worker

NioSocketChannel

相较于time servertime client指定了NioSocketChannel用于创建client side channel。

在client side并不需要指定childOption

connect

上完成上述配置之后只需要调用connect即可

在time client实现中解析time server返回的integer其逻辑如下所示

package io.netty.example.time;

import java.util.Date;

public class TimeClientHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        ByteBuf m = (ByteBuf) msg; // (1)
        try {
            long currentTimeMillis = (m.readUnsignedInt() - 2208988800L) * 1000L;
            System.out.println(new Date(currentTimeMillis));
            ctx.close();
        } finally {
            m.release();
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}

处理stream-based transport

在TCP/IP类似的stream-based transport中接收到的数据被存储在socket receive buffer中。该buffer中存储的数据是按字节存储的而不是按packet存储的。故而即使发送方发送了两个独立的packet操作系统也不会把他们看作是两条消息而是将其看作一系列的字节数据。故而client端读取到的数据并不一定是server端写入的数据。

例如TCP/IP协议栈接收到了三个packet

ABC DEF GHI

但是由于stream-based protocol的特性对方有可能按照如下顺序读取到

AB CDEFG H I

TCP协议虽然保证了数据的可靠性和有序性但是其是stream-based协议传输的基本单位为字节并不具有packet的概念无法区分哪些位置的字节属于哪一个packet

故而作为数据的接收方无论是server端还是client端都需要将接收到的消息转化为一个或多个拥有实际意义的frame。

接收到的数据应该被分为有意义的frame

ABC DEF GHI

解决方案1

对于stream-based transport所产生的问题可以按照如下方案来解决只有当传输过来的数据达到4字节或以上时才对接收数据进行处理在数据量不足以解析一个frame时则不进行处理

package io.netty.example.time;

import java.util.Date;

public class TimeClientHandler extends ChannelInboundHandlerAdapter {
    private ByteBuf buf;
    
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) {
        buf = ctx.alloc().buffer(4); // (1)
    }
    
    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) {
        buf.release(); // (1)
        buf = null;
    }
    
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        ByteBuf m = (ByteBuf) msg;
        buf.writeBytes(m); // (2)
        m.release();
        
        if (buf.readableBytes() >= 4) { // (3)
            long currentTimeMillis = (buf.readUnsignedInt() - 2208988800L) * 1000L;
            System.out.println(new Date(currentTimeMillis));
            ctx.close();
        }
    }
    
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}

handlerAdded和handlerRemoved

ChannelHandler拥有两个生命周期handler methodhandlerAddedhandlerRemoved。可以执行任意任意的初始化动作。

在上述实现中channelRead被调用时会将数据积累到buf中如果buf中字节数不足一个frame则不进行后续处理等待下一次channelRead调用来积累更多字节数据直到积累的字节数达到一个frame

解决方案2

上述解决方案针对可变长度frame的处理可能不是很理想。

由于可以向ChannelPipeline中添加多个ChannelHandler故而可将单个复杂的ChannelHandler拆分为多个模块化的ChannelHandler来降低程序的复杂性。

故而可以将TimeClientHandler拆分为两个handler

  • TimeDecoder处理fragment任务
  • 解析fragement

对于数据碎片的处理。netty提供了一个开箱即用的类

package io.netty.example.time;

public class TimeDecoder extends ByteToMessageDecoder { // (1)
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) { // (2)
        if (in.readableBytes() < 4) {
            return; // (3)
        }
        
        out.add(in.readBytes(4)); // (4)
    }
}

ByteToMessageDecoder

ByteToMessageDecoder是ChannelInBoundHandler的一个实现可以将数据碎片问题的处理变得更加简单。

在接收到新消息后ByteToMessageDecoder会调用decode方法调用时会包含一个内部维护的积累缓冲区。

当内部积累缓冲区中的数据尚未达到一个frame时ByteToMessageDecoder#decode方法可以决定什么都不做当又有新数据到达时会重新调用decode方法判断内部积累缓冲区中是否有足够的数据。

如果decode方法向out中追加了一个out那么其代表decoder成功解析了一条数据ByteToMessageDecoder会丢弃此次读取的内容尚未读取的部分继续保留

注意在decode方法的实现中并不需要decode多条消息因为decode会被反复调用直到其在本次调用中未向out写入任何内容

根据上述介绍故而可以将ChannelHandler的注册改为如下方式

b.handler(new ChannelInitializer<SocketChannel>() {
    @Override
    public void initChannel(SocketChannel ch) throws Exception {
        ch.pipeline().addLast(new TimeDecoder(), new TimeClientHandler());
    }
});

除此之外netty还提供了许多开箱即用的decoder实现故而可以简单的实现大多数协议decoder实现类的路径如下

  • io.netty.example.factorial针对二进制协议
  • io.netty.example.telnet针对text line-based协议

POJO

上述的所有实例其消息的数据结构都是ByteBuf在下述示例中会将消息数据结构改为POJO

在handler method中使用POJO的优势是明显的这可以将解析ByteBuf中数据的逻辑从handler method中剥离这将增加程序的可重用性和可维护性。

POJO定义如下

package io.netty.example.time;

import java.util.Date;

public class UnixTime {

    private final long value;
    
    public UnixTime() {
        this(System.currentTimeMillis() / 1000L + 2208988800L);
    }
    
    public UnixTime(long value) {
        this.value = value;
    }
        
    public long value() {
        return value;
    }
        
    @Override
    public String toString() {
        return new Date((value() - 2208988800L) * 1000L).toString();
    }
}

之后就可以更改TimeDecoder的逻辑来向out中添加一个UnixTime类型的数据而不是ByteBuf

@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
    if (in.readableBytes() < 4) {
        return;
    }

    out.add(new UnixTime(in.readUnsignedInt()));
}

TimeClientHandler实现也可以改为如下方式

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
    UnixTime m = (UnixTime) msg;
    System.out.println(m);
    ctx.close();
}

Encoder

除了在对消息进行decode时可以将二进制数据转化为pojo在server端向channel中发送数据时也可以写入POJO然后再由encoder转化为二进制字节数据。

发送数据时写入POJO如下

@Override
public void channelActive(ChannelHandlerContext ctx) {
    ChannelFuture f = ctx.writeAndFlush(new UnixTime());
    f.addListener(ChannelFutureListener.CLOSE);
}

在对消息进行encode时需要实现ChannelOutboundHandler其会将POJO转化为ByteBuf

package io.netty.example.time;

public class TimeEncoder extends ChannelOutboundHandlerAdapter {
    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
        UnixTime m = (UnixTime) msg;
        ByteBuf encoded = ctx.alloc().buffer(4);
        encoded.writeInt((int)m.value());
        ctx.write(encoded, promise); // (1)
    }
}

在上述实现中将ChannelPromise原样传递给了ctx.write方法故而在encoded data被实际写入时会更新ChannelPromise的状态。

为了简化上述的流程,可以使用MessageToByteEncoder

public class TimeEncoder extends MessageToByteEncoder<UnixTime> {
    @Override
    protected void encode(ChannelHandlerContext ctx, UnixTime msg, ByteBuf out) {
        out.writeInt((int)msg.value());
    }
}

关闭server

关闭netty应用只需要关闭所有创建的EventLoopGroup即可。在关闭EventLoopGroup时调用shutdownGracefully即可其会返回一个Future当eventLoopGroup被关闭并且所有关联的channel也被关闭时future会得到提醒。