基础配置参考
https://blog.csdn.net/llll234/article/details/80966952

查看了基础配置那么会遇到一下几个问题:

1.实际应用中可能会订阅多个通道,而一下这种写法不太通用
container.addMessageListener(listenerAdapter(new RedisPmpSub()),new PatternTopic("pmp"));

2.使用过程中使用new RedisPmpSub()配置消息接收对象会有问题。
如果RedisPmpSub既是消息接收类,也是消息处理类。那么如果此时需要注入Bean,会成功吗?

3.考虑后期的扩展性是否能尽量不改变原有代码的基础上,进行扩展

发布者

枚举定义

考虑到可维护性,采用枚举的方式定义管道RedisChannelEnums

 public enum RedisChannelEnums {

     /**redis频道code定义 需要与发布者一致*/
     LIVE_INFO_CHANGE("LIVE_INFO_CHANGE","直播信息改变"),

     ;
     /** 枚举定义+描述 */
     private String code;
     private String description;

     RedisChannelEnums(String code, String description) {
         this.code = code;
         this.description = description;
     }

     /** 根据code获取对应的枚举对象 */
     public static RedisChannelEnums getEnum(String code) {
         RedisChannelEnums[] values = RedisChannelEnums.values();
         if (null != code && values.length > 0) {
             for (RedisChannelEnums value : values) {
                 if (value.code == code) {
                     return value;
                 }
             }
         }
         return null;
     }

     /** 该code在枚举列表code属性是否存在 */
     public static boolean containsCode(String code) {
         RedisChannelEnums anEnum = getEnum(code);
         return anEnum != null;
     }

     /** 判断code与枚举中的code是否相同 */
     public static boolean equals(String code, RedisChannelEnums calendarSourceEnum) {
         return calendarSourceEnum.code == code;
     }

     public String getCode() {
         return code;
     }

     public String getDescription() {
         return description;
     }

 }

消息模板

为了兼容不同的业务场景,需要定义消息模板对象BasePubMessage
其中ToString方法的作用是将对象转成Json字符

 @Data
 public abstract class BasePubMessage {

     /**发布订阅频道名称*/
     protected String channel;

     protected String extra;

     @Override
     public String toString() {
         return GsonUtil.toJson(this, BasePubMessage.class);
     }

 }

消息对象LiveChangeMessage
其中ToString方法的作用是将对象转成Json字符

 @Data
 public class LiveChangeMessage extends BasePubMessage {

     /**直播Ids*/
     private String liveIds;

     @Override
     public String toString() {
         return GsonUtil.toJson(this, LiveChangeMessage.class);
     }

 }

发布者服务

public interface RedisPub {

    /**
     * 集成redis实现消息发布订阅模式-双通道
     * @param redisChannelEnums 枚举定义
     * @param basePubMessage 消息
     */
    void sendMessage(RedisChannelEnums redisChannelEnums, BasePubMessage basePubMessage);

}
 @Service
 public class RedisPubImpl implements RedisPub {

     @Resource
     private StringRedisTemplate stringRedisTemplate;

     @Override
     public void sendMessage(RedisChannelEnums redisChannelEnums, BasePubMessage basePubMessage) {

         if(redisChannelEnums ==null || basePubMessage ==null){
             return;
         }

         basePubMessage.setChannel(redisChannelEnums.getCode());
         stringRedisTemplate.convertAndSend(redisChannelEnums.getCode(), basePubMessage.toString());
         System.out.println("发布成功!");
     }
 }

订阅者

注解配置

RedisConfig作为订阅者的配置类,主要作用是:Redis消息监听器容器、配置消息接收处理类
同时新加入的功能解决了我们上面提出的几个问题

 @Service
 @Configuration
 @EnableCaching
 public class RedisConfig {

     /**
      * 存放策略实例
      * classInstanceMap : key-beanName value-对应的策略实现
      */
     private ConcurrentHashMap<String, BaseSub> classInstanceMap = new ConcurrentHashMap<>(20);

     /**
      * 注入所有实现了Strategy接口的Bean
      *
      * @param strategyMap
      *         策略集合
      */
     @Autowired
     public RedisConfig(Map<String, BaseSub> strategyMap) {
         this.classInstanceMap.clear();
         strategyMap.forEach((k, v) ->
                 this.classInstanceMap.put(k.toLowerCase(), v)
         );
     }

     /**
      * Redis消息监听器容器
      *
      * @param connectionFactory
      *
      * @return
      */
     @Bean
     RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory) {

         RedisMessageListenerContainer container = new RedisMessageListenerContainer();
         container.setConnectionFactory(connectionFactory);

         RedisChannelEnums[] redisChannelEnums = RedisChannelEnums.values();
         if (redisChannelEnums.length > 0) {
             for (RedisChannelEnums redisChannelEnum : redisChannelEnums) {
                 if (redisChannelEnum == null || StringUtils.isEmpty(redisChannelEnum.getCode()) || redisChannelEnum.getClassName()==null) {
                     continue;
                 }
                 //订阅了一个叫pmp和channel 的通道,多通道
                 //一个订阅者接收一个频道信息,新增订阅者需要新增RedisChannelEnums定义+BaseSub的子类

                 String toLowerCase = redisChannelEnum.getClassName().getSimpleName().toLowerCase();
                 BaseSub baseSub = classInstanceMap.get(toLowerCase);
                 container.addMessageListener(listenerAdapter(baseSub), new PatternTopic(redisChannelEnum.getCode()));
             }
         }
         return container;
     }

     /**
      * 配置消息接收处理类
      *
      * @param baseSub
      *         自定义消息接收类
      *
      * @return MessageListenerAdapter
      */
     @Bean()
     @Scope("prototype")
     MessageListenerAdapter listenerAdapter(BaseSub baseSub) {
         //这个地方 是给messageListenerAdapter 传入一个消息接受的处理器,利用反射的方法调用“receiveMessage”
         //也有好几个重载方法,这边默认调用处理器的方法 叫handleMessage 可以自己到源码里面看
         //注意2个通道调用的方法都要为receiveMessage
         return new MessageListenerAdapter(baseSub, "receiveMessage");
     }

 }

  

