背景

基于netty实现的框架: Vert.x、RocketMQ、Dubbo

快速开始,创建一个springboot项目,在pom里面引入

1
2
3
4
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
</dependency>

简介: netty3.30版本以后由于netty作者离开了jboss公司,groupId的引用发生了变化.netty4做了模块化设计,如果需要全部引用可直接artfactId依赖netty-all,如果只需要用到部分功能,也可以直接引用单独的包.(来源:https://blog.csdn.net/xiaopingping___/article/details/51280216)

IO框架说明

数据的传输离不开IO框架,IO框架分为BIO、NIO、AIO

BIO

Blocking InputStream OutputStream:从字面意思上理解就是阻塞性的输入输出流,就因为它是阻塞的,所以相对读写性能不高

BIO网络传输例子(以ServerSocket和Socket为例(TCP/IP方面的java网络具体类))

代码如下:

ServerSocket

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
package pers.xiaotian.ioproject.io;

import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.io.InputStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

@Slf4j
public class ServerSocketTest {

public static void main(String[] args) throws IOException {
//创建一个线程池
ExecutorService executorService = Executors.newCachedThreadPool();
//启动一个serverSocket对象
ServerSocket serverSocket = new ServerSocket(6666);
while(true) {
log.info("等待客户端连接,阻塞中。。。");
//阻塞等待获取socket客户端连接
Socket socket = serverSocket.accept();
//创建并异步执行一个Runnable任务
executorService.execute(()->{
try {
//获取客户端的输入流
InputStream socketInputStream = socket.getInputStream();
while(true) {
byte[] bytes = new byte[1024];
int len;
log.info("等待客户端输入信息,阻塞中。。。");
//阻塞等待信息写入到socket流中,有信息会输出到字节数组中
if ((len = socketInputStream.read(bytes)) != -1) {
//将字节数组放入到String字符串中
String result = new String(bytes, 0, len);
log.info("输出的结果为:result = {}", result);
}
}
} catch (IOException e) {
throw new RuntimeException(e);
}

});

}
}

}

Socket

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
package pers.xiaotian.ioproject.io;

import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.io.OutputStream;
import java.net.Socket;
import java.util.Scanner;

@Slf4j
public class SocketTest {
public static void main(String[] args) throws IOException {
//创建一个Socket客户端,绑定SocketServer信息
Socket socket = new Socket("127.0.0.1", 6666);
//获取socket的输出流
OutputStream socketOutputStream = socket.getOutputStream();
//创建键盘绑定事件
Scanner scanner = new Scanner(System.in);
while (true) {
log.info("等待键盘输入。。。阻塞中");
//等待键盘输入
if (scanner.hasNext()) {
//取出键盘输入的字符串信息
String inputStr = scanner.next();
//将字符串信息写入到socket输出流中
socketOutputStream.write(inputStr.getBytes());
}
}


}
}

运行结果如图:

客户端键盘输入事件:

服务端响应事件:

结论:

以上面ServerSocket为例:

1.存在两处阻塞问题:一个是等待连接阻塞,另外一个socket流中等待输入的阻塞.

2.另外一个问题:服务端为了支持多个客户端连接,服务端必须启动多个线程来管理客户端的连接(1对1)

线程资源属于服务器宝贵的资源,创建线程和关闭线程都比较耗服务器方面的性能,线程处于blocked状态又比较浪费线程的资源.

结合上面几点,java方面推出了NIO的包,用来解决上面的问题

NIO

Non-Blocking InputStream OutputStream: 从字面意思上来说,就是非阻塞的输入输出流

NIO的核心三大组件是:Selector、Channel、Buffer(选择器、通道、缓冲区)

关系图如下:

代码示例如下:

SeverSocketChannel代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
package pers.xiaotian.ioproject.nio;

import lombok.extern.slf4j.Slf4j;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;

@Slf4j
public class ServerSocketChannelTest {

public static void main(String[] args) throws IOException, InterruptedException {

//创建serverSocket通道
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
//设为非阻塞的状态
serverSocketChannel.configureBlocking(false);
//绑定通道网络端口
serverSocketChannel.bind(new InetSocketAddress(6666));
//创建选择器
Selector selector = Selector.open();
//将serverSocket通道注册到selector获取连接的事件Key中
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
//服务端已经启动成功了
log.info("服务端启动成功。。。");
while(true){
//等待有事件处理,阻塞中
log.info("选择器阻塞等待事件发生。。。");
selector.select();
//获取选择器事件类型
Iterator<SelectionKey> keyIterator = selector.selectedKeys().iterator();
while (keyIterator.hasNext()){
//获取具体事件类型
SelectionKey selectionKey = keyIterator.next();
//从迭代器中移除刚才获取的事件类型
keyIterator.remove();
//是客户端连接请求类型
if(selectionKey.isAcceptable()){
//从serverSocketChannel得到socketChannel
SocketChannel socketChannel = serverSocketChannel.accept();
//设置为非阻塞的
socketChannel.configureBlocking(false);
//将socketChannel注入到selector读取事件key中
socketChannel.register(selector, SelectionKey.OP_READ);
log.info("客户端已经连接成功。。。");
}
//监听到了读事件
if(selectionKey.isReadable()){
//从selectKey中获取socketChannel
SocketChannel channel = (SocketChannel)selectionKey.channel();
//创建字节流缓冲区
ByteBuffer buffer = ByteBuffer.allocate(1024);
//将socketChannel中的数据读取到缓冲区中
int len = channel.read(buffer);
if(len != -1) {
String result = new String(buffer.array(), 0, len);
log.info("客户端发送的消息为: msg = {}", result);
}else{
channel.close();
selectionKey.cancel();
log.info("客户端已经断开连接了。。。");
}
}
}
}
}
}

SocketChannel代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
package pers.xiaotian.ioproject.nio;

import lombok.extern.slf4j.Slf4j;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Scanner;

@Slf4j
public class SocketChannelTest {
public static void main(String[] args) throws IOException, InterruptedException {
//创建一个socketChannel客户端
SocketChannel socketChannel = SocketChannel.open();
//设置通道为非阻塞的
socketChannel.configureBlocking(false);
//创建Selector
Selector selector = Selector.open();
//将socketChannel注册到selector中,事件为连接事件
socketChannel.register(selector, SelectionKey.OP_CONNECT);
//向服务器发送连接
socketChannel.connect(new InetSocketAddress(6666));
//等待1s,保证socketChannel能和serverSocket完成连接
Thread.sleep(1000);
//判断是否已经完成连接
if (socketChannel.finishConnect()) {
while(true) {
//获取键盘事件
log.info("阻塞等待获取键盘输入。。。");
Scanner scanner = new Scanner(System.in);
if (scanner.hasNext()) {
//获取键盘输入
String msg = scanner.next();
//将msg放入ByteBuffer中,并将buffer写入到socketChannel中
socketChannel.register(selector, SelectionKey.OP_READ);
// //创建字节缓冲区
// ByteBuffer byteBuffer = ByteBuffer.allocate(msg.getBytes().length);
// //将消息放入字节缓冲区
// byteBuffer.put(msg.getBytes());
// //重置position位置,为后续写入socketChannel
// byteBuffer.flip();
// //将byteBuffer的字节数组写入socketChannel
// socketChannel.write(byteBuffer);
//推荐使用该方法创建字符缓冲区并包装字节数据,然后写入到socketChannel中
socketChannel.write(ByteBuffer.wrap(msg.getBytes()));
log.info("发送完成。。。");
}
}
} else {
System.exit(1);
}
}
}

运行结果,如图:

client发送消息:

server接收消息:

总结:

服务端通过Selector组件监听事件的方式,用同一个线程完成对不同客户端请求及不同事件类型的处理.

NIO相关组件: Selector、SelectorKey、SocketChannel、ServerSocketChannel、ByteBuffer

零拷贝

1.普通传输

数据传输会从硬盘通过DMA拷贝到内核态(kernel state),内核态数据经过CPU拷贝到用户态(user state),然后用户态的数据经过CPU拷贝到内核态,内核态的数据又经过DMA拷贝到网卡或者目标磁盘位置.这样就经过了四次拷贝以及四次状态的切换(每次CPU拷贝都会有相应的读入和写出的切换).如下图所示:

2.针对上面需要四次拷贝和四次用户态和内核态的切换,下面有两种解决方案可以进行优化:

mmap(共享内存技术)

通过用户缓冲区共享的方式,减少了一次CPU COPY的过程,CPU COPY可以直接从源内核缓冲区拷贝到目标内核缓冲区,具体流程见下图:

四次状态切换

三次拷贝

sendfile(零拷贝技术)

linux 2.1时,通过sendfile技术减少了两次用户态与内核态之间的切换.如图:

linux 2.4时,源内核缓冲区将少量的地址信息,数据大小等拷贝到目标内核缓冲区,其它数据直接从源内核缓冲区直接通过DMA COPY到目标网卡上.从而又减少了一次CPU拷贝.如图:

扩展:linux的splice()技术

代码实现:

1.普通的IO流

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
package pers.xiaotian.ioproject.zerocopy;

import lombok.extern.slf4j.Slf4j;

import java.io.*;
import java.net.Socket;
import java.util.Scanner;

@Slf4j
public class CommonIOTest {
public static void main(String[] args) throws IOException {
long startTime = System.currentTimeMillis();
File target = new File("2.zip");
if(!target.exists()){
target.createNewFile();
}
FileOutputStream fileOutputStream = new FileOutputStream(target);
FileInputStream fileInputStream = new FileInputStream("1.zip");
int len;
byte[] b = new byte[4096];
while((len = fileInputStream.read(b))!=-1){
fileOutputStream.write(b, 0, len);
}
log.info("耗费时间:time = {}", System.currentTimeMillis()-startTime);



}
}

2.MMAP,通过MappedByteBuffer实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
package pers.xiaotian.ioproject.zerocopy;

import lombok.extern.slf4j.Slf4j;

import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.StandardOpenOption;

@Slf4j
public class MMapIOTest {
public static void main(String[] args) throws IOException {
long startTime = System.currentTimeMillis();
FileChannel sourceChannel = new FileInputStream("1.zip").getChannel();
MappedByteBuffer readBuffer = sourceChannel.map(FileChannel.MapMode.READ_ONLY, 0, sourceChannel.size());
File target = new File("2.zip");
if(!target.exists()){
target.createNewFile();
}
FileChannel targetChannel = FileChannel.open(target.toPath(), StandardOpenOption.READ,
StandardOpenOption.WRITE, StandardOpenOption.TRUNCATE_EXISTING);
MappedByteBuffer targetBuffer = targetChannel.map(FileChannel.MapMode.READ_WRITE, 0, sourceChannel.size());
targetBuffer.put(readBuffer);
targetBuffer.force();
log.info("耗费时间:time = {}", System.currentTimeMillis()-startTime);
}
}

3.sendfile,通过FileChannel的transferTo方法实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
package pers.xiaotian.ioproject.zerocopy;

import lombok.extern.slf4j.Slf4j;

import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.channels.FileChannel;

@Slf4j
public class SendFileIOTest {
public static void main(String[] args) throws IOException {
long startTime = System.currentTimeMillis();
File target = new File("2.zip");
if(!target.exists()){
target.createNewFile();
}
FileOutputStream fileOutputStream = new FileOutputStream(target);
FileChannel targetChannel = fileOutputStream.getChannel();
FileInputStream fileInputStream = new FileInputStream("1.zip");
FileChannel sourceChannel = fileInputStream.getChannel();


sourceChannel.transferTo(0, sourceChannel.size(), targetChannel);
log.info("耗费时间:time = {}", System.currentTimeMillis()-startTime);



}
}

执行结果如图:

源文件大小如图:

Reactor模型

具体可以分为: 单reactor单线程模型,单reactor多线程模型,多reactor多线程模型

单reactor单线程模型:

单reactor多线程模型:

多reactor多线程模型:

NIO-netty的Reactor模型就是采用主从Reactor架构设计的,具体架构图如下:

Netty代码简单示例

引入netty的jar包

1
2
3
4
5
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.86.Final</version>
</dependency>

serverBootstrap代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
package pers.xiaotian.ioproject.netty;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.ServerSocketChannel;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;

public class NettyServerSocket {
public static void main(String[] args) throws InterruptedException {
//bossGroup 负责处理accept事件
NioEventLoopGroup bossGroup = new NioEventLoopGroup();
//workGroup 负责处理读写事件
NioEventLoopGroup workGroup = new NioEventLoopGroup();

//创建一个serverSocket服务
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workGroup) //注册EventLoopGroup
.channel(NioServerSocketChannel.class) //标识channel类型
.childHandler(new ChannelInitializer<SocketChannel>() { //创建serverSocketChannel
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new ServerHandler()); //添加对应handler
}
})
.option(ChannelOption.SO_BACKLOG, 128);

