SpringBoot中使用WebSocket的几种方式
WebSocket是一种在 http协议之前的双向通信协议,支持长连接和服务端主动推送数据。而传统的 http协议想要实时获取信息大多采用轮询的方式,比较浪费性能。采用 WebSocket协议服务端可以在数据发生更新后将数据主动推送给客户端,以此实现数据的实时更新。SpringBoot框架中使用 WebSocket有多种方式
Spring STOMP
STOMP 是一种基于文本的消息传递协议,可用于在客户端和服务器之间进行异步通信。建立在基础的 WebSocket 协议上,并提供了一种简单、灵活的方式来实现消息传递。Spring对 STOMP协议进行了封装,提供了注解驱动、消息代理配置、广播和点对点消息、消息转换。
广播消息
先引入 SpringBoot的 WenSocket依赖,其中包含了 Web依赖
org.springframework.boot
spring-boot-starter-websocket
配置并开启 WebScoket支持
// WebSocketConfig.java
@Configuration
// 开启WebSocket和消息代理
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
@Override
public void configureMessageBroker(MessageBrokerRegistry config) {
// 开启一个基于内存的简单消息代理 如果有客户端订阅了已 /topic 开头的地址,当有新的消息到达该地址时,消息代理会广播该消息给所有订阅了的客户端
config.enableSimpleBroker("/topic");
// 设置一个消息请求前缀,当该路径收到消息时,会被处理,可以不设置
config.setApplicationDestinationPrefixes("/app");
}
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
// 注册一个WebSocket端点并开启withSockJS支持,用于客户端连接
registry.addEndpoint("/websocket").withSockJS();
}
}
编写接收消息的 Controller类
// MessageController.java
@Controller
public class WebSocketController {
// 表明这是一个消息端点 如果设置了消息请求前缀 需要拼接
@MessageMapping("/message")
// 将处理后的消息转发给指定目的地 订阅了该目的地的客户端就会收到消息
// 相当于messagingTemplate.convertAndSend()方法,如果不使用注解,也可以手动发送
@SendTo("/topic/message")
public String handleMessage(String message) {
return "return:" + message;
}
}
接收 http请求的 Controller类,将接收到的请求参数转发到消息目的地
// DemoController.java
@RestController
@RequestMapping("/send")
public class DemoController {
private final SimpMessagingTemplate messagingTemplate;
public DemoController(SimpMessagingTemplate messagingTemplate) {
this.messagingTemplate = messagingTemplate;
}
@GetMapping
public String getSendMessage(@RequestParam("message") String message) {
// 当接收到请求时,请求参数会被发送到消息目的地 某个订阅了该地址的客户端都可以接收到
messagingTemplate.convertAndSend("/topic/message", message);
return "success";
}
}
前端代码
<body>
<div id="app">
<button onclick="connect()">开始连接</button>
<div>
<input id="message" type="text" placeholder="要发送的消息">
<button onclick="send()">发送</button>
</div>
<div id="response">
</div>
</div>
<script src="https://cdn.jsdelivr.net/npm/sockjs-client@1/dist/sockjs.min.js"></script>
<script src="https://cdnjs.cloudflare.com/ajax/libs/stomp.js/2.3.3/stomp.min.js"></script>
<script>
let stompClient = null;
const connect = () => {
console.log("ok")
// 获取一个socket连接对象
const socket = new SockJS('/websocket')
// 通过socket连接获取stomp客户端
stompClient = Stomp.over(socket);
// 连接成功后的回调方法
stompClient.connect({}, frame => {
alert("连接成功")
// 订阅某个地址 收到消息后的处理
stompClient.subscribe("/topic/message", res => {
console.log(res)
const message = res.body;
document.getElementById("response").innerText += message + "\n";
})
})
}
// 发送消息
const send = () => {
const message = document.getElementById("message").value;
// 三个参数 接收消息的路径,headers没有就空{},需要发送的消息
stompClient.send("/message", {}, message)
}
</script>
</body>
点对点消息
点对点消息模式,需要使用认证框架(Spring Security),放一下简单代码,没有整合。
// WebSocketConfig
@Override
public void configureMessageBroker(MessageBrokerRegistry config) {
config.enableSimpleBroker("/topic");
config.setApplicationDestinationPrefixes("/app");
// 添加一个点对点的订阅前缀 默认为 /user
config.setUserDestinationPrefix("/queue");
}
// WebSocketController
// 指定用户发送消息的接口
@MessageMapping("/message/user")
public void handleUserMessage(MessageBody body) {
// 拿到用户ID和消息
Integer userId = body.userId();
String message = body.message();
// 向指定用户推送消息 用户需要订阅 /queue/message
messagingTemplate.convertAndSendToUser(userId.toString(), "/queue/message", message);
}
record MessageBody(Integer userId, String message) {}
MVC使用WebSocket处理类
不使用 Spring提供的 STOMP协议的封装,而是自己处理某个地址的 WebSocket请求。也依赖于 spring-boot-starter-websocket。
编写 WebSocket请求处理类
@Component
// 实现WebSocketHandler接口 这是一个顶级接口
public class ServletSocketHandle implements WebSocketHandler {
private static final Logger logger = LoggerFactory.getLogger(ServletSocketHandle.class);
// 发送消息的服务层
private final WebSocketService service;
// 所有连接的WebSocket session管理
private final SessionManager sessionManager;
public ServletSocketHandle(WebSocketService service, SessionManager sessionManager) {
this.service = service;
this.sessionManager = sessionManager;
}
// 客户端成功连接后调用的方法
@Override
public void afterConnectionEstablished(WebSocketSession session) throws Exception {
logger.info("客户端连接成功,{}", session.getId());
sessionManager.add(session.getId(), session);
}
// 接收到客户端消息时调用的方法
@Override
public void handleMessage(WebSocketSession session, WebSocketMessage<?> message) throws Exception {
if (message instanceof TextMessage textMessage){
String payload = textMessage.getPayload();
service.sendMessage(session, "发送的消息:" + payload + ",时间:" + System.currentTimeMillis());
}
}
// 和客户端连接发生异常时调用的方法
@Override
public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception {
logger.info("{}用户发生异常,错误消息:{}", session.getId(), exception.getMessage());
sessionManager.remove(session.getId());
}
// 和客户端连接断开时调用的方法
@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) throws Exception {
logger.info("客户端{}断开连接", session.getId());
sessionManager.remove(session.getId());
}
// 是否允许不完整的请求消息
@Override
public boolean supportsPartialMessages() {
return false;
}
}
编写配置类,将某个 WebSocket地址对应到请求处理类
@Configuration
// 开启WebSocket
@EnableWebSocket
public class ServletWebSocketConfig implements WebSocketConfigurer {
private final ServletSocketHandle servletSocketHandle;
public ServletWebSocketConfig(ServletSocketHandle servletSocketHandle) {
this.servletSocketHandle = servletSocketHandle;
}
@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
// 将暴露的/websocket请求关联到编写的请求处理类
registry.addHandler(servletSocketHandle, "/websocket");
}
}
实现 WebSocket Session管理类
@Component
public class SessionManager {
private static final Logger logger = LoggerFactory.getLogger(SessionManager.class);
// 使用线程安全的ConcurrentHashMap
private static final ConcurrentHashMap<String, WebSocketSession> SESSION_POOL = new ConcurrentHashMap<>();
// 将Session添加Map
public void add(String sessionId, WebSocketSession session) {
SESSION_POOL.put(sessionId, session);
}
// 通过sessionId从Map中获取Session
public WebSocketSession get(String sessionId) {
return SESSION_POOL.get(sessionId);
}
// 获取所有Session 用于广播消息
public List<WebSocketSession> getAllWebSocket() {
return SESSION_POOL.values().stream().toList();
}
// 通过sessionId从Map中移除某个Session
public void remove(String sessionId) {
WebSocketSession session = SESSION_POOL.get(sessionId);
if (session != null){
try {
session.close();
} catch (IOException e) {
logger.error("session close error, sessionId:{}, message:{}", sessionId, e.getMessage());
}
SESSION_POOL.remove(sessionId);
}
}
}
编写用于发送消息的 WebSocket服务层
@Service
public class WebSocketService {
private static final Logger logger = LoggerFactory.getLogger(WebSocketService.class);
private final SessionManager sessionManager;
public WebSocketService (SessionManager sessionManager) {
this.sessionManager = sessionManager;
}
// 给单个客户端发送消息
public void sendMessage(WebSocketSession session, String message) {
try{
session.sendMessage(new TextMessage(message));
}catch (IOException e){
logger.error("Error send message, sessionId:{}, error message:{}", session.getId(), e.getMessage());
}
}
// 广播消息到所有客户端
public void allSendMessage(String message) {
sessionManager.getAllWebSocket().forEach(session -> {
try{
session.sendMessage(new TextMessage(message));
} catch (IOException e){
logger.error("Error send message, sessionId:{}, error message:{}", session.getId(), e.getMessage());
}
});
}
}
用于测试的 Runner,在 SpringBoot启动后执行,定时发送广播消息
@Component
public class WebSocketRunner implements ApplicationRunner {
private static final Logger logger = LoggerFactory.getLogger(WebSocketRunner.class);
private final WebSocketService service;
public WebSocketRunner (WebSocketService service) {
this.service = service;
}
@Override
public void run(ApplicationArguments args) throws Exception {
// 使用线程安全的原子型整数
AtomicInteger counter = new AtomicInteger(1);
// 调用异步线程 一直循环发送广播消息
CompletableFuture.runAsync(() -> {
while (true) {
service.allSendMessage("广播消息:" + counter);
counter.getAndIncrement();
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
logger.error("Error sleep runner thread, message:{}", e.getMessage());
}
}
});
}
}
使用Javax提供的ServerEndpoint规范来处理
ServerEndpoint最开始是由 javax提供的一个类。但从 Java EE8之后,WebSocket API被迁移到了外部,形成了一个独立的规范。
使用 @ServerEndpoint注解编写处理类
@Component
// 被 @ServerEndpoint 标注的类是多对象的 每次连接时都会创建一个,和IOC容器的单例模式冲突,因此即使用了@Component注解也不能使用构造方法注入和自动依赖注入
@ServerEndpoint("/websocket/{userId}")
public class WebSocketServerEndpoint {
// 都使用静态变量
private static final Logger logger = LoggerFactory.getLogger(WebSocketServerEndpoint.class);
private static ObjectMapper objectMapper;
private static SessionManager sessionManager;
private static WebSocketService service;
// 需要从IOC容器中获取的对象使用set方法注入
@Autowired
public void setObjectMapper(ObjectMapper objectMapper) {
WebSocketServerEndpoint.objectMapper = objectMapper;
}
@Autowired
public void setSessionManager(SessionManager sessionManager) {
WebSocketServerEndpoint.sessionManager = sessionManager;
}
@Autowired
public void setWebSocketService(WebSocketService service) {
WebSocketServerEndpoint.service = service;
}
// 连接成功时调用的方法
// 路径参数可以用 @PathParam 获取
@OnOpen
public void onOpen(Session session, @PathParam("userId") String userId) {
logger.info("用户{}连接成功", userId);
sessionManager.add(userId, session);
}
// 连接关闭时调用的方法
@OnClose
public void onClose(Session session, CloseReason reason, @PathParam("userId") String userId) {
logger.info("用户{}断开连接", userId);
sessionManager.remove(userId);
}
// 接收文本消息
@OnMessage
public void onMessage(Session session, String message) {
System.out.println(message);
try{
MessageBody messageBody = objectMapper.readValue(message, MessageBody.class);
service.sendMessage(messageBody.userId(), messageBody.message());
}catch (JsonProcessingException e) {
logger.error("请求消息解析失败,错误消息:{}", e.getMessage());
}
}
// 接收pong消息 常用于心跳检测
@OnMessage
public void onMessage(Session session, PongMessage pongMessage) {
}
// 接收二进制数据消息
@OnMessage
public void onMessage(Session session, byte[] bytes) {
}
// 连接发生错误时调用
@OnError
public void onError(Session session, Throwable throwable, @PathParam("userId") String userId) {
logger.info("用户{}连接错误,错误信息:{}", userId, throwable.getMessage());
sessionManager.remove(userId);
}
}
// 消息体的类
record MessageBody(String userId, String message){}
编写配置类,用以注入 ServerEndpointExporter开启 WebSocket端点的自动注册和暴露
@Configuration
public class MyWebSocketConfig {
@Bean
public ServerEndpointExporter serverEndpointExporter() {
return new ServerEndpointExporter();
}
}
SessionManager类和WebSocketService类与MVC使用的大致是一样的,把sessionId替换为userId即可,可以通过客户端消息的userId参数给指定的在线用户发送消息
// 通过userId拿到客户端session 然后给指定的客户端发送消息
public void sendMessage(String userId, String message) {
Session session = sessionManager.getSession(userId);
if (session == null) {
logger.warn("用户{}不存在", userId);
return;
}
try{
// 发送文本消息
session.getBasicRemote().sendText(message);
}catch (IOException e) {
logger.error("给用户{}发送消息失败,错误消息:{}", userId, e.getMessage());
}
}
前端代码,不需要依赖第三方库,使用 Js提供的 WebSocket即可
<body>
<div>
<input id="user-id" type="text" placeholder="用户ID">
<button onclick="connect()">开始连接</button><br>
<input id="user" type="text" placeholder="接收消息的用户ID">
<input id="in" type="text" placeholder="发送的消息">
<button onclick="send()">发送</button>
<div id="resp"></div>
</div>
<script>
let socket = null;
const connect = () => {
const userId = document.getElementById('user-id').value;
socket = new WebSocket(`ws://localhost:8080/websocket/${userId}`);
socket.onopen = event => {
alert('连接成功')
}
socket.onclose = err => {
alert('连接断开')
}
socket.onmessage = res => {
document.getElementById('resp').innerText += res.data + "\n";
}
}
const send = () => {
const userId = document.getElementById('user').value;
const message = document.getElementById('in').value;
const data = {userId: userId, message: message}
socket.send(JSON.stringify(data))
}
</script>
</body>
结尾
:::tip
除了上面三种方式外,还有基于 WebFlux和 Web容器框架的,还没了解过,后面了解再补上。
可以整合消息队列和连接鉴权,实现一个简单的实时消息推送服务端,后面再补 :::






0 条评论