@Autowired
public RedisConfig(Map<String, BaseSub> strategyMap) 方法的作用是将所有的配置消息接收处理类注入进来,那么消息接收处理类里面的注解对象也会注入进来。
解决了我们提出的第二个问题

而String toLowerCase = redisChannelEnum.getClassName().getSimpleName().toLowerCase();
BaseSub baseSub = classInstanceMap.get(toLowerCase);
container.addMessageListener(listenerAdapter(baseSub), new PatternTopic(redisChannelEnum.getCode()));
是根据不同的管道对应不同的订阅者,也就是一个订阅者对应一个管道。方便根据不同的业务场景进行处理。
使用这种方式主需要配置redisChannelEnum枚举即可,解决了我们提出的第一个问题。
这样一来,订阅者就变得比较通用了

  

枚举

RedisChannelEnums作用:定义不同管道对应的订阅者,后期增加一个管道类型只需要增加一个枚举即可

 public enum RedisChannelEnums {

     /**redis频道名称定义 需要与发布者一致*/
     LIVE_INFO_CHANGE("LIVE_INFO_CHANGE", LiveChangeSub.class, "直播信息改变"),

     ;
     /** 枚举定义+描述 */
     private String code;
     private Class<? extends BaseSub> className;
     private String description;

     RedisChannelEnums(String code, Class<? extends BaseSub> className, String description) {
         this.code = code;
         this.className=className;
         this.description = description;
     }

     /** 根据code获取对应的枚举对象 */
     public static RedisChannelEnums getEnum(String code) {
         RedisChannelEnums[] values = RedisChannelEnums.values();
         if (null != code && values.length > 0) {
             for (RedisChannelEnums value : values) {
                 if (value.code == code) {
                     return value;
                 }
             }
         }
         return null;
     }

     /** 该code在枚举列表code属性是否存在 */
     public static boolean containsCode(String code) {
         RedisChannelEnums anEnum = getEnum(code);
         return anEnum != null;
     }

     /** 判断code与枚举中的code是否相同 */
     public static boolean equals(String code, RedisChannelEnums calendarSourceEnum) {
         return calendarSourceEnum.code == code;
     }

     public String getCode() {
         return code;
     }

     public String getDescription() {
         return description;
     }

     public Class<? extends BaseSub> getClassName() {
         return className;
     }
 }

消息模板

BaseSubMessage定义通用的字段,与json字符的通用转换

 @Data
 abstract class BaseSubMessage {

     /** 发布订阅频道名称 */
     private String channel;

     private String extra;

     private String json;

     BaseSubMessage(String json) {
         if(StringUtils.isEmpty(json)){
             return;
         }

         this.json = json;
         Map map = new Gson().fromJson(this.json, Map.class);
         BeanHelper.populate(this, map);
     }

 }

LiveChangeMessage定义当前业务场景的字段

 @Data
 @ToString(callSuper = true)
 public class LiveChangeMessage extends BaseSubMessage {

     /** 直播Ids */
     private String liveIds;

     public LiveChangeMessage(String json) {
         super(json);
     }

 }

订阅者服务

BaseSub定义接收消息的通用方法

 public interface BaseSub {

     /**
      * 接收消息
      * @param jsonMessage  json字符
      */
     void receiveMessage(String jsonMessage);
 }

LiveChangeSub具体消息接收对象

 @Component
 public class LiveChangeSub implements BaseSub {

     /**只是定义的注解测试,可以换成自己的*/
     @Autowired
     private CategoryMapper categoryMapper;

     @Override
     public void receiveMessage(String jsonMessage) {

         System.out.println("项目aries-server.....................");
         //注意通道调用的方法名要和RedisConfig2的listenerAdapter的MessageListenerAdapter参数2相同
         System.out.println("这是 LiveChangeSub" + "-----" + jsonMessage);

         LiveChangeMessage liveChangeMessage = new LiveChangeMessage(jsonMessage);
         System.out.println(liveChangeMessage);

         Category category = categoryMapper.get(1L);
         System.out.println("category:" + category);

     }
 }

