RabbitMQ

author::消息队列之真知灼见

Python三方库:Pika(RabbitMQ基础使用)


一、消息队列

1 定义

MQ:全称为Messge Queue消息队列(MQ)是一种应用程序对应用程序的通信方法。MQ是消费生产者模型的一个典型的代表,一端往消息队列中不断写入消息,而另一端则可以读取队列中的消息。这样发布者和使用者都不用知道对方的存在。

生产者—消费者模式:生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题,生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯。所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。

作用:消息队列中间件是分布式系统中重要的组件,主要解决应用解耦,异步消息,流量削锋等问题,实现高性能,高可用,可伸缩和最终一致性架构。目前使用较多的消息队列有ActiveMQRabbitMQZeroMQKafkaMetaMQRocketMQ

具体作用:

  • 解耦:当一个模块需要向另一个模块发送消息时,它不需要直接调用目标模块的接口,而是将消息发送到消息队列中,由目标模块从队列中接收消息。这样,发送方和接收方之间就不再直接耦合,提高了系统的灵活性和可维护性。
  • 异步消息:消息队列支持异步消息传递,发送方在向队列发送消息后即可继续执行其他操作,而无需等待接收方的响应。这种方式提高了系统的并发性能和吞吐量,特别适合处理大量的非实时性任务,如日志处理、邮件发送等。
  • 流量削峰:在高并发场景下,消息队列可作为缓冲层,暂时存储突发的高峰流量,并按照系统的处理能力逐渐释放,从而平滑流量波峰,保护系统不受过载的影响。这种机制提高了系统的稳定性和可用性,同时增强了系统的弹性和容错能力,使系统能够更好地应对各种负载情况。

2 普通队列

import queue

# 创建队列(FIFO模式)
q = queue.Queue(maxsize=10)

# 数据入队
q.put(111)

# 数据出队(block设置阻塞,True:默认,当队列没数据就会阻塞。False:异常)
data = q.get(block=True)

二、RabbitMQ模式

1 其他消息队列

Kafka:常用于日志处理。

RocketMQ:阿里开发,类似RabbitMQ。

2 定义

定义:RabbiMQ是一款由Erlang 语言开发,基于AMQP协议的消息中间件,它能够在应用之间提供可靠的消息传输。在易用性,扩展性,高可用性上表现优秀。使用消息中间件利于应用之间的解耦,生产者(客户端)无需知道消费者(服务端)的存在。而且两端可以使用不同的语言编写,大大提供了灵活性。

official::RabbitMQ

RabbitMQ模式:

  • 简单模式
  • 交换机模式:
    • 发布订阅模式
    • 关键字模式
    • 模糊匹配模式

3 安装

Windows:先要安装Erlang,再安装RabbitMQ:

Linux:一键安装。

4 简单模式

生产者流程:

  1. 链接RabbitMQ
  2. 创建队列
  3. 向指定的队列插入数据

消费者流程:

  1. 链接RabbitMQ
  2. 监听模式
  3. 确定回调函数

callback函数:

  1. ch:是一个 Channel 对象,表示调用回调函数的通道。
  2. method:是一个 method 对象,包含有关消息的信息,如交付标签、交换器、路由密钥等。
  3. properties:是一个 BasicProperties 对象,包含消息的属性,如消息类型、消息持久性、消息优先级等。
  4. body:是一个字节串,包含消息的内容。

注意:消费者也需要再链接RabbitMQ后创建队列,因为不确定生产者还是消费者先被运行,如果没有这一步且消费者先于生产者执行,hello队列就会不存在,会监听不到hello队列。

# 生产者
import pika

# 1.链接RabbitMQ
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# 2.创建队列(队列名)
channel.queue_declare(queue="hello")

# 3.向指定的队列插入数据(交换机参数,简单模式为空、指定队列、内容)
channel.basic_public(exchange='',
routing_key='hello',
body='Hello World!')

print("[x] Send 'HelloWorld!'")
# 消费者

# 1.链接RabbitMQ
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# 创建队列(如果hello队列存在就什么也不会做)
channel.queue_declare(queue="hello")

# 3.确定回调函数
def callback(ch, method, properties, body):
print(" [x] Received %r"%body)

# 确定监听队列(队列名、应答参数,默认True、回调函数)
channel.basic_consume(queue='hello',
auto_ack=True,
on_message_callback=callback)

# 2.监听模式
print(' [*] Waiting for messages. To exit press CTRL+C')
# 开始监听
channel.start_consuming()

5 发布订阅模式

发布订阅和简单的消息队列区别在于,发布订阅会将消息发送给所有的订阅者,而消息队列中的数据被消费一次便消失。所以,RabbitMQ实现发布和订阅时,会为每一个订阅者创建一个队列,而发布者发布消息时,会将消息放置在所有相关队列中。

