Java I_O 体系从原理到应用,这一篇全说清楚了 - 今日头条

本文由 简悦 SimpRead 转码, 原文地址 www.toutiao.com

本文介绍操作系统 I/O 工作原理,Java I/O 设计,基本使用,开源项目中实现高性能 I/O 常见方法和实现,彻底搞懂高性能 I/O 之道基础概念在介绍

https://p26.toutiaoimg.com/origin/pgc-image/271e91f4ac3c4412a36dd53d34950b9f?from=pc

本文介绍操作系统 I/O 工作原理,Java I/O 设计,基本使用,开源项目中实现高性能 I/O 常见方法和实现,彻底搞懂高性能 I/O 之道

在介绍 I/O 原理之前,先重温几个基础概念:

  • (1) 操作系统与内核

https://p26.toutiaoimg.com/origin/pgc-image/5cd7a329b6b24e3a92cf8eb98c874e9f?from=pc

操作系统:管理计算机硬件与软件资源的系统软件内核:操作系统的核心软件,负责管理系统的进程、内存、设备驱动程序、文件和网络系统等等,为应用程序提供对计算机硬件的安全访问服务

  • 2 内核空间和用户空间

为了避免用户进程直接操作内核,保证内核安全,操作系统将内存寻址空间划分为两部分:内核空间(Kernel-space) ,供内核程序使用用户空间(User-space) ,供用户进程使用为了安全,内核空间和用户空间是隔离的,即使用户的程序崩溃了,内核也不受影响

  • 3 数据流

https://p26.toutiaoimg.com/origin/pgc-image/3f26d025174344e2a9b7e43efb99fe4c?from=pc

计算机中的数据是基于随着时间变换高低电压信号传输的,这些数据信号连续不断,有着固定的传输方向,类似水管中水的流动,因此抽象数据流 (I/O 流) 的概念:指一组有顺序的、有起点和终点的字节集合

抽象出数据流的作用:实现程序逻辑与底层硬件解耦,通过引入数据流作为程序与硬件设备之间的抽象层,面向通用的数据流输入输出接口编程,而不是具体硬件特性,程序和底层硬件可以独立灵活替换和扩展

https://p26.toutiaoimg.com/origin/pgc-image/44324991a9d04cc5bbbcf2463b985828?from=pc

1 磁盘 I/O

典型 I/O 读写磁盘工作原理如下:

https://p26.toutiaoimg.com/origin/pgc-image/2dffe80251054b9db5925ea783d5b108?from=pc

tips: DMA:全称叫直接内存存取(Direct Memory Access),是一种允许外围设备(硬件子系统)直接访问系统主内存的机制。基于 DMA 访问方式,系统主内存与硬件设备的数据传输可以省去 CPU 的全程调度

值得注意的是:

  • 读写操作基于系统调用实现
  • 读写操作经过用户缓冲区,内核缓冲区,应用进程并不能直接操作磁盘
  • 应用进程读操作时需阻塞直到读取到数据

2 网络 I/O

这里先以最经典的阻塞式 I/O 模型介绍:

https://p26.toutiaoimg.com/origin/pgc-image/72949daf377a4cec904573605fe4a055?from=pc

https://p26.toutiaoimg.com/origin/pgc-image/8b0648f0ef484af18bc2d00d43ed7507?from=pc

tips:recvfrom,经 socket 接收数据的函数

值得注意的是:

  • 网络 I/O 读写操作经过用户缓冲区,Sokcet 缓冲区
  • 服务端线程在从调用 recvfrom 开始到它返回有数据报准备好这段时间是阻塞的,recvfrom 返回成功后,线程开始处理数据报

1 I/O 分类

Java 中对数据流进行具体化和实现,关于 Java 数据流一般关注以下几个点:

  • (1) 流的方向 从外部到程序,称为输入流;从程序到外部,称为输出流
  • (2) 流的数据单位 程序以字节作为最小读写数据单元,称为字节流,以字符作为最小读写数据单元,称为字符流
  • (3) 流的功能角色

https://p26.toutiaoimg.com/origin/pgc-image/b7f646eec7514a368e4c33c3e0ce347b?from=pc

