https://blog.csdn.net/jiangyu1013/article/details/81668671

消息中间件的作用

1. 应用解耦

2. 异步处理

比如用户注册场景,注册主流程完成以后,需要调用邮件系统发送邮件通知用户注册成功,可能还需要调用其他系统。这是串行的,如果一个系统依赖很多系统,那么这个主流程会比较长,耦合度高,整个系统维护成本也会越来越高。那么我们就可以使用消息中间件来进行解耦,通过发布订阅模式,完成用户注册之后,向中间件发送消息,这样就可以马上给用户返回,至于后续工作其他系统向中间件订阅这个消息并完成后续工作就好。这也就是一个解耦和异步处理过程。

中间件有下面两种模型

点对点模型

发布订阅模型

消息中间件的解耦和异步是两个最重要的需求点,除此之外还应该做一些其他事情比如:

  • 保证一致性,产生消息和发送消息是一致的,也就是如果操作成功,那么消息一定发送成功;如果业务操作没有成功那么就不能发送消息

  • 具备一定消息堆积能力,可以为后端挡住一些数据流保证后端不会被压垮

  • 具备消息实时性,保证消息的低延迟

  • 具备消息的可靠性,主要是可靠地存储和投递

消息系统里面应该有这样一个假设:消息一定会堆积。下游系统通常有很多,里面有重要的也不重要的,面对突发流量高峰,一定会有后端系统处理不过来的情况,从而造成消息堆积;当然还有一种情况是后端系统出现问题导致暂时无法消费消息从而造成消息中间件的消息堆积。所以中间件要起到蓄水池的作用。

数据一致性,这个很容易理解,因为是分布式异步的,但是又不能容忍数据出错,所以在性能和数据一致性方面就需要有所妥协,通常在互联网行业中采取最终一致性。需要注意的是最终一致性和弱一致性不同,弱一致性表示允许在异常情况下数据可能不一致,而最终一致性则是在某段时间内允许不一致但是最终会一致。

RocketMQ介绍

基于发布订阅的队列模型消息中间件,它只有发布和订阅的消息方式,消息类型只支持Message,消息可以持久化。服务端使用JAVA编写,客户端支持JAVA、C++。阿里2012年开源,之后作为Apache基金会的一个项目进行维护。是一款低延迟、高可靠、可伸缩、易于使用的中间件。在Github上有相关介绍。

特性

消息可靠性:

  • 生产者的可靠性保证:生产者发送消息后返回SendResult,如果isSuccess返回true,则表示消息已经确认发送到服务器并被服务器接收保存。整个发送过程是一个同步过程。

  • 服务器的可靠性:消息生产者发送的消息,RocketMQ服务收到后在做必要的校验和检查之后马上保存到磁盘,写入成功后返回给生产者。因此可以确认每条发送结果为成功的消息都会被消息服务器写入磁盘。

  • 消费者的可靠性:消费者是一条一条顺序消费的,之后在成功消费一条后才会消费吓一跳。如果在消费某一条消息时失败则会重试消费这条消息,默认为5次,如果超过最大次数仍然无法消费,则将消息保存到本地,后台线程继续重试消费,主线程则会继续往后走,消费队列后面的消息。

消息持久性:RocketMQ收到消息后,会将消息持久化到文件,并利用Linux文件系统内存来提高性能

消息实时性:RocketMQ采取长轮询+PULL模式保证消息的持久性

消息重复:对于消费者来说,通过拉取方式将消息保存到本地,消费完再向服务器返回,在网络异常的情况下可能会出现重复。

消息过滤:

  • 服务器端过滤:减少不必要消息传输,但是会增加服务器负担

  • 客户端过滤:根据客户端需求来定制消息,缺点是客户端会收到对它来说没用的消息,如果客户端无法承载这么多消息就会导致故障

消息堆积:支持10亿级别的消息堆积,不会因为消息堆积影响性能

术语说明

角色 说明
Producer

Producer:

生产者,用于将消息发送到RocketMQ,生产者本身既可以是生成消息,也可以对外提供接口,由外部来调用接口,再由生产者将受到的消息发送给MQ。

Consumer

Consumer:

消费者,从Broker拉取消息进行消费。从应用角度来说有两类消费者:

  • PullConsumer:主动拉取消息,一旦拉取到消息,应用的消费进程进行初始化

  • PushConsumer:封装消息拉取,消费进程和内部

Broker

Broker:

RocketMQ服务器,也是整个服务的核心,它实现了消息的存储、拉取功能。它通常以集群方式启动,并可配置主从,每个broker上提供对指定topic的服务。理解了broker的原理以及它和其他服务交互的过程,也就命令消息中间件的原理,其实都大同小异。它具有2中角色

  • Master:能写、能读

  • Slave:只能读,不能写

Topic

Topic:

消息的主题,由用于定义并在服务端配置,消费者可以按照主题进行订阅,也就是消息分类,通常一个应用一个Topic

Message

Message:

在生产者、消费者、服务器之间传递的消息,一个message必须属于一个Topic

Namesrv

Namesrv:

