131 lines
7.5 KiB
Markdown
131 lines
7.5 KiB
Markdown
# netty document
|
||
## ChannelPipeline
|
||
I/O Request
|
||
via Channel or
|
||
ChannelHandlerContext
|
||
|
|
||
+---------------------------------------------------+---------------+
|
||
| ChannelPipeline | |
|
||
| \|/ |
|
||
| +---------------------+ +-----------+----------+ |
|
||
| | Inbound Handler N | | Outbound Handler 1 | |
|
||
| +----------+----------+ +-----------+----------+ |
|
||
| /|\ | |
|
||
| | \|/ |
|
||
| +----------+----------+ +-----------+----------+ |
|
||
| | Inbound Handler N-1 | | Outbound Handler 2 | |
|
||
| +----------+----------+ +-----------+----------+ |
|
||
| /|\ . |
|
||
| . . |
|
||
| ChannelHandlerContext.fireIN_EVT() ChannelHandlerContext.OUT_EVT()|
|
||
| [ method call] [method call] |
|
||
| . . |
|
||
| . \|/ |
|
||
| +----------+----------+ +-----------+----------+ |
|
||
| | Inbound Handler 2 | | Outbound Handler M-1 | |
|
||
| +----------+----------+ +-----------+----------+ |
|
||
| /|\ | |
|
||
| | \|/ |
|
||
| +----------+----------+ +-----------+----------+ |
|
||
| | Inbound Handler 1 | | Outbound Handler M | |
|
||
| +----------+----------+ +-----------+----------+ |
|
||
| /|\ | |
|
||
+---------------+-----------------------------------+---------------+
|
||
| \|/
|
||
+---------------+-----------------------------------+---------------+
|
||
| | | |
|
||
| [ Socket.read() ] [ Socket.write() ] |
|
||
| |
|
||
| Netty Internal I/O Threads (Transport Implementation) |
|
||
+-------------------------------------------------------------------+
|
||
|
||
如上图所示,i/o event将会被`ChannelInboundHandler`或`ChannelOutboundHandler`处理,并且通过调用`ChannelHandlerContext`中的event propagation method(触发事件传播的方法)来进行转发。
|
||
|
||
常见的event propagation method如下,例如`ChannelHandlerContext.write(Object)`。
|
||
|
||
### inbound event
|
||
inbound event会通过inbound handler自底向上的进行处理,例如上图的左边部分所示。inbound handler通常处理由io thread产生的inbound data,inbound则是通常从remote peer读取。
|
||
|
||
> 如果inbound event传播到高于top inbound handler,那么该event将会被丢弃。
|
||
|
||
### outbound event
|
||
outbound event会按照自顶向下的顺序被outbound进行处理,outbound handler通常产生outbound traffic或对outbound traffic进行处理。
|
||
|
||
> 如果outbound event传播到低于bottom outbound handler, 其会直接被channel关联的io线程处理。
|
||
|
||
> 通常,io thread会执行实际的输出操作,例如`SocketChannel.write(ByteBuffer)`。
|
||
|
||
### pipeline处理顺序
|
||
例如,按照如下顺序项pipeline中添加handler时,
|
||
```java
|
||
ChannelPipeline p = ...;
|
||
p.addLast("1", new InboundHandlerA());
|
||
p.addLast("2", new InboundHandlerB());
|
||
p.addLast("3", new OutboundHandlerA());
|
||
p.addLast("4", new OutboundHandlerB());
|
||
p.addLast("5", new InboundOutboundHandlerX());
|
||
```
|
||
#### inbound
|
||
对于inbound event,handler的执行顺序为`1,2,3,4,5`
|
||
|
||
但由于`3,4`没有实现ChannelInboundHandler,故而Inbound event会跳过`3,4` handler,实际Inbound event的handler顺序为`1,2,5`
|
||
|
||
#### outbound
|
||
对于outbound event,handler的执行顺序为`5,4,3,2,1`
|
||
|
||
对于outbound evnet,由于`1,2`并没有实现ChannelOutboundHandler,故而outbound event的handler顺序为`5,4,3`。
|
||
|
||
> inbound顺序和pipeline handler的添加顺序相同,outbound顺序和pipeline handler添加顺序相反。
|
||
|
||
### 将event转发给下一个handler
|
||
如上图所示,再handler中必须调用`ChannelHandlerContext`中的event propagation method来将event转发给下一个handler。`event propagation event`包括的方法如下:
|
||
|
||
#### Inbound event propagation method
|
||
- ChannelHandlerContext.fireChannelRegistered()
|
||
- ChannelHandlerContext.fireChannelActive()
|
||
- ChannelHandlerContext.fireChannelRead(Object)
|
||
- ChannelHandlerContext.fireChannelReadComplete()
|
||
- ChannelHandlerContext.fireExceptionCaught(Throwable)
|
||
- ChannelHandlerContext.fireUserEventTriggered(Object)
|
||
- ChannelHandlerContext.fireChannelWritabilityChanged()
|
||
- ChannelHandlerContext.fireChannelInactive()
|
||
- ChannelHandlerContext.fireChannelUnregistered()
|
||
|
||
#### Outbound event propagation method
|
||
- ChannelHandlerContext.bind(SocketAddress, ChannelPromise)
|
||
- ChannelHandlerContext.connect(SocketAddress, SocketAddress, ChannelPromise)
|
||
- ChannelHandlerContext.write(Object, ChannelPromise)
|
||
- ChannelHandlerContext.flush()
|
||
- ChannelHandlerContext.read()
|
||
- ChannelHandlerContext.disconnect(ChannelPromise)
|
||
- ChannelHandlerContext.close(ChannelPromise)
|
||
- ChannelHandlerContext.deregister(ChannelPromise)
|
||
|
||
### build pipeline
|
||
在一个pipeline中,应该包含一个或多个ChannelHandler用于接收IO事件(read)和请求IO操作(write and close)。例如,一个典型的server其channel的pipeline中应该包含如下handler:
|
||
- protocol decoder:将二进制数据转化为java object
|
||
- protocol encoder:将java object转化为二进制数据
|
||
- business logic handler:执行实际业务操作
|
||
|
||
构建pipeline示例如下:
|
||
```java
|
||
static final EventExecutorGroup group = new DefaultEventExecutorGroup(16);
|
||
...
|
||
ChannelPipeline pipeline = ch.pipeline();
|
||
|
||
pipeline.addLast("decoder", new MyProtocolDecoder());
|
||
pipeline.addLast("encoder", new MyProtocolEncoder());
|
||
|
||
// Tell the pipeline to run MyBusinessLogicHandler's event handler methods
|
||
// in a different thread than an I/O thread so that the I/O thread is not blocked by
|
||
// a time-consuming task.
|
||
// If your business logic is fully asynchronous or finished very quickly, you don't
|
||
// need to specify a group.
|
||
pipeline.addLast(group, "handler", new MyBusinessLogicHandler());
|
||
```
|
||
当为BusinessLogicHandler指定DefaultEventExecutorGroup时,虽然会将操作从EventLoop中卸载,但是其针对每个ChannelHandlerContext仍然是串行进行处理的。故而,串行处理可能仍然会导致性能瓶颈。如果在用例场景下顺序不太重要时,可以考虑使用`UnorderedThreadPoolEventExecutor`来最大化任务的并行执行。
|
||
|
||
### ThreadSafety
|
||
channelhandler可以在任何时刻添加到pipeline或从pipeline中移除,ChannelPipeline是线程安全的。例如,可以在交换敏感信息前添加encryption handler并且在交换完信息后将encryption handler移除。
|
||
|