image

Netty高性能通信框架的简单使用

  • WORDS 18592

Netty入门

Netty

  • 本质:网络应用程序框架
  • 实现:异步、事件驱动
  • 特性:高性能、可维护、快速开发
  • 用途:开发服务端和客户端

Reactor模式

  • 单线程模式
  • 多线程模式
  • 主从线程模式

TCP粘包、半包

  • 什么是粘包、半包

    粘包指服务器一次接收到了多条消息,半包指一次发送的消息服务器用了多次才接收完。根本原因在于TCP是一个流式协议,消息无边界

    一个发送可能被多次接收,多个发送可能被一次接收

    一个发送可能占用多个传输包,多个发送可能共用一个传输包

  • 粘包的主要原因:

    • 发送方每次写入数据 < 套接字缓冲区大小
    • 接收方读取套接字缓冲区数据不够及时
  • 半包的主要原因:

    • 发送写入数据 > 套接字缓冲区大小
    • 发送的数据大于协议的MTU(最大传输单元),需要拆包
  • 解决办法:找出消息的边界

    • TCP连接改成短链接,一个请求一个短链接。请求从连接到释放即为一条消息。效率低下不推荐。
    • 固定长度,消息满足固定长度即可。空间浪费不推荐。
    • 分隔符,将不同的消息之间使用特定分隔符进行分割,内容自身如果出现分隔符号需要转义。推荐
    • 固定字段存储内容的长度信息,先解析出长度,再通过长度获取内容,长度上理论上有限制。推荐
  • Netty对三种常用封帧方式的支持

    • 固定长度:FixedLengthFrameDecoder
    • 分隔符:DelimiterBasedFrameDecoder
    • 固定长度字段存储内容的长度信息:LengthFieldBasedFrameDescder

二次编解码

请求一次解码完成的数据是字节,需要和项目中所使用的对象做转化,这层解码器可以称为二次解码器,其对应的编码器是为了将 Java对象转化为字节流方便存储或运输。

  • 一次解码器 ByteToMessageDecoder

    io.netty.buffer.ByteBuf(原始数据流) => io.netty.buffer.ByteBuf(用户数据)

  • 二次解码器 MessageToMessageDecoder

    io.netty.buffer.ByteBuf(用户数据) => Object对象

如果将一次解码器和二次解码器合并,直接从原始数据流转化为Object对象也是可行的,但是不建议

  • 没有分层,不够清晰
  • 耦合性高,不利用扩展置换方案

常用的二次编解码方式

  • Java序列化
  • Marshalling
  • XML
  • JSON
  • MessagePack
  • Protobuf

选择编解码方式的要点

  • 编码后的占用大小
  • 编解码速度
  • 是否追求可读性

Netty支持的二次编解码方式

  • Base64
  • Bytes:将 NettyByteBuf转为 Javabyte[]
  • JSON
  • Marshalling
  • Protobuf
  • Serialization:类似与 JDK的序列化,但是信息会相对少一点
  • String:把用户数据转换为字符串
  • XML

Google Protobuf简介与使用

Protobuf 是一个灵活高效的用于序列化数据的协议,相较于 XMLJSON格式,Protobuf编码后空间更小、编解码速度更快、使用更方便。Protobuf是跨语言的,并且自带一个编译器(protoc),只需要用它进行编译,可以自动生成代码。

Keepalive与Idle检测

需要 Keepalive的场景:

  • 对端异常崩溃
  • 对端在,但是处理不过来
  • 对端在,但是连接不可达

如果不做 Keepalive,那么会导致连接已坏,但是还在浪费资源维持,下次直接使用会报错。

怎么设计 keepalive,以 TCP Keepalive为例(默认关闭)

# TCP连接在没有数据通过的7200秒后发送keepalive消息
net.ipv4.tcp_keepalive_time = 7200
# 如果没响应 按照每75秒一次的频率重发
net.ipv4.tcp_keepalive_intvl = 75
# 连续发送9个探测包没有响应,则判定连接失效
net.ipv4.tcp_keepalive_probes = 9

为什么还需要应用层 Keepalive

  • 协议分层,各层关注点不同:传输层关注是否可以联通,应用层关注是否可服务。
  • TCP层的 Keepalive默认关闭,且经过路由等中转设备 Keepalive包可能被丢弃
  • TCP层的 Keepalive时间太长

Synchronized