从 / 向一个特定的 IO 设备(如磁盘,网络)或者存储对象 (如内存数组) 读 / 写数据的流,称为节点流;对一个已有流进行连接和封装,通过封装后的流来实现数据的读 / 写功能,称为处理流 (或称为过滤流);

2 I/O 操作接口

java.io 包下有一堆 I/O 操作类,初学时看了容易搞不懂,其实仔细观察其中还是有规律:这些 I/O 操作类都是在继承 4 个基本抽象流的基础上,要么是节点流,要么是处理流

2.1 四个基本抽象流

java.io 包中包含了流式 I/O 所需要的所有类,java.io 包中有四个基本抽象流,分别处理字节流和字符流:

  • InputStream
  • OutputStream
  • Reader
  • Writer

https://p26.toutiaoimg.com/origin/pgc-image/3cb1ebe660ac4409944b004233818604?from=pc

2.2 节点流

https://p26.toutiaoimg.com/origin/pgc-image/0a2c9d70ff264390bef3af4710f75d9b?from=pc

节点流 I/O 类名由节点流类型 + 抽象流类型组成,常见节点类型有:

  • File 文件
  • Piped 进程内线程通信管道
  • ByteArray / CharArray (字节数组 / 字符数组)
  • StringBuffer / String (字符串缓冲区 / 字符串)

节点流的创建通常是在构造函数传入数据源,例如:

1
2
FileReader reader = new FileReader(new File("file.txt"));
FileWriter writer = new FileWriter(new File("file.txt"));

2.3 处理流

https://p26.toutiaoimg.com/origin/pgc-image/a5748ea2dc6e412485e6cc942253f2fa?from=pc

处理流 I/O 类名由对已有流封装的功能 + 抽象流类型组成,常见功能有:

  • 缓冲:对节点流读写的数据提供了缓冲的功能,数据可以基于缓冲批量读写,提高效率。常见有 BufferedInputStream、BufferedOutputStream
  • 字节流转换为字符流:由 InputStreamReader、OutputStreamWriter 实现
  • 字节流与基本类型数据相互转换:这里基本数据类型数据如 int、long、short,由 DataInputStream、DataOutputStream 实现
  • 字节流与对象实例相互转换:用于实现对象序列化,由 ObjectInputStream、ObjectOutputStream 实现

处理流的应用了适配器 / 装饰模式,转换 / 扩展已有流,处理流的创建通常是在构造函数传入已有的节点流或处理流:

1
2
3
4
5
FileOutputStream fileOutputStream = new FileOutputStream("file.txt");
// 扩展提供缓冲写
BufferedOutputStream bufferedOutputStream = new BufferedOutputStream(fileOutputStream);
 // 扩展提供提供基本数据类型写
DataOutputStream out = new DataOutputStream(bufferedOutputStream);

3 Java NIO

3.1 标准 I/O 存在问题

Java NIO(New I/O) 是一个可以替代标准 Java I/O API 的 IO API(从 Java 1.4 开始),Java NIO 提供了与标准 I/O 不同的 I/O 工作方式,目的是为了解决标准 I/O 存在的以下问题:

  • (1) 数据多次拷贝

标准 I/O 处理,完成一次完整的数据读写,至少需要从底层硬件读到内核空间,再读到用户文件,又从用户空间写入内核空间,再写入底层硬件

此外,底层通过 write、read 等函数进行 I/O 系统调用时,需要传入数据所在缓冲区起始地址和长度由于 JVM GC 的存在,导致对象在堆中的位置往往会发生移动,移动后传入系统函数的地址参数就不是真正的缓冲区地址了

可能导致读写出错,为了解决上面的问题,使用标准 I/O 进行系统调用时,还会额外导致一次数据拷贝:把数据从 JVM 的堆内拷贝到堆外的连续空间内存 (堆外内存)

所以总共经历 6 次数据拷贝,执行效率较低

https://p26.toutiaoimg.com/origin/pgc-image/e80d080550474d9f9ddf6e9e821ec4f3?from=pc

  • (2) 操作阻塞

传统的网络 I/O 处理中,由于请求建立连接 (connect),读取网络 I/O 数据(read),发送数据(send) 等操作是线程阻塞的

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
// 等待连接
Socket socket = serverSocket.accept();