一个无状态的名称服务,可以集群部署,每一个broker启动的时候都会向名称服务器注册,主要是接收broker的注册(broker每十秒就会向所有名称服务器发送心跳请求,同时注册topic信息到名称服务器),接收客户端的路由请求并返回路由信息,你可以理解为服务自动发现,就是相当于zookeeper在dubbo框架中的作用。

  • 生产者发消息时会根据Topic向名称服务器获取到指定broker的路由信息

  • 消费者根据Topic到名称服务器获取该Topic到broker的路由信息

Group

Group:

组名,一类消费者或者生产者的集合名称。

  • 消费者组,消费相同Topic内容的消费者,可以并行消费Topic中Partition中的消息。

  • 生产者组,生产相同Topic内容的生产者

Offset

Offset:

偏移量,消费者拉取消息时需要知道上一次消费到了什么位置,这一次从哪里开始。

Partition

Partition:

分区,Topic物理上的分组,一个Topic可以分为多个分区,每个分区是一个有序的队列。分区中的每条消息都会给分配一个有序的ID,也就是偏移量。

分区的目的:

  • 减缓日志文件占用磁盘空间,消息需要持久化到文件,分区可以将消息粒度细分,每个分区可以存放在不同的磁盘空间中

  • 不同消费者同时消费分区中的数据,一个分区仅由一个消费者组中的消费者消费,1个消费者可以同时消费多个分区。

  • 可以实现负载均衡,如果同一个Topic的消息都放在同一个Broker上,那消费的时候同一个Topic的消费者都去同一个Broker上消费,这样会带来压力,如果通过分区放在不同Broker上,这样就可以到不同的Broker上消费,当然同一个ID的消息只能存在一个分区上。你可以想象A这个topic的消息有10个那么每个消息有1个ID,如果分布10个消息分布在不同的分区上,比如3个,那就形成3-3-4,消费者去消费的时候消费10条消息时通过3个分区完成这样就提高了吞吐量。

Topic是消息的逻辑队列,分区是物理队列。可以通过配置文件来设置topic的默认分区数量,也可以在新建立topic的时候指定。建议分区数量和消费者数量一致,因为消费者数量多,多出来的不会去消费消息的,因为一个队列只能被一个消费者消费。如果消费者数量少则消费者就会比较繁忙。

Tag

Tag:

用于对消息进行过滤,理解文件message的子主题,同一业务不同目的的message可以用相同的topic但是可以用不同的tag来区分,在队列中tag在消息的数据结构中被 转换为一个8byte的hashcode,这样节省空间。过滤分两步:

  1. 在Broker端进行Message Tag对比,先遍历Consume Queue,如果存储的Message tag与订阅的tag不符合就跳过,符合则传输给Consumer,在队列中继续比对hashcode

  2. Consumer收到消息后,对比真实的Message Tag字符串,而不是Hashcode,这样避免HASH冲突。

key

key:

消息的KEY字段是为了唯一表示消息的,方便查问题,不是说必须设置,只是说设置为了方便开发和运维定位问题,这个KEY可以是订单ID等。

原理

消费者:

  • Push Consumer,应用向Consumer对象注册一个Listener接口,一但收到消息,Consumer对象立刻回调Listener接口方法

  • Pull Consumer,应用主动调用Consumer的拉取消息方法,从Broker拉消息

消费模式:

  • 广播模式:一条消息被多个消费者消费,即使它们属于同一个消费者组,消息会被组中的每个成员消费一次。

  • 集群模式:消息会被平均分配到消费者组中进行消费。

消息模式:

  • 顺序消息:消息的消费顺序要和发送的顺序一致,一类消息为满足顺序性,生产者必须单线程顺序发送且发送到同一个队列,这样消费者就可以按照生产者发送的顺序去消费。

  • 普通顺序消息:正常情况下可以保证完全顺序消费,但是一旦发生异常,比如broker重启,由于队列总数发生变化,会产生短暂的消息顺序不一致。如果业务可以容忍这种异常情况则可以使用。

  • 严格顺序消息:无论任何情况下都必须保证消息的顺序,但是这就牺牲分布式的高可用功能,也就是Broker集群中只要有一台不可用,那么整个集群就不可用。如果集群部署模式为同步双写模式,那么可以通过备机自动切换来避免,不过仍然存在短暂间隙的服务不可用。

消息的存储

生产者上产消息,根据Topic选择其对应的某一个分区,然后发送到这个分区所在的Brocker上,消费者根据订阅的Topic选择去Topic的某一个分区拉取消息。

RocketMQ收到消息后会把消息保存在本地文件中,每个文件最大上线1G,如果写入消息时超过当前文件大小,会建立一个新文件,文件名为起始字节大小。消息写入是顺序的,读取是随机的,因为数据持久化当前写入文件只有一个,所以可以是顺序写入,但是读取的时候因为有多个逻辑队列,每个逻辑队列由多个分区所以就出现多个逻辑读队列,这样读取的时候就是随机的。如何提高读取性能呢?就是尽可能让读命中系统pageCache,减少磁盘IO次数。RcoketMQ的持久化是先写入pageCache页面高速缓存,然后刷盘,这样保证内存与磁盘都有一份相同的数据,访问时直接从内存读取。另外一方面RocketMQ在文件读写方面做了优化,采用内存映射方式完成,也就是把磁盘文件映射到内存地址空间,避免了内核空间到用户空间的复制。