综上所述,生产者需要创建一个交换机,数据存储在交换机中。而每一个消费者需要自己创建一个队列,然后从交换机中获得数据到队列后再进行使用。

生产者流程:

  1. 链接RabbitMQ
  2. 声明交换机
  3. 向指定的交换机插入数据

消费者流程:

  1. 链接RabbitMQ
  2. 创建队列
  3. 将队列绑定到指定交换机
  4. 确定回调函数
# 生产者
import pika

# 1.链接RabbitMQ
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# 2.声明交换机(交换机名,模式(fanout:发布订阅模式))
channel.exchange_declare(exchange="logs", exchange_type="fanout")

# 3.向指定的交换机插入数据(交换机参数、指定队列,交换机模式为空、内容)
message = "info: Hello World!"
channel.basic_public(exchange='logs',
routing_key='',
body=message)

print("[x] Send %r"%message)
connection.close()
# 消费者
import pika

# 1.链接RabbitMQ
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# 同样声明交换机防止消费者先运行找不到交换机
channel.exchange_declare(exchange="logs", exchange_type="fanout")

# 2.创建队列(不指定,系统自动生成队列名)
result = channel.queue_declare("", exclusive=True)
queue_name = result.method.queue
print(queue_name)

# 3.将队列绑定到指定交换机
channel.queue_bind(exchange="logs", queue=queue_name)

print(' [*]Waiting for logs. To exit press CTRL+C')

# 4.确定回调函数
def callback(ch, method, properties, body):
print(" [x] %r" % body)

channel.basic_consume(queue=queue_name,
auto_ack=True,
on_message_callback=callback
)
channel.start_consuming()

6 关键字模式

在发布订阅模式中,无论消费者需不需要,数据都会被获取。而使用关键字模式,在获取数据时就会先将数据和关键字进行匹配,只有匹配成功才会获得关键字。

生产者流程:

  1. 链接RabbitMQ
  2. 声明交换机
  3. 向指定的交换机插入数据,需要绑定关键字

消费者流程:

  1. 链接RabbitMQ
  2. 创建队列
  3. 将队列绑定到指定交换机,需要绑定关键字
  4. 确定回调函数
# 生产者
import pika

# 1.链接RabbitMQ
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# 2.声明交换机(交换机名,模式(direct:关键字模式))
channel.exchange_declare(exchange="logs2", exchange_type="direct")

# 3.向指定的交换机插入数据(routing_key:关键字绑定)
message = "info: Hello World!"
channel.basic_public(exchange='logs2',
routing_key='info',
body=message)

print("[x] Send %r"%message)
connection.close()
# 消费者
import pika

# 1.链接RabbitMQ
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# 同样声明交换机防止消费者先运行找不到交换机
channel.exchange_declare(exchange="logs2", exchange_type="direct")

# 2.创建队列(不指定,系统自动生成队列名)
result = channel.queue_declare("", exclusive=True)
queue_name = result.method.queue
print(queue_name)

# 3.将队列绑定到指定交换机(绑定多个关键字)
keys = ["info", "error", "warning"]
for key in keys:
channel.queue_bind(exchange="logs2",
queue=queue_name,
routing_key=key)

print(' [*]Waiting for logs. To exit press CTRL+C')

# 4.确定回调函数
def callback(ch, method, properties, body):
print(" [x] %r" % body)

channel.basic_consume(queue=queue_name,
auto_ack=True,
on_message_callback=callback
)
channel.start_consuming()

7 通配符模式

在关键字模式匹配中,只有所有字符完全匹配成功才算成功。而在通配符模式下,可以进行模糊匹配,类似正则表达式。

“通配符交换机”(Topie Exchange)将路由键和某模式进行匹配。此时队列需要绑定在一个模式上。符号#匹配一个或多个词,符号*中仅匹配一个词。因此audit.#能够匹配到audit.irs.corporate,但是audit.*只会匹配到audit.irs。(这里与我们一般的正则表达式的*#刚好相反,这里我们需要注意一下)

生产者流程:

  1. 链接RabbitMQ
  2. 声明交换机
  3. 向指定的交换机插入数据,需要绑定通配符

消费者流程:

  1. 链接RabbitMQ
  2. 创建队列
  3. 将队列绑定到指定交换机,需要绑定通配符
  4. 确定回调函数

消费者案例

  • usa.#:拿到美国的所有相关消息。
  • #.news:拿到任何国家的新闻消息。
  • #.weather:拿到任何国家的天气消息。
  • europe.#:拿到欧洲的所有相关消息。
# 生产者
import pika

# 1.链接RabbitMQ
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# 2.声明交换机(交换机名,模式(topic:通配符模式))
channel.exchange_declare(exchange="logs3", exchange_type="topic")

# 3.向指定的交换机插入数据(routing_key:关键字)
message = "usa.news: Hello World!"
channel.basic_public(exchange='logs3',
routing_key='usa.news',
body=message)

print("[x] Send %r"%message)
connection.close()
# 消费者
import pika

# 1.链接RabbitMQ
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# 同样声明交换机防止消费者先运行找不到交换机
channel.exchange_declare(exchange="logs3", exchange_type="topic")

# 2.创建队列(不指定,系统自动生成队列名)
result = channel.queue_declare("", exclusive=True)
queue_name = result.method.queue
print(queue_name)

# 3.将队列绑定到指定交换机(循环绑定多个关键字)
keys = ["usa.#"]
for key in keys:
channel.queue_bind(exchange="logs3",
queue=queue_name,
routing_key=key)

print(' [*]Waiting for logs. To exit press CTRL+C')

# 4.确定回调函数
def callback(ch, method, properties, body):
print(" [x] %r" % body)

channel.basic_consume(queue=queue_name,
auto_ack=True,
on_message_callback=callback
)
channel.start_consuming()

三、RabbitMQ参数

1 手动应答

channel.basic_consume

  • auto_ack:默认是True(默认应答),表示当消费者获取数据后,数据就从队列中被删除了。这样如果回调函数取走数据,但是回调函数报错了,就会导致数据丢失。如果要让数据被获取后依旧存在可以改成False(手动应答),同时在回调函数中加上信号,告知消息队列数据处理完毕了。
# 手动应答核心代码
def callback(ch, method, properties, body):
print(" [x] Received %r"%body)
# 告知消息队列数据处理完毕
ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_consume(queue='hello', auto_ack=False, on_message_callback=callback)

2 持久化

声明:channel.queue_declare(durable=True)

使用:channel.basic_public(properties=pika.BasicProperties(delivery_mode=2))

# 持久化

# 声明持久化(不能将原非持久化消息队列改成持久化消息队列,必须重新声明一个)
channel.queue_declare(queue="hello3", durable=True)

# 插入数据
channel.basic_public(exchange="",
routing_key="hello3",
body="Hello ALEX!",
properties=pika.BasicProperties(
delivery_mode=2, # 消息持久化模式
)
)

3 分发参数

有两个消费者同时监听一个队列。其中一个线程sleep2秒,另一个消费者线程sleep1秒,但是处理的消息是一样多。这种方式叫轮询分发(round-robin)。不管谁忙,都不会多给消息,总是你一个我一个,想要做到公平分发(fair dispatch),必须关闭自动应答ack,改成手动应答。使用basic_qos(prefetch_count=1)限制每次只发送不超过1条消息到同一个消费者,消费者必须手动反馈告知队列,才会发送下一个。

默认是轮询。

# 公平分发

# 限制消息同时手动应答
channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='hello', auto_ack=False, on_message_callback=callback)

四、RabbitMQ设计

1 实现RPC

RPC远程调用(Remote Procedure Call)

RPC(远程过程调用)详解_rpc调用-CSDN博客

import pika
import uuid

class FibonacciRpcClient:
def __init__(self):
self.connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
self.channel = self.connection.channel()

result = self.channel.queue_declare(queue='', exclusive=True)
self.callback_queue = result.method.queue

self.channel.basic_consume(queue=self.callback_queue, on_message_callback=self.on_response, auto_ack=True)

def on_response(self, ch, method, props, body):
if self.corr_id == props.correlation_id:
self.response = body

def call(self, n):
self.response = None
self.corr_id = str(uuid.uuid4())
self.channel.basic_publish(
exchange='',
routing_key='rpc_queue',
properties=pika.BasicProperties(
reply_to=self.callback_queue,
correlation_id=self.corr_id,
),
body=str(n))
while self.response is None:
self.connection.process_data_events()
return int(self.response)

fibonacci_rpc = FibonacciRpcClient()

print(" [x] Requesting fib(30)")
response = fibonacci_rpc.call(30)
print(" [.] Got %r" % response)
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='rpc_queue')

def fib(n):
if n == 0:
return 0
elif n == 1:
return 1
else:
return fib(n-1) + fib(n-2)

def on_request(ch, method, props, body):
n = int(body)
print(" [.] fib(%s)" % n)
response = fib(n)
ch.basic_publish(
exchange='',
routing_key=props.reply_to,
properties=pika.BasicProperties(correlation_id = props.correlation_id),
body=str(response))
ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='rpc_queue', on_message_callback=on_request)

print(" [x] Awaiting RPC requests")
channel.start_consuming()