1:RabbitMQ是个啥?(专业术语参考自网络)

 RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。

  RabbitMQ服务器是用Erlang语言编写的,Erlang是专门为高并发而生的语言,而集群和故障转移是构建在开发电信平台框架上的。所有主要的编程语言均有与代理接口通讯的客户端库

2:使用RabbitMQ有啥好处?

RabbitMQ是使用Erlang语言开发的开源消息队列系统,基于AMQP协议来实现。
AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。
AMQP协议更多用在企业系统内,对数据一致性、稳定性和可靠性要求很高的场景,对性能和吞吐量的要求还在其次。
RabbitMQ的可靠性是非常好的,数据能够保证百分之百的不丢失。可以使用镜像队列,它的稳定性非常好。所以说在我们互联网的金融行业。

对数据的稳定性和可靠性要求都非常高的情况下,我们都会选择RabbitMQ。当然没有kafka性能好,但是要比AvtiveMQ性能要好很多。也可以自己做一些性能的优化。

RabbitMQ可以构建异地双活架构,包括每一个节点存储方式可以采用磁盘或者内存的方式,

3:RabbitMq的安装以及环境搭建等:

网络上有很多关于怎么搭建配置RabbitMq服务环境的详细文章,也比较简单,这里不再说明,本人是Docker上面的pull RabbitMq 镜像来安装的!

3.1:运行容器的命令如下:

docker run -d --hostname Log --restart=always --name rabbitmq -p 5672:5672 -p 15672:15672 -e RABBITMQ_DEFAULT_USER=log_user -e RABBITMQ_DEFAULT_PASS=331QQFEG123 rabbitmq:3-management

4:RabbitMq的使用场景主要有哪些,啥时候用或者不用?

4.1什么时候使用MQ?

1)数据驱动的任务依赖

2)上游不关心多下游执行结果

3)异步返回执行时间长

4.2什么时候不使用MQ?

需要实时关注执行结果 (eg:同步调用)

5:具体C#怎么使用RabbitMq?下面直接上code和测试截图了(Demo环境是.NetCore3.1控制台+Docker上的RabbitMQ容器来进行的)

6:sample模式,就是简单地队列模式,一进一出的效果差不多,测试截图:

Code:

  1 //简单生产端 ui调用者
2
3 using System;
4 namespace RabbitMqPublishDemo
5 {
6 using MyRabbitMqService;
7 using System.Runtime.CompilerServices;
8
9 class Program
10 {
11 static void Main(string[] args)
12 {
13 //就是简单的队列,生产者
14 Console.WriteLine("====RabbitMqPublishDemo====");
15 for (int i = 0; i < 500; i++)
16 {
17 ZrfRabbitMqHelper.PublishSampleMsg("smapleMsg", $"nihaifengge:{i}");
18 }
19 Console.WriteLine("生成完毕!");
20 Console.ReadLine();
21 }
22 }
23 }
24
25 /// <summary>
26 /// 简单生产者 逻辑
27 /// </summary>
28 /// <param name="queueName"></param>
29 /// <param name="msg"></param>
30 public static void PublishSampleMsg(string queueName, string msg)
31 {
32
33 using (IConnection conn = connectionFactory.CreateConnection())
34 {
35 using (IModel channel = conn.CreateModel())
36 {
37 channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: null);
38 var msgBody = Encoding.UTF8.GetBytes(msg);
39 channel.BasicPublish(exchange: "", routingKey: queueName, basicProperties: null, body: msgBody);
40 }
41 }
42 }
43
44
45 //简单消费端
46 using System;
47
48 namespace RabbitMqConsumerDemo
49 {
50 using MyRabbitMqService;
51 using System.Runtime.InteropServices;
52
53 class Program
54 {
55 static void Main(string[] args)
56 {
57 Console.WriteLine("====RabbitMqConsumerDemo====");
58 ZrfRabbitMqHelper.ConsumeSampleMsg("smapleMsg", isBasicNack: true, handleMsgStr: handleMsgStr =>
59 {
60 Console.WriteLine($"订阅到消息:{DateTime.Now}:{handleMsgStr}");
61 });
62 Console.ReadLine();
63 }
64 }
65 }
66
67 #region 简单生产者后端逻辑
68 /// <summary>
69 /// 简单消费者
70 /// </summary>
71 /// <param name="queueName">队列名称</param>
72 /// <param name="isBasicNack">失败后是否自动放到队列</param>
73 /// <param name="handleMsgStr">有就自己对字符串的处理,如果要存储到数据库请自行扩展</param>
74 public static void ConsumeSampleMsg(string queueName, bool isBasicNack = false, Action<string> handleMsgStr = null)// bool ifBasicReject = false,
75 {
76 Console.WriteLine("ConsumeSampleMsg Waiting for messages....");
77 //我们通常可以使用一个导师器或者后台服务定时处理,whilt(true) 里面的老师变量如达到 sleep 200次就先跳出,交由定时器来重新启动
78 //这样每启动一次生产者,消费者就会自动去消费,而不会只是消费一次就停止了
79 while (true)
80 {
81 IConnection conn = connectionFactory.CreateConnection();
82 IModel channel = conn.CreateModel();
83 channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: null);
84 var consumer = new EventingBasicConsumer(channel);
85 consumer.Received += (sender, ea) =>
86 {
87 byte[] bymsg = ea.Body.ToArray();
88 string msg = System.Text.Encoding.UTF8.GetString(bymsg);
89 if (handleMsgStr != null)
90 {
91 handleMsgStr.Invoke(msg);
92 }
93 else
94 {
95 Console.WriteLine($"{DateTime.Now}->收到消息:{msg}");
96 }
97 if (isBasicNack)
98 {
99 channel.BasicNack(ea.DeliveryTag, true, true);
100 }
101 };
102 channel.BasicConsume(queueName, autoAck: true, consumer);
103 conn.Dispose();
104 channel.Dispose();
105 System.Threading.Thread.Sleep(50);
106 }
107 }
108
109 /// <summary>
110 /// 简单生产者后端逻辑
111 /// </summary>
112 /// <param name="queueName"></param>
113 /// <param name="msg"></param>
114 public static void PublishSampleMsg(string queueName, string msg)
115 {
116
117 using (IConnection conn = connectionFactory.CreateConnection())
118 {
119 using (IModel channel = conn.CreateModel())
120 {
121 channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: null);
122 var msgBody = Encoding.UTF8.GetBytes(msg);
123 channel.BasicPublish(exchange: "", routingKey: queueName, basicProperties: null, body: msgBody);
124 }
125 }
126 }
127 #endregion
128
129