Idle监测负责连接诊断,诊断后,做出不同的行为:

  • 发送 keepalive:一般用户配合 keepalive,减少 keepalive消息。

    keepalive设计改进:v1定时发送消息 => v2 空闲监测 + 判定为 Idle时才发送 keepalive

    • v1:keepalive消息与服务器正常消息不关联,定时就发送
    • v2:有其他数据传输时,不发送 keepalive,无数据传输超过一定时间,判定为 Idle后再发送 keepalive

Netty中开启 TCP KeepaliveIdle检测

  • Server端开启 TCP Keepalive

    bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);
    bootstrap.childOption(NioChannelOption.of(StandardScoketOptions.SO_KEEPALIVE), true);
    
  • 开启 Idle检测

    ch.pipeline().addLast("idleCheckHandler", new IdleStateHandler(0, 20, 0, TimeUint.SECONDS));
    

Netty中锁的使用

同步问题的核心三要素:

  • 原子性
  • 可见性
  • 有序性

锁的分类:

  • 对竞争的态度:乐观锁(java.util.concurrent包中的原子类)、悲观锁 Synchronized
  • 等待锁的人是否公平而言:公平锁 new ReentrantLock(true)和非公平锁 new ReentrantLock()
  • 是否可以共享:共享锁与独享锁:ReadWriteLock,其读锁是共享锁、写锁是独享锁

Netty中是如何使用的锁:

  • 在意锁的对象和范围 => 减少粒度:初始化 channel

    Synchronized method => Synchronized block

  • 注意锁的对象本身大小

  • 注意锁的速度 => 提高并发性

  • 不同的场景选择不同的并发包=> 因需而变

  • 衡量好锁的价值 => 能不用则不用

    Netty应用场景下:局部串行 + 整体并行 > 一个队列 + 多个线程模式

    • 降低开发难度、逻辑简单、提升处理性能
    • 避免锁带来的上下文切换和并发保护等额外开销

Netty中的内存使用

内存使用技巧的目标:

  • 内存占用少
  • 应用速度快

Java而言:减少 Full GC(垃圾回收)的时间

Netty中的内存使用技巧:

  • 减少对象本身大小:

    • 能用基本类型就不要用包装类
    • 应该定义成类变量的不要定义成实例变量(实例越多,浪费越多)
  • 对分配内存进行预估

    • 对于已经可以预知固定 sizeHashMap避免扩容(可以提前计算好初始 size或者直接使用)
  • 预测分配大小

    • Netty根据接收到的数据动态调整下个要分配的 Buffer的大小
  • Zero-Copy

    • 使用逻辑组合,代替实际复制
    • 使用包装,代替实际复制
    • 调用 JdkZero-Copy接口
  • 堆外内存

    • 优点:破除堆空间限制,减轻 GC 压力、避免复制
    • 缺点:创建速度稍慢,堆外内存受操作系统管理
  • 内存池

    需要引入对象池的原因:

    • 创建对象开销大
    • 对象高频率创建且可复用
    • 支持并发又能保护系统
    • 维护、共享有限的资源

    实现对象池:

    • 开源实现:Apache Commons Pool
    • Netty轻量级实现io.netty.util.Recycler

Netty的启动

主线:

Main thread

  • 创建 selector
  • 创建 server socket channel
  • 初始化 server socket channel
  • server socket channelboss group(主线程)中选择一个 NioEventLoop

boss thread

  • server socket channel注册到选择的 NioEventLoopselector(线程切换)
  • 绑定地址启动
  • 注册接受连接事件(OP_ACCEPT)到 selestor

Netty的启动流程

Selector selector = SelectorProviderImpl.openSelector();
ServerSocketChannel serverSocketChannel = provider.openServerSocketChannel();
selectionKey = javaChannel().register(eventLoop().unwrappedSelector, 0, this);
javaChannel().bind(localAddress, config.getBacklog());
// 可以接受连接了
selectionKey.interestOps(OP_ACCEPT);
  • Selector是在 new NioEventLoopGroup()时创建的
  • 第一次 register并不是监听 OP_ACCEPT,而是 0
  • 最终监听 OP_ACCEPT是通过 bind完成后的 fireChannelActive()来触发的
  • NioEventLoop是通过 Register操作的执行来完成启动的
  • 类似 ChannelInitalizer,一些 Handler可以设计成一次性的,用完就移除

构建连接

主线:

boss thread

  • NioEventLoop中的 selector轮询创建连接事件 OP_ACCEPT
  • 创建 Socket Channel
  • 初始化 Socket Channer并从 worker group中选择一个 NioEventLoop

