参考:http://rabbitmq.mr-ping.com/AMQP/AMQP_0-9-1_Model_Explained.html

简答模式(exchange不工作)

import pika

# 链接
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost')) channel = connection.channel() # 建立通道
channel.queue_declare(queue='hello') #定义回调函数
def callback(ch, method, properties, body):
print(" [x] Received %r" % body) # 传入消费者参数
channel.basic_consume(callback,
queue='hello',
no_ack=True) print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming() # 启动消费者

消费者

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel() channel.queue_declare(queue='hello') # 生成队列,并命名为hello channel.basic_publish(exchange='', # 不设置交换机
routing_key='hello', #将设置传入的消息的队列
body='Hello World!') # 传入数据
print(" [x] Sent 'Hello World!'")
connection.close() # 关闭链接

生产者

Exchange模型1——简单分发

绑定的路由键(routing key)名称与队列名称相同。

发布订阅和简单的消息队列区别在于,发布订阅会将消息发送给所有的订阅者,而消息队列中的数据被消费一次便消失。所以,RabbitMQ实现发布和订阅时,会为每一个订阅者创建一个队列,而发布者发布消息时,会将消息放置在所有相关队列中。

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel() channel.exchange_declare(exchange='logs',
type='fanout') # 这里type 是fanout message = "info: Hello World!"
channel.basic_publish(exchange='exchanger_name', # exchange要设置一个名称
routing_key='', # 不设置具体的通道,让交换机决定
body=message)
print(" [x] Sent %r" % message)
connection.close()

生产者

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel() channel.exchange_declare(exchange='exchanger_name',
type='fanout') result = channel.queue_declare(exclusive=True) # 这里队列并不指定名称
queue_name = result.method.queue # 这里拿到名字 channel.queue_bind(exchange='logs',
queue=queue_name) print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body):
print(" [x] %r" % body) channel.basic_consume(callback,
queue=queue_name, # 这里用随机的名字
no_ack=True) # 无应答模式 channel.start_consuming()

消费者

exchange模型2——关键字

直连交换机经常用来循环分发任务给多个工作者(workers),消息的负载均衡是发生在消费者(consumer)之间的,而不是队列(queue)之间。

关键词的是由消费者决定的,循环完成绑定

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel() channel.exchange_declare(exchange='direct_logs',
type='direct') # 这里type=direct 代表关键字分发模式 severity = "abc"
message = "hello world"
channel.basic_publish(exchange='direct_logs',
routing_key=severity, # 设置分发的关键字"abc"
body=message)
print(" [x] Sent %r:%r" % (severity, message))
connection.close()

生产者

#!/usr/bin/env python
import pika connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel() channel.exchange_declare(exchange='direct_logs',
type='direct') # 这里 type = “direct” result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue severities = ["abc","bcd","awd"] # 三个关键字 for severity in severities:
channel.queue_bind(exchange='direct_logs',
queue=queue_name,
routing_key=severity) # 用这三个关键字设置为routing_key print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body):
print(" [x] %r:%r" % (method.routing_key, body)) channel.basic_consume(callback,
queue=queue_name,
no_ack=True) channel.start_consuming()

消费者

exchange模型3——模糊关键词

关键用点分割*代表匹配一个词,#代表多个关键词

new.age.rabbit          new.*  -- 不匹配 
new.age.rabbit          new.#  -- 匹配
new.age.rabbit          #.rabbit  -- 匹配
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel() channel.exchange_declare(exchange='topic_logs',
type='topic') # 这里是topic result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue binding_keys = ["abc","cba","bac"] # 设置三个路径 for binding_key in binding_keys: # 循环三次为queue_name这个队列绑定三个关键词
channel.queue_bind(exchange='topic_logs',
queue=queue_name,
routing_key=binding_key) print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body):
print(" [x] %r:%r" % (method.routing_key, body)) channel.basic_consume(callback,
queue=queue_name,
no_ack=True) channel.start_consuming()

消费者

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel() channel.exchange_declare(exchange='topic_logs',
type='topic') routing_key = "#.bca" # 模糊关键词
message = 'Hello World!'
channel.basic_publish(exchange='topic_logs',
routing_key=routing_key,
body=message)
print(" [x] Sent %r:%r" % (routing_key, message))
connection.close()