ChannelFuture cf = serverBootstrap.bind(6666).sync(); //启动服务端

cf.channel().closeFuture().sync(); //关闭断开的管道监听


}
}

ServerHandler代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
package pers.xiaotian.ioproject.netty;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
import lombok.extern.slf4j.Slf4j;

import java.net.SocketAddress;

@Slf4j
public class ServerHandler extends ChannelInboundHandlerAdapter {

@Override
//建立连接
public void channelActive(ChannelHandlerContext ctx) throws Exception {
SocketAddress client = ctx.channel().remoteAddress();
log.info("一个客户端正在建立连接,client = {}", client);
ctx.channel().writeAndFlush(Unpooled.copiedBuffer("你好, 我是服务端!", CharsetUtil.UTF_8));
}

//获取读消息方法
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//获取ByteBuf对象
ByteBuf byteBuf = (ByteBuf) msg;
String result = byteBuf.toString(CharsetUtil.UTF_8);
log.info("收到客户端的消息, client = {}: result = {}", ctx.channel().remoteAddress(), result);
}

@Override
//读消息处理完成后方法
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
String msg = "hello, 消息已收到";
log.info("msg is complete, msg = {}", msg);
}

@Override
//异常后处理方法
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
log.error("方法异常,原因为 e = {}", cause);
}
}