// 连接已建立,读取请求消息
StringBuilder req = new StringBuilder();
byte[] recvByteBuf = new byte[1024];
int len;
while ((len = socket.getInputStream().read(recvByteBuf)) != -1) {
 req.append(new String(recvByteBuf, 0, len, StandardCharsets.UTF_8));
}

// 写入返回消息
socket.getOutputStream().write(("server response msg".getBytes()));
socket.shutdownOutput();

以上面服务端程序为例,当请求连接已建立,读取请求消息,服务端调用 read 方法时,客户端数据可能还没就绪 (例如客户端数据还在写入中或者传输中),线程需要在 read 方法阻塞等待直到数据就绪

为了实现服务端并发响应,每个连接需要独立的线程单独处理,当并发请求量大时为了维护连接,内存、线程切换开销过大

https://p26.toutiaoimg.com/origin/pgc-image/539ca848ad6a47f199a923a1ab2537e8?from=pc

3.2 Buffer

Java NIO 核心三大核心组件是 Buffer(缓冲区)、Channel(通道)、Selector

Buffer 提供了常用于 I/O 操作的字节缓冲区,常见的缓存区有 ByteBuffer, CharBuffer, DoubleBuffer, FloatBuffer, IntBuffer, LongBuffer, ShortBuffer,分别对应基本数据类型: byte, char, double, float, int, long, short,下面介绍主要以最常用的 ByteBuffer 为例,Buffer 底层支持 Java 堆内 (HeapByteBuffer) 或堆外内存(DirectByteBuffer)