生产者

exchange模型4——头

有时消息的路由操作会涉及到多个属性,此时使用消息头就比用路由键更容易表达,头交换机(headers exchange)就是为此而生的。头交换机使用多个消息属性来代替路由键建立路由规则。通过判断消息头的值能否与指定的绑定相匹配来确立路由规则。

参数

拿取顺序

默认消息队列里的数据是按照顺序被消费者拿走,例如:消费者1 去队列中获取 奇数 序列的任务,消费者1去队列中获取 偶数 序列的任务。(问题是有可能其中一个消费者处理慢,依然造成排队)

channel.basic_qos(prefetch_count=1) 表示谁来谁取,不再按照奇偶数排列。这个参数放在消费者代码部分

持久化:durable

顾名思义,防止数据丢失,但会降低效率

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host='10.211.55.4'))
channel = connection.channel() # make message persistent
channel.queue_declare(queue='hello', durable=True) # 生产者持久化参数 channel.basic_publish(exchange='',
routing_key='hello',
body='Hello World!',
properties=pika.BasicProperties(
delivery_mode=2, # make message persistent
))
print(" [x] Sent 'Hello World!'")
connection.close() # 消费者
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import pika connection = pika.BlockingConnection(pika.ConnectionParameters(host='10.211.55.4'))
channel = connection.channel() # make message persistent
channel.queue_declare(queue='hello', durable=True) # 消费者持久化参数 def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
import time
time.sleep(10)
print 'ok'
ch.basic_ack(delivery_tag = method.delivery_tag) channel.basic_consume(callback,
queue='hello',
no_ack=False) print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

持久化

应答模式: no-ack

当一个消息从队列中投递给消费者后(consumer),消费者会通知一下消息代理(broker),这个可以是自动的也可以由处理消息的应用的开发者执行。当“消息确认”被启用的时候,消息代理不会完全将消息从队列中删除,直到它收到来自消费者的确认回执(acknowledgement)。

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
host='10.211.55.4'))
channel = connection.channel() channel.queue_declare(queue='hello') def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
import time
time.sleep(10)
print 'ok'
ch.basic_ack(delivery_tag = method.delivery_tag) channel.basic_consume(callback,
queue='hello',
no_ack=False) # 这里写false 代表会应答 print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

消费者

Exclusive:

只被一个连接(connection)使用,而且当连接关闭后队列即被删除

RPC模式

RPC(Remote Procedure Call)—远程过程调用

通俗的讲:两台电脑A,B,A计算机需要用到B的方法或者函数,由于不共享内存,需要通过通信来传递命令和相关数据。

import pika

# 建立连接,服务器地址为localhost,可指定ip地址
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost')) # 建立会话
channel = connection.channel() # 声明RPC请求队列
channel.queue_declare(queue='rpc_queue') # 数据处理方法
def fib(n):
if n == 0:
return 0
elif n == 1:
return 1
else:
return fib(n-1) + fib(n-2) # 对RPC请求队列中的请求进行处理
def on_request(ch, method, props, body):
n = int(body) print(" [.] fib(%s)" % n) # 调用数据处理方法
response = fib(n) # 将处理结果(响应)发送到回调队列,这里用简单模式
ch.basic_publish(exchange='',
routing_key=props.reply_to, # 这里拿到生产者的这个参数作为返回队列的名字
properties=pika.BasicProperties(correlation_id = \
props.correlation_id), # 这里拿到请求的唯一id作为标识并返回,方便生产者确认
body=str(response))
ch.basic_ack(delivery_tag = method.delivery_tag) # 负载均衡,同一时刻发送给该服务器的请求不超过一个
channel.basic_qos(prefetch_count=1) channel.basic_consume(on_request, queue='rpc_queue') print(" [x] Awaiting RPC requests")
channel.start_consuming()

消费者,最初队列的建立者

