Netty入门
Netty:
- 本质:网络应用程序框架
- 实现:异步、事件驱动
- 特性:高性能、可维护、快速开发
- 用途:开发服务端和客户端
Reactor模式
- 单线程模式
- 多线程模式
- 主从线程模式
TCP粘包、半包
-
什么是粘包、半包
粘包指服务器一次接收到了多条消息,半包指一次发送的消息服务器用了多次才接收完。根本原因在于TCP是一个流式协议,消息无边界。
一个发送可能被多次接收,多个发送可能被一次接收
一个发送可能占用多个传输包,多个发送可能共用一个传输包
-
粘包的主要原因:
- 发送方每次写入数据 < 套接字缓冲区大小
- 接收方读取套接字缓冲区数据不够及时
-
半包的主要原因:
- 发送写入数据 > 套接字缓冲区大小
- 发送的数据大于协议的MTU(最大传输单元),需要拆包
-
解决办法:找出消息的边界
- TCP连接改成短链接,一个请求一个短链接。请求从连接到释放即为一条消息。效率低下不推荐。
- 固定长度,消息满足固定长度即可。空间浪费不推荐。
- 分隔符,将不同的消息之间使用特定分隔符进行分割,内容自身如果出现分隔符号需要转义。推荐
- 固定字段存储内容的长度信息,先解析出长度,再通过长度获取内容,长度上理论上有限制。推荐
-
Netty对三种常用封帧方式的支持
- 固定长度:
FixedLengthFrameDecoder - 分隔符:
DelimiterBasedFrameDecoder - 固定长度字段存储内容的长度信息:
LengthFieldBasedFrameDescder
- 固定长度:
二次编解码
请求一次解码完成的数据是字节,需要和项目中所使用的对象做转化,这层解码器可以称为二次解码器,其对应的编码器是为了将 Java对象转化为字节流方便存储或运输。
-
一次解码器
ByteToMessageDecoderio.netty.buffer.ByteBuf(原始数据流) =>io.netty.buffer.ByteBuf(用户数据) -
二次解码器
MessageToMessageDecoderio.netty.buffer.ByteBuf(用户数据) => Object对象
如果将一次解码器和二次解码器合并,直接从原始数据流转化为Object对象也是可行的,但是不建议
- 没有分层,不够清晰
- 耦合性高,不利用扩展置换方案
常用的二次编解码方式:
- Java序列化
MarshallingXMLJSONMessagePackProtobuf
选择编解码方式的要点
- 编码后的占用大小
- 编解码速度
- 是否追求可读性
Netty支持的二次编解码方式
Base64Bytes:将Netty的ByteBuf转为Java的byte[]JSONMarshallingProtobufSerialization:类似与JDK的序列化,但是信息会相对少一点String:把用户数据转换为字符串XML
Google Protobuf简介与使用
Protobuf 是一个灵活高效的用于序列化数据的协议,相较于 XML、JSON格式,Protobuf编码后空间更小、编解码速度更快、使用更方便。Protobuf是跨语言的,并且自带一个编译器(protoc),只需要用它进行编译,可以自动生成代码。
Keepalive与Idle检测
需要 Keepalive的场景:
- 对端异常崩溃
- 对端在,但是处理不过来
- 对端在,但是连接不可达
如果不做 Keepalive,那么会导致连接已坏,但是还在浪费资源维持,下次直接使用会报错。
怎么设计 keepalive,以 TCP Keepalive为例(默认关闭)
# TCP连接在没有数据通过的7200秒后发送keepalive消息
net.ipv4.tcp_keepalive_time = 7200
# 如果没响应 按照每75秒一次的频率重发
net.ipv4.tcp_keepalive_intvl = 75
# 连续发送9个探测包没有响应,则判定连接失效
net.ipv4.tcp_keepalive_probes = 9
为什么还需要应用层 Keepalive
- 协议分层,各层关注点不同:传输层关注是否可以联通,应用层关注是否可服务。
TCP层的Keepalive默认关闭,且经过路由等中转设备Keepalive包可能被丢弃TCP层的Keepalive时间太长
Synchronized
Idle监测负责连接诊断,诊断后,做出不同的行为:
-
发送
keepalive:一般用户配合keepalive,减少keepalive消息。keepalive设计改进:v1定时发送消息 => v2 空闲监测 + 判定为Idle时才发送keepalive- v1:
keepalive消息与服务器正常消息不关联,定时就发送 - v2:有其他数据传输时,不发送
keepalive,无数据传输超过一定时间,判定为Idle后再发送keepalive
- v1:
Netty中开启 TCP Keepalive和 Idle检测
-
Server端开启TCP Keepalivebootstrap.childOption(ChannelOption.SO_KEEPALIVE, true); bootstrap.childOption(NioChannelOption.of(StandardScoketOptions.SO_KEEPALIVE), true); -
开启
Idle检测ch.pipeline().addLast("idleCheckHandler", new IdleStateHandler(0, 20, 0, TimeUint.SECONDS));
Netty中锁的使用
同步问题的核心三要素:
- 原子性
- 可见性
- 有序性
锁的分类:
- 对竞争的态度:乐观锁(
java.util.concurrent包中的原子类)、悲观锁Synchronized - 等待锁的人是否公平而言:公平锁
new ReentrantLock(true)和非公平锁new ReentrantLock() - 是否可以共享:共享锁与独享锁:
ReadWriteLock,其读锁是共享锁、写锁是独享锁
Netty中是如何使用的锁:
-
在意锁的对象和范围 => 减少粒度:初始化
channelSynchronized method => Synchronized block -
注意锁的对象本身大小
-
注意锁的速度 => 提高并发性
-
不同的场景选择不同的并发包=> 因需而变
-
衡量好锁的价值 => 能不用则不用
Netty应用场景下:局部串行 + 整体并行 > 一个队列 + 多个线程模式- 降低开发难度、逻辑简单、提升处理性能
- 避免锁带来的上下文切换和并发保护等额外开销
Netty中的内存使用
内存使用技巧的目标:
- 内存占用少
- 应用速度快
对 Java而言:减少 Full GC(垃圾回收)的时间
Netty中的内存使用技巧:
-
减少对象本身大小:
- 能用基本类型就不要用包装类
- 应该定义成类变量的不要定义成实例变量(实例越多,浪费越多)
-
对分配内存进行预估
- 对于已经可以预知固定
size的HashMap避免扩容(可以提前计算好初始size或者直接使用)
- 对于已经可以预知固定
-
预测分配大小
Netty根据接收到的数据动态调整下个要分配的Buffer的大小
-
Zero-Copy
- 使用逻辑组合,代替实际复制
- 使用包装,代替实际复制
- 调用
Jdk的Zero-Copy接口
-
堆外内存
- 优点:破除堆空间限制,减轻
GC压力、避免复制 - 缺点:创建速度稍慢,堆外内存受操作系统管理
- 优点:破除堆空间限制,减轻
-
内存池
需要引入对象池的原因:
- 创建对象开销大
- 对象高频率创建且可复用
- 支持并发又能保护系统
- 维护、共享有限的资源
实现对象池:
- 开源实现:
Apache Commons Pool Netty轻量级实现:io.netty.util.Recycler
Netty的启动
主线:
Main thread:
- 创建
selector - 创建
server socket channel - 初始化
server socket channel - 给
server socket channel从boss group(主线程)中选择一个NioEventLoop
boss thread:
- 将
server socket channel注册到选择的NioEventLoop的selector(线程切换) - 绑定地址启动
- 注册接受连接事件(
OP_ACCEPT)到selestor上
Netty的启动流程
Selector selector = SelectorProviderImpl.openSelector();
ServerSocketChannel serverSocketChannel = provider.openServerSocketChannel();
selectionKey = javaChannel().register(eventLoop().unwrappedSelector, 0, this);
javaChannel().bind(localAddress, config.getBacklog());
// 可以接受连接了
selectionKey.interestOps(OP_ACCEPT);
Selector是在new NioEventLoopGroup()时创建的- 第一次
register并不是监听OP_ACCEPT,而是0 - 最终监听
OP_ACCEPT是通过bind完成后的fireChannelActive()来触发的 NioEventLoop是通过Register操作的执行来完成启动的- 类似
ChannelInitalizer,一些Handler可以设计成一次性的,用完就移除
构建连接
主线:
boss thread:
NioEventLoop中的selector轮询创建连接事件OP_ACCEPT- 创建
Socket Channel - 初始化
Socket Channer并从worker group中选择一个NioEventLoop
worker thread:
- 将
socket channel注册到选择的NioEventLoop的selector - 注册读事件
OP_READ到selector上
Http服务器
使用 Netty作为 Http服务器
// 使用单线程的主进程
NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
// 工作线程
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
// 业务处理类
final TaskRunnerManager manager = ManagerFactory.newRunnerManager(config);
final AbstractTaskProcessorHandler taskHandler = TaskProcessHandlerFactory.newTaskHandler();
final TaskProcessorSelector selector = new TaskProcessorSelector(manager, taskHandler, new GoTaskClient(HttpClient.newHttpClient(), config));
// Netty的启动器
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
// Info日志
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
// 添加Http编解码器
pipeline.addLast(new HttpResponseEncoder())
.addLast(new HttpRequestDecoder())
.addLast("httpAggregator", new HttpObjectAggregator(1024 * 1024))
// 添加自定义的请求处理器
.addLast(new HttpServerHandler(selector));
}
});
ChannelFuture cf = bootstrap.bind(config.getPort()).sync();
logger.debug("HTTP服务器启动成功,绑定端口:{}", config.getPort());
serverChannel = cf.channel();
请求处理器
// 继承SimpleChannelInboundHandler接口
public final class HttpServerHandler extends SimpleChannelInboundHandler<FullHttpRequest> {
private static final Logger logger = LoggerFactory.getLogger(HttpServerHandler.class);
// 业务配置
private static final String TASK_URI = "/task/issued";
private final TaskProcessorSelector selector;
public HttpServerHandler(TaskProcessorSelector selector) {
this.selector = selector;
}
// 读计算完成 刷新缓存
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
// 连接异常的处理方法
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
logger.error("连接处理异常,错误消息:{}", cause.getMessage());
}
// 请求处理方法
protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest fullHttpRequest) throws Exception {
String uri = fullHttpRequest.uri();
DefaultFullHttpResponse response;
if ("/task/issued".equals(uri)) {
if (fullHttpRequest.method().equals(HttpMethod.POST)) {
// 拿到请求参数并转为Json
ByteBuf content = fullHttpRequest.content();
String requestBody = content.toString(CharsetUtil.UTF_8);
TaskInfo taskInfo = (TaskInfo)JsonUtils.form(requestBody, TaskInfo.class);
if (taskInfo == null) {
response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST);
} else {
logger.info(taskInfo.toString());
this.selector.doSelect(taskInfo);
response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
}
} else {
response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.METHOD_NOT_ALLOWED);
}
} else {
response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.NOT_FOUND);
}
// 写入resopnse并发送
ChannelFuture future = ctx.writeAndFlush(response);
// 监听响应发送结果
future.addListener(ChannelFutureListener.CLOSE);
}
}
WebSocket客户端
String url = WS_URI.replace("{id}", UUID.randomUUID().toString());
url = url.formatted(
URLEncoder.encode(config.getNodeName(), StandardCharsets.UTF_8),
URLEncoder.encode(String.valueOf(config.getIntervals()), StandardCharsets.UTF_8),
URLEncoder.encode(String.valueOf(config.getPort()), StandardCharsets.UTF_8)
);
URI uri = new URI("ws://" + config.getAddress() + url);
// 客户端的工作线程
NioEventLoopGroup clientGroup = new NioEventLoopGroup();
// 创建websocket连接
final WebSocketClientHandshaker handshaker = WebSocketClientHandshakerFactory.newHandshaker(uri, WebSocketVersion.V13, null, true, new DefaultHttpHeaders());
// 业务类
final ControlService controlService = ControlServiceFactory.newControlService();
// websocket请求处理器
final WebSocketClientHandler handler = new WebSocketClientHandler(controlService, handshaker, config);
// netty启动器
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(clientGroup)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
// http编解码器
pipeline.addLast("http-codec", new HttpClientCodec());
pipeline.addLast("aggregator", new HttpObjectAggregator(65536));
pipeline.addLast(WebSocketClientCompressionHandler.INSTANCE);
// 添加请求处理器
pipeline.addLast("ws-handler", handler);
}
});
clientChannel = bootstrap.connect(uri.getHost(), uri.getPort()).sync().channel();
handler.handshakeFuture().sync();
WebScoket请求处理器
// 继承SimpleChannelInboundHandler接口
public class WebSocketClientHandler extends SimpleChannelInboundHandler<Object> {
private static final Logger logger = LoggerFactory.getLogger(WebSocketClientHandler.class);
private static final ScheduledExecutorService EXECUTOR = Executors.newSingleThreadScheduledExecutor();
private final ControlService controlService;
private final TaskNodeConfig config;
private WebSocketClientHandshaker handshaker;
private ChannelPromise handshakeFuture;
public WebSocketClientHandler(ControlService controlService, TaskNodeConfig config) {
this.controlService = controlService;
this.config = config;
}
public WebSocketClientHandler(ControlService controlService, WebSocketClientHandshaker handshaker, TaskNodeConfig config) {
this.controlService = controlService;
this.handshaker = handshaker;
this.config = config;
}
public void setHandshaker(WebSocketClientHandshaker handshaker) {
this.handshaker = handshaker;
}
public ChannelFuture handshakeFuture() {
return this.handshakeFuture;
}
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
this.handshakeFuture = ctx.newPromise();
}
// 连接成功处理
public void channelActive(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
this.handshaker.handshake(channel);
this.startInfoUpload(channel);
}
// 断开连接处理
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
logger.error("WebSocket服务器断开连接");
ctx.channel().closeFuture();
throw new WebSocketClientHandshakeException("服务器连接异常");
}
// 接受到消息处理
protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
Channel ch = ctx.channel();
// 判断连接是否成功
if (!this.handshaker.isHandshakeComplete()) {
try {
this.handshaker.finishHandshake(ch, (FullHttpResponse)msg);
logger.info("WebSocket Client connected!");
this.handshakeFuture.setSuccess();
} catch (WebSocketHandshakeException var6) {
logger.info("WebSocket Client failed to connect");
this.handshakeFuture.setFailure(var6);
}
} else if (msg instanceof FullHttpResponse) {
FullHttpResponse response = (FullHttpResponse)msg;
HttpResponseStatus var10002 = response.status();
throw new IllegalStateException("Unexpected FullHttpResponse (getStatus=" + var10002 + ", content=" + response.content().toString(CharsetUtil.UTF_8) + ")");
} else {
// 将消息转为WebSocketFrame消息
WebSocketFrame frame = (WebSocketFrame)msg;
if (frame instanceof TextWebSocketFrame) {
TextWebSocketFrame textWebSocketFrame = (TextWebSocketFrame)frame;
logger.debug("收到服务器回应,响应消息:{}", textWebSocketFrame.text());
} else if (frame instanceof PongWebSocketFrame) {
logger.info("Accept Server pong message");
} else if (frame instanceof CloseWebSocketFrame) {
logger.info("Websocket connect close");
ch.close();
}
}
}
// 连接异常处理
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
logger.error("Websocket连接异常,错误信息:{}", cause.getMessage());
if (!this.handshakeFuture.isDone()) {
this.handshakeFuture.setFailure(cause);
}
ctx.close();
EXECUTOR.shutdownNow();
}
// 定时任务发送消息
private void startInfoUpload(Channel channel) throws Exception {
EXECUTOR.scheduleWithFixedDelay(() -> {
if (channel.isActive()) {
ControlInfo systemInfo = this.controlService.getAllInfo();
String message = JsonUtils.to(systemInfo);
// 发送消息 使用自定义的消息发送监听器
channel.writeAndFlush(new TextWebSocketFrame(message)).addListener(new CustomChannelFutureListener());
}
}, 500L, (long)this.config.getIntervals() * 1000L, TimeUnit.MILLISECONDS);
}
}
和SpringBoot整合
在 SpringBoot中使用 Netty和普通项目的使用差不多,只不过需要将 Netty的启动和销毁和 SpringBoot的声明周期绑定,同时将对象交由 IOc进行管理。
// 继承ApplicationRunner, DisposableBean接口,实现run和destroy方法
public class BootstrapRunner implements ApplicationRunner, DisposableBean {
private static final Logger logger = LoggerFactory.getLogger(BootstrapRunner.class);
private static final String WS_URI = "/node/registration/ws/{id}?name=%s&intervals=%s&port=%s";
private final TaskNodeConfig config;
private final TaskProcessorSelector selector;
private final WebSocketClientHandler clientHandler;
private final NioEventLoopGroup bossGroup;
private final NioEventLoopGroup workerGroup;
private final NioEventLoopGroup clientGroup;
private Channel serverChannel;
private Channel clientChannel;
// 注入需要的对象
public BootstrapRunner(TaskNodeConfig config, TaskProcessorSelector selector, WebSocketClientHandler clientHandler) {
this.config = config;
this.selector = selector;
this.clientHandler = clientHandler;
this.bossGroup = config.isSingleMain() ? new NioEventLoopGroup(1) : new NioEventLoopGroup();
this.workerGroup = new NioEventLoopGroup();
this.clientGroup = new NioEventLoopGroup();
}
// run方法会在SpringBoot IOC容器注入完成后运行
@Override
public void run(ApplicationArguments args) throws Exception {
// 启动服务器和客户端
bootstrapWebsocketClient();
bootstrapHttpServer();
logger.info("Go-Task Client Success Bootstrap");
}
// destroy方法会在IOC容器销毁时运行
@Override
public void destroy() throws Exception {
// 关闭通道并停止线程池
if (serverChannel != null) {
serverChannel.close();
logger.info("关闭Http服务器");
}
if (clientChannel != null) {
clientChannel.close();
logger.info("关闭WebSocket客户端");
}
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
clientGroup.shutdownGracefully();
}
// 启动Websocket客户端
private void bootstrapWebsocketClient() throws Exception {
String url = WS_URI.replace("{id}", UUID.randomUUID().toString());
url = url.formatted(
URLEncoder.encode(config.getNodeName(), StandardCharsets.UTF_8),
URLEncoder.encode(String.valueOf(config.getIntervals()), StandardCharsets.UTF_8),
URLEncoder.encode(String.valueOf(config.getPort()), StandardCharsets.UTF_8)
);
URI uri = new URI("ws://" + config.getAddress() + url);
final WebSocketClientHandshaker handshaker = WebSocketClientHandshakerFactory.newHandshaker(uri, WebSocketVersion.V13, null, true, new DefaultHttpHeaders());
clientHandler.setHandshaker(handshaker);
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(clientGroup)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast("http-codec", new HttpClientCodec());
pipeline.addLast("aggregator", new HttpObjectAggregator(65536));
pipeline.addLast(WebSocketClientCompressionHandler.INSTANCE);
pipeline.addLast("ws-handler", clientHandler);
}
});
clientChannel = bootstrap.connect(uri.getHost(), uri.getPort()).sync().channel();
clientHandler.handshakeFuture().sync();
}
// 启动Http服务器
private void bootstrapHttpServer() throws Exception {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast(new HttpResponseEncoder())
.addLast(new HttpRequestDecoder())
.addLast("httpAggregator", new HttpObjectAggregator(1024 * 1024))
.addLast(new HttpServerHandler(selector));
}
});
ChannelFuture cf = bootstrap.bind(config.getPort()).sync();
logger.debug("HTTP服务器启动成功,绑定端口:{}", config.getPort());
serverChannel = cf.channel();
}
}






0 条评论