7:Work模式

 1 //就如下的code, 多次生产,3个消费者都可以自动开始消费
2
3 //生产者
4 using System;
5 namespace RabbitMqPublishDemo
6 {
7 using MyRabbitMqService;
8 using System.Runtime.CompilerServices;
9 class Program
10 {
11 static void Main(string[] args)
12 {
13 for (int i = 0; i < 500; i++)
14 {
15 ZrfRabbitMqHelper.PublishWorkQueueModel("workqueue", $" :发布消息成功{i}");
16 }
17 Console.WriteLine("工作队列模式 生成完毕......!");
18 Console.ReadLine();
19 }
20 }
21 }
22
23 //生产者后端逻辑
24 public static void PublishWorkQueueModel(string queueName, string msg)
25 {
26 using (var connection = connectionFactory.CreateConnection())
27 using (var channel = connection.CreateModel())
28 {
29 channel.QueueDeclare(queue: queueName, durable: true, exclusive: false, autoDelete: false, arguments: null);
30 var body = Encoding.UTF8.GetBytes(msg);
31 var properties = channel.CreateBasicProperties();
32 properties.Persistent = true;
33
34 channel.BasicPublish(exchange: "", routingKey: queueName, basicProperties: properties, body: body);
35 Console.WriteLine($"{DateTime.Now},SentMsg: {msg}");
36 }
37 }
38
39 //work消费端
40 using System;
41
42 namespace RabbitMqConsumerDemo
43 {
44 using MyRabbitMqService;
45 using System.Runtime.InteropServices;
46 class Program
47 {
48 static void Main(string[] args)
49 {
50 Console.WriteLine("====Work模式开启了====");
51 ZrfRabbitMqHelper.ConsumeWorkQueueModel("workqueue", handserMsg: msg =>
52 {
53 Console.WriteLine($"work模式获取到消息{msg}");
54 });
55 Console.ReadLine();
56 }
57 }
58 }
59
60 //work后端逻辑
61 public static void ConsumeWorkQueueModel(string queueName, int sleepHmao = 90, Action<string> handserMsg = null)
62 {
63 var connection = connectionFactory.CreateConnection();
64 var channel = connection.CreateModel();
65
66 channel.QueueDeclare(queue: queueName, durable: true, exclusive: false, autoDelete: false, arguments: null);
67 channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
68
69 var consumer = new EventingBasicConsumer(channel);
70 Console.WriteLine(" ConsumeWorkQueueModel Waiting for messages....");
71
72 consumer.Received += (sender, ea) =>
73 {
74 var body = ea.Body.ToArray();
75 var message = Encoding.UTF8.GetString(body);
76 if (handserMsg != null)
77 {
78 if (!string.IsNullOrEmpty(message))
79 {
80 handserMsg.Invoke(message);
81 }
82 }
83 channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
84 };
85 channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);
86 }