import pika
import uuid class FibonacciRpcClient(object):
def __init__(self):
”“”
客户端启动时,创建回调队列,会开启会话用于发送RPC请求以及接受响应 “”“ # 建立连接,指定服务器的ip地址
self.connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost')) # 建立一个会话,每个channel代表一个会话任务
self.channel = self.connection.channel() # 声明回调队列,再次声明的原因是,服务器和客户端可能先后开启,该声明是幂等的,多次声明,但只生效一次
result = self.channel.queue_declare(exclusive=True)
# 将次队列指定为当前客户端的回调队列
self.callback_queue = result.method.queue # 客户端订阅回调队列,当回调队列中有响应时,调用`on_response`方法对响应进行处理;
self.channel.basic_consume(self.on_response, no_ack=True,
queue=self.callback_queue) # 对回调队列中的响应进行处理的函数
def on_response(self, ch, method, props, body):
if self.corr_id == props.correlation_id: # 如果验证码通过,就拿到结果对response赋值
self.response = body # 发出RPC请求
def call(self, n): # 初始化 response
self.response = None #生成correlation_id
self.corr_id = str(uuid.uuid4()) # 生成每个任务的id # 发送RPC请求内容到RPC请求队列`rpc_queue`,同时发送的还有`reply_to`和`correlation_id`
self.channel.basic_publish(exchange='',
routing_key='rpc_queue', # 队列名字叫rpc_queue
properties=pika.BasicProperties(
reply_to = self.callback_queue, # 把回调的队列的名字传进来
correlation_id = self.corr_id, #这是唯一id
),
body=str(n)) # 传数据 while self.response is None:
self.connection.process_data_events() # 如果没有返回值,就一直阻塞等着callback队列的
return int(self.response) # 建立客户端
fibonacci_rpc = FibonacciRpcClient() # 发送RPC请求
print(" [x] Requesting fib(30)")
response = fibonacci_rpc.call(30) # 这里发起传递
print(" [.] Got %r" % response)

生产者,生产了回调通道

其他及补充:

  • Auto-delete (当所有与之绑定的消息队列都完成了对此交换机的使用后,删掉它)
  • Arguments(依赖代理本身)

消费者模型

  消息如果只是存储在队列里是没有任何用处的。被应用消费掉,消息的价值才能够体现。在AMQP 0-9-1 模型中,有两种途径可以达到此目的:

  • 将消息投递给应用 ("push API")
  • 应用根据需要主动获取消息 ("pull API")

  使用push API,应用(application)需要明确表示出它在某个特定队列里所感兴趣的,想要消费的消息。如是,我们可以说应用注册了一个消费者,或者说订阅了一个队列。一个队列可以注册多个消费者,也可以注册一个独享的消费者(当独享消费者存在时,其他消费者即被排除在外)。

  每个消费者(订阅者)都有一个叫做消费者标签的标识符。它可以被用来退订消息。消费者标签实际上是一个字符串。

消息的属性

    • Content type(内容类型)
    • Content encoding(内容编码)
    • Routing key(路由键)
    • Delivery mode (persistent or not)
      投递模式(持久化 或 非持久化)
    • Message priority(消息优先权)
    • Message publishing timestamp(消息发布的时间戳)
    • Expiration period(消息有效期)
    • Publisher application id(发布应用的ID)