堆外内存是指与堆内存相对应的,把内存对象分配在 JVM 堆以外的内存,这些内存直接受操作系统管理(而不是虚拟机,相比堆内内存,I/O 操作中使用堆外内存的优势在于:

  • 不用被 JVM GC 线回收,减少 GC 线程资源占有
  • 在 I/O 系统调用时,直接操作堆外内存,可以节省一次堆外内存和堆内内存的复制

ByteBuffer 底层堆外内存的分配和释放基于 malloc 和 free 函数,对外 allocateDirect 方法可以申请分配堆外内存,并返回继承 ByteBuffer 类的 DirectByteBuffer 对象:

1
2
3
public static ByteBuffer allocateDirect(int capacity) {
 return new DirectByteBuffer(capacity);
}

堆外内存的回收基于 DirectByteBuffer 的成员变量 Cleaner 类,提供 clean 方法可以用于主动回收,Netty 中大部分堆外内存通过记录定位 Cleaner 的存在,主动调用 clean 方法来回收;另外,当 DirectByteBuffer 对象被 GC 时,关联的堆外内存也会被回收

tips: JVM 参数不建议设置 - XX:+DisableExplicitGC,因为部分依赖 Java NIO 的框架 (例如 Netty) 在内存异常耗尽时,会主动调用 System.gc(),触发 Full GC,回收 DirectByteBuffer 对象,作为回收堆外内存的最后保障机制,设置该参数之后会导致在该情况下堆外内存得不到清理

堆外内存基于基础 ByteBuffer 类的 DirectByteBuffer 类成员变量:Cleaner 对象,这个 Cleaner 对象会在合适的时候执行 unsafe.freeMemory(address),从而回收这块堆外内存

Buffer 可以见到理解为一组基本数据类型,存储地址连续的的数组,支持读写操作,对应读模式和写模式,通过几个变量来保存这个数据的当前位置状态:capacity、 position、 limit:

  • capacity 缓冲区数组的总长度
  • position 下一个要操作的数据元素的位置
  • limit 缓冲区数组中不可操作的下一个元素的位置:limit <= capacity

https://p26.toutiaoimg.com/origin/pgc-image/23fd338f6f5b4943b333d1eecfa8b7cc?from=pc

3.3 Channel

Channel(通道) 的概念可以类比 I/O 流对象,NIO 中 I/O 操作主要基于 Channel:从 Channel 进行数据读取 :创建一个缓冲区,然后请求 Channel 读取数据从 Channel 进行数据写入 :创建一个缓冲区,填充数据,请求 Channel 写入数据

Channel 和流非常相似,主要有以下几点区别:

  • Channel 可以读和写,而标准 I/O 流是单向的
  • Channel 可以异步读写,标准 I/O 流需要线程阻塞等待直到读写操作完成
  • Channel 总是基于缓冲区 Buffer 读写

Java NIO 中最重要的几个 Channel 的实现:

  • FileChannel: 用于文件的数据读写,基于 FileChannel 提供的方法能减少读写文件数据拷贝次数,后面会介绍
  • DatagramChannel: 用于 UDP 的数据读写
  • SocketChannel: 用于 TCP 的数据读写,代表客户端连接
  • ServerSocketChannel: 监听 TCP 连接请求,每个请求会创建会一个 SocketChannel,一般用于服务端

基于标准 I/O 中,我们第一步可能要像下面这样获取输入流,按字节把磁盘上的数据读取到程序中,再进行下一步操作,而在 NIO 编程中,需要先获取 Channel,再进行读写

1
2
FileInputStream fileInputStream = new FileInputStream("test.txt");
FileChannel channel = fileInputStream.channel();

tips: FileChannel 仅能运行在阻塞模式下,文件异步处理的 I/O 是在 JDK 1.7 才被加入的 java.nio.channels.AsynchronousFileChannel

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
// server socket channel:
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.bind(new InetSocketAddress(InetAddress.getLocalHost(), 9091));

while (true) {
 SocketChannel socketChannel = serverSocketChannel.accept();
 ByteBuffer buffer = ByteBuffer.allocateDirect(1024);
 int readBytes = socketChannel.read(buffer);
 if (readBytes > 0) {
 // 从写数据到buffer翻转为从buffer读数据
 buffer.flip();
 byte[] bytes = new byte[buffer.remaining()];
 buffer.get(bytes);
 String body = new String(bytes, StandardCharsets.UTF_8);
 System.out.println("server 收到:" + body);
 }
}

3.4 Selector

Selector(选择器) ,它是 Java NIO 核心组件中的一个,用于检查一个或多个 NIO Channel(通道)的状态是否处于可读、可写。实现单线程管理多个 Channel,也就是可以管理多个网络连接

Selector 核心在于基于操作系统提供的 I/O 复用功能,单个线程可以同时监视多个连接描述符,一旦某个连接就绪(一般是读就绪或者写就绪),能够通知程序进行相应的读写操作,常见有 select、poll、epoll 等不同实现

https://p26.toutiaoimg.com/origin/pgc-image/aa064278c5af4d1897dcd797eb6f8f24?from=pc

https://p26.toutiaoimg.com/origin/pgc-image/abd872f34a414a738829491b86b97912?from=pc

Java NIO Selector 基本工作原理如下:

  • (1) 初始化 Selector 对象,服务端 ServerSocketChannel 对象
  • (2) 向 Selector 注册 ServerSocketChannel 的 socket-accept 事件
  • (3) 线程阻塞于 selector.select(),当有客户端请求服务端,线程退出阻塞
  • (4) 基于 selector 获取所有就绪事件,此时先获取到 socket-accept 事件,向 Selector 注册客户端 SocketChannel 的数据就绪可读事件事件
  • (5) 线程再次阻塞于 selector.select(),当有客户端连接数据就绪,可读
  • (6) 基于 ByteBuffer 读取客户端请求数据,然后写入响应数据,关闭 channel

示例如下,完整可运行代码已经上传 github( https://github.com/caison/caison-blog-demo):

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
Selector selector = Selector.open();
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.bind(new InetSocketAddress(9091));
// 配置通道为非阻塞模式
serverSocketChannel.configureBlocking(false);
// 注册服务端的socket-accept事件
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

while (true) {
 // selector.select()会一直阻塞,直到有channel相关操作就绪
 selector.select();
 // SelectionKey关联的channel都有就绪事件
 Iterator<SelectionKey> keyIterator = selector.selectedKeys().iterator();

 while (keyIterator.hasNext()) {
 SelectionKey key = keyIterator.next();
 // 服务端socket-accept
 if (key.isAcceptable()) {
 // 获取客户端连接的channel
 SocketChannel clientSocketChannel = serverSocketChannel.accept();
 // 设置为非阻塞模式
 clientSocketChannel.configureBlocking(false);
 // 注册监听该客户端channel可读事件,并为channel关联新分配的buffer
 clientSocketChannel.register(selector, SelectionKey.OP_READ, ByteBuffer.allocateDirect(1024));
 }

 // channel可读
 if (key.isReadable()) {
 SocketChannel socketChannel = (SocketChannel) key.channel();
 ByteBuffer buf = (ByteBuffer) key.attachment();

 int bytesRead;
 StringBuilder reqMsg = new StringBuilder();
 while ((bytesRead = socketChannel.read(buf)) > 0) {
 // 从buf写模式切换为读模式
 buf.flip();
 int bufRemain = buf.remaining();
 byte[] bytes = new byte[bufRemain];
 buf.get(bytes, 0, bytesRead);
 // 这里当数据包大于byteBuffer长度,有可能有粘包/拆包问题
 reqMsg.append(new String(bytes, StandardCharsets.UTF_8));
 buf.clear();
 }
 System.out.println("服务端收到报文:" + reqMsg.toString());
 if (bytesRead == -1) {
 byte[] bytes = "[这是服务回的报文的报文]".getBytes(StandardCharsets.UTF_8);

 int length;
 for (int offset = 0; offset < bytes.length; offset += length) {
 length = Math.min(buf.capacity(), bytes.length - offset);
 buf.clear();
 buf.put(bytes, offset, length);
 buf.flip();
 socketChannel.write(buf);
 }
 socketChannel.close();
 }
 }
 // Selector不会自己从已selectedKeys中移除SelectionKey实例
 // 必须在处理完通道时自己移除 下次该channel变成就绪时,Selector会再次将其放入selectedKeys中
 keyIterator.remove();
 }
}

tips: Java NIO 基于 Selector 实现高性能网络 I/O 这块使用起来比较繁琐,使用不友好,一般业界使用基于 Java NIO 进行封装优化,扩展丰富功能的 Netty 框架来优雅实现

下面结合业界热门开源项目介绍高性能 I/O 的优化

1 零拷贝

零拷贝 (zero copy) 技术,用于在数据读写中减少甚至完全避免不必要的 CPU 拷贝,减少内存带宽的占用,提高执行效率,零拷贝有几种不同的实现原理,下面介绍常见开源项目中零拷贝实现

1.1 Kafka 零拷贝

Kafka 基于 Linux 2.1 内核提供,并在 2.4 内核改进的的 sendfile 函数 + 硬件提供的 DMA Gather Copy 实现零拷贝,将文件通过 socket 传送

函数通过一次系统调用完成了文件的传送,减少了原来 read/write 方式的模式切换。同时减少了数据的 copy, sendfile 的详细过程如下:

https://p26.toutiaoimg.com/origin/pgc-image/c0a7006cb7324760afc00c41d4c0c2e8?from=pc

基本流程如下:

  • (1) 用户进程发起 sendfile 系统调用
  • (2) 内核基于 DMA Copy 将文件数据从磁盘拷贝到内核缓冲区
  • (3) 内核将内核缓冲区中的文件描述信息 (文件描述符,数据长度) 拷贝到 Socket 缓冲区
  • (4) 内核基于 Socket 缓冲区中的文件描述信息和 DMA 硬件提供的 Gather Copy 功能将内核缓冲区数据复制到网卡
  • (5) 用户进程 sendfile 系统调用完成并返回

相比传统的 I/O 方式,sendfile + DMA Gather Copy 方式实现的零拷贝,数据拷贝次数从 4 次降为 2 次,系统调用从 2 次降为 1 次,用户进程上下文切换次数从 4 次变成 2 次 DMA Copy,大大提高处理效率

Kafka 底层基于 java.nio 包下的 FileChannel 的 transferTo:

1
public abstract long transferTo(long position, long count, WritableByteChannel target)

transferTo 将 FileChannel 关联的文件发送到指定 channel,当 Comsumer 消费数据,Kafka Server 基于 FileChannel 将文件中的消息数据发送到 SocketChannel

1.2 RocketMQ 零拷贝

RocketMQ 基于 mmap + write 的方式实现零拷贝:mmap() 可以将内核中缓冲区的地址与用户空间的缓冲区进行映射,实现数据共享,省去了将数据从内核缓冲区拷贝到用户缓冲区

1
2
tmp_buf = mmap(file, len); 
write(socket, tmp_buf, len);

https://p26.toutiaoimg.com/origin/pgc-image/0fdba4a44ec64b56b59ca2856f9ce3dd?from=pc

mmap + write 实现零拷贝的基本流程如下:

  • (1) 用户进程向内核发起系统 mmap 调用
  • (2) 将用户进程的内核空间的读缓冲区与用户空间的缓存区进行内存地址映射
  • (3) 内核基于 DMA Copy 将文件数据从磁盘复制到内核缓冲区
  • (4) 用户进程 mmap 系统调用完成并返回
  • (5) 用户进程向内核发起 write 系统调用
  • (6) 内核基于 CPU Copy 将数据从内核缓冲区拷贝到 Socket 缓冲区
  • (7) 内核基于 DMA Copy 将数据从 Socket 缓冲区拷贝到网卡
  • (8) 用户进程 write 系统调用完成并返回

RocketMQ 中消息基于 mmap 实现存储和加载的逻辑写在 org.apache.rocketmq.store.MappedFile 中,内部实现基于 nio 提供的 java.nio.MappedByteBuffer,基于 FileChannel 的 map 方法得到 mmap 的缓冲区:

1
2
3
// 初始化
this.fileChannel = new RandomAccessFile(this.file, "rw").getChannel();
this.mappedByteBuffer = this.fileChannel.map(MapMode.READ_WRITE, 0, fileSize);

查询 CommitLog 的消息时,基于 mappedByteBuffer 偏移量 pos,数据大小 size 查询:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
public SelectMappedBufferResult selectMappedBuffer(int pos, int size) {
 int readPosition = getReadPosition();
 // ...各种安全校验
 
 // 返回mappedByteBuffer视图
 ByteBuffer byteBuffer = this.mappedByteBuffer.slice();
 byteBuffer.position(pos);
 ByteBuffer byteBufferNew = byteBuffer.slice();
 byteBufferNew.limit(size);
 return new SelectMappedBufferResult(this.fileFromOffset + pos, byteBufferNew, size, this);
}

tips: transientStorePoolEnable 机制 Java NIO mmap 的部分内存并不是常驻内存,可以被置换到交换内存 (虚拟内存),RocketMQ 为了提高消息发送的性能,引入了内存锁定机制,即将最近需要操作的 CommitLog 文件映射到内存,并提供内存锁定功能,确保这些文件始终存在内存中,该机制的控制参数就是 transientStorePoolEnable

因此,MappedFile 数据保存 CommitLog 刷盘有 2 种方式:

  • 1 开启 transientStorePoolEnable:写入内存字节缓冲区 (writeBuffer) -> 从内存字节缓冲区(writeBuffer) 提交 (commit) 到文件通道(fileChannel) -> 文件通道(fileChannel) -> flush 到磁盘
  • 2 未开启 transientStorePoolEnable:写入映射文件字节缓冲区 (mappedByteBuffer) -> 映射文件字节缓冲区 (mappedByteBuffer) -> flush 到磁盘

RocketMQ 基于 mmap+write 实现零拷贝,适用于业务级消息这种小块文件的数据持久化和传输 Kafka 基于 sendfile 这种零拷贝方式,适用于系统日志消息这种高吞吐量的大块文件的数据持久化和传输

tips: Kafka 的索引文件使用的是 mmap+write 方式,数据文件发送网络使用的是 sendfile 方式

1.3 Netty 零拷贝

Netty 的零拷贝分为两种:

  • 1 基于操作系统实现的零拷贝,底层基于 FileChannel 的 transferTo 方法
  • 2 基于 Java 层操作优化,对数组缓存对象 (ByteBuf) 进行封装优化,通过对 ByteBuf 数据建立数据视图,支持 ByteBuf 对象合并,切分,当底层仅保留一份数据存储,减少不必要拷贝

2 多路复用

Netty 中对 Java NIO 功能封装优化之后,实现 I/O 多路复用代码优雅了很多:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
// 创建mainReactor
NioEventLoopGroup boosGroup = new NioEventLoopGroup();
// 创建工作线程组
NioEventLoopGroup workerGroup = new NioEventLoopGroup();

final ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap 
 // 组装NioEventLoopGroup 
 .group(boosGroup, workerGroup)
 // 设置channel类型为NIO类型
 .channel(NioServerSocketChannel.class)
 // 设置连接配置参数
 .option(ChannelOption.SO_BACKLOG, 1024)
 .childOption(ChannelOption.SO_KEEPALIVE, true)
 .childOption(ChannelOption.TCP_NODELAY, true)
 // 配置入站、出站事件handler
 .childHandler(new ChannelInitializer<NioSocketChannel>() {
 @Override
 protected void initChannel(NioSocketChannel ch) {
 // 配置入站、出站事件channel
 ch.pipeline().addLast(...);
 ch.pipeline().addLast(...);
 }
 });

// 绑定端口
int port = 8080;
serverBootstrap.bind(port).addListener(future -> {
 if (future.isSuccess()) {
 System.out.println(new Date() + ": 端口[" + port + "]绑定成功!");
 } else {
 System.err.println("端口[" + port + "]绑定失败!");
 }
});

3 页缓存 (PageCache)

页缓存(PageCache) 是操作系统对文件的缓存,用来减少对磁盘的 I/O 操作,以页为单位的,内容就是磁盘上的物理块,页缓存能帮助程序对文件进行顺序读写的速度几乎接近于内存的读写速度,主要原因就是由于 OS 使用 PageCache 机制对读写访问操作进行了性能优化:

页缓存读取策略:当进程发起一个读操作 (比如,进程发起一个 read() 系统调用),它首先会检查需要的数据是否在页缓存中:

  • 如果在,则放弃访问磁盘,而直接从页缓存中读取
  • 如果不在,则内核调度块 I/O 操作从磁盘去读取数据,并读入紧随其后的少数几个页面(不少于一个页面,通常是三个页面),然后将数据放入页缓存中

https://p26.toutiaoimg.com/origin/pgc-image/e27a4b88cfc54651bb826a61f99260c5?from=pc

页缓存写策略:当进程发起 write 系统调用写数据到文件中,先写到页缓存,然后方法返回。此时数据还没有真正的保存到文件中去,Linux 仅仅将页缓存中的这一页数据标记为 “脏”,并且被加入到脏页链表中

然后,由 flusher 回写线程周期性将脏页链表中的页写到磁盘,让磁盘中的数据和内存中保持一致,最后清理 “脏” 标识。在以下三种情况下,脏页会被写回磁盘:

  • 空闲内存低于一个特定阈值
  • 脏页在内存中驻留超过一个特定的阈值时
  • 当用户进程调用 sync() 和 fsync() 系统调用时

RocketMQ 中,ConsumeQueue 逻辑消费队列存储的数据较少,并且是顺序读取,在 page cache 机制的预读取作用下,Consume Queue 文件的读性能几乎接近读内存,即使在有消息堆积情况下也不会影响性能,提供了 2 种消息刷盘策略:

  • 同步刷盘:在消息真正持久化至磁盘后 RocketMQ 的 Broker 端才会真正返回给 Producer 端一个成功的 ACK 响应
  • 异步刷盘,能充分利用操作系统的 PageCache 的优势,只要消息写入 PageCache 即可将成功的 ACK 返回给 Producer 端。消息刷盘采用后台异步线程提交的方式进行,降低了读写延迟,提高了 MQ 的性能和吞吐量

Kafka 实现消息高性能读写也利用了页缓存,这里不再展开

《深入理解 Linux 内核 —— Daniel P.Bovet》

http://calvin1978.blogcn.com/articles/directbytebuffer.html

https://mp.weixin.qq.com/s/c9tkrokcDQR375kiwCeV9w

https://www.kunzhao.org/blog/2018/03/12/rocketmq-message-store-flow

https://www.jianshu.com/p/6681bfa36c4f

更多精彩,欢迎关注公众号【分布式系统架构】