# Netty ## Netty适用场景 目前,http协议被广泛用于web服务器和客户端之间的交流,但是在一些场景下http协议不能够很好的拓展。 如在交换大文件、email信息或实时信息(如多人游戏数据和经济信息)等场景下,通常不使用通用http协议,而是需要为特定需求优化过使用特定场景的协议。 ## Netty介绍 Netty Project提供了异步事件驱动的网络应用框架,并为快速开发和维护高性能、高拓展的协议服务器和客户端提供工具。 Netty是开发协议服务器和客户端的NIO开发框架,提供了开发快速和简单开发协议服务器和客户端的方式。其大大简化了网络开发例如tcp和udp套接字编程过程。 ## Netty Demo ### Discard Demo 如果要通过netty实现一个丢弃接收数据的server,可以参照如下实现。如下的handler实现用于处理由netty产生的IO事件: ```java package io.netty.example.discard; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerAdapter; /** * Handles a server-side channel. */ public class DiscardServerHandler extends ChannelHandlerAdapter { // (1) @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { // (2) // Discard the received data silently. ((ByteBuf) msg).release(); // (3) } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // (4) // Close the connection when an exception is raised. cause.printStackTrace(); ctx.close(); } } ``` - ChannelHandlerAdapter是ChannelHandler的实现类,ChannelHandler接口提供了不同的event handler method,可以通过实现ChannelHandler接口并override这些方法。在实现ChannelHandler接口时,可以继承ChannelHandlerAdaptrer,如此便无需手动实现ChannelHandler接口中的每个方法。 - 通过override channelRead方法,可以对收到的数据进行处理。每当收到新的消息时,channelRead方法都会被调用,在上述demo中,收到消息的类型是ByteBuf。 - 为了实现DISCARD协议,channelRead方法会丢弃所有接收到的数据。ByteBuf是一个引用计数对象,其必须通过release方法显式的进行释放。(**对于引用计数对象,其释放的责任应该由handler method来承担**) > 通常情况下,channelRead的实现都按照如下方式: > ```java > @Override > public void channelRead(ChannelHandlerContext ctx, Object msg) { > try { > // Do something with msg > } finally { > ReferenceCountUtil.release(msg); > } > ``` - exceptionCaught方法用于处理Throwable,该异常可能由netty IO error导致或是由handler method实现在处理event时抛出。通常情况下,exceptionCaught方法捕获的异常应该被logged,并且该异常关联的Channel应该被关闭。 ### Discard Demo Main Method 以上实现了Discard Demo的前半部分,可以按照以下方法来编写main方法启动DiscardServerHandler的Server: ```java package io.netty.example.discard; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; /** * Discards any incoming data. */ public class DiscardServer { private int port; public DiscardServer(int port) { this.port = port; } public void run() throws Exception { EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1) EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); // (2) b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) // (3) .childHandler(new ChannelInitializer() { // (4) @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new DiscardServerHandler()); } }) .option(ChannelOption.SO_BACKLOG, 128) // (5) .childOption(ChannelOption.SO_KEEPALIVE, true); // (6) // Bind and start to accept incoming connections. ChannelFuture f = b.bind(port).sync(); // (7) // Wait until the server socket is closed. // In this example, this does not happen, but you can do that to gracefully // shut down your server. f.channel().closeFuture().sync(); } finally { workerGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } } public static void main(String[] args) throws Exception { int port; if (args.length > 0) { port = Integer.parseInt(args[0]); } else { port = 8080; } new DiscardServer(port).run(); } } ``` - NioEventLoopGroup是一个多线程的event loop,用于处理IO操作。netty提供了不同的EventLoopGroup实现用于不同的传输,在本例中编写Server-side的应用使用的是两个NioEventLoopGroup。第一个NioEventLoopGroup通常被称为boss,用于接受连接,第二个NioEventLoopGroup,通常称之为worker,用于处理boss接受到连接的traffic。一旦boss接收到连接,就把接受到的来连接注册到worker中。EventLoopGroup使用了多少线程,并且它们如何映射到被创建的channel取决于EventLoopGroup的实现,设置可以通过构造方法对其进行配置。 > bossGroup用于监听来自客户端的连接,专门负责与客户端创建连接,类似ServerSocket,并且把与客户端的连接注册到workGroup的Selector中。workGroup则用于处理与客户端连接中的读写事件。 - ServerBootstrap是一个工具类,用于帮助构建一个server。可以通过Channel来设置server,但是通常情况下无需这么做。 - 上述Demo当接受到新的请求时,通过NioServerSocketChannel.class来初始化一个新的Channel - 上述Demo中指定的handler会被新接受的Channel执行。ChannelInitializer是一个特定的handler,用于帮助用户配置新的Channel。在此处,为新Channel的ChannelPipeline添加了DiscardServerHandler来实现DiscardServer逻辑。当程序变得复杂时,你可能会对pipeline添加更多的handler,并且会将该匿名类提升为顶级类。 - 可以为Channel的实现指定参数,上述实例在写的是TCP/IP server,故而我们可以设置socket选项例如tcpNoDelay或keepAlive - 在上述实现中,option用于接受新连接的NioServerSocketChannel,childOption则是用于被父ServerChannel接受的channels,在这个demo中也是NioServerSocketChannel ### 打印DiscardServer接收到的数据到sout 可以通过如下方式改进channelRead方法,从而打印出接收到的数据: ```java @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { ByteBuf in = (ByteBuf) msg; try { while (in.isReadable()) { // (1) System.out.print((char) in.readByte()); System.out.flush(); } } finally { ReferenceCountUtil.release(msg); // (2) } } ``` ### 实现EchoServer 通过如下方式,可以对客户端的请求返回一个response: ```java @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { ctx.write(msg); // (1) ctx.flush(); // (2) } ``` - ChannelHandlerContext对象提供了不同的操作来触发不同的IO事件和操作,在本实现中,可以通过write方法来讲接收到的数据逐字写回。在本例中,对于msg没有进行释放操作,因为netty在写入操作时会对msg进行释放。 - ctx.write方法并不会实际的将数据写入到socket中,而是会将数据写入到缓存中,然后调用ctx.flush方法将缓存中数据刷新到socket中 ### 编写一个TimeServer 在本节中实现的协议是time协议,与先前实例不同的是,其并不会接收任何数据,在返回一个32bit的整数之后就会关闭连接。 因为在time server实现中我们会忽略掉任何请求message,并且在连接一建立时就返回一个整数,故而我们不能使用channelRead方法。相对的,我们override并使用channelActive方法,如下是该方法的实现: ```java package io.netty.example.time; public class TimeServerHandler extends ChannelHandlerAdapter { @Override public void channelActive(final ChannelHandlerContext ctx) { // (1) final ByteBuf time = ctx.alloc().buffer(4); // (2) time.writeInt((int) (System.currentTimeMillis() / 1000L + 2208988800L)); final ChannelFuture f = ctx.writeAndFlush(time); // (3) f.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) { assert f == future; ctx.close(); } }); // (4) } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); } } ``` - channelActive在连接已经成功建立并且处于就绪状态能够产生traffic时,会立即被调用。我们会写入一个int整数来代表当前时间。 - 为了发送新消息,我们需要分配一个新buffer,buffer中包含消息。由于需要写入32为的整数,故而buffer的大小是4字节。通过ChannelHandlerContext.alloc方法获取ByteBufAllocator并分配ByteBuf。 - 在ByteBuf中,拥有两个指针read pointer和write pointer,在进行写入操作时,write pointer会增加但是read pointer并不会变化,因而在执行write操作之前无需像NIO中的ByteBuffer一样调用flip方法 - ChannelHandlerContext.write方法和writeAndFlush方法会返回一个ChannelFuture对象。ChannelFuture对象代表一个目前尚未发生的IO操作,这意味着netty中任何请求的操作目前都可能尚未执行,netty中任何操作都是异步的 > 如下代码中,channel可能在数据写入之前被删除 > ```java > Channel ch = ...; > ch.writeAndFlush(message); > ch.close(); > ``` 因而,需要在ChannelFuture完成之后再调用close方法,再上例中再listener中对channel进行关闭。 > ctx.close并不会立马关闭连接,close方法返回的也是一个ChannelFuture对象 - 如果要监听write操作何时完成,可以为返回的ChannelFuture添加一个listener,如上例所示。也可以使用预先定义好的listener: ```java f.addListener(ChannelFutureListener.CLOSE); ``` ### 编写一个Time Client 如下通过netty编写了一个client。再通过netty编写server和client的过程中,唯一的区别是不同的Bootstrap和Channel实现类被使用。 netty client客户端的实现通过如下代码: ```java package io.netty.example.time; public class TimeClient { public static void main(String[] args) throws Exception { String host = args[0]; int port = Integer.parseInt(args[1]); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { Bootstrap b = new Bootstrap(); // (1) b.group(workerGroup); // (2) b.channel(NioSocketChannel.class); // (3) b.option(ChannelOption.SO_KEEPALIVE, true); // (4) b.handler(new ChannelInitializer() { @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new TimeClientHandler()); } }); // Start the client. ChannelFuture f = b.connect(host, port).sync(); // (5) // Wait until the connection is closed. f.channel().closeFuture().sync(); } finally { workerGroup.shutdownGracefully(); } } } ``` - BootStrap和ServerBootstrap类似,但是其是为了非server channel如客户端或无连接channel。 - 当只指定了一个EventLoopGroup时,该EventLoopGroup会被同时用作boss group和worker group。 - 对于client-side应用,其并不会用NioServerSocketChannel,而是会用NioSocketChannel - 客户端程序会调用connect方法,而不是bind方法 客户端的ChannelHandler实现如下,其接收一个32bit的整数,并且将其转化为可读的日期格式 ```java package io.netty.example.time; import java.util.Date; public class TimeClientHandler extends ChannelHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { ByteBuf m = (ByteBuf) msg; // (1) try { long currentTimeMillis = (m.readUnsignedInt() - 2208988800L) * 1000L; System.out.println(new Date(currentTimeMillis)); ctx.close(); } finally { m.release(); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); } } ``` ### 处理基于stream的传输 #### Socket Buffer的一个小漏洞 再基于流的传输中,例如tcp/ip,接收到的数据被存储在socket receive buffer中。不幸的是,基于流传输的socket buffer不是一个packet队列而是一个byte队列。因此,即使你将两个消息作为两个独立的packet发送,操作系统也不会将其作为两个独立的packet而是将其作为一些列byte。