clientBootstrap代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
package pers.xiaotian.ioproject.netty;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

public class NettyClientSocket {

public static void main(String[] args) throws InterruptedException {
Bootstrap clientBootstrap = new Bootstrap();
EventLoopGroup workerGroup = new NioEventLoopGroup();
Bootstrap worker = clientBootstrap.group(workerGroup)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new ClientHandler());
}
});

ChannelFuture cf = worker.connect("127.0.0.1", 6666).sync();

cf.channel().closeFuture();
}
}

ClientHandler代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
package pers.xiaotian.ioproject.netty;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class ClientHandler extends ChannelInboundHandlerAdapter {

//获取读消息方法
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//获取ByteBuf对象
ByteBuf byteBuf = (ByteBuf) msg;
String result = byteBuf.toString(CharsetUtil.UTF_8);
log.info("收到服务端的消息, client = {}: result = {}", ctx.channel().remoteAddress(), result);

String sendMsg = "你好 我是客户端";
ctx.channel().writeAndFlush(Unpooled.copiedBuffer(sendMsg, CharsetUtil.UTF_8));
}


@Override
//异常后处理方法
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
log.error("方法异常,原因为 e = {}", cause);
}
}

执行结果,如图:

服务端输出:

客户端输出:

Client1输出:

Client2输出:

Netty代码的HTTP请求示例

主要使用netty的HttpServerCodec组件进行http协议的编码和解码处理

netty服务端代码如下:

nettyServer代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
package pers.xiaotian.ioproject.netty.http;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.http.HttpServerCodec;

public class NettyHttpServerSocket {

public static void main(String[] args) throws Exception{
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workGroup = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 128)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new HttpServerCodec())//重要步骤,加入httpserver的编码和解码的handler
.addLast(new MyHttpHandler());
}
});
ChannelFuture channelFuture = serverBootstrap.bind(6670).sync();

