image

SpringBoot中整合使用WebSocket的多种方式

  • WORDS 14424

SpringBoot中使用WebSocket的几种方式

WebSocket是一种在 http协议之前的双向通信协议,支持长连接和服务端主动推送数据。而传统的 http协议想要实时获取信息大多采用轮询的方式,比较浪费性能。采用 WebSocket协议服务端可以在数据发生更新后将数据主动推送给客户端,以此实现数据的实时更新。SpringBoot框架中使用 WebSocket有多种方式

Spring STOMP

STOMP 是一种基于文本的消息传递协议,可用于在客户端和服务器之间进行异步通信。建立在基础的 WebSocket 协议上,并提供了一种简单、灵活的方式来实现消息传递。SpringSTOMP协议进行了封装,提供了注解驱动、消息代理配置、广播和点对点消息、消息转换。

广播消息

先引入 SpringBootWenSocket依赖,其中包含了 Web依赖


    org.springframework.boot
    spring-boot-starter-websocket

配置并开启 WebScoket支持

// WebSocketConfig.java

@Configuration
// 开启WebSocket和消息代理
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
    @Override
    public void configureMessageBroker(MessageBrokerRegistry config) {
        // 开启一个基于内存的简单消息代理 如果有客户端订阅了已 /topic 开头的地址,当有新的消息到达该地址时,消息代理会广播该消息给所有订阅了的客户端
        config.enableSimpleBroker("/topic");
        // 设置一个消息请求前缀,当该路径收到消息时,会被处理,可以不设置
        config.setApplicationDestinationPrefixes("/app");
    }

    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        // 注册一个WebSocket端点并开启withSockJS支持,用于客户端连接
        registry.addEndpoint("/websocket").withSockJS();
    }
}

编写接收消息的 Controller

// MessageController.java

@Controller
public class WebSocketController {
    // 表明这是一个消息端点 如果设置了消息请求前缀 需要拼接
    @MessageMapping("/message")
    // 将处理后的消息转发给指定目的地 订阅了该目的地的客户端就会收到消息
    // 相当于messagingTemplate.convertAndSend()方法,如果不使用注解,也可以手动发送
    @SendTo("/topic/message")
    public String handleMessage(String message) {
        return "return:" + message;
    }
}

接收 http请求的 Controller类,将接收到的请求参数转发到消息目的地

// DemoController.java

@RestController
@RequestMapping("/send")
public class DemoController {

    private final SimpMessagingTemplate messagingTemplate;

    public DemoController(SimpMessagingTemplate messagingTemplate) {
        this.messagingTemplate = messagingTemplate;
    }

    @GetMapping
    public String getSendMessage(@RequestParam("message") String message) {
        // 当接收到请求时,请求参数会被发送到消息目的地 某个订阅了该地址的客户端都可以接收到
        messagingTemplate.convertAndSend("/topic/message", message);
        return "success";
    }
}

前端代码

<body>
    <div id="app">
        <button onclick="connect()">开始连接</button>
        <div>
            <input id="message" type="text" placeholder="要发送的消息">
            <button onclick="send()">发送</button>
        </div>
        <div id="response">

        </div>
    </div>
    <script src="https://cdn.jsdelivr.net/npm/sockjs-client@1/dist/sockjs.min.js"></script>
    <script src="https://cdnjs.cloudflare.com/ajax/libs/stomp.js/2.3.3/stomp.min.js"></script>
    <script>
        let stompClient = null;
        const connect = () => {
            console.log("ok")
            // 获取一个socket连接对象
            const socket = new SockJS('/websocket')
            // 通过socket连接获取stomp客户端
            stompClient = Stomp.over(socket);
            // 连接成功后的回调方法
            stompClient.connect({}, frame => {
                alert("连接成功")
                // 订阅某个地址 收到消息后的处理
                stompClient.subscribe("/topic/message", res => {
                    console.log(res)
                    const message = res.body;
                    document.getElementById("response").innerText += message + "\n";
                })
            })
        }
        // 发送消息
        const send = () => {
            const message = document.getElementById("message").value;
            // 三个参数 接收消息的路径,headers没有就空{},需要发送的消息
            stompClient.send("/message", {}, message)
        }
    </script>
