一、持久化

如果看到这一篇文章的朋友,都是有经验的开发人员,对持久化的概念就不用再做过多的解析了,经过前面的几篇文章,其实不难发现RabbitMQ 的持久化其实就只分交换器持久化、队列持久化和消息持久化这三个部分;

  • 定义持久化交换器,通过第三个参数 durable 开启/关闭持久化
channel.exchangeDeclare(exchangeName, exchangeType, durable)
  • 定义持久化队列,通过第二个参数 durable 开启/关闭持久化
channel.queueDeclare(queue, durable, exclusive, autoDelete, arguments);
  • 发送持久化消息,需要在消息属性中设置 deliveryMode=2 , 此属性在 BasicProperties 中,通过 basicPublish 方法的 props 参数传入。
channel.basicPublish(exchange, routingKey, props, body);
BasicProperties 对象可以从RabbitMQ 内置的 MessageProperties 类中获取
MessageProperties.PERSISTENT_TEXT_PLAIN 1
如果还需要设置其它属性,可以通过 AMQP.BasicProperties.Builder 去构建一个BasicProperties 对象;这个用法在前两篇文章中都有展示过
new AMQP.BasicProperties.Builder() .deliveryMode(2) .build()

二、持久化代码演示

/**
* 持久化示例
*/
public class Consumer {
private static Runnable receive = new Runnable() {
public void run() {
// 1、创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 2、设置连接属性
factory.setHost("192.168.0.1");
factory.setUsername("admin");
factory.setPassword("admin"); Connection connection = null;
Channel channel = null;
final String clientName = Thread.currentThread().getName();
String queueName = "routing_test_queue"; try {
// 3、从连接工厂获取连接
connection = factory.newConnection("消费者-" + clientName); // 4、从链接中创建通道
channel = connection.createChannel(); // 定义一个持久化的,direct类型交换器
channel.exchangeDeclare("routing_test", "direct", true);
// 定义一个持久化队列
channel.queueDeclare(queueName, true, false, false, null); // 将队列和交换器绑定,第三个参数 routingKey是关键,通过此路由键决定接收谁的消息
channel.queueBind(queueName, "routing_test", clientName); // 定义消息接收回调对象
DeliverCallback callback = new DeliverCallback() {
public void handle(String consumerTag, Delivery message) throws IOException {
System.out.println(clientName + " 收到消息:" + new String(message.getBody(), "UTF-8"));
}
};
// 监听队列
channel.basicConsume(queueName, true, callback, new CancelCallback() {
public void handle(String consumerTag) throws IOException {
}
}); System.out.println(clientName + " 开始接收消息");
System.in.read(); } catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
} finally {
// 8、关闭通道
if (channel != null && channel.isOpen()) {
try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
} // 9、关闭连接
if (connection != null && connection.isOpen()) {
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}; public static void main(String[] args) {
new Thread(receive, "c1").start();
new Thread(receive, "c2").start();
}
}
public class Producer {

    public static void main(String[] args) {
// 1、创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 2、设置连接属性
factory.setHost("192.168.0.1");
factory.setUsername("admin");
factory.setPassword("admin"); Connection connection = null;
Channel channel = null; try {
// 3、从连接工厂获取连接
connection = factory.newConnection("生产者"); // 4、从链接中创建通道
channel = connection.createChannel(); // 定义一个持久化的,direct类型交换器
channel.exchangeDeclare("routing_test", "direct", true); // 内存、磁盘预警时用
System.out.println("按回车继续");
System.in.read(); // 消息内容
String message = "Hello A";
// 发送持久化消息到routing_test交换器上
channel.basicPublish("routing_test", "c1", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
System.out.println("消息 " + message + " 已发送!"); // 消息内容
message = "Hello B";
// 发送持久化消息到routing_test交换器上
channel.basicPublish("routing_test", "c2", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
System.out.println("消息 " + message + " 已发送!"); // 内存、池畔预警时用
System.out.println("按回车结束");
System.in.read();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
} finally {
// 7、关闭通道
if (channel != null && channel.isOpen()) {
try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
} // 8、关闭连接
if (connection != null && connection.isOpen()) {
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}

二、内存告警

默认情况下 set_vm_memory_high_watermark 的值为 0.4,即内存阈值(临界值)为 0.4,表示当RabbitMQ 使用的内存超过 40%时,就会产生内存告警并阻塞所有生产者的连接。一旦告警被解除(有消息被消费或者从内存转储到磁盘等情况的发生), 一切都会恢复正常。在出现内存告警后,所有的客户端连接都会被阻塞。阻塞分为 blocking 和 blocked 两种。
  • blocking:表示没有发送消息的链接。
  • blocked:表示试图发送消息的链接。
如果出现了内存告警,并且机器还有可用内存,可以通过命令调整内存阈值,解除告警。
rabbitmqctl set_vm_memory_high_watermark 1 1
或者
rabbitmqctl set_vm_memory_high_watermark absolute 1GB
但这种方式只是临时调整,RabbitMQ 服务重启后,会还原。如果需要永久调整,可以修改配置文件。但修改配置文件需要重启RabbitMQ 服务才能生效。

修改配置文件: vim /etc/rabbitmq/rabbitmq.conf
vm_memory_high_watermark.relative = 0.4 1
或者
vm_memory_high_watermark.absolute = 1GB

三、模拟内存告警

1. 调整内存阈值,模拟出告警,在RabbitMQ 服务器上修改。 注意:修改之前,先在管理页面看一下当前使用了多少,调成比当前值小
rabbitmqctl set_vm_memory_high_watermark absolute 50MB

2.刷新管理页面(可能需要刷新多次),在 Overview -> Nodes 中可以看到Memory变成了红色,表示此节点内存告警了

3. 启动 Producer 和 Consumer(源码链接在最下面)
4. 查看管理界面的 Connections 页面,可以看到生产者和消费者的链接都处于 blocking 状态。
5. 在 Producer 的控制台按回车健,再观察管理界面的 Connections 页面,会发现生产者的状态成了 blocked 。
6. 此时虽然在 Producer 控制台看到了发送两条消息的信息,但 Consumer 并没有收到任何消息。并且在管理界面的 Queues 页面也看到不到队列的消息数量有变化。
7. 解除内存告警后,会发现 Consumer 收到了 Producer 发送的两条消息。

四、内存换页

  • 在Broker节点的使用内存即将达到内存阈值之前,它会尝试将队列中的消息存储到磁盘以释放内存空间,这个动作叫内存换页。
  • 持久化和非持久化的消息都会被转储到磁盘中,其中持久化的消息本身就在磁盘中有一份副本,此时会将持久化的消息从内存中清除掉。
  • 默认情况下,在内存到达内存阈值的 50%时会进行换页动作。也就是说,在默认的内存阈值为 0.4的情况下,当内存超过 0.4 x 0 .5=0.2 时会进行换页动作。
  • 通过修改配置文件,调整内存换页分页阈值(不能通过命令调整)。
# 此值大于1时,相当于禁用了换页功能。
vm_memory_high_watermark_paging_ratio = 0.75

五、磁盘告警

  • 当磁盘剩余空间低于磁盘的阈值时,RabbitMQ 同样会阻塞生产者,这样可以避免因非持久化的消息持续换页而耗尽磁盘空间导致服务崩溃
  • 默认情况下,磁盘阈值为50MB,表示当磁盘剩余空间低于50MB 时会阻塞生产者并停止内存中消息的换页动作
  • 这个阈值的设置可以减小,但不能完全消除因磁盘耗尽而导致崩渍的可能性。比如在两次磁盘空间检测期间内,磁盘空间从大于50MB被耗尽到0MB
  • 通过命令可以调整磁盘阈值,临时生效,重启恢复
# disk_limit 为固定大小,单位为MB、GB
rabbitmqctl set_disk_free_limit <disk_limit>
或者
# fraction 为相对比值,建议的取值为1.0~2.0之间
rabbitmqctl set_disk_free_limit mem_relative <fraction>

其实这些内容在官网上都有说明,有兴趣可以直接看官网:https://www.rabbitmq.com/alarms.html

git源码:https://gitee.com/TongHuaShuShuoWoDeJieJu/rabbit.git

RabbitMQ持久化机制、内存磁盘控制(四)的更多相关文章

  1. 消息中间件-RabbitMQ持久化机制、内存磁盘控制

    RabbitMQ持久化机制 RabbitMQ内存控制 RabbitMQ磁盘控制 RabbitMQ持久化机制 重启之后没有持久化的消息会丢失 package com.study.rabbitmq.a13 ...

  2. RabbitMQ的持久化机制

    一.问题的引出 RabbitMQ的一大特色是消息的可靠性,那么它是如何保证消息可靠性的呢?——消息持久化.为了保证RabbitMQ在退出,服务重启或者crash等异常情况下,也不会丢失消息,我们可以将 ...

  3. 【转】RabbitMQ基础——和——持久化机制

    这里原来有一句话,触犯啦天条,被阉割!!!! 首先不去讨论我的日志组件怎么样.因为有些日志需要走网络,有的又不需要走网路,也是有性能与业务场景的多般变化在其中,就把他抛开,我们只谈消息RabbitMQ ...

  4. Redis基础篇(四)持久化:内存快照(RDB)

    AOF好处是每次执行只需要记录操作命令,记录量不大.但在故障恢复时,需要逐一执行AOF的操作命令,如果日志很大,恢复就很慢. 今天学习另一种持久化方式:内存快照.内存快照,是Redis某一时刻的状态, ...

  5. 数据库并发事务控制四:postgresql数据库的锁机制二:表锁 &lt;转&gt;

    在博文<数据库并发事务控制四:postgresql数据库的锁机制 > http://blog.csdn.net/beiigang/article/details/43302947 中后面提 ...

  6. 分析RedisRDB和AOF两种持久化机制的工作原理及优劣势

    一.RDB和AOF两种持久化机制的介绍 RDB持久化机制,对redis中的数据执行周期性的持久化 AOF机制对每条写入命令作为日志,以append-only(追加)的模式写入一个日志文件中,在redi ...

  7. Redis提供的持久化机制(RDB和AOF)【转载】

    Redis提供的持久化机制    Redis是一种面向“key-value”类型数据的分布式NoSQL数据库系统,具有高性能.持久存储.适应高并发应用场景等优势.它虽然起步较晚,但发展却十分迅速. 近 ...

  8. Redis的持久化机制

    持久化机制 RDB:快照模式AOF :日志模式 多数据库– 一个redis服务器内部默认有16个数据,编号О0-15– 默认操作是编号为0的数据库– 可以在命令行用select选择数据库127.0.0 ...

  9. 《【面试突击】— Redis篇》-- Redis哨兵原理及持久化机制

    能坚持别人不能坚持的,才能拥有别人未曾拥有的.关注编程大道公众号,让我们一同坚持心中所想,一起成长!! <[面试突击]— Redis篇>-- Redis哨兵原理及持久化机制 在这个系列里, ...

  10. 源码级别理解 Redis 持久化机制

    文章首发于公众号"蘑菇睡不着",欢迎来访~ 前言 大家都知道 Redis 是一个内存数据库,数据都存储在内存中,这也是 Redis 非常快的原因之一.虽然速度提上来了,但是如果数据 ...

随机推荐

  1. SQL Server 获取最后一天(指定时间的月最后一天日期)

    /* author OceanHo @ 2015-10-23 10:14:21 获取指定时间字符串指定日期的月最后一天日期 */ IF OBJECT_ID('get_LastDayDate') IS ...

  2. EFW框架源代码版本升级记录说明

    回<[开源]EFW框架系列文章索引>        EFW框架源代码下载V1.3:http://pan.baidu.com/s/1c0dADO0 EFW框架实例源代码下载:http://p ...

  3. 深入学习APC

    一.前言 在NT中,有两种类型的APCs:用户模式和内核模式.用户APCs运行在用户模式下目标线程当前上下文中,并且需要从目标线程得到许可来运行.特别是,用户模式的APCs需要目标线程处在alerta ...

  4. Codeforces 509C Sums of Digits 贪心

    这道题目有人用DFS.有人用DP 我觉得还是最简单的贪心解决也是不错的选择. Ok,不废话了,这道题目的意思就是 原先存在一个严格递增的Arrary_A,然后Array_A[i] 的每位之和为Arra ...

  5. 《NLTK基础教程》译者序

    购买<NLTK基础教程> 说来也凑巧,在我签下这本书的翻译合同时,这个世界好像还不知道AlphaGo的存在.而在我完成这本书的翻译之时,Master已经对人类顶级高手连胜60局了.至少从媒 ...

  6. URLConnection调用接口

    写在前面: 项目是java web,jdk1.4,weblogic 7;对方.net系统,用wcf开发的接口.对方提供接口url地址,以及说明用post方式去调用,无需传递参数,直接返回json ar ...

  7. 一篇深入剖析PCA的好文

    主成分分析(Principal components analysis)-最大方差解释 在这一篇之前的内容是<Factor Analysis>,由于非常理论,打算学完整个课程后再写.在写这 ...

  8. Android为TV端助力 内存溢出与内存泄露

    内存溢出就是软件运行需要的内存,超出了java虚拟机给他分配的可用的最大内存 内存泄露就是在缓存图片文字等等的时候,没有关闭流所导致的内存泄露

  9. gulp 技巧

    install npm install --save-dev jshint gulp-jshint 压缩js npm install --save-dev gulp-minify-css xxCSS ...

  10. springboot报错Whitelabel Error Page

    第一次使用springboot没有问题.隔了两天继续看.一直报错Whitelabel Error Page. 重新搭建试了任何方法都错了. 报的就是一个404错误,犯了一个习惯性错误,一般都是loca ...