channelFuture.channel().closeFuture().sync();
}finally {
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
}
}

自定义handler代码(传输的数据对象使用HttpObject组件):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
package pers.xiaotian.ioproject.netty.http;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.*;
import io.netty.util.CharsetUtil;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class MyHttpHandler extends SimpleChannelInboundHandler<HttpObject> {

@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, HttpObject s) throws Exception {
System.out.println("==========================");
if(s instanceof HttpRequest) {
log.info("服务器收到的信息, s= {}", s);
ByteBuf byteBuf = Unpooled.copiedBuffer("你好,我是nettyHttpServer", CharsetUtil.UTF_8);
DefaultHttpResponse defaultHttpResponse = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, byteBuf);
defaultHttpResponse.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain;Charset=utf-8");
defaultHttpResponse.headers().set(HttpHeaderNames.CONTENT_LENGTH, byteBuf.readableBytes());
channelHandlerContext.channel().writeAndFlush(defaultHttpResponse);
}
}
}

运行结果如下:

启动服务端代码:

浏览器显示结果:

Netty代码的心跳机制请求示例

主要使用Netty的IdleStateHandler组件完成,通过重写ChannelInboundHandler的userEventTriggered完成

netty服务端代码如下:

nettyServer代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
package pers.xiaotian.ioproject.netty.heartbeat;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.timeout.IdleStateHandler;

import java.util.concurrent.TimeUnit;

public class NettyHeartBeatServer {

public static void main(String[] args) throws InterruptedException {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
nioSocketChannel.pipeline()
//主要步骤,添加IdleStateHandler,传入读间隔时间,写间隔时间,读写间隔使劲啊,以及时间单位来完成
.addLast(new IdleStateHandler(3,5,7, TimeUnit.SECONDS))
.addLast(new MyHeartBeatHandler());
}
});
ChannelFuture channelFuture = serverBootstrap.bind(6666).sync();
channelFuture.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}

自定义handler代码(通过重写userEventTriggered方法来完成):

IdleStateEvent对应监听读事件,写事件,读写事件来完成,具体代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
package pers.xiaotian.ioproject.netty.heartbeat;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.timeout.IdleStateEvent;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class MyHeartBeatHandler extends ChannelInboundHandlerAdapter {
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if(evt instanceof IdleStateEvent){

IdleStateEvent idleStateEvent = (IdleStateEvent) evt;
String stateString = "";
switch (idleStateEvent.state()) {
case READER_IDLE:
stateString = "读超时";
break;
case WRITER_IDLE:
stateString = "写超时";
break;
case ALL_IDLE:
stateString = "读写超时";
break;
}
log.info("服务器输出事件结果: stateString = {}", stateString);
}
}
}

启动服务端,执行结果如下:

启动客户端,执行结果如下:

启动完成后,观察服务端的响应事件,如图:

Netty各客户端聊天代码示例

主要通过ChannelGroup进行客户端消息的转发,重写ChannelInboundHandlerAdapter的handlerAdded和channelRead方法.

ChannelGroup组件初始化时需要使用GlobalEventExecutor的实例

nettyServer代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
package pers.xiaotian.ioproject.netty.chatroom;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

public class NettyChatServer {
public static void main(String[] args) throws InterruptedException {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_KEEPALIVE, true)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
nioSocketChannel.pipeline().addLast(new ChatRoomHandler());
}
});
ChannelFuture channelFuture = serverBootstrap.bind(6677).sync();
channelFuture.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}

serverHandler代码示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
package pers.xiaotian.ioproject.netty.chatroom;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;

import io.netty.util.CharsetUtil;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.GlobalEventExecutor;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class ChatRoomHandler extends ChannelInboundHandlerAdapter {
private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
log.info("channel = {}", channel);
channelGroup.writeAndFlush(Unpooled.copiedBuffer(("客户端"+channel.remoteAddress()+", 上线了").getBytes()));
channelGroup.add(channel);
}

@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
log.info("channel = {}", channel);
channelGroup.writeAndFlush(Unpooled.copiedBuffer(("客户端"+channel.remoteAddress()+", 下线了").getBytes()));
}

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
log.info("客户端"+channel.remoteAddress()+", 上线了");

}

@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
log.info("客户端"+channel.remoteAddress()+", 下线了");
}

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
Channel self = ctx.channel();
log.info("channel = {}, msg = {}", self, msg.toString());
for(Channel channel: channelGroup){
if(self == channel){
channel.writeAndFlush(Unpooled.copiedBuffer(("自己说:"+((ByteBuf)msg).toString(CharsetUtil.UTF_8)).getBytes()));
}else{
channel.writeAndFlush(Unpooled.copiedBuffer(("客户端"+self.remoteAddress()+"说:"+((ByteBuf) msg).toString(CharsetUtil.UTF_8)).getBytes()));
}
}
}
}

NettyClient代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
package pers.xiaotian.ioproject.netty.chatroom;

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.CharsetUtil;
import lombok.extern.slf4j.Slf4j;

import java.util.Scanner;