8:Fanout

Code:

 1 //同一个消息会被多个订阅者消费
2
3 //发布者
4 using System;
5
6 namespace RabbitMqPublishDemo
7 {
8 using MyRabbitMqService;
9 using System.Runtime.CompilerServices;
10
11 class Program
12 {
13 static void Main(string[] args)
14 {
15
16 #region 发布订阅模式,带上了exchange
17 for (int i = 0; i < 500; i++)
18 {
19 ZrfRabbitMqHelper.PublishExchangeModel("exchangemodel", $"发布的消息是:{i}");
20 }
21 Console.WriteLine("发布ok!");
22 #endregion
23 Console.ReadLine();
24 }
25 }
26 }
27 //发布者的后端逻辑 我在这里选择了扇形: ExchangeType.Fanout
28 public static void PublishExchangeModel(string exchangeName, string message)
29 {
30 using (var connection = connectionFactory.CreateConnection())
31 using (var channel = connection.CreateModel())
32 {
33 channel.ExchangeDeclare(exchange: exchangeName, type: ExchangeType.Fanout);
34 var body = Encoding.UTF8.GetBytes(message);
35 channel.BasicPublish(exchange: exchangeName, routingKey: "", basicProperties: null, body: body);
36 Console.WriteLine($" Sent {message}");
37 }
38 }
39
40
41 //订阅者
42 using System;
43 namespace RabbitMqConsumerDemo
44 {
45 using MyRabbitMqService;
46 using System.Runtime.InteropServices;
47 class Program
48 {
49 static void Main(string[] args)
50 {
51
52 #region 发布订阅模式 Exchange
53 ZrfRabbitMqHelper.SubscriberExchangeModel("exchangemodel", msg =>
54 {
55 Console.WriteLine($"订阅到消息:{msg}");
56 });
57 #endregion
58 Console.ReadLine();
59 }
60 }
61 }
62
63 //订阅者后端的逻辑
64 public static void SubscriberExchangeModel(string exchangeName, Action<string> handlerMsg = null)
65 {
66 var connection = connectionFactory.CreateConnection();
67 var channel = connection.CreateModel();
68
69 channel.ExchangeDeclare(exchange: exchangeName, type: ExchangeType.Fanout);//Fanout 扇形分叉
70
71 var queueName = channel.QueueDeclare().QueueName;
72 channel.QueueBind(queue: queueName,
73 exchange: exchangeName,
74 routingKey: "");
75
76 Console.WriteLine(" Waiting for msg....");
77
78 var consumer = new EventingBasicConsumer(channel);
79 consumer.Received += (model, ea) =>
80 {
81 var body = ea.Body.ToArray();
82 var message = Encoding.UTF8.GetString(body);
83 if (handlerMsg != null)
84 {
85 if (!string.IsNullOrEmpty(message))
86 {
87 handlerMsg.Invoke(message);
88 }
89 }
90 else
91 {
92 Console.WriteLine($"订阅到消息:{message}");
93 }
94 };
95 channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer);
96 }

9:Direct

Code:

  1 //发布者
