1-java.io梳理

网络编程的本质是进程间通信,而进程间通信的基础为IO模型。

java.io包中提供了大量与IO操作相关的类和方法,如下图所示,可大致分为字符流(处理单位为字符,可包含多个字节)和字节流(处理单位为字节)两类。下图所示Reader / Writer,InputStream / OutputStream均为接口,可被各种不同类实现,以完成不同的功能。

1.1 字符流

下图为字符流的几个主要实现类

  • CharArrayReader / CharArrayWriter的数据源为字符数组,StringReader / StringWriter的数据源为字符串
  • BufferdReaderFilterReaderInputStreamReader以及对应的Writer实现类为更高级的字符流实现类,它们采用装饰器模式,可在类创建时传入的另一个Reader / Writer的基础上提供更高级的额外功能。
    • BufferedReader:额外提供一个缓冲区,可一次性从数据源中读取较多数据至缓冲区,减少对数据源的直接访问次数。缓冲区位于内存中,效率更高。
    • FilterReader:抽象类,有很多子类,可在传入的Reader的基础上提供更多灵活的功能
    • InputStreamReader:将字节流转换为字符流,常用子类为FileReader(File中存储的数据为字节,FileReader可实现字节到字符的转换,从File中读取字符)

1.2 字节流

下图为字节流的几个主要实现类

  • ByteArrayInputStream / ByteArrayOutputStream的数据源为字节数组,FileInputStream /FileOutputStream 的数据源为文件
  • FilterInputStream / FilterOutputStream为抽象类,可使用装饰器模式在传入的InputStream / OutStream 的基础上叠加更高级的功能,常见子类包括:。
    • BufferedInputStream:额外提供缓冲区
    • DataInputStream:从流中读取出指定的Java基本数据
    • DataOutputStream:将Java基本数据类型写到输出流中

2-同步/异步/阻塞/非阻塞

同步/异步 与 阻塞/非阻塞 可两两组合:同步阻塞同步非阻塞异步阻塞异步非阻塞

2.1 同步 & 异步

区别在于被调用方的行为(通信机制的区别):

同步:已知调用结果后才将调用返回
异步:不需要等待调用结果即可返回,有调用结果后,被调用方会“通知”调用方

2.2 阻塞 & 非阻塞

区别在于调用方的行为(调用状态的区别):

阻塞:调用方在收到调用结果前,不能处理其他任何事务(注意是收到调用结果前,而不是调用返回前)
非阻塞:调用方在收到调用结果前,可处理其他事务

3-内核IO模型

3.1 阻塞式I/O - BIO

应用程序发起系统调用,若此时数据还没有被准备好,则该系统调用不会被返回,它会一直等待,直到数据被准备好(内核缓冲区有了数据,并且被复制到了应用程序缓冲区),系统调用返回。

这样的内核IO模型就对应于BIO(Blocking IO),即BIO是该内核模型在应用层面的一种抽象。

3.2 非阻塞式I/O - NIO

应用程序进行系统调用,如果此时数据还没准备好,则调用返回(返回无数据,应用程序会了解到数据还未被准备好)。过一会儿应用程序会再次询问,如果仍没有数据,则调用会再次返回。假设再一次询问时,数据已经准备好,则数据会被复制到应用程序可以访问的缓冲区,调用成功返回。

这样的IO模型对应于应用层面的NIO(Non-blocking IO)。注意此时并没有使用Selector监听,而是通过应用程序的不断轮询来实现非阻塞。

3.3 I/O多路复用 - NIO + Selector

应用程序发起系统调用,如果此时数据没有准备好,那么我们会要求内核监听这个IO通道。数据准备好后,系统调用返回,通知应用程序此时数据已经准备好(此时并没有将数据返回,只是通知数据已就绪)。接下来应用程序再进行一次系统调用,复制数据并带着数据返回。由于内核可以监听多个IO,任一IO出现状态更新,内核都会返回给应用程序,因此该模型称为IO多路复用。

这样的IO模型对应于应用层面的NIO + Selector模型。Selector也称为IO多路复用器

3.4 异步I/O - AIO

以上三种IO模型均为同步IO模型,因为无论阻塞与否,如果我们不再次发起系统调用,我们将无法获得准备好的数据。这次系统调用是无法省略的,因为内核不会主动通知我们调用的结果。

异步IO的流程如下:应用程序发起系统调用,如果数据没准备好,则调用返回,不会阻塞。如果数据已经准备好,则内核会将数据复制,并向应用程序递交信号,告知应用程序此时数据已经准备好。

这样的IO模型对应于应用层面的AIO。

4-BIO编程模型

4.1 BIO流程

BIO:阻塞式IO

  • 线程 Acceptor 负责接收客户端发起的连接请求
  • 每当有新客户端连接,服务器都为其创建一个新线程Handler,负责处理与该客户端相关的输入输出操作
    • 创建新线程的原因:如果直接在线程 Acceptor 中处理客户端的输入输出操作,则服务器无法再接收其他客户端的连接请求

4.2 基于BIO的多人聊天室设计

4.2.1 需求分析

  • 基于BIO模型
  • 支持多人同时在线
  • 每个用户的发言都被转发给其他在线用户

4.2.2 聊天室设计

