1-java.io梳理 网络编程的本质是进程间通信,而进程间通信的基础为IO模型。
java.io包中提供了大量与IO操作相关的类和方法,如下图所示,可大致分为字符流(处理单位为字符,可包含多个字节)和字节流(处理单位为字节)两类。下图所示Reader / Writer,InputStream / OutputStream均为接口,可被各种不同类实现,以完成不同的功能。
1.1 字符流 下图为字符流的几个主要实现类
CharArrayReader
/ CharArrayWriter
的数据源为字符数组,StringReader
/ StringWriter
的数据源为字符串BufferdReader
、FilterReader
、InputStreamReader
以及对应的Writer实现类为更高级的字符流实现类,它们采用装饰器模式,可在类创建时传入的另一个Reader / Writer的基础上提供更高级的额外功能。BufferedReader
:额外提供一个缓冲区,可一次性从数据源中读取较多数据至缓冲区,减少对数据源的直接访问次数。缓冲区位于内存中,效率更高。FilterReader
:抽象类,有很多子类,可在传入的Reader的基础上提供更多灵活的功能InputStreamReader
:将字节流转换为字符流,常用子类为FileReader(File中存储的数据为字节,FileReader可实现字节到字符的转换,从File中读取字符)
1.2 字节流 下图为字节流的几个主要实现类
ByteArrayInputStream
/ ByteArrayOutputStream
的数据源为字节数组,FileInputStream
/FileOutputStream
的数据源为文件FilterInputStream
/ FilterOutputStream
为抽象类,可使用装饰器模式在传入的InputStream
/ OutStream
的基础上叠加更高级的功能,常见子类包括:。BufferedInputStream:额外提供缓冲区 DataInputStream:从流中读取出指定的Java基本数据 DataOutputStream:将Java基本数据类型写到输出流中
2-同步/异步/阻塞/非阻塞 同步/异步 与 阻塞/非阻塞 可两两组合:同步阻塞 、同步非阻塞 、异步阻塞 、异步非阻塞
2.1 同步 & 异步 区别在于被调用方的行为(通信机制的区别):
同步:已知调用结果后才将调用返回 异步:不需要等待调用结果即可返回,有调用结果后,被调用方会“通知”调用方
2.2 阻塞 & 非阻塞 区别在于调用方的行为(调用状态的区别):
阻塞:调用方在收到调用结果前,不能处理其他任何事务(注意是收到调用结果前,而不是调用返回前) 非阻塞:调用方在收到调用结果前,可处理其他事务
3-内核IO模型 3.1 阻塞式I/O - BIO 应用程序发起系统调用,若此时数据还没有被准备好,则该系统调用不会被返回,它会一直等待,直到数据被准备好(内核缓冲区有了数据,并且被复制到了应用程序缓冲区),系统调用返回。
这样的内核IO模型就对应于BIO(Blocking IO),即BIO是该内核模型在应用层面的一种抽象。
3.2 非阻塞式I/O - NIO 应用程序进行系统调用,如果此时数据还没准备好,则调用返回(返回无数据,应用程序会了解到数据还未被准备好)。过一会儿应用程序会再次询问,如果仍没有数据,则调用会再次返回。假设再一次询问时,数据已经准备好,则数据会被复制到应用程序可以访问的缓冲区,调用成功返回。
这样的IO模型对应于应用层面的NIO(Non-blocking IO)。注意此时并没有使用Selector监听,而是通过应用程序的不断轮询来实现非阻塞。
3.3 I/O多路复用 - NIO + Selector 应用程序发起系统调用,如果此时数据没有准备好,那么我们会要求内核监听这个IO通道。数据准备好后,系统调用返回,通知应用程序此时数据已经准备好(此时并没有将数据返回,只是通知数据已就绪)。接下来应用程序再进行一次系统调用,复制数据并带着数据返回。由于内核可以监听多个IO,任一IO出现状态更新,内核都会返回给应用程序,因此该模型称为IO多路复用。
这样的IO模型对应于应用层面的NIO + Selector模型。Selector也称为IO多路复用器
3.4 异步I/O - AIO 以上三种IO模型均为同步IO模型,因为无论阻塞与否,如果我们不再次发起系统调用,我们将无法获得准备好的数据。这次系统调用是无法省略的,因为内核不会主动通知我们调用的结果。
异步IO的流程如下:应用程序发起系统调用,如果数据没准备好,则调用返回,不会阻塞。如果数据已经准备好,则内核会将数据复制,并向应用程序递交信号,告知应用程序此时数据已经准备好。
这样的IO模型对应于应用层面的AIO。
4-BIO编程模型 4.1 BIO流程 BIO:阻塞式IO
线程 Acceptor 负责接收客户端发起的连接请求 每当有新客户端连接,服务器都为其创建一个新线程Handler,负责处理与该客户端相关的输入输出操作创建新线程的原因:如果直接在线程 Acceptor 中处理客户端的输入输出操作,则服务器无法再接收其他客户端的连接请求
4.2 基于BIO的多人聊天室设计 4.2.1 需求分析 基于BIO模型 支持多人同时在线 每个用户的发言都被转发给其他在线用户 4.2.2 聊天室设计 1. 服务器端 需要一个 Acceptor 线程,使用 ServerSocket 的 accept() 函数阻塞式地接受客户端连接 为每一个连接进来的客户端新建一个线程,该线程需要实现两个功能:1)接收该客户端发来的信息;2)将该信息转发给其他客户端 转发功能要求客户端能够存储当前在线的所有客户端列表 2. 客户端 可以和服务端建立连接 客户端需要完成两个功能:1)接收用户在控制台的输入;2)读取其他客户端发送来的消息 由于等待用户输入的过程是阻塞的,所以客户端也需要两条线程,分别完成上述两个功能 3. BIO的体现 ServerSocket.accept()
:该方法为阻塞式调用,在有新客户端连接进来之前一直阻塞InputStream.read() & OutputStream.read()
:这两个方法均为阻塞式调用,在用户输入消息前,read()方法会一直阻塞,因此需要为每个客户端都创建一个线程4.3 基于BIO的多人聊天室实现 代码结构如下:
serverChatServer.java:服务端的主线程,负责启动服务端、接收客户端请求、存储当前在线客户端、为客户端新建处理线程等 ChatHandler.java:处理和客户端之间的输入输出工作 clientChatClient.java:客户端的主线程,负责连接服务端,读取服务端转发来的其他客户端信息 UserInputHandler.java:等待和发送用户输入 4.3.1 服务端 1. ChatServer.java 服务端的主线程,负责启动服务端、接收客户端请求、存储当前在线客户端、为客户端新建处理线程等
package server;import java.io.BufferedWriter;import java.io.IOException;import java.io.OutputStreamWriter;import java.io.Writer;import java.net.ServerSocket;import java.net.Socket;import java.util.HashMap;import java.util.Map;public class ChatServer { private int SERVER_PORT = 8888 ; private final String QUIT = "\\quit" ; private ServerSocket serverSocket; private Map<Integer, Writer> connectedClients; public ChatServer () { connectedClients = new HashMap<>(); } public synchronized void addClient (Socket socket) throws IOException { if (socket != null ) { int port = socket.getPort(); BufferedWriter writer = new BufferedWriter( new OutputStreamWriter(socket.getOutputStream()) ); connectedClients.put(port, writer); System.out.println("客户端[" + port + "]已连接到服务器" ); } } public synchronized void removeClient (Socket socket) throws IOException { if (socket != null ) { int port = socket.getPort(); if (connectedClients.containsKey(port)) { connectedClients.get(port).close(); } connectedClients.remove(port); System.out.println("客户端[" + port + "]已断开连接" ); } } public synchronized void forwardMessage (Socket socket, String fwdMsg) throws IOException { for (Integer id : connectedClients.keySet()) { if (!id.equals(socket.getPort())) { Writer writer = connectedClients.get(id); writer.write(fwdMsg); writer.flush(); } } } public void start () { try { serverSocket = new ServerSocket(SERVER_PORT); System.out.println("服务器启动,监听端口:" + SERVER_PORT + "..." ); while (true ) { Socket socket = serverSocket.accept(); new Thread(new ChatHandler(this , socket)).start(); } } catch (IOException e) { e.printStackTrace(); } finally { close(); } } public boolean readyToQuit (String msg) { return QUIT.equals(msg); } public synchronized void close () { if (serverSocket != null ) { try { serverSocket.close(); System.out.println("服务器关闭" ); } catch (IOException e) { e.printStackTrace(); } } } public static void main (String[] args) { ChatServer chatServer = new ChatServer(); chatServer.start(); } }
可使用线程池对服务器端进行进一步的改进,限制可以进入聊天室的客户端个数,避免为用户开启的线程过多。改进后的部分代码如下:
public class ChatServer { ... private ExecutorService executorService; public ChatServer (int threadNum) { executorService = Executors.newFixedThreadPool(threadNum); connectedClient = new HashMap<>(); } ... public void start () { try { serverSocket = new ServerSocket(SERVER_PORT); System.out.println("服务器启动,监听端口" + SERVER_PORT + "..." ); while (true ) { Socket socket = serverSocket.accept(); executorService.execute(new ChatHandler(this , socket)); } } catch (IOException e) { e.printStackTrace(); } finally { close(); } } public static void main (String[] args) { ChatServer chatServer = new ChatServer(3 ); chatServer.start(); } }
2. ChatHandler.java 每当有新客户端连接进服务器时,服务器均会为其创建一个ChatHandler线程,处理服务器和客户端之间的输入输出工作
package server;import java.io.BufferedReader;import java.io.IOException;import java.io.InputStreamReader;import java.net.Socket;public class ChatHandler implements Runnable { private ChatServer server; private Socket socket; public ChatHandler (ChatServer server, Socket socket) { this .server = server; this .socket = socket; } @Override public void run () { try { server.addClient(socket); BufferedReader reader = new BufferedReader( new InputStreamReader(socket.getInputStream()) ); String msg = null ; while ((msg = reader.readLine()) != null ) { if (server.readyToQuit(msg)) { break ; } String fwdMsg = "客户端[" + socket.getPort() + "]:" + msg + "\n" ; System.out.print(fwdMsg); server.forwardMessage(socket, fwdMsg); } } catch (IOException e) { e.printStackTrace(); } finally { try { server.removeClient(socket); } catch (IOException e) { e.printStackTrace(); } } } }
4.3.2 客户端 1. ChatClient.java 客户端的主线程,负责连接服务端,读取服务端转发来的其他客户端信息
package client;import java.io.*;import java.net.Socket;public class ChatClient { private final String SERVER_HOST = "127.0.0.1" ; private final int SERVER_PORT = 8888 ; private final String QUIT = "\\quit" ; private Socket socket; private BufferedReader reader; private BufferedWriter writer; public void send (String msg) throws IOException { if (!socket.isOutputShutdown()) { writer.write(msg + "\n" ); writer.flush(); } } public boolean readyToQuit (String msg) { return QUIT.equals(msg); } public void close () { if (writer != null ) { try { writer.close(); System.out.println("关闭客户端" ); } catch (IOException e) { e.printStackTrace(); } } } public void start () { try { socket = new Socket(SERVER_HOST, SERVER_PORT); reader = new BufferedReader( new InputStreamReader(socket.getInputStream()) ); writer = new BufferedWriter( new OutputStreamWriter(socket.getOutputStream()) ); new Thread(new UserInputHandler(this )).start(); String msg = null ; while ((msg = reader.readLine()) != null ) { System.out.println(msg); } } catch (IOException e) { e.printStackTrace(); } finally { close(); } } public static void main (String[] args) { ChatClient client = new ChatClient(); client.start(); } }
需要为每一个客户端进程都创建一个UserInputHandler线程,负责等待和发送用户输入
package client;import java.io.BufferedReader;import java.io.IOException;import java.io.InputStreamReader;public class UserInputHandler implements Runnable { private ChatClient chatClient; public UserInputHandler (ChatClient chatClient) { this .chatClient = chatClient; } @Override public void run () { try { BufferedReader consoleReader = new BufferedReader( new InputStreamReader(System.in) ); while (true ) { String input = consoleReader.readLine(); chatClient.send(input); if (chatClient.readyToQuit(input)) { break ; } } } catch (IOException e) { e.printStackTrace(); } } }
5-NIO 5.1-NIO概述 NIO(Non-blocking IO 或 New IO),非阻塞式IO。与BIO不同,NIO使用Channel代替Stream,特征如下:
Stream具有方向性,分为输入流和输出流,而Channel无方向,一个Channel既可以写入数据也可以读取数据。 Stream的读写均为阻塞式(例如InputStream.read(),OutputStream.write()),而Channnel的读写具有两种模式,既可以阻塞式读写,也提供了非阻塞式读写的方法 NIO使用Selector监控多条Channel 可以在一个线程里处理多个Channel I/O 5.1.1 Buffer NIO向Channel中读写数据都需要通过一个Buffer类 来实现。
Buffer,顾名思义,是一个缓冲区,代表内存中我们可以进行读写的一个区域。Channel可以支持双向操作,因此Buffer也可进行读写双向操作。
1. 向Buffer中写入数据 写模式主要用到两个指针:
position :当前写入的位置capacity :最远可写入位置(缓冲区最大容量)
2. 由写模式转换为读模式 调用flip()
方法实现从写模式到读模式的转换,步骤如下:
将position 指针移至缓冲区头部 将limit 指针移至写入的最远位置,limit 指针表示的是读模式下最远所能读取的位置
3. 读模式 第一种读取模式是将写入的数据全部读取完,读取完后position指针将和limit指针指向相同的位置 调用clear()方法转换为写模式,步骤如下:
osition指针移回头部 imit指针移回capacity处 从图中可以看出,clear()方法其实并没有清除已读取的数据,而是通过指针操作,使得再次写入的数据覆盖之前的数据。
第二种读取模式是只读取部分已写入的数据,读取完后position指针的位置在limit指针之上。 此时若想转换为写模式,需要调用compact()方法,步骤如下:
将已写入但未读取的数据复制到缓冲区头部 将position指针移至未读取数据的下面一个区域 将limit指针移回capacity处 5.1.2 Channel 1. Channel的基本操作 通过Buffer写入/读取数据 Channel之间可以直接进行数据传输 :每个Channel可以向其他Channel数据传输数据,也可以接受从其他Channel传输过来的数据 2. 几个重要的Channel
5.1.3 Selector Channel可以进行非阻塞式的读写,而并不是每个时刻Channel上都有数据可供读写,所以我们需要不停地询问Channel是否处于可操作的状态。Jahannlelva提供了一个Selector类来实现这个功能,该类负责监听各个Channel的状态。
Selector也称为 I/O多路复用器 。
1. Channel的状态 Channel的状态并不是固定不变的,Channel的状态会随着不同事件的发生而在如下状态间发生变化:
CONNECT
:客户端与服务端建立连接后,客户端的SocketChannel会处于CONNECT状态ACCEPT
:服务端接受了客户端的连接建立请求后,服务端的ServerSocketChannel会处于ACCEPT状态READ
:Channel上有了可读取信息后WRITE
:可以向Channel写入数据的状态2. Channel的注册 要想使用Selector监听Channel,我们首先需要将该Channel注册到Selector上。
注册后,我们会得到一个SelectionKey对象,该对象可理解为每一个注册在Selector上的Channel的ID。通过SelectionKey,我们可以得到如下信息:
interestOps()
:得到一组Selector需要监听的该Channel的状态readyOps()
:在需要监听的状态中,哪些是处于准备好、可操作的channel()
:返回监听的Channel对象selector()
:返回被注册的Selectorattachment()
:根据具体业务需求,可以是需要加载在channel上的任意对象5.2 用NIO进行文件拷贝 使用FileChannel实现本地文件拷贝。如上所述,有通过Buffer传输和Channel之间直接传输两种方式
5.2.1 通过Buffer进行数据传输 几个需要注意的点:
通过文件输入输出流获得文件通道:FileChannel fin = new FileInputStream(source).getChannel();
注意Channel和Buffer的方向
:将数据从文件Channel中读取出来(fin.read(buffer)
),写入缓冲区(缓冲区为写模式);将缓冲区转换为读模式(buffer.flip()
);将数据从缓冲区中读出(缓冲区为读模式),写入文件Channel(fout.write(buffer)
);将缓冲区转换为写模式(buffer.clear()
)
import java.io.*;import java.nio.ByteBuffer;import java.nio.channels.FileChannel;public class NioBufferCopy { private static void close (Closeable closeable) { if (closeable != null ) { try { closeable.close(); } catch (IOException e) { e.printStackTrace(); } } } public static void copyFile (File source, File target) { FileChannel fin = null ; FileChannel fout = null ; try { fin = new FileInputStream(source).getChannel(); fout = new FileOutputStream(target).getChannel(); ByteBuffer buffer = ByteBuffer.allocate(1024 ); while (fin.read(buffer) != -1 ) { buffer.flip(); while (buffer.hasRemaining()) { fout.write(buffer); } buffer.clear(); } } catch (IOException e) { e.printStackTrace(); } finally { close(fin); close(fout); } } public static void main (String[] args) { File file = new File("E:/JavaProject/web/nio-file-copy/tmp/smallFile.jpg" ); File fileCopy = new File("E:/JavaProject/web/nio-file-copy/tmp/smallFile-copy.jpg" ); System.out.println("--- Copying small file ---" ); copyFile(file, fileCopy); } }
5.2.2 直接在Channel间进行数据传输 调用通道的transferTo()
方法实现Channel间的数据传输 与write()
方法一样,transferTo()不能保证拷贝通道中的所有数据,因此需要使用一个while循环,确保文件被完整拷贝 import java.io.*;import java.nio.channels.FileChannel;public class NioTransferCopy { private static void close (Closeable closeable) { if (closeable != null ) { try { closeable.close(); } catch (IOException e) { e.printStackTrace(); } } } public static void copyFile (File source, File target) { FileChannel fin = null ; FileChannel fout = null ; try { fin = new FileInputStream(source).getChannel(); fout = new FileOutputStream(target).getChannel(); long transferred = 0L ; long size = fin.size(); while (transferred != size) { transferred += fin.transferTo(0 , size, fout); } } catch (IOException e) { e.printStackTrace(); } finally { close(fin); close(fout); } } public static void main (String[] args) { File file = new File("E:/JavaProject/web/nio-file-copy/tmp/smallFile.jpg" ); File fileCopy = new File("E:/JavaProject/web/nio-file-copy/tmp/smallFile-copy.jpg" ); System.out.println("--- Copying small file ---" ); copyFile(file, fileCopy); } }
5.3 NIO编程模型 在Selector上注册服务器Channel,监听ACCEPT事件; 当Client1连接进服务器,ACCEPT事件触发,调用handles对该事件进行处理:向Selector上注册Client1的Channel的READ事件; 当Client1向服务器发送数据,READ事件触发,调用handles对该事件进行处理:转发消息; 当Client2连接进服务器,再次触发ACCEPT事件,同样调用handles进行相同处理,以此类推。 几个需要注意的点:
与BIO不同,NIO编程模型中,accept操作与读写处理操作是在同一个线程中进行的。 虽然使用Selector可实现非阻塞式调用,但Selector的select()方法是阻塞式的:如果当前没有Selector监听事件出现,则该方法阻塞(返回值为出现事件的数量)。 同时可有多个事件被触发,调用Selector的selectedKeys()方法可以获得可操作Channel的SelectionKey集合。
5.4 基于NIO的多人聊天室实现 5.4.1 服务端 仅需要一个线程。Selector需要监听两种事件:服务端Channel的ACCEPT事件,客户端Channel的READ事件。两种事件的具体处理在handles()方法中实现。具体实现见代码注释。
package server;import java.io.Closeable;import java.io.IOException;import java.net.InetSocketAddress;import java.nio.ByteBuffer;import java.nio.channels.SelectionKey;import java.nio.channels.Selector;import java.nio.channels.ServerSocketChannel;import java.nio.channels.SocketChannel;import java.nio.charset.Charset;import java.nio.charset.StandardCharsets;import java.util.Set;public class ChatServer { private static final int DEFAULT_PORT = 8888 ; private int port; private ServerSocketChannel server; private Selector selector; private static final int BUFFER_SIZE = 1024 ; private ByteBuffer rBuffer = ByteBuffer.allocate(BUFFER_SIZE); private ByteBuffer wBuffer = ByteBuffer.allocate(BUFFER_SIZE); private static final String QUIT = "\\quit" ; private Charset charset = StandardCharsets.UTF_8; public ChatServer () { this (DEFAULT_PORT); } public ChatServer (int port) { this .port = port; } private void start () { try { server = ServerSocketChannel.open(); server.configureBlocking(false ); server.socket().bind(new InetSocketAddress(port)); selector = Selector.open(); server.register(selector, SelectionKey.OP_ACCEPT); System.out.println("启动服务器,监听端口:" + port + "..." ); while (true ) { selector.select(); Set<SelectionKey> selectionKeys = selector.selectedKeys(); for (SelectionKey key : selectionKeys) { handles(key); } selectionKeys.clear(); } } catch (IOException e) { e.printStackTrace(); } finally { close(selector); } } private void handles (SelectionKey key) throws IOException { if (key.isAcceptable()) { ServerSocketChannel serverChannel = (ServerSocketChannel) key.channel(); SocketChannel clientChannel = serverChannel.accept(); clientChannel.configureBlocking(false ); clientChannel.register(selector, SelectionKey.OP_READ); System.out.println(getClientName(clientChannel) + "已连接" ); } else if (key.isReadable()) { SocketChannel clientChannel = (SocketChannel) key.channel(); String fwdMsg = receive(clientChannel); if (fwdMsg.isEmpty() || readyToQuit(fwdMsg)) { key.cancel(); selector.wakeup(); System.out.println(getClientName(clientChannel) + "已断开" ); } else { System.out.println(getClientName(clientChannel) + ":" + fwdMsg); forwardMessage(clientChannel, fwdMsg); } } } private String receive (SocketChannel clientChannel) throws IOException { rBuffer.clear(); while ((clientChannel.read(rBuffer)) > 0 ); rBuffer.flip(); return String.valueOf(charset.decode(rBuffer)); } private void forwardMessage (SocketChannel clientChannel, String fwdMsg) throws IOException { for (SelectionKey key : selector.keys()) { if (key.isValid() && key.channel() instanceof SocketChannel) { SocketChannel connectedClient = (SocketChannel) key.channel(); if (!connectedClient.equals(clientChannel)) { wBuffer.clear(); wBuffer.put(charset.encode(getClientName(clientChannel) + ":" + fwdMsg)); wBuffer.flip(); while (wBuffer.hasRemaining()) { connectedClient.write(wBuffer); } } } } } private String getClientName (SocketChannel client) { return "客户端[" + client.socket().getPort() + "]" ; } private boolean readyToQuit (String msg) { return QUIT.equals(msg); } private void close (Closeable closeable) { if (closeable != null ) { try { closeable.close(); } catch (IOException e) { e.printStackTrace(); } } } public static void main (String[] args) { ChatServer chatServer = new ChatServer(); chatServer.start(); } }
5.4.2 客户端 ChatClient.java :客户端主线程,Selector监听客户端的CONNECT事件和READ事件
package client;import java.io.Closeable;import java.io.IOException;import java.net.InetSocketAddress;import java.nio.ByteBuffer;import java.nio.channels.ClosedSelectorException;import java.nio.channels.SelectionKey;import java.nio.channels.Selector;import java.nio.channels.SocketChannel;import java.nio.charset.Charset;import java.nio.charset.StandardCharsets;import java.util.Set;public class ChatClient { private String host; private static final String DEFAULT_SERVER_HOST = "127.0.0.1" ; private int port; private static final int DEFAULT_SERVER_PORT = 8888 ; private SocketChannel client; private Selector selector; private static final int BUFFER_SIZE = 1024 ; private ByteBuffer rBuffer = ByteBuffer.allocate(BUFFER_SIZE); private ByteBuffer wBuffer = ByteBuffer.allocate(BUFFER_SIZE); private static final String QUIT = "\\quit" ; private Charset charset = StandardCharsets.UTF_8; public ChatClient () { this (DEFAULT_SERVER_HOST, DEFAULT_SERVER_PORT); } public ChatClient (String host, int port) { this .host = host; this .port = port; } public boolean readyToQuit (String msg) { return QUIT.equals(msg); } private void close (Closeable closeable) { if (closeable != null ) { try { closeable.close(); } catch (IOException e) { e.printStackTrace(); } } } private void start () { try { client = SocketChannel.open(); client.configureBlocking(false ); selector = Selector.open(); client.register(selector, SelectionKey.OP_CONNECT); client.connect(new InetSocketAddress(host, port)); while (true ) { selector.select(); Set<SelectionKey> selectionKeys = selector.selectedKeys(); for (SelectionKey key : selectionKeys) { handles(key); } selectionKeys.clear(); } } catch (IOException e) { e.printStackTrace(); } catch (ClosedSelectorException e) { } finally { close(selector); } } private void handles (SelectionKey key) throws IOException { if (key.isConnectable()) { SocketChannel clientChannel = (SocketChannel) key.channel(); if (clientChannel.isConnectionPending()) { clientChannel.finishConnect(); new Thread(new UserInputHandler(this )).start(); } clientChannel.register(selector, SelectionKey.OP_READ); } else if (key.isReadable()) { SocketChannel clientChannel = (SocketChannel) key.channel(); String msg = receive(clientChannel); if (msg.isEmpty()) { close(selector); } else { System.out.println(msg); } } } public void send (String msg) throws IOException { if (msg.isEmpty()) { return ; } wBuffer.clear(); wBuffer.put(charset.encode(msg)); wBuffer.flip(); while (wBuffer.hasRemaining()) { client.write(wBuffer); } if (readyToQuit(msg)) { close(selector); } } private String receive (SocketChannel clientChannel) throws IOException { rBuffer.clear(); while (clientChannel.read(rBuffer) > 0 ); rBuffer.flip(); return String.valueOf(charset.decode(rBuffer)); } public static void main (String[] args) { ChatClient chatClient = new ChatClient(); chatClient.start(); } }
UserInputHandler.java :处理用户输入
package client;import java.io.BufferedReader;import java.io.IOException;import java.io.InputStreamReader;public class UserInputHandler implements Runnable { ChatClient client; public UserInputHandler (ChatClient client) { this .client = client; } @Override public void run () { BufferedReader consoleReader = new BufferedReader( new InputStreamReader(System.in) ); while (true ) { try { String input = consoleReader.readLine(); client.send(input); if (client.readyToQuit(input)) { break ; } } catch (IOException e) { e.printStackTrace(); } } } }
6-AIO 6.1 异步调用机制 6.1.1 AIO中的异步操作 AsynchronousSocketChannel
:客户端Socket通道AsynchronousServerSocketChannel
:服务端Socket通道connect
/ accept
:建立连接(支持异步操作)read
/ write
:读写操作(支持异步操作)
6.1.2 异步调用机制 1. Future Channel异步调用connect / accept,read / write等方法,方法返回一个Future
对象。获得Future对象后,我们可以对它进行如下操作:
调用Future的get()
方法:该方法是阻塞式调用,它会一直阻塞,直到Future所对应的任务完成并返回。 调用Future的isDone()
方法:再一个循环中调用该方法,不停询问任务是否已完成,可设置超时时间。 2. CompletionHandler Channel异步调用connect / accept,read / write等方法时,将一个CompletionHandler
作为参数传入。
CompletionHandler是一个接口,它提供两个回调函数(callback),completed()
和failed()
,我们可以自己实现这两个方法。当异步操作完成后,系统会调用回调函数(操作成功则调用completed(),发生异常则调用failed()),完成相关业务功能。
6.2 AIO编程模型 6.2.1 AsynchronousChannelGroup 我们使用的异步通道属于一个AsynchronousChannelGroup,即异步通道组,是一组可以被多个异步通道共享的资源群组。Group可以自定义,如果不特别指定,系统会使用默认群组。
需要ChannelGroup的原因:AIO机制下,操作系统帮我们做了很多事,比如当调用完成时,操作系统会自动回调函数。操作系统完成这些功能需要一定的系统资源,比如线程池。Group中会包含线程池等资源,操作系统可以复用线程池中的线程来实现回调函数等功能。
6.2.2 异步实现 创建AsynchronousServerSocketChannel后,需要创建一个AcceptHandler,用于处理accept()的异步调用,Client1连接后,会触发该Handler; AcceptHandler会获得连接后返回的AsynrounousSocketChannel。服务端Channel需要进行读写操作,这些操作也是异步调用的,因此需要再创建一个ClientHandler用于处理客户端的读写调用; 每当有新客户端连接,都会再次触发AcceptHandler,完成相应操作(即为新的客户端Channel注册一个ClientHandler) 当某个客户端有IO操作,会触发对应的ClientHandler,完成特定的业务操作
6.3 基于AIO的多人聊天室实现 6.3.1 服务端 在服务端,我们使用CompletionHandler
来实现异步调用
package server;import java.io.Closeable;import java.io.IOException;import java.net.InetSocketAddress;import java.nio.ByteBuffer;import java.nio.CharBuffer;import java.nio.channels.AsynchronousChannelGroup;import java.nio.channels.AsynchronousServerSocketChannel;import java.nio.channels.AsynchronousSocketChannel;import java.nio.channels.CompletionHandler;import java.nio.charset.Charset;import java.util.ArrayList;import java.util.List;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;public class ChatServer { private static final String LOCALHOST = "localhost" ; private static final int DEFAULT_PORT = 8888 ; private int port; private static final int BUFFER_SIZE = 1024 ; private static final int THREADPOOL_SIZE = 8 ; private static final String QUIT = "\\quit" ; private Charset charset = Charset.forName("UTF-8" ); private AsynchronousChannelGroup channelGroup; private AsynchronousServerSocketChannel serverChannel; private List<ClientHandler> connectedClients; public ChatServer () { this (DEFAULT_PORT); } public ChatServer (int port) { this .port = port; this .connectedClients = new ArrayList<>(); } private boolean readyToQuit (String msg) { return QUIT.equals(msg); } private void close (Closeable closeable) { if (closeable != null ) { try { closeable.close(); } catch (IOException e) { e.printStackTrace(); } } } private void start () { try { ExecutorService executorService = Executors.newFixedThreadPool(THREADPOOL_SIZE); channelGroup = AsynchronousChannelGroup.withThreadPool(executorService); serverChannel = AsynchronousServerSocketChannel.open(channelGroup); serverChannel.bind(new InetSocketAddress(LOCALHOST, port)); System.out.println("启动服务器,监听端口: " + port + "..." ); while (true ) { serverChannel.accept(null , new AcceptHandler()); System.in.read(); } } catch (IOException e) { e.printStackTrace(); } finally { close(serverChannel); } } private class AcceptHandler implements CompletionHandler <AsynchronousSocketChannel , Object > { @Override public void completed (AsynchronousSocketChannel clientChannel, Object attachment) { if (serverChannel.isOpen()) { serverChannel.accept(null , this ); } if (clientChannel != null && clientChannel.isOpen()) { ClientHandler handler = new ClientHandler(clientChannel); addClient(handler); ByteBuffer buffer = ByteBuffer.allocate(BUFFER_SIZE); clientChannel.read(buffer, buffer, handler); } } @Override public void failed (Throwable exc, Object attachment) { System.out.println("连接失败: " + exc); } } private synchronized void addClient (ClientHandler handler) { connectedClients.add(handler); System.out.println(getClientName(handler.clientChannel) + "已连接" ); } private synchronized void removeClient (ClientHandler handler) { connectedClients.remove(handler); System.out.println(getClientName(handler.clientChannel) + "已断开" ); close(handler.clientChannel); } private class ClientHandler implements CompletionHandler <Integer , Object > { AsynchronousSocketChannel clientChannel; public ClientHandler (AsynchronousSocketChannel channel) { this .clientChannel = channel; } @Override public void completed (Integer result, Object attachment) { ByteBuffer buffer = (ByteBuffer) attachment; if (buffer != null ) { if (result <= 0 ) { removeClient(this ); } else { buffer.flip(); String fwdMsg = receive(buffer); if (readyToQuit(fwdMsg)) { removeClient(this ); return ; } System.out.println(getClientName(clientChannel) + ": " + fwdMsg); forwardMessage(clientChannel, fwdMsg); buffer.clear(); clientChannel.read(buffer, buffer, this ); } } } @Override public void failed (Throwable exc, Object attachment) { System.out.println("读写失败:" + exc); } } private synchronized void forwardMessage (AsynchronousSocketChannel clientChannel, String fwdMsg) { for (ClientHandler handler : connectedClients) { if (!clientChannel.equals(handler.clientChannel)) { try { ByteBuffer buffer = charset.encode(getClientName(clientChannel) + ": " + fwdMsg); handler.clientChannel.write(buffer, null , handler); } catch (Exception e) { e.printStackTrace(); } } } } private String getClientName (AsynchronousSocketChannel clientChannel) { int clientPort = -1 ; try { InetSocketAddress address = (InetSocketAddress) clientChannel.getRemoteAddress(); clientPort = address.getPort(); } catch (IOException e) { e.printStackTrace(); } return "客户端[" + clientPort + "]" ; } private String receive (ByteBuffer buffer) { CharBuffer charBuffer = charset.decode(buffer); return String.valueOf(charBuffer); } public static void main (String[] args) { ChatServer chatServer = new ChatServer(); chatServer.start(); } }
6.3.2 ## 客户端 在客户端,我们使用Future
来实现异步调用
ChatClient.java
package client;import java.io.Closeable;import java.io.IOException;import java.net.InetSocketAddress;import java.nio.ByteBuffer;import java.nio.channels.AsynchronousSocketChannel;import java.nio.charset.Charset;import java.util.concurrent.ExecutionException;import java.util.concurrent.Future;public class ChatClient { private static final String LOCALHOST = "localhost" ; private String host; private static final int DEFAULT_PORT = 8888 ; private int port; private static final int BUFFER_SIZE = 1024 ; private static final String QUIT = "\\quit" ; private Charset charset = Charset.forName("UTF-8" ); private AsynchronousSocketChannel clientChannel; public ChatClient () { this (LOCALHOST, DEFAULT_PORT); } public ChatClient (String host, int port) { this .host = host; this .port = port; } public boolean readyToQuit (String msg) { return QUIT.equals(msg); } public void close (Closeable closeable) { if (closeable != null ) { try { closeable.close(); } catch (IOException e) { e.printStackTrace(); } } } public void start () { try { clientChannel = AsynchronousSocketChannel.open(); Future<Void> future = clientChannel.connect(new InetSocketAddress(host, port)); future.get(); new Thread(new UserInputHandler(this )).start(); ByteBuffer buffer = ByteBuffer.allocate(BUFFER_SIZE); while (true ) { Future<Integer> readResult = clientChannel.read(buffer); int result = readResult.get(); if (result <= 0 ) { System.out.println("服务器断开" ); close(clientChannel); System.exit(1 ); } else { buffer.flip(); String msg = String.valueOf(charset.decode(buffer)); buffer.clear(); System.out.println(msg); } } } catch (IOException | InterruptedException | ExecutionException e) { e.printStackTrace(); } } public void send (String msg) { if (msg.isEmpty()) { return ; } ByteBuffer buffer = charset.encode(msg); Future<Integer> writeResult = clientChannel.write(buffer); try { writeResult.get(); } catch (InterruptedException | ExecutionException e) { System.out.println("发送消息失败" ); e.printStackTrace(); } } public static void main (String[] args) { ChatClient client = new ChatClient(); client.start(); } }
UserInputHandler.java
package client;import java.io.BufferedReader;import java.io.IOException;import java.io.InputStreamReader;public class UserInputHandler implements Runnable { private ChatClient chatClient; public UserInputHandler (ChatClient client) { this .chatClient = client; } @Override public void run () { try { BufferedReader consoleReader = new BufferedReader( new InputStreamReader(System.in) ); String msg = null ; while ((msg = consoleReader.readLine()) != null ) { chatClient.send(msg); } } catch (IOException e) { e.printStackTrace(); } } }
参考链接(7条消息) 【Java网络编程】基于BIO/NIO/AIO的多人聊天室(一):java IO与内核IO_NoxUni的博客-CSDN博客 (7条消息) 一站式学习Java网络编程 全面理解BIO_NIO_AIO,学习手记(五)_方圆 Blog-CSDN博客