4 Java I/O:AIO和NIO中的Selector( 二 )

之前說過,同步指的按順序一次完成一個任務,直到前一個任務完成并有了結果以后 , 才能再執行后面的任務 。而異步指的是前一個任務結束后,并不等待任務結果,而是繼續執行后一個任務 , 在所有任務都「執行」完后,通過任務的回調函數去獲得結果 。所以異步使得應用性能有了極大的提高 。為了更加生動地說明什么是異步,可以來做個實驗:

4 Java I/O:AIO和NIO中的Selector

文章插圖
通過調用CompletableFuture.supplyAsync()方法可以很明顯地觀察到,處于位置2的「這一步先執行」會最先顯示,然后才執行位置1的代碼 。而這就是異步的具體實現 。
NIO為了支持異步 , 升級到了NIO2,也就是AIO 。而AIO引入了新的異步Channel的概念,并提供了異步FileChannel和異步SocketChannel的實現 。AIO的異步SocketChannel是真正的異步非阻塞I/O 。通過代碼可以更好地說明:
/** * AIO客戶端 * * @author xiangwang */public class AioClient {public void start() throws IOException, InterruptedException {AsynchronousSocketChannel channel = AsynchronousSocketChannel.open();if (channel.isOpen()) {// socket接收緩沖區recbuf大小channel.setOption(StandardSocketOptions.SO_RCVBUF, 128 * 1024);// socket發送緩沖區recbuf大小channel.setOption(StandardSocketOptions.SO_SNDBUF, 128 * 1024);// 保持長連接狀態channel.setOption(StandardSocketOptions.SO_KEEPALIVE, true);// 連接到服務端channel.connect(new InetSocketAddress(8080), null,new AioClientHandler(channel));// 阻塞主進程for(;;) {TimeUnit.SECONDS.sleep(1);}} else {throw new RuntimeException("Channel not opened!");}}public static void main(String[] args) throws IOException, InterruptedException {new AioClient().start();}}/** * AIO客戶端CompletionHandler * * @author xiangwang */public class AioClientHandler implements CompletionHandler<Void, AioClient> {private final AsynchronousSocketChannel channel;private final CharsetDecoder decoder = Charset.defaultCharset().newDecoder();private final BufferedReader input = new BufferedReader(new InputStreamReader(System.in));public AioClientHandler(AsynchronousSocketChannel channel) {this.channel = channel;}@Overridepublic void failed(Throwable exc, AioClient attachment) {throw new RuntimeException("channel not opened!");}@Overridepublic void completed(Void result, AioClient attachment) {System.out.println("send message to server: ");try {// 將輸入內容寫到bufferString line = input.readLine();channel.write(ByteBuffer.wrap(line.getBytes()));// 在操作系統中的Java本地方法native已經把數據寫到了buffer中// 這里只需要一個緩沖區能接收就行了ByteBuffer buffer = ByteBuffer.allocate(1024);while (channel.read(buffer).get() != -1) {buffer.flip();System.out.println("from server: " + decoder.decode(buffer).toString());if (buffer.hasRemaining()) {buffer.compact();} else {buffer.clear();}// 將輸入內容寫到bufferline = input.readLine();channel.write(ByteBuffer.wrap(line.getBytes()));}} catch (IOException | InterruptedException | ExecutionException e) {e.printStackTrace();}}}/** * AIO服務端 * * @author xiangwang */public class AioServer {public void start() throws InterruptedException, IOException {AsynchronousServerSocketChannel channel = AsynchronousServerSocketChannel.open();if (channel.isOpen()) {// socket接受緩沖區recbuf大小channel.setOption(StandardSocketOptions.SO_RCVBUF, 4 * 1024);// 端口重用,防止進程意外終止,未釋放端口 , 重啟時失敗// 因為直接殺進程,沒有顯式關閉套接字來釋放端口,會等待一段時間后才可以重新use這個關口// 解決辦法就是用SO_REUSEADDRchannel.setOption(StandardSocketOptions.SO_REUSEADDR, true);channel.bind(new InetSocketAddress(8080));} else {throw new RuntimeException("channel not opened!");}// 處理client連接channel.accept(null, new AioServerHandler(channel));System.out.println("server started");// 阻塞主進程for(;;) {TimeUnit.SECONDS.sleep(1);}}public static void main(String[] args) throws IOException, InterruptedException {AioServer server = new AioServer();server.start();}}/** * AIO服務端CompletionHandler * * @author xiangwang */public class AioServerHandler implements CompletionHandler<AsynchronousSocketChannel, Void> {private final AsynchronousServerSocketChannel serverChannel;private final CharsetDecoder decoder = Charset.defaultCharset().newDecoder();private final BufferedReader input = new BufferedReader(new InputStreamReader(System.in));public AioServerHandler(AsynchronousServerSocketChannel serverChannel) {this.serverChannel = serverChannel;}@Overridepublic void failed(Throwable exc, Void attachment) {// 處理下一次的client連接serverChannel.accept(null, this);}@Overridepublic void completed(AsynchronousSocketChannel result, Void attachment) {// 處理下一次的client連接,類似鏈式調用serverChannel.accept(null, this);try {// 將輸入內容寫到bufferString line = input.readLine();result.write(ByteBuffer.wrap(line.getBytes()));// 在操作系統中的Java本地方法native已經把數據寫到了buffer中// 這里只需要一個緩沖區能接收就行了ByteBuffer buffer = ByteBuffer.allocate(1024);while (result.read(buffer).get() != -1) {buffer.flip();System.out.println("from client: " + decoder.decode(buffer).toString());if (buffer.hasRemaining()) {buffer.compact();} else {buffer.clear();}// 將輸入內容寫到bufferline = input.readLine();result.write(ByteBuffer.wrap(line.getBytes()));}} catch (InterruptedException | ExecutionException | IOException e) {e.printStackTrace();}}}

推薦閱讀