1. 服务器端

  1. 需要一个 Acceptor 线程,使用 ServerSocket 的 accept() 函数阻塞式地接受客户端连接
  2. 为每一个连接进来的客户端新建一个线程,该线程需要实现两个功能:1)接收该客户端发来的信息;2)将该信息转发给其他客户端
  3. 转发功能要求客户端能够存储当前在线的所有客户端列表

2. 客户端

  1. 可以和服务端建立连接
  2. 客户端需要完成两个功能:1)接收用户在控制台的输入;2)读取其他客户端发送来的消息
  3. 由于等待用户输入的过程是阻塞的,所以客户端也需要两条线程,分别完成上述两个功能

3. BIO的体现

  1. ServerSocket.accept():该方法为阻塞式调用,在有新客户端连接进来之前一直阻塞
  2. InputStream.read() & OutputStream.read():这两个方法均为阻塞式调用,在用户输入消息前,read()方法会一直阻塞,因此需要为每个客户端都创建一个线程

4.3 基于BIO的多人聊天室实现

代码结构如下:

  • server
    • ChatServer.java:服务端的主线程,负责启动服务端、接收客户端请求、存储当前在线客户端、为客户端新建处理线程等
    • ChatHandler.java:处理和客户端之间的输入输出工作
  • client
    • ChatClient.java:客户端的主线程,负责连接服务端,读取服务端转发来的其他客户端信息
    • UserInputHandler.java:等待和发送用户输入

4.3.1 服务端

1. ChatServer.java

服务端的主线程,负责启动服务端、接收客户端请求、存储当前在线客户端、为客户端新建处理线程等

package server;

import java.io.BufferedWriter;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.io.Writer;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.HashMap;
import java.util.Map;

public class ChatServer {

/** 服务器监听端口 */
private int SERVER_PORT = 8888;

/** 客户端退出标志(客户端发送\quit表示退出聊天室 */
private final String QUIT = "\\quit";

/** 服务端 Socket */
private ServerSocket serverSocket;

/**
* 存储已连接的客户端
* key:客户端的端口号
* value:向该端口发信息所使用的 Writer
*/
private Map<Integer, Writer> connectedClients;

public ChatServer() {
connectedClients = new HashMap<>();
}

/**
* 添加新在线客户端
* @param socket 新增客户端的socket
* @throws IOException
*/
public synchronized void addClient(Socket socket) throws IOException {
if (socket != null) {
int port = socket.getPort();
BufferedWriter writer = new BufferedWriter(
new OutputStreamWriter(socket.getOutputStream())
);
connectedClients.put(port, writer);
System.out.println("客户端[" + port + "]已连接到服务器");
}
}

/**
* 移除已下线客户端
* @param socket 已下线的客户端socket
* @throws IOException
*/
public synchronized void removeClient(Socket socket) throws IOException {
if (socket != null) {
int port = socket.getPort();
if (connectedClients.containsKey(port)) {
connectedClients.get(port).close();
}
connectedClients.remove(port);
System.out.println("客户端[" + port + "]已断开连接");
}
}

/**
* 转发信息给其他所有在线客户端
* @param socket 发送信息的客户端
* @param fwdMsg 该客户端发送的信息
* @throws IOException
*/
public synchronized void forwardMessage(Socket socket, String fwdMsg) throws IOException {
for (Integer id : connectedClients.keySet()) {
if (!id.equals(socket.getPort())) {
Writer writer = connectedClients.get(id);
writer.write(fwdMsg);
writer.flush();
}
}
}

/**
* 服务端主要逻辑
*/
public void start() {
try {
// 为服务端绑定端口
serverSocket = new ServerSocket(SERVER_PORT);
System.out.println("服务器启动,监听端口:" + SERVER_PORT + "...");

while (true) {
// accept()方法是阻塞式的
Socket socket = serverSocket.accept();

// 有客户端连接后,为它创建一个ChatHandler线程
new Thread(new ChatHandler(this, socket)).start();
}
} catch (IOException e) {
e.printStackTrace();
} finally {
close();
}
}

/**
* 判断客户端是否准备退出
* @param msg 客户端发送的消息
* @return true:准备退出
*/
public boolean readyToQuit(String msg) {
return QUIT.equals(msg);
}

/**
* 关闭服务器
*/
public synchronized void close() {
if (serverSocket != null) {
try {
serverSocket.close();
System.out.println("服务器关闭");
} catch (IOException e) {
e.printStackTrace();
}
}
}

public static void main(String[] args) {
ChatServer chatServer = new ChatServer();
chatServer.start();
}
}

可使用线程池对服务器端进行进一步的改进,限制可以进入聊天室的客户端个数,避免为用户开启的线程过多。改进后的部分代码如下:

public class ChatServer {

...

/** 线程池 */
private ExecutorService executorService;

public ChatServer(int threadNum) {
// 创建线程池
executorService = Executors.newFixedThreadPool(threadNum);
connectedClient = new HashMap<>();
}

...

/**
* 服务端主要逻辑
*/
public void start() {
try {
// 绑定监听端口
serverSocket = new ServerSocket(SERVER_PORT);
System.out.println("服务器启动,监听端口" + SERVER_PORT + "...");

while (true) {
// 等待客户端连接
Socket socket = serverSocket.accept();

// 向线程池提交任务
executorService.execute(new ChatHandler(this, socket));
}
} catch (IOException e) {
e.printStackTrace();
} finally {
close();
}
}

public static void main(String[] args) {
// 创建服务器类时传入允许的最大在线用户数
ChatServer chatServer = new ChatServer(3);
chatServer.start();
}
}