2 using System;
3
4 namespace RabbitMqPublishDemo
5 {
6 using MyRabbitMqService;
7 using System.Runtime.CompilerServices;
8
9 class Program
10 {
11 static void Main(string[] args)
12 {
13 #region 发布订阅 交换机路由模式 Direct
14 string routerKeyValue = args[0].Split("=")[1];//如 abc.exe --name='qq'
15 Console.WriteLine("开始发布中。。。");
16 for (int i = 0; i < 20; i++)
17 {
18 string msg = $"小明有{i}只宝剑";
19 ZrfRabbitMqHelper.ExchangeRoutersByDirectModelPublish(msg, routerKey: routerKeyValue);
20
21 //下面的为固定的写法
22 //ZrfRabbitMqHelper.ExchangeRoutersByDirectModelPublish(msg);
23 //ZrfRabbitMqHelper.ExchangeRoutersByDirectModelPublish($"你好我好大家好{i}", routerKey:"onlylog");
24 }
25 Console.WriteLine("这次发布完毕。。。");
26 #endregion
27 Console.ReadLine();
28 }
29 }
30 }
31
32 //发布者后端逻辑 发布订阅的路由模式 Direct
33 /// <summary>
34 /// 发布 Direct 路由模式 Direct
35 /// </summary>
36 /// <param name="message"></param>
37 /// <param name="exchangeName"></param>
38 /// <param name="routerKey"></param>
39 public static void ExchangeRoutersByDirectModelPublish(string message, string exchangeName = "qqai", string routerKey = "insertToStudent")
40 {
41 using (IConnection connection = connectionFactory.CreateConnection())
42 {
43 using (IModel channelmodel = connection.CreateModel())
44 {
45 channelmodel.ExchangeDeclare(exchange: exchangeName, type: ExchangeType.Direct);
46 byte[] bymsg = Encoding.UTF8.GetBytes(message);
47 channelmodel.BasicPublish(exchange: exchangeName, routingKey: routerKey, body: bymsg);
48
49 // ​byte[] bytemsg = Encoding.UTF8.GetBytes(message);
50 // ​channelmodel.BasicPublish(exchange: exchangeName,routingKey: routerKey,basicProperties: null,body: bytemsg);
51 }
52 }
53 }
54
55 //订阅者 Exchange Router路由 Director
56 using System;
57
58 namespace RabbitMqConsumerDemo
59 {
60 using MyRabbitMqService;
61 using System.Runtime.InteropServices;
62
63 class Program
64 {
65 static void Main(string[] args)
66 {
67 Console.WriteLine("开始消费中。。!");
68 if (args.Length > 0)
69 {
70 string routerKeyValue = args[0].Split("=")[1];
71 Console.WriteLine($"routerKey=>{routerKeyValue}");
72 if (!string.IsNullOrEmpty(routerKeyValue))
73 ZrfRabbitMqHelper.ExchangeRoutersByDirectModelConsumer(routerKey: routerKeyValue, handler: msg =>
74 {
75 Console.WriteLine($"拿到消息:{msg}");
76 });
77 else
78 Console.WriteLine("没有获取到routerKey !");
79 }
80 //else
81 //{
82 // ZrfRabbitMqHelper.ExchangeRoutersByDirectModelConsumer(handler: msg =>
83 // {
84 // Console.WriteLine($"拿到消息:{msg}");
85 // });
86 //}
87 Console.ReadLine();
88 }
89 }
90 }
91
92 //订阅者 Exchange Router路由 Director 后端逻辑
93 public static void ExchangeRoutersByDirectModelConsumer(string exchangeName = "qqai", string routerKey = "insertToStudent", Action<string> handler = null)
94 {
95 var connection = connectionFactory.CreateConnection();
96 var channel = connection.CreateModel();
97 channel.ExchangeDeclare(exchange: exchangeName, type: ExchangeType.Direct);
98 var queueName = channel.QueueDeclare().QueueName;
99 channel.QueueBind(queue: queueName, exchange: exchangeName, routingKey: routerKey);
100
101 Console.WriteLine("wating for message...!");
102 var consumer = new EventingBasicConsumer(channel);
103 //(object sender, BasicDeliverEventArgs e)
104 consumer.Received += (sender, e) =>
105 {
106 var bytedata = e.Body.ToArray();
107 var getRoutekey = e.RoutingKey;
108 string msg = Encoding.UTF8.GetString(bytedata);
109 if (handler != null)
110 handler.Invoke(msg);
111 else
112 Console.WriteLine($"路由{getRoutekey},订阅到消息{msg}!");
113 };
114 channel.BasicConsume(queue: queueName, autoAck: true, consumer);
115 }
116

需要完整的code,可以留言获取!

