IO和NIO的区别:

IO:阻塞的,面向流(Stream)的,流是单向的

NIO:非阻塞的,面向缓冲区(Buffer)的,通道(Channel)是双向的

NIO主要解决场景在网络IO

IO组件(ServerSocket、Socket)的accept方法是阻塞的.

NIO组件(ServerSocketChannel、SocketChannel)的accept方法是阻塞的,但可以通过configureBlock方法设置为非阻塞的.

文件方面的:FileChannel,还有异步读取写入通道:AsynchronousFileChannel(对应的read和write方法都是非阻塞的)

NIO的三大组件:Buffer、Channel、Selector

server代码示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
package pers.xiaotian.nio.socket;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class NIOServerSocket {

public static void main(String[] args) {
try {
ServerSocketChannel socketChannel = ServerSocketChannel.open();
socketChannel.bind(new InetSocketAddress(8012));
List<SocketChannel> clientChannels = new ArrayList<>();

while(true) {
System.out.println("等待获取客户端连接");
SocketChannel accept = socketChannel.accept();
clientChannels.add(accept);
ExecutorService executorService = Executors.newCachedThreadPool();
executorService.submit(()->{
handle(accept, clientChannels);
});
}

} catch (IOException e) {
System.out.println("主线程退出结束");
e.printStackTrace();
}
}

private static void handle(SocketChannel accept, List<SocketChannel> clientChannels) {
ByteBuffer readByteBuffer = ByteBuffer.allocate(1024);
ByteBuffer writeByteBuffer = ByteBuffer.allocate(1024);
try {
while (true) {
System.out.println("线程" + Thread.currentThread().getName() + "等待获取客户端发送数据:");
int limit = accept.read(readByteBuffer);
readByteBuffer.flip();
byte[] bytes = new byte[1024];
readByteBuffer.get(bytes, 0, limit);
readByteBuffer.clear();
System.out.println("线程" + Thread.currentThread().getName() + "接收到客户端发送的数据为:" + new String(bytes, 0, limit));

SocketChannel closeChannel = null;
for(SocketChannel otherClient: clientChannels){
if(otherClient != accept) {
System.out.println("线程" + Thread.currentThread().getName() + "转发到其他客户端发送的数据为:" + new String(bytes, 0, limit));

try {
writeByteBuffer.clear();
writeByteBuffer.put(bytes, 0, limit);
writeByteBuffer.flip();
otherClient.write(writeByteBuffer);
} catch (IOException e) {
closeChannel = otherClient;
}
}
}
clientChannels.remove(closeChannel);
}
} catch (IOException e) {
System.out.println("线程"+ Thread.currentThread().getName() +"异常:");
e.printStackTrace();
}
}
}

client代码示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
package pers.xiaotian.nio.socket;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.util.Scanner;

public class NIOClientSocket {
static final String name = "client3";

public static void main(String[] args) {
try {
SocketChannel sc = SocketChannel.open();
sc.connect(new InetSocketAddress(8012));
new Thread(()->{
ByteBuffer writeBuffer = ByteBuffer.allocate(1024);
Thread writeThread = Thread.currentThread();
new Thread(()->{
ByteBuffer readBuffer = ByteBuffer.allocate(1024);
while(true){
try {
if(Thread.State.TERMINATED.equals(writeThread.getState())){
System.out.println(name + " 接收线程终止!");
break;
}
int limit = sc.read(readBuffer);
readBuffer.flip();
byte[] bytes = new byte[1024];
readBuffer.get(bytes, 0, limit);
System.out.println(name+" 接收到信息:"+ new String(bytes, 0, limit));
readBuffer.clear();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}).start();
while(true){
Scanner scanner = new Scanner(System.in);
System.out.println("等待"+name+"输入:");
String next = scanner.next();

if("quit".equals(next)){
System.out.println(name + " 发送线程终止!");
break;
}

try {
System.out.println(name+"的"+next+"发送到服务器");
writeBuffer.put(next.getBytes());
writeBuffer.flip();
sc.write(writeBuffer);
writeBuffer.clear();

} catch (IOException e) {
throw new RuntimeException(e);
}
}
}).start();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}

使用selector选择器,对server端多线程进行改造

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
package pers.xiaotian.nio.selector;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Set;

public class NIOSelectorServer {

public static void main(String[] args) {
try {
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.bind(new InetSocketAddress(8013));
ssc.configureBlocking(false);

Selector selector = Selector.open();
ssc.register(selector, SelectionKey.OP_ACCEPT);

List<SocketChannel> scList = new ArrayList<>();

while(true){
if(selector.select() >0) {
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
if(iterator.hasNext()){
SelectionKey sk = iterator.next();
if (sk.isAcceptable()) {
ServerSocketChannel channel = (ServerSocketChannel) sk.channel();
SocketChannel sc = channel.accept();
scList.add(sc);
sc.configureBlocking(false);

sc.register(selector, SelectionKey.OP_READ);
} else if (sk.isReadable()) {

ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
SocketChannel channel = (SocketChannel) sk.channel();
int limit = channel.read(byteBuffer);
if(limit == -1){
System.out.println("客户端已关闭");
scList.remove(channel);
channel.close();
}else {
byte[] bytes = new byte[1024];
byteBuffer.flip();
byteBuffer.get(bytes, 0, limit);
String message = new String(bytes, 0, limit);
System.out.println("客户端传入的数据为:" + message);

//转发到其他通道
forwardMessage(channel, scList, message);
}
}
iterator.remove();
}

}


}
} catch (IOException e) {
throw new RuntimeException(e);
}
}

private static void forwardMessage(SocketChannel channel, List<SocketChannel> scList, String message) {
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
Iterator<SocketChannel> iterator = scList.iterator();
while(iterator.hasNext()){
SocketChannel sc = iterator.next();
if(channel != sc){
byteBuffer.put(message.getBytes());
try {
byteBuffer.flip();
sc.write(byteBuffer);
byteBuffer.rewind();
} catch (IOException e) {
iterator.remove();
}
}
}
}
}