1 基础接入配置
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfiguration extends AbstractWebSocketMessageBrokerConfigurer {
@Override
public void configureMessageBroker(MessageBrokerRegistry config) {
config.enableSimpleBroker("/topic/");
//config.setApplicationDestinationPrefixes("/app/");
}
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/ws").setAllowedOrigins("*").withSockJS();
registry.addEndpoint("/ws").setAllowedOrigins("*");
}
}
2 Controller
@Controller
public class PoetrySofaController extends BaseController {
@MessageMapping("/xxx/{code}")
@SendTo("/topic/xxx/{code}")
public XXXOpResultVO sofaOp(
@DestinationVariable String code,
SimpMessageHeaderAccessor accessor, XXXOp message) throws Exception {
return new XXXOpResultVO();
}
}
3 鉴权(Cookie)
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfiguration extends AbstractWebSocketMessageBrokerConfigurer {
@Bean
public HandshakeInterceptor handshakeInterceptor() {
return new HandshakeInterceptor() {
@Override
public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Map<String, Object> attributes) throws Exception {
if (request instanceof ServletServerHttpRequest) {
ServletServerHttpRequest servletServerRequest = (ServletServerHttpRequest) request;
HttpServletRequest servletRequest = servletServerRequest.getServletRequest();
// 这里解析cookie到userId变量
attributes.put(USER_ID, userId);
if (userId < 0) {
// 未登陆,直接websocket拒绝连接(不让升级)
return false;
}
}
return true;
}
@Override
public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Exception exception) {
}
};
}
@Override
public void configureMessageBroker(MessageBrokerRegistry config) {
config.enableSimpleBroker("/topic/");
//config.setApplicationDestinationPrefixes("/app/");
}
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/ws").setAllowedOrigins("*").withSockJS().setInterceptors(handshakeInterceptor());
registry.addEndpoint("/ws").setAllowedOrigins("*").addInterceptors(handshakeInterceptor());
}
@Override
public void configureClientInboundChannel(ChannelRegistration registration) {
registration.setInterceptors(new ChannelInterceptorAdapter() {
@Override
public Message<?> preSend(Message<?> message, MessageChannel channel) {
StompHeaderAccessor accessor =
MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
if (StompCommand.CONNECT.equals(accessor.getCommand())) {
// 读取并设置到accessor中,后续Controller中就可以直接用了,注意一旦Websocket建立连接,这个字端就不变了
Map<String, Object> sessionAttributes = accessor.getSessionAttributes();
int userId = (int) sessionAttributes.getOrDefault(USER_ID, 0);
accessor.setUser(() -> String.valueOf(userId));
}
return message;
}
});
}
}
4 检测客户端断线事件
@EventListener
public void on(SessionDisconnectEvent event) {
int userId = SimpMessagUtils.getUserId(event.getUser());
String sessionId = event.getSessionId();
// do something
}
5 优化broker性能
@Override
public void configureMessageBroker(MessageBrokerRegistry config) {
config.enableSimpleBroker("/topic/");
config.setCacheLimit(100000);
}