springboot集成redis实现消息发布订阅模式-双通道(跨多服务器)
基础配置参考
https://blog.csdn.net/llll234/article/details/80966952
查看了基础配置那么会遇到一下几个问题:
1.实际应用中可能会订阅多个通道,而一下这种写法不太通用
container.addMessageListener(listenerAdapter(new RedisPmpSub()),new PatternTopic("pmp"));
2.使用过程中使用new RedisPmpSub()配置消息接收对象会有问题。
如果RedisPmpSub既是消息接收类,也是消息处理类。那么如果此时需要注入Bean,会成功吗?
3.考虑后期的扩展性是否能尽量不改变原有代码的基础上,进行扩展
发布者
枚举定义
考虑到可维护性,采用枚举的方式定义管道RedisChannelEnums
public enum RedisChannelEnums { /**redis频道code定义 需要与发布者一致*/ LIVE_INFO_CHANGE("LIVE_INFO_CHANGE","直播信息改变"), ; /** 枚举定义+描述 */ private String code; private String description; RedisChannelEnums(String code, String description) { this.code = code; this.description = description; } /** 根据code获取对应的枚举对象 */ public static RedisChannelEnums getEnum(String code) { RedisChannelEnums[] values = RedisChannelEnums.values(); if (null != code && values.length > 0) { for (RedisChannelEnums value : values) { if (value.code == code) { return value; } } } return null; } /** 该code在枚举列表code属性是否存在 */ public static boolean containsCode(String code) { RedisChannelEnums anEnum = getEnum(code); return anEnum != null; } /** 判断code与枚举中的code是否相同 */ public static boolean equals(String code, RedisChannelEnums calendarSourceEnum) { return calendarSourceEnum.code == code; } public String getCode() { return code; } public String getDescription() { return description; } }
消息模板
为了兼容不同的业务场景,需要定义消息模板对象BasePubMessage
其中ToString方法的作用是将对象转成Json字符
@Data public abstract class BasePubMessage { /**发布订阅频道名称*/ protected String channel; protected String extra; @Override public String toString() { return GsonUtil.toJson(this, BasePubMessage.class); } }
消息对象LiveChangeMessage
其中ToString方法的作用是将对象转成Json字符
@Data public class LiveChangeMessage extends BasePubMessage { /**直播Ids*/ private String liveIds; @Override public String toString() { return GsonUtil.toJson(this, LiveChangeMessage.class); } }
发布者服务
public interface RedisPub { /** * 集成redis实现消息发布订阅模式-双通道 * @param redisChannelEnums 枚举定义 * @param basePubMessage 消息 */ void sendMessage(RedisChannelEnums redisChannelEnums, BasePubMessage basePubMessage); }
@Service public class RedisPubImpl implements RedisPub { @Resource private StringRedisTemplate stringRedisTemplate; @Override public void sendMessage(RedisChannelEnums redisChannelEnums, BasePubMessage basePubMessage) { if(redisChannelEnums ==null || basePubMessage ==null){ return; } basePubMessage.setChannel(redisChannelEnums.getCode()); stringRedisTemplate.convertAndSend(redisChannelEnums.getCode(), basePubMessage.toString()); System.out.println("发布成功!"); } }
订阅者
注解配置
RedisConfig作为订阅者的配置类,主要作用是:Redis消息监听器容器、配置消息接收处理类
同时新加入的功能解决了我们上面提出的几个问题
@Service @Configuration @EnableCaching public class RedisConfig { /** * 存放策略实例 * classInstanceMap : key-beanName value-对应的策略实现 */ private ConcurrentHashMap<String, BaseSub> classInstanceMap = new ConcurrentHashMap<>(20); /** * 注入所有实现了Strategy接口的Bean * * @param strategyMap * 策略集合 */ @Autowired public RedisConfig(Map<String, BaseSub> strategyMap) { this.classInstanceMap.clear(); strategyMap.forEach((k, v) -> this.classInstanceMap.put(k.toLowerCase(), v) ); } /** * Redis消息监听器容器 * * @param connectionFactory * * @return */ @Bean RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory) { RedisMessageListenerContainer container = new RedisMessageListenerContainer(); container.setConnectionFactory(connectionFactory); RedisChannelEnums[] redisChannelEnums = RedisChannelEnums.values(); if (redisChannelEnums.length > 0) { for (RedisChannelEnums redisChannelEnum : redisChannelEnums) { if (redisChannelEnum == null || StringUtils.isEmpty(redisChannelEnum.getCode()) || redisChannelEnum.getClassName()==null) { continue; } //订阅了一个叫pmp和channel 的通道,多通道 //一个订阅者接收一个频道信息,新增订阅者需要新增RedisChannelEnums定义+BaseSub的子类 String toLowerCase = redisChannelEnum.getClassName().getSimpleName().toLowerCase(); BaseSub baseSub = classInstanceMap.get(toLowerCase); container.addMessageListener(listenerAdapter(baseSub), new PatternTopic(redisChannelEnum.getCode())); } } return container; } /** * 配置消息接收处理类 * * @param baseSub * 自定义消息接收类 * * @return MessageListenerAdapter */ @Bean() @Scope("prototype") MessageListenerAdapter listenerAdapter(BaseSub baseSub) { //这个地方 是给messageListenerAdapter 传入一个消息接受的处理器,利用反射的方法调用“receiveMessage” //也有好几个重载方法,这边默认调用处理器的方法 叫handleMessage 可以自己到源码里面看 //注意2个通道调用的方法都要为receiveMessage return new MessageListenerAdapter(baseSub, "receiveMessage"); } }
@Autowired public RedisConfig(Map<String, BaseSub> strategyMap) 方法的作用是将所有的配置消息接收处理类注入进来,那么消息接收处理类里面的注解对象也会注入进来。 解决了我们提出的第二个问题 而String toLowerCase = redisChannelEnum.getClassName().getSimpleName().toLowerCase(); BaseSub baseSub = classInstanceMap.get(toLowerCase); container.addMessageListener(listenerAdapter(baseSub), new PatternTopic(redisChannelEnum.getCode())); 是根据不同的管道对应不同的订阅者,也就是一个订阅者对应一个管道。方便根据不同的业务场景进行处理。 使用这种方式主需要配置redisChannelEnum枚举即可,解决了我们提出的第一个问题。 这样一来,订阅者就变得比较通用了
枚举
RedisChannelEnums作用:定义不同管道对应的订阅者,后期增加一个管道类型只需要增加一个枚举即可
public enum RedisChannelEnums { /**redis频道名称定义 需要与发布者一致*/ LIVE_INFO_CHANGE("LIVE_INFO_CHANGE", LiveChangeSub.class, "直播信息改变"), ; /** 枚举定义+描述 */ private String code; private Class<? extends BaseSub> className; private String description; RedisChannelEnums(String code, Class<? extends BaseSub> className, String description) { this.code = code; this.className=className; this.description = description; } /** 根据code获取对应的枚举对象 */ public static RedisChannelEnums getEnum(String code) { RedisChannelEnums[] values = RedisChannelEnums.values(); if (null != code && values.length > 0) { for (RedisChannelEnums value : values) { if (value.code == code) { return value; } } } return null; } /** 该code在枚举列表code属性是否存在 */ public static boolean containsCode(String code) { RedisChannelEnums anEnum = getEnum(code); return anEnum != null; } /** 判断code与枚举中的code是否相同 */ public static boolean equals(String code, RedisChannelEnums calendarSourceEnum) { return calendarSourceEnum.code == code; } public String getCode() { return code; } public String getDescription() { return description; } public Class<? extends BaseSub> getClassName() { return className; } }
消息模板
BaseSubMessage定义通用的字段,与json字符的通用转换
@Data abstract class BaseSubMessage { /** 发布订阅频道名称 */ private String channel; private String extra; private String json; BaseSubMessage(String json) { if(StringUtils.isEmpty(json)){ return; } this.json = json; Map map = new Gson().fromJson(this.json, Map.class); BeanHelper.populate(this, map); } }
LiveChangeMessage定义当前业务场景的字段
@Data @ToString(callSuper = true) public class LiveChangeMessage extends BaseSubMessage { /** 直播Ids */ private String liveIds; public LiveChangeMessage(String json) { super(json); } }
订阅者服务
BaseSub定义接收消息的通用方法
public interface BaseSub { /** * 接收消息 * @param jsonMessage json字符 */ void receiveMessage(String jsonMessage); }
LiveChangeSub具体消息接收对象
@Component public class LiveChangeSub implements BaseSub { /**只是定义的注解测试,可以换成自己的*/ @Autowired private CategoryMapper categoryMapper; @Override public void receiveMessage(String jsonMessage) { System.out.println("项目aries-server....................."); //注意通道调用的方法名要和RedisConfig2的listenerAdapter的MessageListenerAdapter参数2相同 System.out.println("这是 LiveChangeSub" + "-----" + jsonMessage); LiveChangeMessage liveChangeMessage = new LiveChangeMessage(jsonMessage); System.out.println(liveChangeMessage); Category category = categoryMapper.get(1L); System.out.println("category:" + category); } }
springboot集成redis实现消息发布订阅模式-双通道(跨多服务器)的更多相关文章
- 15天玩转redis —— 第九篇 发布/订阅模式
本系列已经过半了,这一篇我们来看看redis好玩的发布订阅模式,其实在很多的MQ产品中都存在这样的一个模式,我们常听到的一个例子 就是邮件订阅的场景,什么意思呢,也就是说100个人订阅了你的博客,如果 ...
- redis实现消息发布/订阅
redis实现简单的消息发布/订阅模式. 消息订阅者: package org.common.component; import org.slf4j.Logger; import org.slf4j. ...
- redis 的消息发布订阅
redis支持pub/sub功能(可以用于消息服务器),这个功能类似mq,这里做一个简单的介绍 Pub/Sub Pub/Sub 从字面上理解就是发布(Publish)与订阅(Subscribe),在R ...
- Spring Data Redis实现消息队列——发布/订阅模式
一般来说,消息队列有两种场景,一种是发布者订阅者模式,一种是生产者消费者模式.利用redis这两种场景的消息队列都能够实现. 定义:生产者消费者模式:生产者生产消息放到队列里,多个消费者同时监听队列, ...
- redis消息通知(任务队列/优先级队列/发布订阅模式)
1.任务队列 对于发送邮件或者是复杂计算这样的操作,常常需要比较长的时间,为了不影响web应用的正常使用,避免页面显示被阻塞,常常会将此类任务存入任务队列交由专门的进程去处理. 队列最基础的方法如下: ...
- redis实现消息队列&;发布/订阅模式使用
在项目中用到了redis作为缓存,再学习了ActiveMq之后想着用redis实现简单的消息队列,下面做记录. Redis的列表类型键可以用来实现队列,并且支持阻塞式读取,可以很容易的实现一个高性 ...
- Redis消息通知(任务队列和发布订阅模式)
Redis学习笔记(十)消息通知(任务队列和发布订阅模式) 1. 任务队列 1.1 任务队列的特点 任务队列:顾名思义,就是“传递消息的队列”.与任务队列进行交互的实体有两类,一类是生产者(produ ...
- redis的发布订阅模式
概要 redis的每个server实例都维护着一个保存服务器状态的redisServer结构 struct redisServer { /* Pubsub */ // 字典,键为频道, ...
- redis发布/订阅模式
其实在很多的MQ产品中都存在这样的一个模式,我们常听到的一个例子 就是邮件订阅的场景,什么意思呢,也就是说100个人订阅了你的博客,如果博主发表了文章,那么100个人就会同时收到通知邮件,除了这个 场 ...
- Redis - 发布/订阅模式
Redis 提供了一组命令可以让开发者实现 “发布/订阅” 模式.“发布/订阅” 可以实现进程间的消息传递,其原理是这样的: “发布/订阅” 模式中包含两种角色,分别是发布者和订阅者.订阅者可以订阅一 ...
随机推荐
- hession
Hessian是一个轻量级的remoting onhttp工具,使用简单的方法提供了RMI的功能. 相比WebService,Hessian更简单.快捷.采用的是二进制RPC协议,因为采用的是二进制协 ...
- centos安装lamp环境
通过yum安装,需要联网且为su账号 yum -y install httpd php mysql mysql-server php-mysql 设置开启启动mysql,httpd /sbin ...
- maven_项目的依赖、聚合、继承
一.假设目前有三个maven项目,分别是project.A.project.B.project.C 要求B依赖A.C依赖B但不依赖C 1.B添加对A的依赖 1 2 3 4 5 <depend ...
- http://blog.163.com/zhangmihuo_2007/blog/static/27011075201392685751232/
http://blog.163.com/zhangmihuo_2007/blog/static/27011075201392685751232/
- Kali linux 2016无法打开virtualbox问题解决
Kali Linux在安装完virtualbox后,打开虚拟机会出现:kernel driver not installed (rc=1908)错误提示,根据提示,大概可以看出是由于缺少内核模块引起的 ...
- Java、JSP获得当前日期的年、月、日
Java package com.ob; import java.text.ParseException; import java.text.SimpleDateFormat; import java ...
- Js之Screen对象
Window Screen window.screen 对象在编写时可以不使用 window 这个前缀. 属性: screen.availWidth - 可用的屏幕宽度,以像素计,减去界面特性,比如窗 ...
- 菱形java代码
public class boy { //菱形 public static void main(String[] args) { int m=4; for (int i=0;i<=m;i++){ ...
- Cocos2d-x 地图行走的实现3:A*算法
本文乃Siliphen原创,转载请注明出处:http://blog.csdn.net/stevenkylelee 上一节<Cocos2d-x 地图行走的实现2:SPFA算法>: http: ...
- 搭建hadoop、hdfs环境--ubuntu
最近在学习hadoop相关知识,就在本机上安装了hadoop,遇到了一些坑,也学到了不少.仅此记录我的安装过程,及可能遇到的问题.供参考.交流沟通见页末. 软件准备 > 虚拟机(VMware) ...