</body>

点对点消息

点对点消息模式,需要使用认证框架(Spring Security),放一下简单代码,没有整合。

// WebSocketConfig
@Override
public void configureMessageBroker(MessageBrokerRegistry config) {
    config.enableSimpleBroker("/topic");
    config.setApplicationDestinationPrefixes("/app");
    // 添加一个点对点的订阅前缀 默认为 /user
    config.setUserDestinationPrefix("/queue");
}
// WebSocketController

// 指定用户发送消息的接口
@MessageMapping("/message/user")
public void handleUserMessage(MessageBody body) {
    // 拿到用户ID和消息
    Integer userId = body.userId();
    String message = body.message();
    // 向指定用户推送消息 用户需要订阅 /queue/message
    messagingTemplate.convertAndSendToUser(userId.toString(), "/queue/message", message);
}
record MessageBody(Integer userId, String message) {}

MVC使用WebSocket处理类

不使用 Spring提供的 STOMP协议的封装,而是自己处理某个地址的 WebSocket请求。也依赖于 spring-boot-starter-websocket

编写 WebSocket请求处理类

@Component
// 实现WebSocketHandler接口 这是一个顶级接口
public class ServletSocketHandle implements WebSocketHandler {
    private static final Logger logger = LoggerFactory.getLogger(ServletSocketHandle.class);
    // 发送消息的服务层
    private final WebSocketService service;
    // 所有连接的WebSocket session管理
    private final SessionManager sessionManager;

    public ServletSocketHandle(WebSocketService service, SessionManager sessionManager) {
        this.service = service;
        this.sessionManager = sessionManager;
    }
    // 客户端成功连接后调用的方法
    @Override
    public void afterConnectionEstablished(WebSocketSession session) throws Exception {
        logger.info("客户端连接成功,{}", session.getId());
        sessionManager.add(session.getId(), session);
    }

    // 接收到客户端消息时调用的方法
    @Override
    public void handleMessage(WebSocketSession session, WebSocketMessage<?> message) throws Exception {
        if (message instanceof TextMessage textMessage){
            String payload = textMessage.getPayload();
            service.sendMessage(session, "发送的消息:" + payload + ",时间:" + System.currentTimeMillis());
        }
    }
	// 和客户端连接发生异常时调用的方法
    @Override
    public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception {
        logger.info("{}用户发生异常,错误消息:{}", session.getId(), exception.getMessage());
        sessionManager.remove(session.getId());
    }
	// 和客户端连接断开时调用的方法
    @Override
    public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) throws Exception {
        logger.info("客户端{}断开连接", session.getId());
        sessionManager.remove(session.getId());
    }
	// 是否允许不完整的请求消息
    @Override
    public boolean supportsPartialMessages() {
        return false;
    }
}

编写配置类,将某个 WebSocket地址对应到请求处理类

@Configuration
// 开启WebSocket
@EnableWebSocket
public class ServletWebSocketConfig implements WebSocketConfigurer {
    private final ServletSocketHandle servletSocketHandle;

    public ServletWebSocketConfig(ServletSocketHandle servletSocketHandle) {
        this.servletSocketHandle = servletSocketHandle;
    }
    @Override
    public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
        // 将暴露的/websocket请求关联到编写的请求处理类
        registry.addHandler(servletSocketHandle, "/websocket");
    }
}

实现 WebSocket Session管理类

@Component
public class SessionManager {
    private static final Logger logger = LoggerFactory.getLogger(SessionManager.class);
    // 使用线程安全的ConcurrentHashMap
    private static final ConcurrentHashMap<String, WebSocketSession> SESSION_POOL = new ConcurrentHashMap<>();
	// 将Session添加Map
    public void add(String sessionId, WebSocketSession session) {
        SESSION_POOL.put(sessionId, session);
    }
    // 通过sessionId从Map中获取Session
    public WebSocketSession get(String sessionId) {
        return SESSION_POOL.get(sessionId);
    }
    // 获取所有Session 用于广播消息
    public List<WebSocketSession> getAllWebSocket() {
        return SESSION_POOL.values().stream().toList();
    }
    // 通过sessionId从Map中移除某个Session
    public void remove(String sessionId) {
        WebSocketSession session = SESSION_POOL.get(sessionId);
        if (session != null){
            try {
                session.close();
            } catch (IOException e) {
                logger.error("session close error, sessionId:{}, message:{}", sessionId, e.getMessage());
            }
            SESSION_POOL.remove(sessionId);
        }
    }
}

编写用于发送消息的 WebSocket服务层

@Service
public class WebSocketService {
    private static final Logger logger = LoggerFactory.getLogger(WebSocketService.class);

    private final SessionManager sessionManager;

    public WebSocketService (SessionManager sessionManager) {
        this.sessionManager = sessionManager;
    }
    // 给单个客户端发送消息
    public void sendMessage(WebSocketSession session, String message) {
        try{
            session.sendMessage(new TextMessage(message));
        }catch (IOException e){
            logger.error("Error send message, sessionId:{}, error message:{}", session.getId(), e.getMessage());
        }
    }
    // 广播消息到所有客户端
    public void allSendMessage(String message) {
        sessionManager.getAllWebSocket().forEach(session -> {
            try{
                session.sendMessage(new TextMessage(message));
            } catch (IOException e){
                logger.error("Error send message, sessionId:{}, error message:{}", session.getId(), e.getMessage());
            }

        });
    }

}

用于测试的 Runner,在 SpringBoot启动后执行,定时发送广播消息

@Component
public class WebSocketRunner implements ApplicationRunner {
    private static final Logger logger = LoggerFactory.getLogger(WebSocketRunner.class);
    private final WebSocketService service;

    public WebSocketRunner (WebSocketService service) {
        this.service = service;
    }
    @Override
    public void run(ApplicationArguments args) throws Exception {
        // 使用线程安全的原子型整数
        AtomicInteger counter = new AtomicInteger(1);
        // 调用异步线程 一直循环发送广播消息
        CompletableFuture.runAsync(() -> {
            while (true) {
                service.allSendMessage("广播消息:" + counter);
                counter.getAndIncrement();
                try {
                    Thread.sleep(3000);
                } catch (InterruptedException e) {
                    logger.error("Error sleep runner thread, message:{}", e.getMessage());
                }
            }
        });
    }
}

使用Javax提供的ServerEndpoint规范来处理

ServerEndpoint最开始是由 javax提供的一个类。但从 Java EE8之后,WebSocket API被迁移到了外部,形成了一个独立的规范。

使用 @ServerEndpoint注解编写处理类

@Component
// 被 @ServerEndpoint 标注的类是多对象的 每次连接时都会创建一个,和IOC容器的单例模式冲突,因此即使用了@Component注解也不能使用构造方法注入和自动依赖注入
@ServerEndpoint("/websocket/{userId}")
public class WebSocketServerEndpoint {
    // 都使用静态变量
    private static final Logger logger = LoggerFactory.getLogger(WebSocketServerEndpoint.class);
    private static ObjectMapper objectMapper;
    private static SessionManager sessionManager;
    private static WebSocketService service;

    // 需要从IOC容器中获取的对象使用set方法注入
    @Autowired
    public void setObjectMapper(ObjectMapper objectMapper) {
        WebSocketServerEndpoint.objectMapper = objectMapper;
    }

    @Autowired
    public void setSessionManager(SessionManager sessionManager) {
        WebSocketServerEndpoint.sessionManager = sessionManager;
    }