@Slf4j
public class NettyChatClient {
public static void main(String[] args) throws InterruptedException {
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
nioSocketChannel.pipeline().addLast(new ClientChatHandler());
}
});
ChannelFuture channelFuture = bootstrap.connect("localhost", 6677).sync();
log.info("客户端"+channelFuture.channel().localAddress()+"启动了!");
Scanner scanner = new Scanner(System.in);
while(scanner.hasNextLine()){
String next = scanner.nextLine();

channelFuture.channel().writeAndFlush(Unpooled.copiedBuffer(next.getBytes(CharsetUtil.UTF_8)));
}
} finally {
group.shutdownGracefully();

}
}
}

ClientHandler代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
package pers.xiaotian.ioproject.netty.chatroom;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class ClientChatHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf byteBuf = (ByteBuf) msg;
log.info("服务端发送的消息:byteBuf = {}", byteBuf.toString(CharsetUtil.UTF_8));

}
}

执行结果如下:

启动server端:

启动两个客户端服务:

对应两个客户端分别发送聊天消息和接收到的消息记录,如图:

断开其中一个客户端,另外一个客户端可以接收到监听事件,如图:

如果出现问题,建议重新换个服务端口绑定再进行测试一次!!!

Netty建立WebSocket连接代码示例

Netty的WebSocket连接是基于http请求的,所以需要使用HttpServerCodec(http的编解码handler)、ChunkedWritedHandler(块)、HttpObjectAggregator(分段,聚合段)、WebSocketServerProtocolHandler(webSocket服务端协议处理器)

nettyServer代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
package pers.xiaotian.ioproject.netty.websocket;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.stream.ChunkedWriteHandler;

public class NettyWebSocketServer {

public static void main(String[] args) throws InterruptedException {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try{
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new HttpServerCodec())//http编解码handler
.addLast(new ChunkedWriteHandler())//块写handler
.addLast(new HttpObjectAggregator(8196))//分段,聚合handler
.addLast(new WebSocketServerProtocolHandler("/hello"))//ws协议handler
.addLast(new MyWebSocketHandler());
}
});
ChannelFuture channelFuture = serverBootstrap.bind(6678).sync();
channelFuture.channel().closeFuture().sync();
}finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}

自定义handler(使用TextWebSocketFrame对象传输)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
package pers.xiaotian.ioproject.netty.websocket;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import lombok.extern.slf4j.Slf4j;

import java.text.SimpleDateFormat;
import java.time.LocalDateTime;

@Slf4j
public class MyWebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {

@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, TextWebSocketFrame textWebSocketFrame) throws Exception {
log.info("服务器收到的消息:"+textWebSocketFrame.text());
String mes = "服务器收到的消息: " + textWebSocketFrame.text();
channelHandlerContext.channel().writeAndFlush(new TextWebSocketFrame(mes));
}

@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
log.info("客户端"+ctx.channel().id().asLongText()+", 连接被建立了!");

}

@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
log.info("客户端"+ctx.channel().id().asLongText()+", 连接被关闭了!");
}
}

客户端html代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Title</title>
</head>
<body>
<script>
var socket;
if(window.WebSocket){
socket = new WebSocket("ws://localhost:6678/hello");

socket.onopen = function (ev){
var rt = document.getElementById("responseText");
rt.value = "连接建立成功! ";
}

socket.onmessage = function (ev){
var rt = document.getElementById("responseText");
rt.value = rt.value + "\n" + ev.data;
}

socket.onclose = function (ev){
var rt = document.getElementById("responseText");
rt.value = rt.value + "\n" + "断开连接成功";
}
}else {
alert("该浏览器不支持websocket服务!");
}

function send(mes){
alert(mes);
if(window.socket){
if(socket.readyState == WebSocket.OPEN){
socket.send(mes);
}

}

}
</script>
<form onsubmit="return false">
<textarea name = "message" style="height: 300px;width: 300px">

</textarea>
<input type="button" value="发送消息" onclick="send(form.message.value)">
<textarea id = "responseText" style="height: 300px;width: 300px">

</textarea>
<input type="button" value="清空消息" onclick="document.getElementById('response').value=''">
</form>

</body>
</html>

启动服务端:

启动html页面:

页面输入消息,点击发送,页面接收到消息返回,如图:

主要是通过建立websocket连接后,连接管道一直可以处于通信状态.

关闭页面,服务端看到断开websocket连接的信息,如图:

NettyServer基于ProtoBuf数据交换示例

原因:

ProtoBuf是跨语言的通信协议、数据存储

它比xml比较,小(310倍),快(20100倍)、更为简单

具体参见说明:

https://www.fdevops.com/2020/12/30/protobuf-19380

https://www.tizi365.com/archives/367.html

https://www.cnblogs.com/xy-ouyang/p/12825132.html

是guava的通信协议工具

项目集成ProtoBuf步骤

引入pom

1
2
3
4
5
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>3.21.12</version>
</dependency>

protobuf工具包安装:

github地址:

https://github.com/protocolbuffers/protobuf/releases

找到osx-aarch_64版本,如图:

本地下载的目录位置: ~/办公软件安装包/protoc-22.3-osx-aarch_64.zip

运行的安装包目录位置:

~/workapp/protoc-22.3-osx-aarch_64/bin/protoc

可以使用一下指令检测是否可运行

1
~/workapp/protoc-22.3-osx-aarch_64/bin/protoc --version

如图所示,protoc命令可以正常运行:

如果出现“无法打开 protoc 因为无法验证开发者”的弹窗错误,需要在隐私目录下放开权限即可,如图:

创建pojo对象代码

1
2
3
4
5
6
syntax = "proto3";

message ResponsePojo{
string name = 1;
int32 age = 2;
}

将该文件放到~/workapp/protoc-22.3-osx-aarch_64/bin/protoc的同级目录,执行

1
./protoc --java_out=. Response.proto

执行完成后,该目录下会生成Response.java文件

将Response文件拷贝到idea对应目录下.

NettyServer代码,添加ProtobufDecoder组件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
package pers.xiaotian.ioproject.netty.protobuf;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.protobuf.ProtobufDecoder;

public class NettyServerSocket {
public static void main(String[] args) throws InterruptedException {
//bossGroup 负责处理accept事件
NioEventLoopGroup bossGroup = new NioEventLoopGroup();
//workGroup 负责处理读写事件
NioEventLoopGroup workGroup = new NioEventLoopGroup();

//创建一个serverSocket服务
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workGroup) //注册EventLoopGroup
.channel(NioServerSocketChannel.class) //标识channel类型
.childHandler(new ChannelInitializer<SocketChannel>() { //创建serverSocketChannel
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {

socketChannel.pipeline()
.addLast(new ProtobufDecoder(Response.ResponsePojo.getDefaultInstance()))// 服务端添加netty的protobuf解码器
.addLast(new ServerHandler()); //添加对应handler
}
})
.option(ChannelOption.SO_BACKLOG, 128);

ChannelFuture cf = serverBootstrap.bind(6666).sync(); //启动服务端

cf.channel().closeFuture().sync(); //关闭断开的管道监听


}
}

ServerHandler代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
package pers.xiaotian.ioproject.netty.protobuf;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import lombok.extern.slf4j.Slf4j;


@Slf4j
public class ServerHandler extends ChannelInboundHandlerAdapter {

//获取读消息方法
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//获取ByteBuf对象
Response.ResponsePojo pojo = (Response.ResponsePojo) msg;
log.info("收到客户端的消息, pojo.name = {}, pojo.age = {}", pojo.getName(), pojo.getAge());
}

@Override
//异常后处理方法
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
log.error("方法异常,原因为 e = {}", cause);
}
}

NettyClient代码,添加ProtobufEncoder组件:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package pers.xiaotian.ioproject.netty.protobuf;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.protobuf.ProtobufEncoder;

public class NettyClientSocket {

public static void main(String[] args) throws InterruptedException {
Bootstrap clientBootstrap = new Bootstrap();
EventLoopGroup workerGroup = new NioEventLoopGroup();
Bootstrap worker = clientBootstrap.group(workerGroup)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline()
.addLast(new ProtobufEncoder())//客户端添加netty的protobuf编码器
.addLast(new ClientHandler());
}
});

ChannelFuture cf = worker.connect("127.0.0.1", 6666).sync();

cf.channel().closeFuture();
}
}

ClientHandler代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
package pers.xiaotian.ioproject.netty.protobuf;

import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class ClientHandler extends ChannelInboundHandlerAdapter {

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
Response.ResponsePojo pojo = Response.ResponsePojo.newBuilder().setAge(20).setName("张三").build();
ctx.channel().writeAndFlush(Unpooled.copiedBuffer(pojo.toByteArray()));
}



@Override
//异常后处理方法
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
log.error("方法异常,原因为 e = {}", cause);
}
}

执行结果如下:

启动server服务

启动client服务:

然后查看服务端控台,发现Response对象服务端正常接收了

NettyChannel的输入输出pipeline理解

网络传输都是以二进制流的形式进行传输的,所以pipeline对应输出需要编码器,输入需要解码器.

以上面的ProtoBufEncoder和ProtoBufDecoder为例,如图:

Netty网络传输的沾包和拆包的问题

客户端发送时,可能会将多个包一起传输发送,服务端接收时,不知道客户端传输对应的数据包的问题,如图所示:

NettyServer代码示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
package pers.xiaotian.ioproject.netty.tcppackage;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;


public class NettyClientSocket {

public static void main(String[] args) throws InterruptedException {
Bootstrap clientBootstrap = new Bootstrap();
EventLoopGroup workerGroup = new NioEventLoopGroup();
Bootstrap worker = clientBootstrap.group(workerGroup)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline()
.addLast(new ClientHandler());
}
});

ChannelFuture cf = worker.connect("127.0.0.1", 6666).sync();

cf.channel().closeFuture();
}
}

ServerHandler示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
package pers.xiaotian.ioproject.netty.tcppackage;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
import lombok.extern.slf4j.Slf4j;


@Slf4j
public class ServerHandler extends ChannelInboundHandlerAdapter {

private int count;
//获取读消息方法
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//获取ByteBuf对象
ByteBuf byteBuf = (ByteBuf) msg;
int length = byteBuf.readableBytes();
byte[] bytes = new byte[length];
byteBuf.readBytes(bytes);
log.info("收到客户端的消息, response = {}, count = {}", new String(bytes,CharsetUtil.UTF_8), ++count);
}

@Override
//异常后处理方法
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
log.error("方法异常,原因为 e = {}", cause);
}
}

NettyClient代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
package pers.xiaotian.ioproject.netty.tcppackage;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;


public class NettyClientSocket {

public static void main(String[] args) throws InterruptedException {
Bootstrap clientBootstrap = new Bootstrap();
EventLoopGroup workerGroup = new NioEventLoopGroup();
Bootstrap worker = clientBootstrap.group(workerGroup)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline()
.addLast(new ClientHandler());
}
});

