380 lines
15 KiB
Markdown
380 lines
15 KiB
Markdown
# NIO
|
||
## 简介
|
||
nio库在jdk 1.4时被引入,和传统i/o不同的是,nio性能高且基于block。在nio中,定义了类来持有数据,并以block为单位对数据进行处理。
|
||
|
||
### nio和传统io区别
|
||
nio和传统io最大的区别是数据打包和传输的方式。在传统io中,会以stream的形式来处理数据;而在nio中,数据则是以block的形式被处理。
|
||
|
||
面向stream的io系统在同一时刻只能处理1字节的数据,通常较慢。
|
||
|
||
面向block的io系统则是以block为单位处理数据,处理速度较传统io快。
|
||
|
||
## Channel/Buffer
|
||
channel和buffer是nio中的核心对象,几乎在每个io操作中被使用。
|
||
|
||
channel类似于传统io中的stream,所有数据的写入或读取都需要通过channel。而buffer本质上则是数据的容器,`所有被发送到channel中的数据都要先被放置到buffer中`,`所有从channel中读取的数据都要先被读取到buffer中`。
|
||
|
||
### Buffer
|
||
Buffer是一个类,用于存储数据,buffer中的数据要么将要被写入到Channel中,要么刚从Channel中读取出来。
|
||
|
||
> Buffer是nio和传统io的重要区别,在传统io中,数据直接从stream中被读取出来,也被直接写入到stream中。
|
||
>
|
||
> 在nio中,数据的读取和写入都需要经过Buffer
|
||
|
||
buffer的本质是一个字节数组,buffer提供了对数据的结构化访问,并且,buffer还追踪了系统的读/写操作。
|
||
|
||
#### Buffer类型
|
||
最常用的Buffer类型是ByteBuffer,其支持对底层的字节数据进行set/get操作,初次之外,Buffer还有其他类型。
|
||
|
||
- ByteBuffer
|
||
- CharBuffer
|
||
- ShortBuffer
|
||
- IntBuffer
|
||
- LongBuffer
|
||
- FloatBuffer
|
||
- DoubleBuffer
|
||
|
||
上述的每个Buffer类都实现了Buffer接口,除了ByteBuffer之外,其他每个Buffer类型都拥有相同的操作。由于ByteBuffer被绝大多数标准io操作锁使用,故而ByteBuffer除了拥有上述操作外,还拥有一些额外的操作。
|
||
|
||
#### Buffer设计
|
||
Buffer是存储指定primitive type数据的容器(Long,Byte,Int,Float,Char,Double,Short),buffer是线性并且有序的。
|
||
|
||
buffer的主要属性包括capacity、limit和position:
|
||
- capacity:buffer的capacity是其可以包含的元素数量,capacity不可以为负数并且不可以被改变
|
||
- limit:buffer的limit代表buffer中第一个不应该被读/写的元素下表,`(例如buffer的capacity为5,但是其中只包含3个元素,那么limit则为3)`,limit不能为负数,并且limit不可以比capacity大
|
||
- position:position代表下一个被读/写元素的下标,position部为负数也不应该比limit大
|
||
|
||
#### 传输数据
|
||
buffer的每个子类都定义了get/put类似的操作,相应的read或write操作会从position转移一个或多个元素的数据,如果请求转移的数据比limit大,那么相应的get操作会抛出`BufferUnderflowException`,set操作则是会抛出`BufferOverflowException`;在上述两种情况下,没有数据会被传输。
|
||
|
||
#### mark & reset
|
||
buffer的mark操作标识对buffer执行reset操作后position会被重置到的位置。mark并非所有情况下都被定义,当mark被定义时,其应该不为负数并且不应该比position大。如果position或limit被调整为比mark更小的值之后,mark值将会被丢弃。如果在mark没有被定义时调用reset操作,那么会抛出`InvalidMarkException`异常。
|
||
|
||
> #### 属性之间的大小关系
|
||
>
|
||
> `0<=mark<=position<=limit<=capacity`
|
||
|
||
#### clear/flip/rewind
|
||
除了对buffer定义mark/reset操作之外,buffer还定义了如下操作:
|
||
- `clear()`:使buffer可以执行新的sequence channel-read操作或relative put操作
|
||
- clear操作会将limit设置为capacity的值并且将position设置为0
|
||
- `flip()`:使buffer可以执行sequence channel-wirte操作和relative get操作:
|
||
- flip操作会将limit设置为position,并且将position设置为0
|
||
- `rewind()`:使buffer中的数据可以重新被读取:
|
||
- rewind操作会将position设置为0,limit保持不变
|
||
|
||
> 在向buffer写入数据之前,应该调用`clear()`方法,将limit设置为capacity并将position设置为0,从而可以向[0, capacity-1]的范围内写入数据
|
||
>
|
||
> 当写入完成之后,想要从buffer中读取数据时,需要先调用`flip()`方法,将limit设置为position并且将position设置为0,从而可以读取[0, position-1]范围内的数据
|
||
>
|
||
> 如果想要对buffer中的数据进行重新读取,可以调用`rewind()`方法,其将position设置为0,从而可以对[0, limit-1]范围内的数据重新进行读取。
|
||
|
||
#### Buffer get/put方法
|
||
##### get
|
||
在ByteBuffer中,拥有如下`get`方法:
|
||
```java
|
||
byte get();
|
||
ByteBuffer get( byte dst[] );
|
||
ByteBuffer get( byte dst[], int offset, int length );
|
||
byte get( int index );
|
||
```
|
||
其中,前3个get方法都是relative方法,其基于当前position和limit来获取数据。第4个get方法则是absolute方法,其不基于position和limit,而是基于index绝对偏移量来获取数据。
|
||
|
||
> absolute get方法调用不基于position和limit,也不会对position和limit造成影响。
|
||
|
||
##### put
|
||
ByteBuffer拥有如下`put`方法:
|
||
```java
|
||
ByteBuffer put( byte b );
|
||
ByteBuffer put( byte src[] );
|
||
ByteBuffer put( byte src[], int offset, int length );
|
||
ByteBuffer put( ByteBuffer src );
|
||
ByteBuffer put( int index, byte b );
|
||
```
|
||
|
||
#### Buffer allocation and warpping
|
||
在创建buffer之前,必须对buffer进行分配,
|
||
```java
|
||
ByteBuffer buffer = ByteBuffer.allocate(1024);
|
||
```
|
||
`Buffer.allocate`会创建一个指定大小的底层数组,并将数组封装到一个buffer对象中。
|
||
|
||
除了调用`allocate`之外,还可以将一个已经存在的数组分配给buffer:
|
||
```java
|
||
byte array[] = new byte[1024];
|
||
ByteBuffer buffer = ByteBuffer.wrap(array);
|
||
```
|
||
在调用`wrap`方法之后,会创建一个buffer对象,buffer对象对array进行的包装。此后,既可以通过array直接访问数组,也可以通过buffer访问数组。
|
||
|
||
#### Buffer slice
|
||
buffer slice会基于buffer对象创建一个sub buffer,新建的sub buffer会和原来的buffer共享部分的底层数组数据。
|
||
|
||
```java
|
||
ByteBuffer buffer = ByteBuffer.allocate(10);
|
||
|
||
for (int i = 0; i < buffer.capacity(); ++i) {
|
||
buffer.put((byte) i);
|
||
}
|
||
|
||
buffer.position(3);
|
||
buffer.limit(7);
|
||
|
||
ByteBuffer slice = buffer.slice();
|
||
```
|
||
通过上述方法,可以创建一个包含原始buffer[3,6]范围内元素的sub buffer。但是,sub buffer和buffer共享[3,6]范围内的数据,实例如下所示:
|
||
```java
|
||
// 将[3,6]范围内的元素都 * 11
|
||
for (int i = 0; i < slice.capacity(); ++i) {
|
||
byte b = slice.get(i);
|
||
b *= 11;
|
||
slice.put(i, b);
|
||
}
|
||
|
||
// 重置original buffer的position和limit并读取数据
|
||
buffer.position(0);
|
||
buffer.limit(buffer.capacity());
|
||
while (buffer.remaining() > 0) {
|
||
System.out.println(buffer.get());
|
||
}
|
||
```
|
||
修改sub buffer中的元素内容后,访问original buffer中的内容,输出结果如下所示:
|
||
```shell
|
||
$ java SliceBuffer
|
||
0
|
||
1
|
||
2
|
||
33
|
||
44
|
||
55
|
||
66
|
||
7
|
||
8
|
||
9
|
||
```
|
||
易得知在sub buffer修改内容后,内容修改对buffer也可见。
|
||
|
||
#### ReadOnly Buffer
|
||
对于readonly buffer,可以从其中读取值,但是无法向其写入值。对于任何常规buffer,可以对其调用`buffer.asReadOnlyBuffer()`来获取一个readonly buffer。
|
||
|
||
#### Direct & Indirect Buffer
|
||
direct buffer的内存通过特定方式来分配,从而增加io速度。
|
||
|
||
对于direct buffer,jvm在将尽量避免在调用本地io操作前(后),将buffer中内容写入/读取到中间buffer。
|
||
|
||
|
||
|
||
|
||
### Channel
|
||
Channel为一个对象,可以从channel中读取数据或向channel写入数据。将传统io和nio相比较,channel类似于stream。
|
||
|
||
Channel和Stream不同的是,channel为双向的,而Stream则仅支持单项(InputStream或OutputStream)。一个开启的channel可以被用于读取、写入或同时读取或写入。
|
||
|
||
## Channel读写
|
||
向channel读取或写入数据十分简单:
|
||
- 读:仅需创建一个buffer对象,并且访问channel将数据读取到buffer对象中
|
||
- 写:创建一个buffer对象,并将数据填充到buffer对象,之后访问channel并将bufffer中的数据写入到channel中
|
||
|
||
### 读取文件
|
||
通过nio读取文件分为如下步骤:
|
||
1. 从FileInputStream中获取Channel
|
||
2. 创建buffer对象
|
||
3. 将channel中的数据读取到buffer
|
||
|
||
示例如下所示:
|
||
```java
|
||
FileInputStream fin = new FileInputStream( "readandshow.txt" );
|
||
// 1. 获取channel
|
||
FileChannel fc = fin.getChannel();
|
||
// 2. 创建buffer
|
||
ByteBuffer buffer = ByteBuffer.allocate(1024);
|
||
// 3. 将数据从channel中读取到buffer中
|
||
fc.read( buffer );
|
||
```
|
||
|
||
> 如上述示例所示,在从channel读取数据时,无需指定向buffer中读取多少字节的数据,每个buffer内部中都有一个复杂的统计系统,会跟踪已经读取了多少字节的数据、buffer中还有多少空间用于读取更多数据。
|
||
|
||
### 写入文件
|
||
通过nio写入文件步骤和读取类似,示例如下所示:
|
||
```java
|
||
FileOutputStream fout = new FileOutputStream( "writesomebytes.txt" );
|
||
// 1. 获取channel
|
||
FileChannel fc = fout.getChannel();
|
||
// 2. 创建buffer对象
|
||
ByteBuffer buffer = ByteBuffer.allocate( 1024 );
|
||
for (int i = 0; i < message.length; ++i) {
|
||
buffer.put(message[i]);
|
||
}
|
||
|
||
buffer.flip();
|
||
|
||
// 3. 将buffer中的数据写入到channel中
|
||
fc.write(buffer);
|
||
```
|
||
|
||
在向channel写入数据时,同样无需指定需要向channel写入多少字节的数据,buffer内部的统计系统同样会追踪buffer中含有多少字节的数据、还有多少数据待写入。
|
||
|
||
### 同时读取和写入文件
|
||
同时读取和写入文件的示例如下所示:
|
||
```java
|
||
public class CopyFile {
|
||
static public void main(String args[]) throws Exception {
|
||
if (args.length < 2) {
|
||
System.err.println("Usage: java CopyFile infile outfile");
|
||
System.exit(1);
|
||
}
|
||
|
||
String infile = args[0];
|
||
String outfile = args[1];
|
||
|
||
try (FileInputStream fin = new FileInputStream(infile);
|
||
FileOutputStream fout = new FileOutputStream(outfile)) {
|
||
|
||
FileChannel fcin = fin.getChannel();
|
||
FileChannel fcout = fout.getChannel();
|
||
|
||
ByteBuffer buffer = ByteBuffer.allocate(1024);
|
||
|
||
while (true) {
|
||
buffer.clear();
|
||
int r = fcin.read(buffer);
|
||
|
||
if (r == -1) {
|
||
break;
|
||
}
|
||
|
||
buffer.flip();
|
||
fcout.write(buffer);
|
||
}
|
||
}
|
||
}
|
||
}
|
||
```
|
||
|
||
在上述示例中,通过`channel.read(buffer)`的返回值来判断是否已经读取到文件末尾,如果返回值为-1,那么代表文件读取已经完成。
|
||
|
||
## File Lock
|
||
可以针对整个文件或文件的部分进行加锁。文件锁分为独占锁和共享锁,如果获取了独占锁,那么其他进程将无法获取同一文件加锁;如果获取了共享锁,那么其他进程也可以获取同一文件的共享锁,但是无法获取到独占锁。
|
||
|
||
### 对文件进行上锁
|
||
在获取锁时,需要在FileChannel上调用lock方法:
|
||
```java
|
||
RandomAccessFile raf = new RandomAccessFile("usefilelocks.txt", "rw");
|
||
FileChannel fc = raf.getChannel();
|
||
FileLock lock = fc.lock(start, end, false);
|
||
```
|
||
释放锁方法如下所示:
|
||
```java
|
||
lock.release();
|
||
```
|
||
## Network和异步IO
|
||
通过异步io,可以在没有blocking的情况下读取和写入数据,在通常情况下,调用read方法会一直阻塞到数据可以被读取,而write方法会一直阻塞到数据可以被写入。
|
||
|
||
异步IO通过注册io事件来实现,当例如新的可读数据、新的网络连接到来时,系统会根据注册的io时间监听来发送消息提醒。
|
||
|
||
异步io的好处是,可以在同一线程内处理大量的io操作。在传统的io操作中,如果要处理大量io操作,通常需要轮询、创建大量线程来针对io操作进行处理。
|
||
- 轮询:对io请求进行排队,处理完一个io请求后再处理别的请求
|
||
- 创建大量线程:针对每个io请求,为其创建一个线程进行处理
|
||
|
||
通过nio,可以通过单个线程来监听多个channel的io事件,无需轮询也无需额外的事件。
|
||
|
||
### Selector
|
||
nio的核心对象为selector,将channel及其感兴趣的io事件类型注册到selector后,如果io事件触发,那么`selector.select`方法将会返回对应的SelectionKey。
|
||
|
||
Selector创建如下:
|
||
|
||
```java
|
||
Selector selector = Selector.open();
|
||
```
|
||
|
||
### 开启ServerSocketChannel
|
||
为了接收外部连接,需要一个ServerSocketChannel,创建ServerSocketChannel并配置对应ServerSocket的示例如下所示:
|
||
```java
|
||
ServerSocketChannel ssc = ServerSocketChannel.open();
|
||
ssc.configureBlocking(false);
|
||
ServerSocket ss = ssc.socket();
|
||
|
||
InetSocketAddress address = new InetSocketAddress(ports[i]);
|
||
ss.bind(address);
|
||
```
|
||
|
||
### Selection keys
|
||
在创建完SelectableChannel之后,需要将channel注册到selector,可以调用`channel.register`来执行注册操作:
|
||
```java
|
||
SelectionKey key = ssc.register(selector, SelectionKey.OP_ACCEPT);
|
||
```
|
||
|
||
上述方法为ServerSocketChannel注册了OP_ACCEPT的事件监听,OP_ACCEPT事件当新连接到来时将会被触发,OP_ACCEPT是适用于ServerSocketChannel的唯一io事件类型。
|
||
|
||
register方法的返回值SelectionKey代表channel和selector的注册关系,当io事件触发时,selector也会返回事件关联的SelectionKey。
|
||
|
||
除此之外,SelectionKey也可以被用于取消channel对selector的注册。
|
||
|
||
### inner loop
|
||
在向selector注册完channel之后,会进入到selector的循环
|
||
```java
|
||
while (true) {
|
||
selector.select();
|
||
|
||
Set<SelectionKey> selectedKeys = selector.selectedKeys();
|
||
Iterator<SelectionKey> it = selectedKeys.iterator();
|
||
|
||
while (it.hasNext()) {
|
||
SelectionKey key = it.next();
|
||
// ... deal with I/O event ...
|
||
}
|
||
}
|
||
```
|
||
在调用`selector.select`方法时,改方法会阻塞,直到有至少一个注册的io事件被触发。`selector.select`会返回触发事件的个数。
|
||
|
||
`select.selectedKeys()`方法则是会返回被触发事件关联的SelectionKey集合。
|
||
|
||
对于每个SelectionKey,需要通过`key.readyOps()`来判断事件的类型,并以此对其进行处理。
|
||
|
||
### 为新连接注册
|
||
当新连接到来后,需要将新连接注册到selector中
|
||
|
||
```java
|
||
ServerSocketChannel ssc = (ServerSocketChannel)key.channel();
|
||
SocketChannel sc = ssc.accept();
|
||
|
||
sc.configureBlocking(false);
|
||
// 监听新连接的READ事件类型
|
||
sc.register(selector, SelectionKey.OP_READ);
|
||
```
|
||
|
||
### remove SelectionKey
|
||
在完成对io事件的处理之后,必须要将SelectionKey从selectedKeys之中移除,否则selectionKey将会被一直作为活跃的io事件
|
||
```java
|
||
it.remove();
|
||
```
|
||
|
||
### 新连接有可读数据
|
||
当注册的新连接传来可读数据之后,可读数据处理方式如下:
|
||
```java
|
||
else if ((key.readyOps() & SelectionKey.OP_READ)
|
||
== SelectionKey.OP_READ) {
|
||
|
||
// Read the data
|
||
SocketChannel sc = (SocketChannel) key.channel();
|
||
```
|
||
## CharsetEncoder/CharsetDecoder
|
||
通过`CharsetEncoder/CharsetDecoder`,可以进行`CharBuffer`和`ByteBuffer`之间的转化。
|
||
|
||
```java
|
||
// 获取字符集
|
||
Charset latin1 = Charset.forName("ISO-8859-1");
|
||
|
||
// 通过字符集创建encoder/decoder
|
||
CharsetDecoder decoder = latin1.newDecoder();
|
||
CharsetEncoder encoder = latin1.newEncoder();
|
||
|
||
CharBuffer cb = decoder.decode(inputData);
|
||
|
||
ByteBuffer outputData = encoder.encode(cb);
|
||
```
|
||
|
||
|
||
|
||
|
||
|