在项目研发发中由于调用了第三方的支付接口,支付接口只有一个异步回调的通知。由于前端和后端是分开部署的,所以引发了一个问题,支付成功后异步回调后端会处理逻辑,但前端并不知道支付的接果。为了解决研发成本,所以就选择了WebSocket。下面主要介绍SpringBoot是怎样集成WebSocket的?

首先呢,我们先引入一个WebSocket依赖。代码如下:

  1. <dependency>
  2. <groupId>org.springframework.boot </groupId>
  3. <artifactId>spring-boot-starter-websocket </artifactId>
  4. </dependency>

加完依赖后,就可以开始编码了,在SpringBoot中添加WebSocket的配置,配置如下:

  1. import org.springframework.context.annotation.Configuration;
  2. import org.springframework.web.socket.config.annotation.EnableWebSocket;
  3. import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
  4. import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry;
  5.  
  6. /**
  7. * websocket 配置
  8. * @author zxm
  9. *
  10. */
  11. @Configuration
  12. @EnableWebSocket
  13. public class WebSocketH5Config implements WebSocketConfigurer {
  14.  
  15. @Override
  16. public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
  17. // TODO Auto-generated method stub
  18. registry.addHandler(new WebSocketH5Handler(), "/service/{ID}").setAllowedOrigins("*").addInterceptors(new WebSocketInterceptor());
  19. }
  20. }
  21.  

1.@Configuration:注解标识该类为Spring的配置类

2.@EnableWebSocket:开启注解接收和发送消息

3.实现WebSocketConfigurer接口,重写registerWebSocketHandlers方法,这是一个核心实现方法,配置websocket入口,允许访问的域、注册Handler、定义拦截器。客户端通过“/service/{ID}”直接访问Handler核心类,进行socket的连接、接收、发送等操作,这里由于还加了个拦截器,所以建立新的socket访问时,都先进来拦截器再进去Handler类,“new WebSocketInterceptor()”是我实现的拦截器,“new WebSocketInterceptor()”是我实现的一个Handler类

上面提到了WebSocketInterceptor拦截器的实现,下面来看下是如何实现的,代码如下:

  1. import org.springframework.http.server.ServerHttpRequest;
  2. import org.springframework.http.server.ServerHttpResponse;
  3. import org.springframework.http.server.ServletServerHttpRequest;
  4. import org.springframework.web.socket.WebSocketHandler;
  5. import org.springframework.web.socket.server.HandshakeInterceptor;
  6.  
  7. import lombok.extern.log4j.Log4j;
  8.  
  9. /**
  10. * websocket 拦截器
  11. * @author zxm
  12. *
  13. */
  14. @Log4j
  15. public class WebSocketInterceptor implements HandshakeInterceptor {
  16.  
  17. @Override
  18. public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler,
  19. Map<String, Object> attributes) throws Exception {
  20. // TODO Auto-generated method stub
  21. if(request instanceof ServletServerHttpRequest) {
  22. String ID = request.getURI().toString().split("ID=")[1];
  23. log.info("当前的sessionID="+ID);
  24. ServletServerHttpRequest serverHttpRequest = (ServletServerHttpRequest) request;
  25. HttpSession session = serverHttpRequest.getServletRequest().getSession();
  26. attributes.put("WEBSOCKET_USERID", ID);
  27. }
  28. return true;
  29. }
  30.  
  31. @Override
  32. public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler,
  33. Exception exception) {
  34. // TODO Auto-generated method stub
  35. log.info("后置拦截");
  36. }
  37. }
  38.  

上述代码实现了HandshakeInterceptor 接口,并实现了beforeHandshake该方法,该方法是在进入Handler核心类之前进行拦截。

这里主要实现的逻辑是:

截取客户端建立webSocket连接时发送的URL地址字符串,并通过对该字符串进行特殊标识截取操作,获取客户端发送的唯一标识(由自己定义的,一般是系统用户ID唯一标识,用以标识该用户),并把它以键值对的形式放到Session里,这样后期可以通过该session获取它对应的用户ID了。【一个session对应着一个webSocketSession】