2. ChatHandler.java

每当有新客户端连接进服务器时,服务器均会为其创建一个ChatHandler线程,处理服务器和客户端之间的输入输出工作

package server;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.Socket;

public class ChatHandler implements Runnable{

/** 服务器类 */
private ChatServer server;

/** 当前客户端 Socket */
private Socket socket;

public ChatHandler(ChatServer server, Socket socket) {
this.server = server;
this.socket = socket;
}

@Override
public void run() {
try {
// 存储新上线用户
server.addClient(socket);

// 读取用户发送的消息
BufferedReader reader = new BufferedReader(
new InputStreamReader(socket.getInputStream())
);

String msg = null;
while ((msg = reader.readLine()) != null) {
// 检查用户是否退出
if (server.readyToQuit(msg)) {
break;
}

String fwdMsg = "客户端[" + socket.getPort() + "]:" + msg + "\n";
System.out.print(fwdMsg);

// 转发消息至其他在线用户
server.forwardMessage(socket, fwdMsg);
}
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
server.removeClient(socket);
} catch (IOException e) {
e.printStackTrace();
}
}
}
}

4.3.2 客户端

1. ChatClient.java

客户端的主线程,负责连接服务端,读取服务端转发来的其他客户端信息

package client;

import java.io.*;
import java.net.Socket;

public class ChatClient {

/** 服务器 IP */
private final String SERVER_HOST = "127.0.0.1";

/** 服务器监听端口号 */
private final int SERVER_PORT = 8888;

/** 客户端退出命令 */
private final String QUIT = "\\quit";

/** 客户端 Socket */
private Socket socket;

/** 从服务端读取信息的 Reader */
private BufferedReader reader;

/** 向服务端发送消息的 Writer */
private BufferedWriter writer;

/**
* 发送消息给服务器
* @param msg
* @throws IOException
*/
public void send(String msg) throws IOException {
// 确定输出流没有被关闭
if (!socket.isOutputShutdown()) {
writer.write(msg + "\n");
writer.flush();
}
}

/**
* 检查用户是否准备退出
*/
public boolean readyToQuit(String msg) {
return QUIT.equals(msg);
}

/**
* 关闭服务器
*/
public void close() {
if (writer != null) {
try {
writer.close();
System.out.println("关闭客户端");
} catch (IOException e) {
e.printStackTrace();
}
}
}

public void start() {
try {
// 创建socket
socket = new Socket(SERVER_HOST, SERVER_PORT);

// 创建IO流
reader = new BufferedReader(
new InputStreamReader(socket.getInputStream())
);
writer = new BufferedWriter(
new OutputStreamWriter(socket.getOutputStream())
);

// 处理用户输入
new Thread(new UserInputHandler(this)).start();

// 读取服务器转发的信息
String msg = null;
while ((msg = reader.readLine()) != null) {
System.out.println(msg);
}
} catch (IOException e) {
e.printStackTrace();
} finally {
close();
}
}

public static void main(String[] args) {
ChatClient client = new ChatClient();
client.start();
}
}

2. UserInputHandler.java

需要为每一个客户端进程都创建一个UserInputHandler线程,负责等待和发送用户输入

package client;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;