ChannelFuture cf = worker.connect("127.0.0.1", 6666).sync();

cf.channel().closeFuture();
}
}

ClientHandler代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
package pers.xiaotian.ioproject.netty.tcppackage;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class ClientHandler extends ChannelInboundHandlerAdapter {

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
for(int i=0; i<10; i++){
ByteBuf byteBuf = Unpooled.copiedBuffer("客户端发送消息" + i, CharsetUtil.UTF_8);
ctx.writeAndFlush(byteBuf);
}
}



@Override
//异常后处理方法
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
log.error("方法异常,原因为 e = {}", cause);
}
}

运行结果如下

先启动server端,如图:

启动多个client端,观察server端接收情况,如图:

可以看到服务端收到的包的数量是不一样的,有1,2,3,5次包

解决方案:

可以通过自定义传输协议和编解码handler解决

代码示例如下:

自定义传输协议对象

1
2
3
4
5
6
7
8
9
10
package pers.xiaotian.io;

import lombok.Data;

@Data
public class MyProtocol {
private int len;
private byte[] content;
}

自定义编码器继承netty的MessageToByteEncoder组件

1
2
3
4
5
6
7
8
9
10
11
12
13
package pers.xiaotian.io;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;

public class MyProtocolEncoder extends MessageToByteEncoder<MyProtocol> {
protected void encode(ChannelHandlerContext channelHandlerContext, MyProtocol myProtocol, ByteBuf byteBuf) throws Exception {
byteBuf.writeInt(myProtocol.getLen()); //先写入传输对象体字节长度
byteBuf.writeBytes(myProtocol.getContent()); //在传入传输对象体字节内容
}
}

自定义解码器继承netty的ByteToMessageDecoder组件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
package pers.xiaotian.io;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;

import java.util.List;

public class MyProtocolDecoder extends ByteToMessageDecoder {
protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
//先读入字节内容长度,在根据字节内容长度创建byte数组,然后封装自定义协议对象到list
int len = byteBuf.readInt();
byte[] bytes = new byte[len];
byteBuf.readBytes(bytes);
MyProtocol myProtocol = new MyProtocol();
myProtocol.setLen(len);
myProtocol.setContent(bytes);
list.add(myProtocol);

}
}

定义客户端处理器给服务端发送消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
package pers.xiaotian.io;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;

public class ClientHandler extends ChannelInboundHandlerAdapter {

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
for(int i=0; i<10; i++){
byte[] content = ("你好,我是客户端"+i).getBytes(CharsetUtil.UTF_8);
MyProtocol myProtocol = new MyProtocol();
myProtocol.setContent(content);
myProtocol.setLen(content.length);
ctx.writeAndFlush(myProtocol);
}
}
}

定义服务端处理器用于接收消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
package pers.xiaotian.io;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class ServerHandler extends ChannelInboundHandlerAdapter {

private int count;

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
MyProtocol myProtocol = (MyProtocol) msg;
String content = new String(myProtocol.getContent(), CharsetUtil.UTF_8);
log.info("服务端收到消息,content = {}, count = {}", content, ++count);
}
}

NettyClient代码,加入自定义编码器和处理器到pipeline组件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
package pers.xiaotian.io;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;


public class NettyClient {

public static void main(String[] args) throws InterruptedException {
EventLoopGroup group = new NioEventLoopGroup();
try{
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<NioSocketChannel>() {
protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
nioSocketChannel.pipeline()
.addLast(new MyProtocolEncoder()) //编码器
.addLast(new ClientHandler()); //客户端处理器
}
});

ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 7777).sync();
channelFuture.channel().closeFuture().sync();
}finally {
group.shutdownGracefully();
}
}
}

NettyServer端代码,自定义解码器和处理器到pipeline

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package pers.xiaotian.io;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

public class NettyServer {

public static void main(String[] args) throws InterruptedException {
EventLoopGroup boss = new NioEventLoopGroup(1);
EventLoopGroup worker = new NioEventLoopGroup();
try{
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(boss, worker)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
nioSocketChannel.pipeline()
.addLast(new MyProtocolDecoder()) //添加一个解码处理器
.addLast(new ServerHandler()); //添加一个server处理器
}
});
ChannelFuture channelFuture = serverBootstrap.bind(7777).sync();
channelFuture.channel().closeFuture().sync();
}finally {
boss.shutdownGracefully();
worker.shutdownGracefully();
}
}
}

运行代码:

启动服务端代码:

启动多个客户端,观察服务端接收结果,如图:

通过自定义协议+编解码处理器,就可以解决服务端拆包的问题了!

NettyRpc相关代码实现

1.定义rpc调用接口

1
2
3
4
5
6
7
package pers.xiaotian.ioproject.netty.rpc.rpcinterface;

public interface HelloService {

String sayHello(String param);
}

2.服务提供者对应代码

1
2
3
4
5
6
7
8
9
10
11
12
package pers.xiaotian.ioproject.netty.rpc.provider;

import pers.xiaotian.ioproject.netty.rpc.rpcinterface.HelloService;

public class HelloServiceImpl implements HelloService {

@Override
public String sayHello(String param) {
return "你好!"+param;
}
}