Rabbit mq 简单应用的更多相关文章

  1. 在 Windows 上安装Rabbit MQ 指南

    rabbitMQ是一个在AMQP协议标准基础上完整的,可服用的企业消息系统.他遵循Mozilla Public License开源协议.采用 Erlang 实现的工业级的消息队列(MQ)服务器. Ra ...

  2. (转)在 Windows 上安装Rabbit MQ 指南

    rabbitMQ是一个在AMQP协议标准基础上完整的,可服用的企业消息系统.他遵循Mozilla Public License开源协议.采用 Erlang 实现的工业级的消息队列(MQ)服务器. Ra ...

  3. celery rabbit mq 详解

    Celery介绍和基本使用 Celery 是一个 基于python开发的分布式异步消息任务队列,通过它可以轻松的实现任务的异步处理, 如果你的业务场景中需要用到异步任务,就可以考虑使用celery, ...

  4. Rabbit MQ 消息确认和持久化机制

    一:确认种类 RabbitMQ的消息确认有两种.一种是消息发送确认,用来确认生产者将消息发送给交换器,交换器传递给队列的过程中消息是否成功投递.发送确认分为两步,一是确认是否到达交换器,二是确认是否到 ...

  5. Rabbit MQ 入门指南

    rabbitMQ是一个在AMQP协议标准基础上完整的,可服用的企业消息系统.他遵循Mozilla Public License开源协议.采用 Erlang 实现的工业级的消息队列(MQ)服务器. Ra ...

  6. 分布式消息中间件Rabbit Mq的了解与使用

    MQ(消息队列)作为现代比较流行的技术,在互联网应用平台中作为中间件,主要解决了应用解耦.异步通信.流量削锋.服务总线等问题,为实现高并发.高可用.高伸缩的企业应用提供了条件. 目前市面比较流行的消息 ...

  7. Spring Boot:使用Rabbit MQ消息队列

    综合概述 消息队列 消息队列就是一个消息的链表,可以把消息看作一个记录,具有特定的格式以及特定的优先级.对消息队列有写权限的进程可以向消息队列中按照一定的规则添加新消息,对消息队列有读权限的进程则可以 ...

  8. Spring boot集成Rabbit MQ使用初体验

    Spring boot集成Rabbit MQ使用初体验 1.rabbit mq基本特性 首先介绍一下rabbitMQ的几个特性 Asynchronous Messaging Supports mult ...

  9. Rabbit MQ 学习参考

    网上的教程虽然多,但是提供demo的比较少,或者没有详细的说明,因此,本人就照着网上的教程做了几个demo,并把代码托管在码云,供有需要的参考. 项目地址:https://gitee.com/dhcl ...

随机推荐

  1. HDU 5821 Ball (贪心排序) -2016杭电多校联合第8场

    题目:传送门. 题意:T组数据,每组给定一个n一个m,在给定两个长度为n的数组a和b,再给定m次操作,每次给定l和r,每次可以把[l,r]的数进行任意调换位置,问能否在转换后使得a数组变成b数组. 题 ...

  2. Yii源码阅读笔记(二十九)

    动态模型DynamicModel类,用于实现模型内数据验证: namespace yii\base; use yii\validators\Validator; /** * DynamicModel ...

  3. MapReduce最佳成绩统计,男生女生比比看

    上一篇文章我们了解了MapReduce优化方面的知识,现在我们通过简单的项目,学会如何优化MapReduce性能 1.项目介绍 我们使用简单的成绩数据集,统计出0~20.20~50.50~100这三个 ...

  4. hdu4277 USACO ORZ

    USACO ORZ Time Limit: 5000/1500 MS (Java/Others) Memory Limit: 32768/32768 K (Java/Others) Total Sub ...

  5. Android Studio 封装的类的继承

    有个封装好的Firebase.java文件,放到项目中直接使用就可以,这个需要继承一个AbstractFirebase类,在广告代码中,可以等到加广告的时候来加这个文件. 这个地方的继承,因为是ads ...

  6. Mysql中的常用函数:

    Mysql中的常用函数: 1.字符串函数: (1).合并字符串 concat():// concat('M','y',"SQL",'5.5');== MySQL5.5//当传入的参 ...

  7. Python魔法方法详解

      魔法方法 含义   基本的魔法方法 __new__(cls[, ...]) 1.__new__是在一个对象实例化的时候所调用的第一个方法 2.它的第一个参数是这个类,其他的参数是用来直接传递给__ ...

  8. python 全栈开发笔记 4

    反射 1.通过字符串的形式导入模块 2.通过字符串的形式,去模块中寻找指定函数并执行 ''' def f1(): return 'F1' def f2(): return 'F2' ''' #假设上面 ...

  9. 05 JS基础DOM

    JS的window对象定时器: window下一些方法: <script> 弹出 window.alert('hello') 返回布尔值 var ret = window.confirm( ...

  10. 现网环境业务不影响,但是tomcat启动一直有error日志,ERROR org.apache.catalina.startup.ContextConfig- Unable to process Jar entry [module-info.class] from Jar [jar:file:/home/iufs/apache-tomcat/webapps/iufs/WEB-INF/lib/asm

    完整的错误日志信息: 2019-03-19 15:30:42,021 [main] INFO org.apache.catalina.core.StandardEngine- Starting Ser ...