public class UserInputHandler implements Runnable{

/** 客户端类 */
private ChatClient chatClient;

public UserInputHandler(ChatClient chatClient) {
this.chatClient = chatClient;
}

@Override
public void run() {
try {
BufferedReader consoleReader = new BufferedReader(
new InputStreamReader(System.in)
);
while (true) {
// 等待用户输入消息
String input = consoleReader.readLine();

// 向服务器发送消息
chatClient.send(input);

// 检查用户是否准备退出
if (chatClient.readyToQuit(input)) {
break;
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}

5-NIO

5.1-NIO概述

NIO(Non-blocking IO 或 New IO),非阻塞式IO。与BIO不同,NIO使用Channel代替Stream,特征如下:

  • Stream具有方向性,分为输入流和输出流,而Channel无方向,一个Channel既可以写入数据也可以读取数据。
  • Stream的读写均为阻塞式(例如InputStream.read(),OutputStream.write()),而Channnel的读写具有两种模式,既可以阻塞式读写,也提供了非阻塞式读写的方法
  • NIO使用Selector监控多条Channel
  • 可以在一个线程里处理多个Channel I/O

5.1.1 Buffer

NIO向Channel中读写数据都需要通过一个Buffer类来实现。

Buffer,顾名思义,是一个缓冲区,代表内存中我们可以进行读写的一个区域。Channel可以支持双向操作,因此Buffer也可进行读写双向操作。

1. 向Buffer中写入数据

写模式主要用到两个指针:

  • position:当前写入的位置
  • capacity:最远可写入位置(缓冲区最大容量)

2. 由写模式转换为读模式

调用flip()方法实现从写模式到读模式的转换,步骤如下:

  1. position指针移至缓冲区头部
  2. limit指针移至写入的最远位置,limit指针表示的是读模式下最远所能读取的位置

3. 读模式

  1. 第一种读取模式是将写入的数据全部读取完,读取完后position指针将和limit指针指向相同的位置

调用clear()方法转换为写模式,步骤如下:

  • osition指针移回头部
  • imit指针移回capacity处

从图中可以看出,clear()方法其实并没有清除已读取的数据,而是通过指针操作,使得再次写入的数据覆盖之前的数据。

  1. 第二种读取模式是只读取部分已写入的数据,读取完后position指针的位置在limit指针之上。

此时若想转换为写模式,需要调用compact()方法,步骤如下:

  • 将已写入但未读取的数据复制到缓冲区头部
  • 将position指针移至未读取数据的下面一个区域
  • 将limit指针移回capacity处

5.1.2 Channel

1. Channel的基本操作

  1. 通过Buffer写入/读取数据
  2. Channel之间可以直接进行数据传输:每个Channel可以向其他Channel数据传输数据,也可以接受从其他Channel传输过来的数据

2. 几个重要的Channel

5.1.3 Selector

Channel可以进行非阻塞式的读写,而并不是每个时刻Channel上都有数据可供读写,所以我们需要不停地询问Channel是否处于可操作的状态。Jahannlelva提供了一个Selector类来实现这个功能,该类负责监听各个Channel的状态。

Selector也称为 I/O多路复用器

1. Channel的状态

Channel的状态并不是固定不变的,Channel的状态会随着不同事件的发生而在如下状态间发生变化:

  • CONNECT:客户端与服务端建立连接后,客户端的SocketChannel会处于CONNECT状态
  • ACCEPT:服务端接受了客户端的连接建立请求后,服务端的ServerSocketChannel会处于ACCEPT状态
  • READ:Channel上有了可读取信息后
  • WRITE:可以向Channel写入数据的状态

2. Channel的注册

要想使用Selector监听Channel,我们首先需要将该Channel注册到Selector上。

注册后,我们会得到一个SelectionKey对象,该对象可理解为每一个注册在Selector上的Channel的ID。通过SelectionKey,我们可以得到如下信息:

  • interestOps():得到一组Selector需要监听的该Channel的状态
  • readyOps():在需要监听的状态中,哪些是处于准备好、可操作的
  • channel():返回监听的Channel对象
  • selector():返回被注册的Selector
  • attachment():根据具体业务需求,可以是需要加载在channel上的任意对象

5.2 用NIO进行文件拷贝

使用FileChannel实现本地文件拷贝。如上所述,有通过Buffer传输和Channel之间直接传输两种方式

5.2.1 通过Buffer进行数据传输

几个需要注意的点:

通过文件输入输出流获得文件通道:
FileChannel fin = new FileInputStream(source).getChannel();

注意Channel和Buffer的方向:将数据从文件Channel中读取出来(fin.read(buffer)),写入缓冲区(缓冲区为写模式);将缓冲区转换为读模式(buffer.flip());将数据从缓冲区中读出(缓冲区为读模式),写入文件Channel(fout.write(buffer));将缓冲区转换为写模式(buffer.clear()

import java.io.*;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;

public class NioBufferCopy {

private static void close(Closeable closeable) {
if (closeable != null) {
try {
closeable.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}

public static void copyFile(File source, File target) {
// 声明文件通道
FileChannel fin = null;
FileChannel fout = null;

try {
// 通过文件输入输出流得到文件通道
fin = new FileInputStream(source).getChannel();
fout = new FileOutputStream(target).getChannel();

// 创建ByteBuffer类型的缓冲区(按字节读取)
ByteBuffer buffer = ByteBuffer.allocate(1024);
// 将数据从文件通道中读取出来,写进Buffer
while (fin.read(buffer) != -1) {
// 将Buffer从写模式转换为读模式
buffer.flip();
while (buffer.hasRemaining()) { // 确保Buffer中的内容被读完
// 将Buffer中的数据写入文件通道
fout.write(buffer);
}
// 将Buffer从读模式转换为写模式
buffer.clear();
}
} catch (IOException e) {
e.printStackTrace();
} finally {
close(fin);
close(fout);
}

}

public static void main(String[] args) {

File file = new File("E:/JavaProject/web/nio-file-copy/tmp/smallFile.jpg");
File fileCopy = new File("E:/JavaProject/web/nio-file-copy/tmp/smallFile-copy.jpg");

System.out.println("--- Copying small file ---");
copyFile(file, fileCopy);
}
}

5.2.2 直接在Channel间进行数据传输

  • 调用通道的transferTo()方法实现Channel间的数据传输
  • write()方法一样,transferTo()不能保证拷贝通道中的所有数据,因此需要使用一个while循环,确保文件被完整拷贝
import java.io.*;
import java.nio.channels.FileChannel;

public class NioTransferCopy {

private static void close(Closeable closeable) {
if (closeable != null) {
try {
closeable.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}

public static void copyFile(File source, File target) {
// 声明文件通道
FileChannel fin = null;
FileChannel fout = null;

try {
// 通过输入输出流创建通道
fin = new FileInputStream(source).getChannel();
fout = new FileOutputStream(target).getChannel();

long transferred = 0L;
long size = fin.size();
while (transferred != size) {
// 从位置0开始,拷贝fin通道中size长度的数据,至fout通道,返回的是已拷贝长度
// transferTo不能保证拷贝通道中的所有数据,因此使用while循环
transferred += fin.transferTo(0, size, fout);
}

} catch (IOException e) {
e.printStackTrace();
} finally {
close(fin);
close(fout);
}
}

public static void main(String[] args) {

File file = new File("E:/JavaProject/web/nio-file-copy/tmp/smallFile.jpg");
File fileCopy = new File("E:/JavaProject/web/nio-file-copy/tmp/smallFile-copy.jpg");

System.out.println("--- Copying small file ---");
copyFile(file, fileCopy);
}
}

5.3 NIO编程模型

  1. 在Selector上注册服务器Channel,监听ACCEPT事件;
  2. 当Client1连接进服务器,ACCEPT事件触发,调用handles对该事件进行处理:向Selector上注册Client1的Channel的READ事件;
  3. 当Client1向服务器发送数据,READ事件触发,调用handles对该事件进行处理:转发消息;
  4. 当Client2连接进服务器,再次触发ACCEPT事件,同样调用handles进行相同处理,以此类推。

几个需要注意的点:

  • 与BIO不同,NIO编程模型中,accept操作与读写处理操作是在同一个线程中进行的。
  • 虽然使用Selector可实现非阻塞式调用,但Selector的select()方法是阻塞式的:如果当前没有Selector监听事件出现,则该方法阻塞(返回值为出现事件的数量)。
  • 同时可有多个事件被触发,调用Selector的selectedKeys()方法可以获得可操作Channel的SelectionKey集合。

5.4 基于NIO的多人聊天室实现

5.4.1 服务端

仅需要一个线程。Selector需要监听两种事件:服务端Channel的ACCEPT事件,客户端Channel的READ事件。两种事件的具体处理在handles()方法中实现。具体实现见代码注释。

package server;

import java.io.Closeable;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.Set;

public class ChatServer {

/** 默认监听端口 */
private static final int DEFAULT_PORT = 8888;
/** 用户自定义的监听端口 */
private int port;

/** 处理服务器端 IO 的通道 */
private ServerSocketChannel server;
/** 监听 channel 上发生的事件和 channel 状态的变化 */
private Selector selector;

/** 缓冲区大小 */
private static final int BUFFER_SIZE = 1024;
/** 用于从通道读取数据的 Buffer */
private ByteBuffer rBuffer = ByteBuffer.allocate(BUFFER_SIZE);
/** 用于向通道写数据的 Buffer */
private ByteBuffer wBuffer = ByteBuffer.allocate(BUFFER_SIZE);

/** 客户端退出命令 */
private static final String QUIT = "\\quit";
/** 指定编解码方式 */
private Charset charset = StandardCharsets.UTF_8;

public ChatServer() {
this(DEFAULT_PORT);
}

public ChatServer(int port) {
this.port = port;
}

/**
* 服务端主逻辑
*/
private void start() {
try {
// 创建一个新的通道,并设置为非阻塞式调用(open()方法产生的通道默认为阻塞式调用)
server = ServerSocketChannel.open();
server.configureBlocking(false);
// 绑定监听端口
server.socket().bind(new InetSocketAddress(port));

// 创建Selector
selector = Selector.open();
// 在selector上注册serverChannel的accept事件
server.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("启动服务器,监听端口:" + port + "...");

while (true) {
// select()方法为阻塞式调用,如果当前没有selector监听事件出现,则该方法阻塞(返回值为出现事件的数量)
selector.select();
// 获取所有被触发Channel的SelectionKey集合
Set<SelectionKey> selectionKeys = selector.selectedKeys();
for (SelectionKey key : selectionKeys) {
// 处理被触发的事件
handles(key);
}
selectionKeys.clear();
}

} catch (IOException e) {
e.printStackTrace();
} finally {
// 关闭selector:解除注册,同时关闭对应的通道
close(selector);
}
}

/**
* 需要处理两个事件:ACCEPT & READ
*/
private void handles(SelectionKey key) throws IOException {
// ACCEPT事件 --- 和客户端建立了连接
if (key.isAcceptable()) {
ServerSocketChannel serverChannel = (ServerSocketChannel) key.channel();
// 获得连接进来的客户端的channel
SocketChannel clientChannel = serverChannel.accept();
// 转换为非阻塞式调用
clientChannel.configureBlocking(false);

// 注册该客户端channel的READ事件
clientChannel.register(selector, SelectionKey.OP_READ);
System.out.println(getClientName(clientChannel) + "已连接");
}

// READ事件 --- 客户端发送了消息
else if (key.isReadable()) {
SocketChannel clientChannel = (SocketChannel) key.channel();
String fwdMsg = receive(clientChannel);
if (fwdMsg.isEmpty() || readyToQuit(fwdMsg)) { // 客户端异常 or 客户端准备退出
// 取消注册该通道上的该事件
key.cancel();
// 更改状态后,强制返回selector,令其重新检测
selector.wakeup();
System.out.println(getClientName(clientChannel) + "已断开");
} else {
System.out.println(getClientName(clientChannel) + ":" + fwdMsg);
forwardMessage(clientChannel, fwdMsg);
}
}
}

/**
* 读取客户端发来的消息
* @param clientChannel 客户端 channel
* @return 发来的消息
* @throws IOException
*/
private String receive(SocketChannel clientChannel) throws IOException {
// 将rBuffer转为写模式(起到清空的作用)
rBuffer.clear();
// 从clientChannel中读取数据,写入rBuffer,直至channel中没有数据可读
while ((clientChannel.read(rBuffer)) > 0);
// 将rBuffer从写模式转换为读模式
rBuffer.flip();
// 使用utf8编码解码rBuffer,并转为字符串类型
return String.valueOf(charset.decode(rBuffer));
}

/**
* 转发消息给其他客户端
* @param clientChannel 发来消息的客户端 channel
* @param fwdMsg 需要转发的消息
* @throws IOException
*/
private void forwardMessage(SocketChannel clientChannel, String fwdMsg) throws IOException {
// keys()返回所有注册过的SelectionKey
for (SelectionKey key : selector.keys()) {
// key有效并且是客户端socket
if (key.isValid() && key.channel() instanceof SocketChannel) {
SocketChannel connectedClient = (SocketChannel) key.channel();
if (!connectedClient.equals(clientChannel)) {
wBuffer.clear();
// 将需要转发的消息写进wBuffer,注意使用utf8编码
wBuffer.put(charset.encode(getClientName(clientChannel) + ":" + fwdMsg));
// 将wBuffer从写入模式转换为读取模式
wBuffer.flip();
while (wBuffer.hasRemaining()) {
connectedClient.write(wBuffer);
}
}
}
}
}

private String getClientName(SocketChannel client) {
return "客户端[" + client.socket().getPort() + "]";
}

private boolean readyToQuit(String msg) {
return QUIT.equals(msg);
}

private void close (Closeable closeable) {
if (closeable != null) {
try {
closeable.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}

public static void main(String[] args) {
ChatServer chatServer = new ChatServer();
chatServer.start();
}
}

5.4.2 客户端

ChatClient.java:客户端主线程,Selector监听客户端的CONNECT事件和READ事件

package client;

import java.io.Closeable;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedSelectorException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.Set;

public class ChatClient {

/** 服务器地址 */
private String host;
private static final String DEFAULT_SERVER_HOST = "127.0.0.1";

/** 服务器端口 */
private int port;
private static final int DEFAULT_SERVER_PORT = 8888;

/** 客户端 Channel */
private SocketChannel client;
/** 监听Channel的Selector */
private Selector selector;

/** 缓冲区大小 */
private static final int BUFFER_SIZE = 1024;
/** 用于从通道读取数据的 Buffer */
private ByteBuffer rBuffer = ByteBuffer.allocate(BUFFER_SIZE);
/** 用于向通道写数据的 Buffer */
private ByteBuffer wBuffer = ByteBuffer.allocate(BUFFER_SIZE);

/** 客户端退出命令 */
private static final String QUIT = "\\quit";
/** 指定编解码方式 */
private Charset charset = StandardCharsets.UTF_8;

public ChatClient() {
this(DEFAULT_SERVER_HOST, DEFAULT_SERVER_PORT);
}

public ChatClient(String host, int port) {
this.host = host;
this.port = port;
}

public boolean readyToQuit(String msg) {
return QUIT.equals(msg);
}

private void close(Closeable closeable) {
if (closeable != null) {
try {
closeable.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}

/**
* 客户端主要逻辑
*/
private void start() {
try {
// 创建Channel,并设置为非阻塞式调用
client = SocketChannel.open();
client.configureBlocking(false);

// 创建Selector
selector = Selector.open();
// 注册 连接就绪CONNECT 事件
client.register(selector, SelectionKey.OP_CONNECT);
// 向服务器发送连接请求
client.connect(new InetSocketAddress(host, port));

while (true) {
selector.select();
Set<SelectionKey> selectionKeys = selector.selectedKeys();
for (SelectionKey key : selectionKeys) {
handles(key);
}
selectionKeys.clear();
}

} catch (IOException e) {
e.printStackTrace();
} catch (ClosedSelectorException e) {
// 用户正常退出
} finally {
close(selector);
}
}

/**
* 处理 CONNECT (连接就绪)和 READ (服务器转发消息)事件
*/
private void handles(SelectionKey key) throws IOException {
if (key.isConnectable()) { // 处理 CONNECT
SocketChannel clientChannel = (SocketChannel) key.channel();
if (clientChannel.isConnectionPending()) { // 返回true:连接已就绪
// 结束连接状态,完成连接
clientChannel.finishConnect();
new Thread(new UserInputHandler(this)).start();
}
// 注册READ事件,以接收服务端转发的消息
clientChannel.register(selector, SelectionKey.OP_READ);

} else if (key.isReadable()) { // 处理READ
SocketChannel clientChannel = (SocketChannel) key.channel();
String msg = receive(clientChannel);
if (msg.isEmpty()) {
// 服务器异常
close(selector);
} else {
System.out.println(msg);
}
}
}

/**
* 向服务端发送信息
* @param msg 用户输入的信息
* @throws IOException
*/
public void send(String msg) throws IOException {
if (msg.isEmpty()) {
return;
}

wBuffer.clear();
wBuffer.put(charset.encode(msg));
wBuffer.flip();
while (wBuffer.hasRemaining()) {
client.write(wBuffer);
}

if (readyToQuit(msg)) {
close(selector);
}
}

/**
* 读取服务端转发来的消息
* @param clientChannel 客户端channel
* @return 收到的消息
* @throws IOException
*/
private String receive(SocketChannel clientChannel) throws IOException {
rBuffer.clear();
while (clientChannel.read(rBuffer) > 0);
rBuffer.flip();
return String.valueOf(charset.decode(rBuffer));
}

public static void main(String[] args) {
ChatClient chatClient = new ChatClient();
chatClient.start();
}
}

UserInputHandler.java:处理用户输入

package client;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;

public class UserInputHandler implements Runnable {
ChatClient client;

public UserInputHandler(ChatClient client) {
this.client = client;
}
@Override
public void run() {
BufferedReader consoleReader = new BufferedReader(
new InputStreamReader(System.in)
);
while (true) {
try {
String input = consoleReader.readLine();
client.send(input);

if (client.readyToQuit(input)) {
break;
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}

6-AIO

6.1 异步调用机制

6.1.1 AIO中的异步操作

  • AsynchronousSocketChannel:客户端Socket通道
  • AsynchronousServerSocketChannel:服务端Socket通道
  • connect / accept:建立连接(支持异步操作)
  • read / write:读写操作(支持异步操作)

6.1.2 异步调用机制

1. Future

Channel异步调用connect / accept,read / write等方法,方法返回一个Future对象。获得Future对象后,我们可以对它进行如下操作:

  • 调用Future的get()方法:该方法是阻塞式调用,它会一直阻塞,直到Future所对应的任务完成并返回。
  • 调用Future的isDone()方法:再一个循环中调用该方法,不停询问任务是否已完成,可设置超时时间。

2. CompletionHandler

Channel异步调用connect / accept,read / write等方法时,将一个CompletionHandler作为参数传入。

CompletionHandler是一个接口,它提供两个回调函数(callback),completed()failed(),我们可以自己实现这两个方法。当异步操作完成后,系统会调用回调函数(操作成功则调用completed(),发生异常则调用failed()),完成相关业务功能。

6.2 AIO编程模型

6.2.1 AsynchronousChannelGroup

我们使用的异步通道属于一个AsynchronousChannelGroup,即异步通道组,是一组可以被多个异步通道共享的资源群组。Group可以自定义,如果不特别指定,系统会使用默认群组。

需要ChannelGroup的原因:AIO机制下,操作系统帮我们做了很多事,比如当调用完成时,操作系统会自动回调函数。操作系统完成这些功能需要一定的系统资源,比如线程池。Group中会包含线程池等资源,操作系统可以复用线程池中的线程来实现回调函数等功能。

6.2.2 异步实现

  1. 创建AsynchronousServerSocketChannel后,需要创建一个AcceptHandler,用于处理accept()的异步调用,Client1连接后,会触发该Handler;
  2. AcceptHandler会获得连接后返回的AsynrounousSocketChannel。服务端Channel需要进行读写操作,这些操作也是异步调用的,因此需要再创建一个ClientHandler用于处理客户端的读写调用;
  3. 每当有新客户端连接,都会再次触发AcceptHandler,完成相应操作(即为新的客户端Channel注册一个ClientHandler)
  4. 当某个客户端有IO操作,会触发对应的ClientHandler,完成特定的业务操作

6.3 基于AIO的多人聊天室实现

6.3.1 服务端

在服务端,我们使用CompletionHandler来实现异步调用

package server;

import java.io.Closeable;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.channels.AsynchronousChannelGroup;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ChatServer {

private static final String LOCALHOST = "localhost";
private static final int DEFAULT_PORT = 8888;
private int port;

private static final int BUFFER_SIZE = 1024;
private static final int THREADPOOL_SIZE = 8;

private static final String QUIT = "\\quit";
private Charset charset = Charset.forName("UTF-8");

private AsynchronousChannelGroup channelGroup;
private AsynchronousServerSocketChannel serverChannel;

private List<ClientHandler> connectedClients;

public ChatServer() {
this(DEFAULT_PORT);
}

public ChatServer(int port) {
this.port = port;
this.connectedClients = new ArrayList<>();
}

private boolean readyToQuit(String msg) {
return QUIT.equals(msg);
}

private void close(Closeable closeable) {
if (closeable != null) {
try {
closeable.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}

/**
* 服务端主逻辑
*/
private void start() {
try {
// 创建线程池
ExecutorService executorService = Executors.newFixedThreadPool(THREADPOOL_SIZE);
// 创建自定义线程池的ChannelGroup
channelGroup = AsynchronousChannelGroup.withThreadPool(executorService);
// 创建自定义ChannelGroup的异步服务端Channel
serverChannel = AsynchronousServerSocketChannel.open(channelGroup);

serverChannel.bind(new InetSocketAddress(LOCALHOST, port));
System.out.println("启动服务器,监听端口: " + port + "...");

while (true) {
serverChannel.accept(null, new AcceptHandler());
System.in.read();
}
} catch (IOException e) {
e.printStackTrace();
} finally {
close(serverChannel);
}
}

private class AcceptHandler implements CompletionHandler<AsynchronousSocketChannel, Object> {
@Override
public void completed(AsynchronousSocketChannel clientChannel, Object attachment) {
// 服务端持续监听客户端请求
if (serverChannel.isOpen()) {
serverChannel.accept(null, this);
}

if (clientChannel != null && clientChannel.isOpen()) {
ClientHandler handler = new ClientHandler(clientChannel);

// 添加用户至在线列表
addClient(handler);

ByteBuffer buffer = ByteBuffer.allocate(BUFFER_SIZE);
clientChannel.read(buffer, buffer, handler);
}
}

@Override
public void failed(Throwable exc, Object attachment) {
System.out.println("连接失败: " + exc);
}
}

private synchronized void addClient(ClientHandler handler) {
connectedClients.add(handler);
System.out.println(getClientName(handler.clientChannel) + "已连接");
}

private synchronized void removeClient(ClientHandler handler) {
connectedClients.remove(handler);
System.out.println(getClientName(handler.clientChannel) + "已断开");
close(handler.clientChannel);
}

private class ClientHandler implements CompletionHandler<Integer, Object> {

AsynchronousSocketChannel clientChannel;

public ClientHandler (AsynchronousSocketChannel channel) {
this.clientChannel = channel;
}
@Override
public void completed(Integer result, Object attachment) {
ByteBuffer buffer = (ByteBuffer) attachment;
if (buffer != null) { // 说明此时需要处理的是读
if (result <= 0) {
// 客户端异常,移出在线列表
removeClient(this);
} else {
// 获取并打印客户端发送来的信息
buffer.flip();
String fwdMsg = receive(buffer);

// 用户准备退出
if (readyToQuit(fwdMsg)) {
removeClient(this);
return;
}

System.out.println(getClientName(clientChannel) + ": " + fwdMsg);

// 给其他客户端发送消息
forwardMessage(clientChannel, fwdMsg);
buffer.clear();

// 持续监听该客户端channel的输入
clientChannel.read(buffer, buffer, this);
}
}
}

@Override
public void failed(Throwable exc, Object attachment) {
System.out.println("读写失败:" + exc);
}
}

private synchronized void forwardMessage(AsynchronousSocketChannel clientChannel, String fwdMsg) {
for (ClientHandler handler : connectedClients) {
if (!clientChannel.equals(handler.clientChannel)) {
try {
ByteBuffer buffer = charset.encode(getClientName(clientChannel) + ": " + fwdMsg);
handler.clientChannel.write(buffer, null, handler);
} catch (Exception e) {
e.printStackTrace();
}
}
}
}

private String getClientName(AsynchronousSocketChannel clientChannel) {
int clientPort = -1;
try {
InetSocketAddress address = (InetSocketAddress) clientChannel.getRemoteAddress();
clientPort = address.getPort();
} catch (IOException e) {
e.printStackTrace();
}

return "客户端[" + clientPort + "]";
}

private String receive(ByteBuffer buffer) {
CharBuffer charBuffer = charset.decode(buffer);
return String.valueOf(charBuffer);
}

public static void main(String[] args) {
ChatServer chatServer = new ChatServer();
chatServer.start();
}
}

6.3.2 ## 客户端

在客户端,我们使用Future来实现异步调用

ChatClient.java

package client;

import java.io.Closeable;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.charset.Charset;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

public class ChatClient {

private static final String LOCALHOST = "localhost";
private String host;
private static final int DEFAULT_PORT = 8888;
private int port;

private static final int BUFFER_SIZE = 1024;

private static final String QUIT = "\\quit";
private Charset charset = Charset.forName("UTF-8");

private AsynchronousSocketChannel clientChannel;

public ChatClient() {
this(LOCALHOST, DEFAULT_PORT);
}

public ChatClient (String host, int port) {
this.host = host;
this.port = port;
}

public boolean readyToQuit(String msg) {
return QUIT.equals(msg);
}

public void close(Closeable closeable) {
if (closeable != null) {
try {
closeable.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}

public void start() {
try {
// 创建客户端channel
clientChannel = AsynchronousSocketChannel.open();
// 连接服务端,异步调用
Future<Void> future = clientChannel.connect(new InetSocketAddress(host, port));
future.get();

// 处理用户输入
new Thread(new UserInputHandler(this)).start();

ByteBuffer buffer = ByteBuffer.allocate(BUFFER_SIZE);
while (true) {
Future<Integer> readResult = clientChannel.read(buffer);
int result = readResult.get();

if (result <= 0) {
// 说明出现异常,无法再从服务器得到有效信息
System.out.println("服务器断开");
close(clientChannel);
System.exit(1);

} else {
buffer.flip();
String msg = String.valueOf(charset.decode(buffer));
buffer.clear();
System.out.println(msg);
}
}

} catch (IOException | InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}

public void send(String msg) {
if (msg.isEmpty()) {
return;
}

ByteBuffer buffer = charset.encode(msg);
Future<Integer> writeResult = clientChannel.write(buffer);
try {
writeResult.get();
} catch (InterruptedException | ExecutionException e) {
System.out.println("发送消息失败");
e.printStackTrace();
}
}

public static void main(String[] args) {
ChatClient client = new ChatClient();
client.start();
}
}

UserInputHandler.java

package client;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;

public class UserInputHandler implements Runnable {

private ChatClient chatClient;

public UserInputHandler (ChatClient client) {
this.chatClient = client;
}

@Override
public void run() {
try {
BufferedReader consoleReader = new BufferedReader(
new InputStreamReader(System.in)
);
String msg = null;
while ((msg = consoleReader.readLine()) != null) {
chatClient.send(msg);
}

} catch (IOException e) {
e.printStackTrace();
}
}
}

参考链接
(7条消息) 【Java网络编程】基于BIO/NIO/AIO的多人聊天室(一):java IO与内核IO_NoxUni的博客-CSDN博客
(7条消息) 一站式学习Java网络编程 全面理解BIO_NIO_AIO,学习手记(五)_方圆 Blog-CSDN博客