IO和NIO的区别:
IO:阻塞的,面向流(Stream)的,流是单向的
NIO:非阻塞的,面向缓冲区(Buffer)的,通道(Channel)是双向的
NIO主要解决场景在网络IO
IO组件(ServerSocket、Socket)的accept方法是阻塞的.
NIO组件(ServerSocketChannel、SocketChannel)的accept方法是阻塞的,但可以通过configureBlock方法设置为非阻塞的.
文件方面的:FileChannel,还有异步读取写入通道:AsynchronousFileChannel(对应的read和write方法都是非阻塞的)
NIO的三大组件:Buffer、Channel、Selector

server代码示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73
| package pers.xiaotian.nio.socket;
import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors;
public class NIOServerSocket {
public static void main(String[] args) { try { ServerSocketChannel socketChannel = ServerSocketChannel.open(); socketChannel.bind(new InetSocketAddress(8012)); List<SocketChannel> clientChannels = new ArrayList<>();
while(true) { System.out.println("等待获取客户端连接"); SocketChannel accept = socketChannel.accept(); clientChannels.add(accept); ExecutorService executorService = Executors.newCachedThreadPool(); executorService.submit(()->{ handle(accept, clientChannels); }); }
} catch (IOException e) { System.out.println("主线程退出结束"); e.printStackTrace(); } }
private static void handle(SocketChannel accept, List<SocketChannel> clientChannels) { ByteBuffer readByteBuffer = ByteBuffer.allocate(1024); ByteBuffer writeByteBuffer = ByteBuffer.allocate(1024); try { while (true) { System.out.println("线程" + Thread.currentThread().getName() + "等待获取客户端发送数据:"); int limit = accept.read(readByteBuffer); readByteBuffer.flip(); byte[] bytes = new byte[1024]; readByteBuffer.get(bytes, 0, limit); readByteBuffer.clear(); System.out.println("线程" + Thread.currentThread().getName() + "接收到客户端发送的数据为:" + new String(bytes, 0, limit));
SocketChannel closeChannel = null; for(SocketChannel otherClient: clientChannels){ if(otherClient != accept) { System.out.println("线程" + Thread.currentThread().getName() + "转发到其他客户端发送的数据为:" + new String(bytes, 0, limit));
try { writeByteBuffer.clear(); writeByteBuffer.put(bytes, 0, limit); writeByteBuffer.flip(); otherClient.write(writeByteBuffer); } catch (IOException e) { closeChannel = otherClient; } } } clientChannels.remove(closeChannel); } } catch (IOException e) { System.out.println("线程"+ Thread.currentThread().getName() +"异常:"); e.printStackTrace(); } } }
|
client代码示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65
| package pers.xiaotian.nio.socket;
import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SocketChannel; import java.util.Scanner;
public class NIOClientSocket { static final String name = "client3";
public static void main(String[] args) { try { SocketChannel sc = SocketChannel.open(); sc.connect(new InetSocketAddress(8012)); new Thread(()->{ ByteBuffer writeBuffer = ByteBuffer.allocate(1024); Thread writeThread = Thread.currentThread(); new Thread(()->{ ByteBuffer readBuffer = ByteBuffer.allocate(1024); while(true){ try { if(Thread.State.TERMINATED.equals(writeThread.getState())){ System.out.println(name + " 接收线程终止!"); break; } int limit = sc.read(readBuffer); readBuffer.flip(); byte[] bytes = new byte[1024]; readBuffer.get(bytes, 0, limit); System.out.println(name+" 接收到信息:"+ new String(bytes, 0, limit)); readBuffer.clear(); } catch (IOException e) { throw new RuntimeException(e); } } }).start(); while(true){ Scanner scanner = new Scanner(System.in); System.out.println("等待"+name+"输入:"); String next = scanner.next();
if("quit".equals(next)){ System.out.println(name + " 发送线程终止!"); break; }
try { System.out.println(name+"的"+next+"发送到服务器"); writeBuffer.put(next.getBytes()); writeBuffer.flip(); sc.write(writeBuffer); writeBuffer.clear();
} catch (IOException e) { throw new RuntimeException(e); } } }).start(); } catch (IOException e) { throw new RuntimeException(e); } } }
|
使用selector选择器,对server端多线程进行改造
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88
| package pers.xiaotian.nio.selector;
import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.*; import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.Set;
public class NIOSelectorServer {
public static void main(String[] args) { try { ServerSocketChannel ssc = ServerSocketChannel.open(); ssc.bind(new InetSocketAddress(8013)); ssc.configureBlocking(false);
Selector selector = Selector.open(); ssc.register(selector, SelectionKey.OP_ACCEPT);
List<SocketChannel> scList = new ArrayList<>();
while(true){ if(selector.select() >0) { Set<SelectionKey> selectionKeys = selector.selectedKeys(); Iterator<SelectionKey> iterator = selectionKeys.iterator(); if(iterator.hasNext()){ SelectionKey sk = iterator.next(); if (sk.isAcceptable()) { ServerSocketChannel channel = (ServerSocketChannel) sk.channel(); SocketChannel sc = channel.accept(); scList.add(sc); sc.configureBlocking(false);
sc.register(selector, SelectionKey.OP_READ); } else if (sk.isReadable()) {
ByteBuffer byteBuffer = ByteBuffer.allocate(1024); SocketChannel channel = (SocketChannel) sk.channel(); int limit = channel.read(byteBuffer); if(limit == -1){ System.out.println("客户端已关闭"); scList.remove(channel); channel.close(); }else { byte[] bytes = new byte[1024]; byteBuffer.flip(); byteBuffer.get(bytes, 0, limit); String message = new String(bytes, 0, limit); System.out.println("客户端传入的数据为:" + message);
forwardMessage(channel, scList, message); } } iterator.remove(); }
}
} } catch (IOException e) { throw new RuntimeException(e); } }
private static void forwardMessage(SocketChannel channel, List<SocketChannel> scList, String message) { ByteBuffer byteBuffer = ByteBuffer.allocate(1024); Iterator<SocketChannel> iterator = scList.iterator(); while(iterator.hasNext()){ SocketChannel sc = iterator.next(); if(channel != sc){ byteBuffer.put(message.getBytes()); try { byteBuffer.flip(); sc.write(byteBuffer); byteBuffer.rewind(); } catch (IOException e) { iterator.remove(); } } } } }
|