支持的部署架构

集群方式 消息可靠性(Master宕机) 服务可用性 特点 其他说明
一组主主 同步刷盘消息一条都不会丢失 整体可用,未被消费的消息无法取得,影响实时性 结构简单、扩容方便、性能最高 适合消息可靠性高,实时性低的需求
一组主从 异步有毫秒级丢失,同步双写不丢失 主备不能切换,且备机只能读不能写,会造成服务整体不可用   不推荐使用

多组主从

(异步复制)

故障是会丢失消息 整体可用,实时性影响是毫秒级别,该组服务只能读不能写 结构复杂、扩容方便,性能很高。 适合消息可靠性中等,实时性要求中等的场景
多组主从(同步双写) 不丢消息 整体可用,不影响实时性。该组服务只能读不能写。不能自动切换。 结构复杂,扩容方便,性能比异步低一点,所以实时性也并不比异步方式高太多。 适合消息可靠性高,实时性中等,性能要求不高的场景。

推荐的架构如下:

高要求则使用多组主从同步双写,低要求使用主主方案。

应用场景

  • RocketMQ应用到Cache,可以用在大量机器同步信息的场景

  • 业务削峰,在大量交易涌入时,后端可能无法及时处理,所以MQ的大量消息堆积功能就可以发挥作用。

  • 日志收集,RocketMQ的设计模型从Kafka衍生而来,kafka在日志收集系统中充当缓冲功能,随意RocketMQ也适用此场景

  • 对可靠性要求很高的场景,尤其是电商里面的订单扣款,因为扣款要涉及到很多第三方支付。

优缺点

优点

  • 顺序性,它支持顺序性,可以做到局部有序,在单线程内使用该生产者发送的消息按照发送的顺序到达服务器并存储,并按照相同顺序被消费,但前提是这些消息发往同一服务器的同一个分区

  • 实时性:采取长轮询+PULL消费消息,你可以自己决定如何在响应性和吞吐量之间做平衡,配合合理的参数设置来获得更高的响应时间,实时性不低于PUSH方式

  • 提供了丰富的拉取模式

  • 支持10亿级别的消息堆积,不会因为堆积导致性能下降

  • 高效的订阅者水平扩展机制

缺点

  • 消息重复问题,它不能保证不重复,只能保证正常情况下不重复

  • 不支持分布式事务

  • 消息过滤功能扩展比较单一

消息顺序

消息顺序是只可以按照消息发送的顺序进行消费。一个订单产生3条消息,订单创建、付款、订单完成。消费时只有按照顺序消费才有意义,不可能先消费付款消息再消费订单创建消息,这样就乱了。另外,多笔订单又可以并行消费。如何保证呢?

一个订单产生的消息只能发送给同一个MQ服务器中的同一个分区,并且按顺序发送,这样才能在理论上保证消费者消费时是按照顺序消费的,因为一个分区就是一个逻辑队列。生产者虽然按顺序发送,但是第一条消息到达MQ的耗时比第二条多,那么第二条则会被先消费,这样就又导致消费时不是顺序的。那么如何解决呢?可以采取只有第一条被消费者消费成功后再发送第二条。看下图:

但是如果第一条被发送到消费者后,消费者没有响应(消费者发送响应但是因为网络问题丢失或者消费者就没有收到消息),那么在这种情况下你是继续发送第二条还是重发第一条呢?如果是严格消息顺序,那肯定是重发第一条,但是如果是消费者消费后的响应丢失了,那么重发第一条就会造成重复消费。

从另外一方面看,如果不考虑网络异常,那么要实现严格消息,就必须采取一种一对一关系,生产者A的消息对应到MQ服务器1的X队列,消费者A消费X队列。这样串行结构就会造成系统吞吐量太低;更多异常需要处理比如消费端出现问题,那么整个消息队列就会出现阻塞。RocketMQ通过轮询所有队列来确定消息发送到哪一个队列(负载均衡),比如相同订单号的消息会被先后发送到统一队列中。所以RocketMQ

消息重复

造成消费重复的根本原因是网络不可达,只要有网络,这种网络的不稳定因素就存在你无法规避。所以解决这个问题的最好办法就是绕过它。这就变成了,消费端收到两个一样的消息后如何处理,而不是从发送端解决不发送2个一样的消息。对于消费端的要求就是:

  • 消费端处理业务消息要保持幂等性,也就是同一个东西执行多次会得到相同结果

  • 保证每条消息都有唯一编号切保证消息处理成功与去重表的日志同时出现

第一条好理解,第二条就是利用一张日志表来记录已经处理成功的消息ID,如果新到的消息ID已经存在表中那么就不再处理这个消息。第一条是在消费端实现的,不属于消息系统的功能;第二条可以是消息系统实现也可以是业务端实现,处于对消息系统的吞吐量和高可用考虑最好还是由消费端去处理。所以这也就是RocketMQ不解决消息重复的原因。