下面再看看handler处理器的具体实现,代码如下:

  1. import java.io.IOException;
  2. import java.util.HashMap;
  3. import java.util.Map;
  4. import java.util.Set;
  5.  
  6. import org.springframework.stereotype.Component;
  7. import org.springframework.util.StringUtils;
  8. import org.springframework.web.socket.CloseStatus;
  9. import org.springframework.web.socket.TextMessage;
  10. import org.springframework.web.socket.WebSocketMessage;
  11. import org.springframework.web.socket.WebSocketSession;
  12.  
  13. import com.alibaba.fastjson.JSON;
  14. import com.cupiday.common.utils.PublicDictUtil;
  15. import com.google.common.collect.Maps;
  16.  
  17. import lombok.extern.log4j.Log4j;
  18.  
  19. /**
  20. * websocket 消息处理器
  21. * @author zxm
  22. *
  23. */
  24. @Log4j
  25. @Component
  26. public class WebSocketH5Handler implements org.springframework.web.socket.WebSocketHandler {
  27. private static final Map<String, WebSocketSession> users = Maps.newConcurrentMap();
  28.  
  29. @Override
  30. public void afterConnectionEstablished(WebSocketSession session) throws Exception {
  31. // TODO Auto-generated method stub
  32. log.info("成功建立连接");
  33. String ID = session.getUri().toString().split("ID=")[1];
  34. log.info("连接ID="+ID);
  35. if(ID != null) {
  36. users.put(ID, session);
  37. HashMap<String,Object> message = Maps.newHashMap();
  38. message.put(PublicDictUtil.KEY , PublicDictUtil.SUCCESS_VALUE);
  39. message.put(PublicDictUtil.MSG_KEY, "成功建立socket连接");
  40. session.sendMessage(new TextMessage(JSON.toJSONString(message)));
  41. log.info("当前session="+session);
  42. }
  43. log.info("当前在线人数:"+users.size());
  44. }
  45.  
  46. @Override
  47. public void handleMessage(WebSocketSession session, WebSocketMessage<?> message) throws Exception {
  48. // TODO Auto-generated method stub
  49. log.info(message.getPayload().toString());
  50. log.info(message.getPayload().toString()+" :来自"+(String)session.getAttributes().get("WEBSOCKET_USERID")+"的消息");
  51. sendMessageToUser(session.getAttributes().get("WEBSOCKET_USERID").toString(),new TextMessage("服务器收到了,hello"));
  52. }
  53. /**
  54. * 发送消息给指定连接
  55. * @param clientId
  56. * @param message
  57. * @return
  58. */
  59. public boolean sendMessageToUser(String clientId,TextMessage message) {
  60. if(!StringUtils.hasLength(clientId)) {
  61. return false;
  62. }
  63. WebSocketSession webSocketSession = users.get(clientId);
  64. if(webSocketSession == null) {
  65. return false;
  66. }
  67. if(!webSocketSession.isOpen()) {
  68. return false;
  69. }
  70. try {
  71. webSocketSession.sendMessage(message);
  72. } catch (IOException e) {
  73. // TODO Auto-generated catch block
  74. e.printStackTrace();
  75. return false;
  76. }
  77. return true;
  78. }
  79. /**
  80. * 广播消息
  81. * @param message
  82. * @return
  83. */
  84. public boolean sendMessageToAllUsers(TextMessage message) {
  85. boolean allSendSuccess = true;
  86. Set clientIds = users.keySet();
  87. WebSocketSession session = null;
  88. for (String clientId : clientIds) {
  89. try {
  90. session = users.get(clientId);
  91. if (session.isOpen()) {
  92. session.sendMessage(message);
  93. }
  94. } catch (IOException e) {
  95. e.printStackTrace();
  96. allSendSuccess = false;
  97. }
  98. }
  99. return true;
  100. }
  101. /**
  102. * 关闭
  103. * @param clientId
  104. */
  105. public void close(String clientId) {
  106. WebSocketSession webSocketSession = users.get(clientId);
  107. if(webSocketSession != null) {
  108. try {
  109. webSocketSession.close();
  110. } catch (IOException e) {
  111. // TODO Auto-generated catch block
  112. e.printStackTrace();
  113. }
  114. }
  115. }
  116.  
  117. @Override
  118. public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception {
  119. // TODO Auto-generated method stub
  120. if(session.isOpen()) {
  121. session.close();
  122. }
  123. log.info("连接出错");
  124. users.remove(getClientId(session));
  125. }
  126.  
  127. @Override
  128. public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) throws Exception {
  129. // TODO Auto-generated method stub
  130. log.info("连接已经关闭:"+closeStatus);
  131. users.remove(getClientId(session));
  132. }
  133.  
  134. @Override
  135. public boolean supportsPartialMessages() {
  136. // TODO Auto-generated method stub
  137. return false;
  138. }
  139. /**
  140. * 获取连接id
  141. * @param session
  142. * @return
  143. */
  144. private String getClientId(WebSocketSession session) {
  145. try {
  146. String id = (String)session.getAttributes().get("WEBSOCKET_USERID");
  147. return id;
  148. } catch (Exception e) {
  149. // TODO Auto-generated catch block
  150. e.printStackTrace();
  151. }
  152. return null;
  153. }
  154.  

