Spring Boot接入WebSocket和Stomp

1 基础接入配置

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfiguration extends AbstractWebSocketMessageBrokerConfigurer {

    @Override
    public void configureMessageBroker(MessageBrokerRegistry config) {
        config.enableSimpleBroker("/topic/");
        //config.setApplicationDestinationPrefixes("/app/");
    }

    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        registry.addEndpoint("/ws").setAllowedOrigins("*").withSockJS();
        registry.addEndpoint("/ws").setAllowedOrigins("*");
    }
}

2 Controller

@Controller
public class PoetrySofaController extends BaseController {

    @MessageMapping("/xxx/{code}")
    @SendTo("/topic/xxx/{code}")
    public XXXOpResultVO sofaOp(
            @DestinationVariable String code,
            SimpMessageHeaderAccessor accessor, XXXOp message) throws Exception {
        return new XXXOpResultVO();
    }

}

3 鉴权(Cookie)

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfiguration extends AbstractWebSocketMessageBrokerConfigurer {

    @Bean
    public HandshakeInterceptor handshakeInterceptor() {
        return new HandshakeInterceptor() {
            @Override
            public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Map<String, Object> attributes) throws Exception {
                if (request instanceof ServletServerHttpRequest) {
                    ServletServerHttpRequest servletServerRequest = (ServletServerHttpRequest) request;
                    HttpServletRequest servletRequest = servletServerRequest.getServletRequest();
                    // 这里解析cookie到userId变量
                    attributes.put(USER_ID, userId);
                    if (userId < 0) {
                        // 未登陆,直接websocket拒绝连接(不让升级)
                        return false;
                    }
                }
                return true;
            }

            @Override
            public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Exception exception) {
            }
        };
    }

    @Override
    public void configureMessageBroker(MessageBrokerRegistry config) {
        config.enableSimpleBroker("/topic/");
        //config.setApplicationDestinationPrefixes("/app/");
    }

    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        registry.addEndpoint("/ws").setAllowedOrigins("*").withSockJS().setInterceptors(handshakeInterceptor());
        registry.addEndpoint("/ws").setAllowedOrigins("*").addInterceptors(handshakeInterceptor());
    }

    @Override
    public void configureClientInboundChannel(ChannelRegistration registration) {
        registration.setInterceptors(new ChannelInterceptorAdapter() {
            @Override
            public Message<?> preSend(Message<?> message, MessageChannel channel) {
                StompHeaderAccessor accessor =
                        MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
                if (StompCommand.CONNECT.equals(accessor.getCommand())) {
                    // 读取并设置到accessor中,后续Controller中就可以直接用了,注意一旦Websocket建立连接,这个字端就不变了
                    Map<String, Object> sessionAttributes = accessor.getSessionAttributes();
                    int userId = (int) sessionAttributes.getOrDefault(USER_ID, 0);
                    accessor.setUser(() -> String.valueOf(userId));
                }

                return message;
            }
        });
    }
}

4 检测客户端断线事件

@EventListener
    public void on(SessionDisconnectEvent event) {
        int userId = SimpMessagUtils.getUserId(event.getUser());
        String sessionId = event.getSessionId();
        // do something
    }

5 优化broker性能

@Override
    public void configureMessageBroker(MessageBrokerRegistry config) {
        config.enableSimpleBroker("/topic/");
        config.setCacheLimit(100000);
    }

 

 

Leave a Reply

Your email address will not be published.