C#使用RabbitMq队列(Sample,Work,Fanout,Direct等模式的简单使用)的更多相关文章

  1. RabbitMQ三种Exchange模式(fanout,direct,topic)的特性 -摘自网络

    RabbitMQ中,所有生产者提交的消息都由Exchange来接受,然后Exchange按照特定的策略转发到Queue进行存储 RabbitMQ提供了四种Exchange:fanout,direct, ...

  2. RabbitMQ三种Exchange模式(fanout,direct,topic)的性能比较

    RabbitMQ中,所有生产者提交的消息都由Exchange来接受,然后Exchange按照特定的策略转发到Queue进行存储 RabbitMQ提供了四种Exchange:fanout,direct, ...

  3. RabbitMQ三种Exchange模式(fanout,direct,topic)的性能比较(转)

    RabbitMQ中,所有生产者提交的消息都由Exchange来接受,然后Exchange按照特定的策略转发到Queue进行存储 RabbitMQ提供了四种Exchange:fanout,direct, ...

  4. 【RabbitMQ】三种类型交换器 Fanout,Direct,Topic(转)

    出处:https://blog.csdn.net/fxq8866/article/details/62049393 RabbitMQ服务器会根据路由键将消息从交换器路由到队列中,如何处理投递到多个队列 ...

  5. [转]RabbitMQ三种Exchange模式(fanout,direct,topic)的性能比较

    RabbitMQ中,所有生产者提交的消息都由Exchange来接受,然后Exchange按照特定的策略转发到Queue进行存储 RabbitMQ提供了四种Exchange:fanout,direct, ...

  6. 8、RabbitMQ三种Exchange模式(fanout,direct,topic)的性能比较

    RabbitMQ三种Exchange模式(fanout,direct,topic)的性能比较 RabbitMQ中,除了Simple Queue和Work Queue之外的所有生产者提交的消息都由Exc ...

  7. Python开发【十一章】:RabbitMQ队列

    RabbitMQ队列 rabbitMQ是消息队列:想想之前的我们学过队列queue:threading queue(线程queue,多个线程之间进行数据交互).进程queue(父进程与子进程进行交互或 ...

  8. Python自动化 【第十一篇】:Python进阶-RabbitMQ队列/Memcached/Redis

     本节内容: RabbitMQ队列 Memcached Redis 1.  RabbitMQ 安装 http://www.rabbitmq.com/install-standalone-mac.htm ...

  9. python RabbitMQ队列/redis

    RabbitMQ队列 rabbitMQ是消息队列:想想之前的我们学过队列queue:threading queue(线程queue,多个线程之间进行数据交互).进程queue(父进程与子进程进行交互或 ...

随机推荐

  1. css响应有media,js响应有这种

    比如,我不想在移动端执行某js特效可以参考(function(doc, win) { var screenWidth = 0, size = 'M', root = doc.documentEleme ...

  2. Nginx+Lua(OpenResty)开发高性能Web应用

    使用Nginx+Lua(OpenResty)开发高性能Web应用 博客分类: 跟我学Nginx+Lua开发 架构 ngx_luaopenresty 在互联网公司,Nginx可以说是标配组件,但是主要场 ...

  3. The Monocycle(BFS)

    The Monocycle Time Limit: 3000MS64bit IO Format: %lld & %llu [Submit]   [Go Back]   [Status] Des ...

  4. Spring学习8-用MyEclipse搭建SSH框架 Struts Spring Hibernate

    1.new一个web project. 2.右键项目,为项目添加Struts支持. 点击Finish.src目录下多了struts.xml配置文件. 3.使用MyEclipse DataBase Ex ...

  5. jquery iframe高度自适应

    $(document).ready(function () { $("#test").load(function () { var thisheight = $(this).con ...

  6. CI在CentOS中的部署与实践LNMP

    1. 平台:lnmp CentOS6.4 (64bit) nginx1.2.4+php5.5.7配置过程中遇到的问题与处理方式: 1. 404错误: 原因:nginx中的配置请求路径的问题 2. 40 ...

  7. PHP检查表单提交是否来自于本站(验证HTTP_REFERER等)

    方法一: 你可以把处理提交数据的代码写到一个单独的文件里,比如form.php.      <?php      if   (defined(’INSIDE’))   {//判断是否有定义INS ...

  8. Hibernate查询之SQL查询,查询结果用new新对象的方式接受,hql查询,通过SQL查询的结果返回到一个实体中,查询不同表中内容,并将查到的不同表中的内容放到List中

     package com.ucap.netcheck.dao.impl; import java.util.ArrayList;import java.util.List; import org. ...

  9. DML、DDL、DCL的分别是什么

    DML.DDL.DCL的分别是什么 一直以来,分不清这三者的简称代表什么,甚至在面试中遇到可能会张冠李戴.今天特意记录一下. 一.DML(data manipulation language) 数据操 ...

  10. 查找一个Class到底在那一个jar文件里

    整理自己的一些笔记,发觉这个命令 ,看起来是用来找一个Class到底在那一个jar文件里的. 虽然没有再测一下,估计是好使的. 先在博客园里记下来,防止自己忘掉. findstr /S /M org. ...