嗯,代码有点长啊,来具体说说这些代码都是干啥的:
1.实现了WebSocketHandler接口,并实现了关键的几个方法。

① afterConnectionEstablished(接口提供的):建立新的socket连接后回调的方法。主要逻辑是:将成功建立连接的webSocketSssion放到定义好的常量[private static final Map<String, WebSocketSession> users;]中去。这里也截取客户端访问的URL的字符串,拿到标识,以键值对的形式讲每一个webSocketSession存到users里,以记录每个Socket。

② handleMessage(接口提供的):接收客户端发送的Socket。主要逻辑是:获取客户端发送的信息。这里之所以可以获取本次Socket的ID,是因为客户端在第一次进行连接时,拦截器进行拦截后,设置好ID,这样也说明,双方在相互通讯的时候,只是对第一次建立好的socket持续进行操作。

③ sendMessageToUser(自己定义的):发送给指定用户信息。主要逻辑是:根据用户ID从常量users(记录每一个Socket)中,获取Socket,往该Socket里发送消息,只要客户端还在线,就能收到该消息。

④sendMessageToAllUsers (自己定义的):这个广播消息,发送信息给所有socket。主要逻辑是:跟③类型,只不过是遍历整个users获取每一个socket,给每一个socket发送消息即可完广播发送

⑤handleTransportError(接口提供的):连接出错时,回调的方法。主要逻辑是:一旦有连接出错的Socket,就从users里进行移除,有提供该Socket的参数,可直接获取ID,进行移除。这个在客户端没有正常关闭连接时,会进来,所以在开发客户端时,记得关闭连接

⑥afterConnectionClosed(接口提供的):连接关闭时,回调的方法。主要逻辑:一旦客户端/服务器主动关闭连接时,将个socket从users里移除,有提供该Socket的参数,可直接获取ID,进行移除。

至此,后端的开发工作已经完了,剩下的就是前端的实现了,前端JavaScript代码如下:

  1. var ID = 12345865546544; // 随机数
  2. //建立webSocket连接,若服务器端开通了https协议,则客户端必须使用wss协议
  3. var websocket = new WebSocket("ws://192.168.1.18:8080/service/ID="+ID);
  4. //打开webSokcet连接时,回调该函数
  5. websocket.onopen = function () {
  6. console.log("onpen");
  7. }
  8. //关闭webSocket连接时,回调该函数
  9. websocket.onclose = function () {
  10. //关闭连接
  11. console.log("onclose");
  12. }
  13.  
  14. //接收信息
  15. websocket.onmessage = function (msg) {
  16. console.log(msg.data);
  17. }

怎么样,比较简单吧。至此,SpringBoot 与 WebSocket 已经集成完了。