worker thread

  • socket channel注册到选择的 NioEventLoopselector
  • 注册读事件 OP_READselector

Http服务器

使用 Netty作为 Http服务器

// 使用单线程的主进程
NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
// 工作线程
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
// 业务处理类
final TaskRunnerManager manager = ManagerFactory.newRunnerManager(config);
final AbstractTaskProcessorHandler taskHandler = TaskProcessHandlerFactory.newTaskHandler();
final TaskProcessorSelector selector = new TaskProcessorSelector(manager, taskHandler, new GoTaskClient(HttpClient.newHttpClient(), config));
// Netty的启动器
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
    .channel(NioServerSocketChannel.class)
    // Info日志
    .handler(new LoggingHandler(LogLevel.INFO))
    .childHandler(new ChannelInitializer<SocketChannel>() {
        @Override
        protected void initChannel(SocketChannel socketChannel) throws Exception {
            ChannelPipeline pipeline = socketChannel.pipeline();
            // 添加Http编解码器
            pipeline.addLast(new HttpResponseEncoder())
                .addLast(new HttpRequestDecoder())
                .addLast("httpAggregator", new HttpObjectAggregator(1024 * 1024))
                // 添加自定义的请求处理器
                .addLast(new HttpServerHandler(selector));
        }
    });
ChannelFuture cf = bootstrap.bind(config.getPort()).sync();
logger.debug("HTTP服务器启动成功,绑定端口:{}", config.getPort());
serverChannel = cf.channel();

请求处理器

// 继承SimpleChannelInboundHandler接口
public final class HttpServerHandler extends SimpleChannelInboundHandler<FullHttpRequest> {
    private static final Logger logger = LoggerFactory.getLogger(HttpServerHandler.class);
    // 业务配置
    private static final String TASK_URI = "/task/issued";
    private final TaskProcessorSelector selector;

    public HttpServerHandler(TaskProcessorSelector selector) {
        this.selector = selector;
    }

    // 读计算完成 刷新缓存
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.flush();
    }

    // 连接异常的处理方法
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        logger.error("连接处理异常,错误消息:{}", cause.getMessage());
    }

    // 请求处理方法
    protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest fullHttpRequest) throws Exception {
        String uri = fullHttpRequest.uri();
        DefaultFullHttpResponse response;
        if ("/task/issued".equals(uri)) {
            if (fullHttpRequest.method().equals(HttpMethod.POST)) {
                // 拿到请求参数并转为Json
                ByteBuf content = fullHttpRequest.content();
                String requestBody = content.toString(CharsetUtil.UTF_8);
                TaskInfo taskInfo = (TaskInfo)JsonUtils.form(requestBody, TaskInfo.class);
                if (taskInfo == null) {
                    response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST);
                } else {
                    logger.info(taskInfo.toString());
                    this.selector.doSelect(taskInfo);
                    response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
                }
            } else {
                response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.METHOD_NOT_ALLOWED);
            }
        } else {
            response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.NOT_FOUND);
        }
		// 写入resopnse并发送
        ChannelFuture future = ctx.writeAndFlush(response);
        // 监听响应发送结果
        future.addListener(ChannelFutureListener.CLOSE);
    }
}

WebSocket客户端

String url = WS_URI.replace("{id}", UUID.randomUUID().toString());
url = url.formatted(
    URLEncoder.encode(config.getNodeName(), StandardCharsets.UTF_8),
    URLEncoder.encode(String.valueOf(config.getIntervals()), StandardCharsets.UTF_8),
    URLEncoder.encode(String.valueOf(config.getPort()), StandardCharsets.UTF_8)
);
URI uri = new URI("ws://" + config.getAddress() + url);
// 客户端的工作线程
NioEventLoopGroup clientGroup = new NioEventLoopGroup();
// 创建websocket连接
final WebSocketClientHandshaker handshaker = WebSocketClientHandshakerFactory.newHandshaker(uri, WebSocketVersion.V13, null, true, new DefaultHttpHeaders());
// 业务类
final ControlService controlService = ControlServiceFactory.newControlService();
// websocket请求处理器
final WebSocketClientHandler handler = new WebSocketClientHandler(controlService, handshaker, config);
// netty启动器
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(clientGroup)
    .channel(NioSocketChannel.class)
    .handler(new ChannelInitializer<SocketChannel>() {
        @Override
        protected void initChannel(SocketChannel socketChannel) throws Exception {
            ChannelPipeline pipeline = socketChannel.pipeline();
            // http编解码器
            pipeline.addLast("http-codec", new HttpClientCodec());
            pipeline.addLast("aggregator", new HttpObjectAggregator(65536));
            pipeline.addLast(WebSocketClientCompressionHandler.INSTANCE);
            // 添加请求处理器
            pipeline.addLast("ws-handler", handler);
        }
    });