    @Autowired
    public void setWebSocketService(WebSocketService service) {
        WebSocketServerEndpoint.service = service;
    }

    // 连接成功时调用的方法
    // 路径参数可以用 @PathParam 获取
    @OnOpen
    public void onOpen(Session session, @PathParam("userId") String userId) {
        logger.info("用户{}连接成功", userId);
        sessionManager.add(userId, session);
    }
    // 连接关闭时调用的方法
    @OnClose
    public void onClose(Session session, CloseReason reason, @PathParam("userId") String userId) {
        logger.info("用户{}断开连接", userId);
        sessionManager.remove(userId);
    }
    // 接收文本消息
    @OnMessage
    public void onMessage(Session session, String message) {
        System.out.println(message);
        try{
            MessageBody messageBody = objectMapper.readValue(message, MessageBody.class);
            service.sendMessage(messageBody.userId(), messageBody.message());
        }catch (JsonProcessingException e) {
            logger.error("请求消息解析失败,错误消息:{}", e.getMessage());
        }

    }
    // 接收pong消息 常用于心跳检测
    @OnMessage
    public void onMessage(Session session, PongMessage pongMessage) {

    }
    // 接收二进制数据消息
    @OnMessage
    public void onMessage(Session session, byte[] bytes) {

    }
    // 连接发生错误时调用
    @OnError
    public void onError(Session session, Throwable throwable, @PathParam("userId") String userId) {
        logger.info("用户{}连接错误,错误信息:{}", userId, throwable.getMessage());
        sessionManager.remove(userId);
    }
}
// 消息体的类
record MessageBody(String userId, String message){}

编写配置类,用以注入 ServerEndpointExporter开启 WebSocket端点的自动注册和暴露

@Configuration
public class MyWebSocketConfig {

    @Bean
    public ServerEndpointExporter serverEndpointExporter() {
        return new ServerEndpointExporter();
    }
}

SessionManager类和WebSocketService类与MVC使用的大致是一样的,把sessionId替换为userId即可,可以通过客户端消息的userId参数给指定的在线用户发送消息

// 通过userId拿到客户端session 然后给指定的客户端发送消息
public void sendMessage(String userId, String message) {
    Session session = sessionManager.getSession(userId);
    if (session == null) {
        logger.warn("用户{}不存在", userId);
        return;
    }
    try{
        // 发送文本消息
        session.getBasicRemote().sendText(message);
    }catch (IOException e) {
        logger.error("给用户{}发送消息失败,错误消息:{}", userId, e.getMessage());
    }

}

前端代码,不需要依赖第三方库,使用 Js提供的 WebSocket即可

<body>
    <div>
        <input id="user-id" type="text" placeholder="用户ID">
        <button onclick="connect()">开始连接</button><br>
        <input id="user" type="text" placeholder="接收消息的用户ID">
        <input id="in" type="text" placeholder="发送的消息">
        <button onclick="send()">发送</button>
        <div id="resp"></div>
    </div>
    <script>
        let socket = null;

        const connect = () => {
            const userId = document.getElementById('user-id').value;
            socket = new WebSocket(`ws://localhost:8080/websocket/${userId}`);
            socket.onopen = event => {
                alert('连接成功')
            }
            socket.onclose = err => {
                alert('连接断开')
            }
            socket.onmessage = res => {
                document.getElementById('resp').innerText += res.data + "\n";
            }
        }
        const send = () => {
            const userId = document.getElementById('user').value;
            const message = document.getElementById('in').value;
            const data = {userId: userId, message: message}
            socket.send(JSON.stringify(data))
        }
    </script>
</body>

结尾

:::tip

除了上面三种方式外,还有基于 WebFluxWeb容器框架的,还没了解过,后面了解再补上。

可以整合消息队列和连接鉴权,实现一个简单的实时消息推送服务端,后面再补 :::

关联文章

0 条评论