springboot集成redis实现消息发布订阅模式-双通道(跨多服务器)的更多相关文章

  1. 15天玩转redis —— 第九篇 发布/订阅模式

    本系列已经过半了,这一篇我们来看看redis好玩的发布订阅模式,其实在很多的MQ产品中都存在这样的一个模式,我们常听到的一个例子 就是邮件订阅的场景,什么意思呢,也就是说100个人订阅了你的博客,如果 ...

  2. redis实现消息发布/订阅

    redis实现简单的消息发布/订阅模式. 消息订阅者: package org.common.component; import org.slf4j.Logger; import org.slf4j. ...

  3. redis 的消息发布订阅

    redis支持pub/sub功能(可以用于消息服务器),这个功能类似mq,这里做一个简单的介绍 Pub/Sub Pub/Sub 从字面上理解就是发布(Publish)与订阅(Subscribe),在R ...

  4. Spring Data Redis实现消息队列——发布/订阅模式

    一般来说,消息队列有两种场景,一种是发布者订阅者模式,一种是生产者消费者模式.利用redis这两种场景的消息队列都能够实现. 定义:生产者消费者模式:生产者生产消息放到队列里,多个消费者同时监听队列, ...

  5. redis消息通知(任务队列/优先级队列/发布订阅模式)

    1.任务队列 对于发送邮件或者是复杂计算这样的操作,常常需要比较长的时间,为了不影响web应用的正常使用,避免页面显示被阻塞,常常会将此类任务存入任务队列交由专门的进程去处理. 队列最基础的方法如下: ...

  6. redis实现消息队列&amp;发布/订阅模式使用

    在项目中用到了redis作为缓存,再学习了ActiveMq之后想着用redis实现简单的消息队列,下面做记录.   Redis的列表类型键可以用来实现队列,并且支持阻塞式读取,可以很容易的实现一个高性 ...

  7. Redis消息通知(任务队列和发布订阅模式)

    Redis学习笔记(十)消息通知(任务队列和发布订阅模式) 1. 任务队列 1.1 任务队列的特点 任务队列:顾名思义,就是“传递消息的队列”.与任务队列进行交互的实体有两类,一类是生产者(produ ...

  8. redis的发布订阅模式

    概要 redis的每个server实例都维护着一个保存服务器状态的redisServer结构 struct redisServer {     /* Pubsub */     // 字典,键为频道, ...

  9. redis发布/订阅模式

    其实在很多的MQ产品中都存在这样的一个模式,我们常听到的一个例子 就是邮件订阅的场景,什么意思呢,也就是说100个人订阅了你的博客,如果博主发表了文章,那么100个人就会同时收到通知邮件,除了这个 场 ...

  10. Redis - 发布/订阅模式

    Redis 提供了一组命令可以让开发者实现 “发布/订阅” 模式.“发布/订阅” 可以实现进程间的消息传递,其原理是这样的: “发布/订阅” 模式中包含两种角色,分别是发布者和订阅者.订阅者可以订阅一 ...

随机推荐

  1. hession

    Hessian是一个轻量级的remoting onhttp工具,使用简单的方法提供了RMI的功能. 相比WebService,Hessian更简单.快捷.采用的是二进制RPC协议,因为采用的是二进制协 ...

  2. centos安装lamp环境

    通过yum安装,需要联网且为su账号 yum -y install httpd php mysql mysql-server php-mysql 设置开启启动mysql,httpd     /sbin ...

  3. maven_项目的依赖、聚合、继承

      一.假设目前有三个maven项目,分别是project.A.project.B.project.C 要求B依赖A.C依赖B但不依赖C 1.B添加对A的依赖 1 2 3 4 5 <depend ...

  4. http://blog.163.com/zhangmihuo_2007/blog/static/27011075201392685751232/

    http://blog.163.com/zhangmihuo_2007/blog/static/27011075201392685751232/

  5. Kali linux 2016无法打开virtualbox问题解决

    Kali Linux在安装完virtualbox后,打开虚拟机会出现:kernel driver not installed (rc=1908)错误提示,根据提示,大概可以看出是由于缺少内核模块引起的 ...

  6. Java、JSP获得当前日期的年、月、日

    Java package com.ob; import java.text.ParseException; import java.text.SimpleDateFormat; import java ...

  7. Js之Screen对象

    Window Screen window.screen 对象在编写时可以不使用 window 这个前缀. 属性: screen.availWidth - 可用的屏幕宽度,以像素计,减去界面特性,比如窗 ...

  8. 菱形java代码

    public class boy { //菱形 public static void main(String[] args) { int m=4; for (int i=0;i<=m;i++){ ...

  9. Cocos2d-x 地图行走的实现3:A*算法

    本文乃Siliphen原创,转载请注明出处:http://blog.csdn.net/stevenkylelee 上一节<Cocos2d-x 地图行走的实现2:SPFA算法>: http: ...

  10. 搭建hadoop、hdfs环境--ubuntu

    最近在学习hadoop相关知识,就在本机上安装了hadoop,遇到了一些坑,也学到了不少.仅此记录我的安装过程,及可能遇到的问题.供参考.交流沟通见页末. 软件准备 >  虚拟机(VMware) ...