clientChannel = bootstrap.connect(uri.getHost(), uri.getPort()).sync().channel();
handler.handshakeFuture().sync();

WebScoket请求处理器

// 继承SimpleChannelInboundHandler接口
public class WebSocketClientHandler extends SimpleChannelInboundHandler<Object> {
    private static final Logger logger = LoggerFactory.getLogger(WebSocketClientHandler.class);
    private static final ScheduledExecutorService EXECUTOR = Executors.newSingleThreadScheduledExecutor();
    private final ControlService controlService;
    private final TaskNodeConfig config;
    private WebSocketClientHandshaker handshaker;
    private ChannelPromise handshakeFuture;

    public WebSocketClientHandler(ControlService controlService, TaskNodeConfig config) {
        this.controlService = controlService;
        this.config = config;
    }

    public WebSocketClientHandler(ControlService controlService, WebSocketClientHandshaker handshaker, TaskNodeConfig config) {
        this.controlService = controlService;
        this.handshaker = handshaker;
        this.config = config;
    }

    public void setHandshaker(WebSocketClientHandshaker handshaker) {
        this.handshaker = handshaker;
    }

    public ChannelFuture handshakeFuture() {
        return this.handshakeFuture;
    }

    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        this.handshakeFuture = ctx.newPromise();
    }
	// 连接成功处理
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        Channel channel = ctx.channel();
        this.handshaker.handshake(channel);
        this.startInfoUpload(channel);
    }
	// 断开连接处理
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        logger.error("WebSocket服务器断开连接");
        ctx.channel().closeFuture();
        throw new WebSocketClientHandshakeException("服务器连接异常");
    }
	// 接受到消息处理
    protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
        Channel ch = ctx.channel();
        // 判断连接是否成功
        if (!this.handshaker.isHandshakeComplete()) {
            try {
                this.handshaker.finishHandshake(ch, (FullHttpResponse)msg);
                logger.info("WebSocket Client connected!");
                this.handshakeFuture.setSuccess();
            } catch (WebSocketHandshakeException var6) {
                logger.info("WebSocket Client failed to connect");
                this.handshakeFuture.setFailure(var6);
            }

        } else if (msg instanceof FullHttpResponse) {
            FullHttpResponse response = (FullHttpResponse)msg;
            HttpResponseStatus var10002 = response.status();
            throw new IllegalStateException("Unexpected FullHttpResponse (getStatus=" + var10002 + ", content=" + response.content().toString(CharsetUtil.UTF_8) + ")");
        } else {
            // 将消息转为WebSocketFrame消息
            WebSocketFrame frame = (WebSocketFrame)msg;
            if (frame instanceof TextWebSocketFrame) {
                TextWebSocketFrame textWebSocketFrame = (TextWebSocketFrame)frame;
                logger.debug("收到服务器回应,响应消息:{}", textWebSocketFrame.text());
            } else if (frame instanceof PongWebSocketFrame) {
                logger.info("Accept Server pong message");
            } else if (frame instanceof CloseWebSocketFrame) {
                logger.info("Websocket connect close");
                ch.close();
            }

        }
    }
	// 连接异常处理
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        logger.error("Websocket连接异常,错误信息:{}", cause.getMessage());
        if (!this.handshakeFuture.isDone()) {
            this.handshakeFuture.setFailure(cause);
        }

        ctx.close();
        EXECUTOR.shutdownNow();
    }
    // 定时任务发送消息
    private void startInfoUpload(Channel channel) throws Exception {
        EXECUTOR.scheduleWithFixedDelay(() -> {
            if (channel.isActive()) {
                ControlInfo systemInfo = this.controlService.getAllInfo();
                String message = JsonUtils.to(systemInfo);
                // 发送消息 使用自定义的消息发送监听器
                channel.writeAndFlush(new TextWebSocketFrame(message)).addListener(new CustomChannelFutureListener());
            }

        }, 500L, (long)this.config.getIntervals() * 1000L, TimeUnit.MILLISECONDS);
    }
}

和SpringBoot整合

SpringBoot中使用 Netty和普通项目的使用差不多,只不过需要将 Netty的启动和销毁和 SpringBoot的声明周期绑定,同时将对象交由 IOc进行管理。

// 继承ApplicationRunner, DisposableBean接口,实现run和destroy方法
public class BootstrapRunner implements ApplicationRunner, DisposableBean {
    private static final Logger logger = LoggerFactory.getLogger(BootstrapRunner.class);
    private static final String WS_URI = "/node/registration/ws/{id}?name=%s&intervals=%s&port=%s";
    private final TaskNodeConfig config;
    private final TaskProcessorSelector selector;
    private final WebSocketClientHandler clientHandler;
    private final NioEventLoopGroup bossGroup;
    private final NioEventLoopGroup workerGroup;
    private final NioEventLoopGroup clientGroup;
    private Channel serverChannel;
    private Channel clientChannel;
    // 注入需要的对象
    public BootstrapRunner(TaskNodeConfig config, TaskProcessorSelector selector, WebSocketClientHandler clientHandler) {
        this.config = config;
        this.selector = selector;
        this.clientHandler = clientHandler;
        this.bossGroup = config.isSingleMain() ? new NioEventLoopGroup(1) : new NioEventLoopGroup();
        this.workerGroup = new NioEventLoopGroup();
        this.clientGroup = new NioEventLoopGroup();
    }
    // run方法会在SpringBoot IOC容器注入完成后运行
    @Override
    public void run(ApplicationArguments args) throws Exception {
        // 启动服务器和客户端
        bootstrapWebsocketClient();
        bootstrapHttpServer();
        logger.info("Go-Task Client Success Bootstrap");
    }
    // destroy方法会在IOC容器销毁时运行
    @Override
    public void destroy() throws Exception {
        // 关闭通道并停止线程池
        if (serverChannel != null) {
            serverChannel.close();
            logger.info("关闭Http服务器");
        }
        if (clientChannel != null) {
            clientChannel.close();
            logger.info("关闭WebSocket客户端");
        }
        bossGroup.shutdownGracefully();
        workerGroup.shutdownGracefully();
        clientGroup.shutdownGracefully();
    }

    // 启动Websocket客户端
    private void bootstrapWebsocketClient() throws Exception {
        String url = WS_URI.replace("{id}", UUID.randomUUID().toString());
        url = url.formatted(
                URLEncoder.encode(config.getNodeName(), StandardCharsets.UTF_8),
                URLEncoder.encode(String.valueOf(config.getIntervals()), StandardCharsets.UTF_8),
                URLEncoder.encode(String.valueOf(config.getPort()), StandardCharsets.UTF_8)
        );
        URI uri = new URI("ws://" + config.getAddress() + url);
        final WebSocketClientHandshaker handshaker = WebSocketClientHandshakerFactory.newHandshaker(uri, WebSocketVersion.V13, null, true, new DefaultHttpHeaders());
        clientHandler.setHandshaker(handshaker);
        Bootstrap bootstrap = new Bootstrap();
        bootstrap.group(clientGroup)
                .channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        ChannelPipeline pipeline = socketChannel.pipeline();
                        pipeline.addLast("http-codec", new HttpClientCodec());
                        pipeline.addLast("aggregator", new HttpObjectAggregator(65536));
                        pipeline.addLast(WebSocketClientCompressionHandler.INSTANCE);
                        pipeline.addLast("ws-handler", clientHandler);
                    }
                });
        clientChannel = bootstrap.connect(uri.getHost(), uri.getPort()).sync().channel();
        clientHandler.handshakeFuture().sync();
    }
    // 启动Http服务器
    private void bootstrapHttpServer() throws Exception {
        ServerBootstrap bootstrap = new ServerBootstrap();
        bootstrap.group(bossGroup, workerGroup)
                .channel(NioServerSocketChannel.class)
                .handler(new LoggingHandler(LogLevel.INFO))
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        ChannelPipeline pipeline = socketChannel.pipeline();
                        pipeline.addLast(new HttpResponseEncoder())
                                .addLast(new HttpRequestDecoder())
                                .addLast("httpAggregator", new HttpObjectAggregator(1024 * 1024))
                                .addLast(new HttpServerHandler(selector));
                    }
                });
        ChannelFuture cf = bootstrap.bind(config.getPort()).sync();
        logger.debug("HTTP服务器启动成功,绑定端口:{}", config.getPort());
        serverChannel = cf.channel();
    }
}

关联文章

0 条评论