springboot基于Redis发布订阅集群下WebSocket的解决方案
单机节点下,WebSocket连接成功后,可以直接发送消息。而多节点下,连接时通过nginx会代理到不同节点。
假设一开始用户连接了node1的socket服务。触发消息发送的条件的时候也通过nginx进行代理,假如代理转到了node2节点上,那么node2节点的socket服务就发送不了消息,因为一开始用户注册的是node1节点。这就导致了消息发送失败。
为了解决这一方案,消息发送时,就需要一个中间件来记录,这样,三个节点都可以获取消息,然后在根据条件进行消息推送。
二、解决方案(springboot 基于 Redis发布订阅)1、依赖
<!-- redis --> <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId></dependency><!-- websocket --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-websocket</artifactId></dependency>
2、创建业务处理类 Demo.class,该类可以实现MessageListener接口后重写onMessage方法,也可以不实现,自己写方法。
import com.alibaba.fastjson.JSON;import com.dy.service.impl.OrdersServiceImpl;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.data.redis.connection.Message;import org.springframework.data.redis.connection.MessageListener;import org.springframework.stereotype.Component; import java.util.HashMap; /** * @program: * @description: redis消息订阅-业务处理 * @author: zhang yi * @create: 2021-01-25 16:46 */@Componentpublic class Demo implements MessageListener { Logger logger = LoggerFactory.getLogger(this.getClass()); @Override public void onMessage(Message message, byte[] pattern) { logger.info('消息订阅成功---------'); logger.info('内容:'+message.getBody()); logger.info('交换机:'+message.getChannel()); }}
3、创建PubSubConfig配置类
import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.cache.annotation.EnableCaching;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.data.redis.connection.RedisConnectionFactory;import org.springframework.data.redis.core.StringRedisTemplate;import org.springframework.data.redis.listener.PatternTopic;import org.springframework.data.redis.listener.RedisMessageListenerContainer;import org.springframework.data.redis.listener.adapter.MessageListenerAdapter; /** * @program: * @description: redis发布订阅配置 * @author: zhang yi * @create: 2021-01-25 16:49 */@Configuration@EnableCachingpublic class PubSubConfig { Logger logger = LoggerFactory.getLogger(this.getClass()); //如果是多个交换机,则参数为(RedisConnectionFactory connectionFactory, // MessageListenerAdapter listenerAdapter, // MessageListenerAdapter listenerAdapter2) @Bean RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory, MessageListenerAdapter listenerAdapter) { RedisMessageListenerContainer container = new RedisMessageListenerContainer(); container.setConnectionFactory(connectionFactory); // 可以添加多个 messageListener,配置不同的交换机 container.addMessageListener(listenerAdapter, new PatternTopic('channel:demo')); //container.addMessageListener(listenerAdapter2, new PatternTopic('channel:demo2')); return container; } /** * 消息监听器适配器,绑定消息处理器,利用反射技术调用消息处理器的业务方法 * @param demo 第一步的业务处理类 * @return */ @Bean MessageListenerAdapter listenerAdapter(Demo demo) { logger.info('----------------消息监听器加载成功----------------'); // onMessage 就是方法名,基于反射调用 return new MessageListenerAdapter(demo, 'onMessage'); } /** * 多个交换机就多写一个 * @param subCheckOrder * @return */ //@Bean //MessageListenerAdapter listenerAdapter2(SubCheckOrder subCheckOrder) { // logger.info('----------------消息监听器加载成功----------------'); // return new MessageListenerAdapter(subCheckOrder, 'onMessage'); //} @Bean StringRedisTemplate template(RedisConnectionFactory connectionFactory) { return new StringRedisTemplate(connectionFactory); }}
4、消息发布
@Autowiredprivate RedisTemplate<String, Object> redisTemplate; redisTemplate.convertAndSend('channel:demo', '我是内容');三、具体用法 socket连接成功。 socket消息推送时,把信息发布到redis中。socket服务订阅redis的消息,订阅成功后进行推送。集群下的socket都能订阅到消息,但是只有之前连接成功的节点能推送成功,其余的无法推送。
相关文章: