说在前面
在参阅本篇博文之前建议先对队列进行学习。
消息队列
MQ全称为Message Queue消息队列(MQ),是一种应用程序对应用程序的通信方法,MQ是消费-生产者模型的一个典型代表,一端往消息队列中不断写入消息,而另一端可以读取队列中的消息,这样发布者和使用者都不知道对方的存在。
生产者消费者模式是通过一个容器来解决生产者消费者的强耦合问题,生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不用找生产者要数据,而是直接从阻塞队列中取。阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。
同步处理
异步处理
为什么要用消息队列
消息队列中间件是分布式系统中的重要组件,主要解决应用解耦,异步消息,流量削峰等问题。
消息队列的优点
解耦
允许独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束
可恢复性
系统的一部分组件失效时,不会影响到整个系统,消息队列降低了进程间的耦合度,所以即使一个处理消息的进程挂掉,加入队列中的消息仍然可以在系统恢复后被处理
缓冲
有助于控制和优化数据流经过系统的速度,解决生产消息和消费消息的处理速度不一致的情况。
灵活性/峰值处理能力(削峰)
在访问量剧增的情况下,应用仍需发挥作用,但是这样的突发流量并不常见,如果以处理这类峰值访问为标准来投入资源随时待命无疑会是巨大的浪费。使用消息队列能够使关键组件顶住突发的访问压力,而不会因为突发的超负荷请求而完全崩溃
异步通信
很多时候,用户不想也不需要立即处理消息,消息队列提供了异步处理机制,允许用户把第一个消息放入队列,但不立即处理它,想向队列中放入多少消息就放多少,然后在需要的时候去处理它。
消息队列的两种模式
点对点模式(一对一)
一对一,消费者主动拉取数据,收到消息后会将队列中的消息清除。
消费生产者生产消息发送到队列以后,消费者从队列中取出消息并消费它,消息消费以后,队列中不再有存储,所以消费者不可能消费到已经被消费的消息。队列支持存在多个消费者,但对一个消息而言,只会有一个消费者可以消费
发布/订阅模式(一对多)
一对多,消费者消费数据后不会清除消息,消息生产者将消息发布到topic中,同时有多个消息消费者消费该消息。生产者发布到topic的消息会被所有订阅者消费。
消费可以分为消费者主动从队列获取消息,也可以为队列主动推送给消费者消息进行消费(比如公众号)
RabbitMQ模式
RabbitMQ分为简单模式和交换机模式
安装
建议安装在linux系统下,windows系统配置容易出错
客户端
在anaconda环境下输入以下命令:
pip install pika
简单模式
生产者要向RabbitMQ队列中插入数据,消费者监听队列,当RabbitMQ中有消息时取出消费。
生产者
- 连接RabbitMQ
- 在RabbitMQ中创建一个队列对象
- 向指定的队列中插入数据
import pika #1、连接RabbitMQ connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) #连接RabbitMQ channel = connection.channel() #获取RabbitMQ对象,通过channel对RabbitMQ进行操作 # 2、在RabbitMQ中创建一个队列对象 channel.queue_declare(queue='hello') #3、向指定队列插入数据 channel.basic_publish(exchange='', #简单模式 routing_key = 'hello', #指定队列 body = 'hello world') #指定数据
消费者
- 连接RabbitMQ
- 监听模式
- 确定回调函数
默认应答
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
connection = connection.channel()
#创建队列
channel.queue_declare(queue='hello') #如果已存在hello队列则没有作用,如果不存在hello队列则新建一个队列
def callback(ch,method,properties,body): #body是从队列中取出的数据
pass
#确定监听队列
channel.basic_consume(queue='hello',
auto_ack=True, #默认应答,数据从队列中取走之后,不论消费者是否处理成功,该数据就不存在队列中。如果为手动应答,该项置为False,数据被取走后会在队列中进行拷贝,等消费者返回处理成功信号才删除数据
on_message_callback=callback)
channel.start_consuming() #开始监听,队列有数据时调用回调函数,没有数据时进行监听
手动应答
def callback(ch,method,properties,body):
ch.basic_ack(delivery_tag =method.delivery_tag) #应答信息
channel.basic_consume(queue='hello',
auto_ack=False,
on_message_callback=callback)
持久化参数
用于处理RabbitMQ服务器崩溃造成的数据丢失问题。创建队列的语句参数durable:(消费者/生产者端均需要设置)
channel.queue_declare(queue='hello',durable=True)
对于插入的数据是否持久化,要自己设置(生产者端):
channel.basic_publish(exchange='', #简单模式
routing_key = 'hello', #指定队列
body = 'hello world'
properties=pika.BasicProperties(delivery_mode=2 #数据持久化保存
),) #指定数据
分发参数
分发默认的机制是轮询机制(一个消费者一条消息的分发,每个消费者都要收到队列中的所有)
交换机模式
发布订阅
消费者
要先运行消费者创建消息队列
- 连接rabbitMQ
- 声明交换机(有就,没有)
- 创建消息队列
- 绑定交换机
- 回调函数(后续步骤与简单模式相同)
import pika
#1. 连接rabbitMQ
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
connection = connection.channel()
#2.声明一个名为logs类型为fanout的交换机(要与生产者一致)
channel.exchange_declare(exchange='logs',
exchange_type='fanout') #fanout是发布订阅模式参数
#3. 创建队列
result = channel.queue_declare("",exclusive=True) #让rabbitNQ随机分配队列名
queue_name = result.method.queue #获取队列名
#4.将指定队列绑定到交换机上
channel.queue_bind(exchange='logs',
queue=queue_name)
def callback(ch,method,properties,body): #body是从队列中取出的数据
pass
#确定监听队列
channel.basic_consume(queue=queue_name,
auto_ack=True, #默认应答,数据从队列中取走之后,不论消费者是否处理成功,该数据就不存在队列中。如果为手动应答,该项置为False,数据被取走后会在队列中进行拷贝,等消费者返回处理成功信号才删除数据
on_message_callback=callback)
channel.start_consuming() #
因为订阅模式有多个消费者,所以运行时要启动多个consumer.py,在pycharm中需要在edit Configuration中设置allow parallel run
生产者
- 连接rabbitMQ
- 创建交换机
- 往交换机里生产数据
import pika
#1. 连接rabbitMQ
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
connection = connection.channel()
#2.声明一个名为logs类型为fanout的交换机
channel.exchange_declare(exchange='logs',
exchange_type='fanout') #fanout是发布订阅模式参数
#3. 向交换机中插入数据
data = "hello world"
channel.basic_publish(exchange='logs',
routing_key='',
body=message)
connection.close()
关键字模式
在发布订阅模式的基础上进行增强,交换机的消息绑定特定的消息队列实现不同的消息传给不同的消费者。
消费者
要先运行消费者生成消息队列
import pika
#1. 连接rabbitMQ
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
#2.声明一个名为logs类型为fanout的交换机(要与生产者一致)
channel.exchange_declare(exchange='logs',
exchange_type='direct',
durable=True) #fanout是发布订阅模式参数
#3. 创建队列
result = channel.queue_declare("",exclusive=True) #让rabbitNQ随机分配队列名,如果要使用持久化需要自己命名
queue_name = result.method.queue #获取队列名
#4.将指定队列绑定到交换机上
channel.queue_bind(exchange='logs',
queue=queue_name,
routing_key='keyword1') #关键字,可以绑定多个
channel.queue_bind(exchange='logs',
queue=queue_name,
routing_key='keyword2') #绑定第二个关键字
def callback(ch,method,properties,body): #body是从队列中取出的数据
ch.basic_ack(delivery_tag=method.delivery_tag)
#确定监听队列
channel.basic_consume(queue=queue_name,
auto_ack=True, #默认应答,数据从队列中取走之后,不论消费者是否处理成功,该数据就不存在队列中。如果为手动应答,该项置为False,数据被取走后会在队列中进行拷贝,等消费者返回处理成功信号才删除数据
on_message_callback=callback)
channel.start_consuming() #
生产者
与订阅模式相比,需要修改exchange_type,
import pika
#1. 连接rabbitMQ
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
connection = connection.channel()
#2.声明一个名为logs类型为fanout的交换机
channel.exchange_declare(exchange='logs',
exchange_type='direct',
durable=True) #fanout是发布订阅模式参数
#3. 向交换机中插入数据
data = "hello world"
channel.basic_publish(exchange='logs',
routing_key='keyword1', #要与消费者一致
body=message,
properties=pika.BasicProperties(delivery_mode=2),)
connection.close()