1
2
3
4
5
6
7
8
9
10
11
12
13
package pers.xiaotian.ioproject.netty.rpc.provider;

import lombok.extern.slf4j.Slf4j;
import pers.xiaotian.ioproject.netty.rpc.netty.ServerNetty;

@Slf4j
public class ServerBootstrap {
public static void main(String[] args) {
ServerNetty.start("127.0.0.1", 7779);

}
}

3.服务消费者代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
package pers.xiaotian.ioproject.netty.rpc.consumer;

import lombok.extern.slf4j.Slf4j;
import pers.xiaotian.ioproject.netty.rpc.netty.ClientNetty;
import pers.xiaotian.ioproject.netty.rpc.rpcinterface.HelloService;

@Slf4j
public class ClientBootstrap {
static String provideName = "HelloService#sayHello#";
public static void main(String[] args) {
ClientNetty clientNetty = new ClientNetty();

HelloService bean = (HelloService) clientNetty.getBean(HelloService.class, provideName);

String response = bean.sayHello("你好!我是客户端");

log.info("客户端收到的响应,response = {}", response);

}
}

4.对应netty通信服务代码

nettyServer代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
package pers.xiaotian.ioproject.netty.rpc.netty;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class ServerNetty {

public static void start(String host, int port){
serverStart0(host, port);
}

private static void serverStart0(String host, int port){
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workGroup = new NioEventLoopGroup();
try{
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline()
.addLast(new StringDecoder())
.addLast(new StringEncoder())
.addLast(new ServerHandler());
}
});
ChannelFuture channelFuture = serverBootstrap.bind(host, port).sync();
log.info("provider start ...");
channelFuture.channel().closeFuture().sync();
}catch (Exception e){
log.error("serverStart0 error, e = {}", e);
}finally {
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
}
}

serverHandler代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
package pers.xiaotian.ioproject.netty.rpc.netty;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import lombok.extern.slf4j.Slf4j;
import pers.xiaotian.ioproject.netty.rpc.provider.HelloServiceImpl;

@Slf4j
public class ServerHandler extends ChannelInboundHandlerAdapter {

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
String result = (String) msg;
log.info("服务端收到消息, msg = {}", result);
if(result.startsWith("HelloService#sayHello")){
String param = result.substring(result.lastIndexOf("#")+1);
HelloServiceImpl helloService = new HelloServiceImpl();
String response = helloService.sayHello(param);
ctx.writeAndFlush(response);
}
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
log.error("exceptionCaught! cause = {}", cause);
ctx.close();
}
}

nettyClient代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
package pers.xiaotian.ioproject.netty.rpc.netty;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import lombok.extern.slf4j.Slf4j;

import java.lang.reflect.Proxy;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

@Slf4j
public class ClientNetty {

private static ExecutorService executorService = Executors.newCachedThreadPool();

private static ClientHandler clientHandler;

public Object getBean(final Class<?> serviceClass, final String provideName){
return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), new Class<?>[]{ serviceClass},
(proxy, method, args) -> {
try {
log.info("代理方法被调用了, method = {}", method);
if (clientHandler == null) {
connect("127.0.0.1", 7779);
}
clientHandler.setParam(provideName + "#" + args[0]);
return executorService.submit(clientHandler).get();
}catch (Exception e){
log.error("代理方法出错了, method = {}", method);
return null;
}
});
}


public static void connect(String host, int port){
clientHandler = new ClientHandler();
EventLoopGroup clientGroup = new NioEventLoopGroup();
//注意 这里的clientGroup不能shutdown
//channelFuture也不能close,不然就收到服务端的回调信息了
try{
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(clientGroup)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline()
.addLast(new StringDecoder())
.addLast(new StringEncoder())
.addLast(clientHandler);
}
});
bootstrap.connect(host, port).sync();
}catch (Exception e){
log.error("connect error, e = {}", e);
}
}
}

ClientHandler代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
package pers.xiaotian.ioproject.netty.rpc.netty;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.Callable;

@Slf4j
public class ClientHandler extends ChannelInboundHandlerAdapter implements Callable {

private ChannelHandlerContext context;
private String param;
private String response;

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
log.info("客户端建立连接成功。。。");
context = ctx;
}

@Override
public synchronized void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
response = (String) msg;
log.info("客户端收到服务端的响应,response = {}", response);
notify();
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
log.error("exceptionCaught cause = {}", cause);
ctx.close();
}

@Override
public synchronized Object call() throws Exception {
log.info("call被调用了。。。");
context.writeAndFlush(param);
wait();

return response;
}

public void setParam(String param) {
this.param = param;
}
}

执行结果如图:

先启动服务端代码:

在启动客户端代码:

并能正常收到服务端返回的响应信息!

注意:

1.遇到的问题,客户端的EventLoopGroup被shutdown了,channelFuture启动后又被close关闭了,导致服务端一直收不到响应信息

2.idea执行debug方法,会默认先执行代理对象的toString方法引起多余的代理调用,下图为去除debug方法的配置:

把这个toString方法去除就行.

帮助文档:

https://netty.io/ –官网

https://waylau.com/netty-4-user-guide/ –中文翻译博客

https://juejin.cn/post/6844904197129764871 –netty整合springboot

https://blog.csdn.net/weixin_41307800/article/details/124729710 –netty整合springboot

https://unclecatmyself.github.io/2020/02/07/netty01rumen/ –netty bootstrap示例参考博客