首页 > 编程笔记

RabbitMQ入门教程(非常详细)

RabbitMQ 是比较流行的消息中间件(消息队列),通常用于不同的独立程序之间或服务之间的通信。

RabbitMQ 是使用 Erlang 语言开发的开源消息队列系统,基于 AMQP(Advanced Message Queuing Protocol,高级消息队列协议)的主要特征是面向消息、队列、路由、可靠性和安全。

AMQP 更多用在企业系统内对数据的一致性、稳定性和可靠性要求很高的场景,对性能吞吐量的要求不高。

一、RabbitMQ 简介

我们先简单了解一下 RabbitMQ 的内部机制,重点是如何使用 RabbitMQ。RabbitMQ 的结构如图1所示,RabbitMQ 相关概念如表1所示。

RabbitMQ的结构
图1:RabbitMQ 的结构

表1:RabbitMQ 相关概念
核心概念 含义
服务器实例(Broker) 简单来说就是消息队列服务器实体。
消息交换机(Exchange) 指定消息按什么规则,路由到哪个队列。
队列(Queue) 每个消息都会被投入一个或多个队列。
绑定(Binding) 按照路由规则绑定消息交换机和队列。
路由关键字(Routing Key) 消息交换机根据这个关键字进行消息投递。
虚拟主机(Virtual Host) 一个 Broker 里可以设置多个 Virtual Host,用作不同用户的权限分离。

Virtual Host 本质上就是迷你版的 RabbitMQ 服务器,拥有自己的队列、交换器、绑定和权限机制,RabbitMQ 默认的 Virtual Host 是/
消息通道(Channel) 一种轻型的共享 TCP 连接。我们可以在客户端的连接中建立多个 Channel,一个 Channel 表示一个会话任务。

生产者和消费者需要与 RabbitMQ 建立 TCP 连接,而一些应用需要多个连接,因为对操作系统来说建立和销毁 TCP 连接都会产生非常昂贵的系统开销,所以为了减少 TCP 连接,引入信道的概念,以复用 TCP 连接。

信道是建立在真实的 TCP 连接内的虚拟连接,AMQP 命令都是通过信道发出去的,发布消息、订阅队列、接收消息等动作都可以通过信道完成。
网络连接(Connection) 例如 TCP 连接。
消息生产者(Producer) 发送消息的程序。
消息消费者(Consumer) 接收消息的程序。

RabbitMQ 的典型应用场景如下:

二、RabbitMQ 的使用

1. 安装部署

我们需要先安装一个 RabbitMQ 的服务中间件,待部署的服务器系统版本为 CentOS Linux release 7.6.1810(Core),内核版本为 3.10.0-1127.13.1.el7.x86_64。

具体步骤如下:

1) 安装必备的一些相关的依赖环境:

yum install -y gcc gcc-c++ glibc-devel make ncurses-devel openssl-devel autoconf java-1.8.0-openjdk-devel git


2) 下载安装 Erlang 的 rpm 安装包。因为 RabbitMQ 是基于 Erlang 语言开发的,所以需要先部署 Erlang 环境。

注意,从 RabbitMQ 官网选择下载的 rmp 包要和系统内核版本保持一致,例如此处下载的是 el7.x86_64 的 rpm 安装包,el7 对应 CentOS 7,el8 对应 CentOS 8。

下载成功后将 rpm 安装包上传到服务器 CentOS 上,安装命令如下:

rpm -ivh erlang-23.3.4.10-1.el7.x86_64.rpm


3) 下载安装 RabbitMQ-server 的 3.9.13 版本。此处注意要下载 el7 版本,和待部署环境的系统保持一致,即选择 RabbitMQ-server-3.9.13-1.el7.noarch.rpm。

另外,使用命令yum install -y可以一并安装相关的依赖。

安装命令如下:

yum install -y RabbitMQ-server-3.9.13-1.el7.noarch.rpm

图2中的 Installed 和 Complete 表示 RabbitMQ 安装成功。

RabbitMQ 安装成功
图2:RabbitMQ 安装成功

我们可以通过以下命令启动 RabbitMQ 服务:

service Rabbit MQ-server start

启动服务成功如图3所示。

RabbitMQ 启动成功
图3:RabbitMQ 启动成功

在默认情况下 RabbitMQ 的 Web 界面是关闭的,我们需要通过 RabbitMQ-plugins 来启用,以方便后续的使用:

RabbitMQ-plugins enable RabbitMQ_management

成功启用后,我们可以通过 IP:15672 登录 RabbitMQ 的 Web 界面,如图4所示。

登录 RabbitMQ 的 Web 界面
图4:登录 RabbitMQ 的 Web 界面

在登录前,我们需要通过 RabbitMQctl 命令创建用户,如图5所示。

RabbitMQctl add_user admin admin
RabbitMQctl set_user_tags admin administrator


创建用户
图5:创建用户

创建成功后,即可使用该用户登录 RabbitMQ 的 Web 界面。

在部署后使用过程中我们可能遇到如下错误:
ERROR:root:Failed to connect to RabbitMQ: ConnectionClosedByBroker: (530) "NOT_ALLOWED - access to vhost '/' refused for user 'admin'"
ERROR:pika.adapters.blocking_connection:Connection workflow failed: AMQPConnectionWorkflowFailed: 1 exceptions in all; last exception - AMQPConnectorAMQPHandshakeError: ProbableAccessDeniedError: Client was disconnected at a connection stage indicating a probable denial of access to the specified virtual host: ('ConnectionClosedByBroker: (530) "NOT_ALLOWED - access to vhost \'/\' refused for user \'admin\'"',); first exception - None
ERROR:pika.adapters.blocking_connection:Error in _create_connection().

产生这一问题的原因是没有设置 access virtual hosts,如图6所示。

RabbitMQ 的 Web 界面
图6:RabbitMQ 的 Web 界面

我们需要通过图7中的 Set permission 设置 access virtual hosts。

在 RabbitMQ 的 Web 界面设置 permission
图7:在 RabbitMQ 的 Web 界面设置 permission

RabbitMQ 用户角色

RabbitMQ 用户的角色说明如下:
关于权限管理,MySQL 有数据库的概念并且可以指定用户操作库和表等的权限,RabbitMQ 也有类似的权限管理。

在 RabbitMQ 中,我们可以使用虚拟消息服务器 Virtual Host,每个 Virtual Host 相当于一个相对独立的 RabbitMQ 服务器,每个 Virtual Host 之间是相互隔离的,消息交换机、队列和消息不能互通。

Virtual Host 的使用场景是多租户的场景,例如在主机资源紧缺的情况下,开发和测试共用一个 RabbitMQ,我们可以使用 Virtual Hosts 将开发和测试隔离。

注意:常见的安装失败的原因就是 Erlang 版本和 CentOS 版本不匹配,或 Erlang 版本和 RabbitMQ-server 版本不匹配。

2. 使用说明

RabbitMQ 支持不同的语言,并针对不同语言提供相应的库,pika 是 Python 用于连接 RabbitMQ 的主流客户端第三方库。

pika 使用 RabbitMQ 涉及的概念如表2所示。

表2:pika 使用 RabbitMQ 涉及的概念
概念 含义 示例
路由 路由键在发送消息的时候由 routing_key 参数指定,即调用 basic_publish 函数的时候。 channel.basic_publish(exchange='logs',routing_key='',
body=message)
队列绑定 将交换机 exchange 和队列进行绑定。 channel.queue_bind(exchange=exchange_name,
queue=queue_name)
排他队列 仅自己可见的队列,即不允许其他用户访问,RabbitMQ 允许我们将一个队列声明成排他性的。 channel.queue_declare(exclusive=True)

三、封装示例

利用 pika 进行生产者和消费者的模拟流程流程如下。

生产者程序的步骤如下:
消费者程序的步骤除了最后一步是通过 basic_consume 方法消费消息,其他步骤与生产者程序的步骤相同。

结合实际的需求,封装代码如代码清单6所示,封装的代码包括 RabbitMQClient 类,该类包含连接 RabbitMQ 的连接管理、生产消息和消费消息的方法,以及获取队列消息后的回调函数。

代码清单6:RabbitMQUtil.py
# -*- coding: utf-8 -*-
# @Time : 2023/7/27 6:49 下午
# @Project : msgUtil
# @File : RabbitMQUtil.py
# @Version: Python3.9.8
 
import pika
import traceback
import logging
 
# 配置文件
MQ_CONFIG = {
    "host": "172.21.26.54",
    "port": 5672,
    "user": "admin",
    "password": "admin"
}
 
def check_connection(func):
    def wrapper(self, *args, **kwargs):
        if not all([self.channel, self.connection]) or \
                any([self.channel.is_closed, self.connection.is_closed]):
            self.clean_up()
            self.connect_mq()
        return func(self, *args, **kwargs)
    return wrapper
 
class RabbitMQClient(object):
    '''RabbitMQClient using pika library'''
 
    def __init__(self, queue, on_message_callback=None):
        self.mq_config = MQ_CONFIG
        self.connection = None
        self.channel = None
        self.queue = queue
        self.on_message_callback = on_message_callback
        self.connect_mq()
 
    def connect_mq(self):
        """连接RabbitMQ,创建连接、通道,声明队列"""
        try:
            credentials = pika.PlainCredentials(self.mq_config['user'], self.mq_config['password'])
            connect_params = pika.ConnectionParameters(self.mq_config['host'],
                                                     self.mq_config['port'],
                                                     credentials=credentials,
                                                     heartbeat=0)
            self.connection = pika.BlockingConnection(connect_params)
            self.channel = self.connection.channel()
            self.channel.queue_declare(queue=self.queue, durable=True)
            # self.channel.exchange_declare(exchange=exchange, exchange_type=exchange_type, durable=True)
            # self.channel.queue_bind(queue=queue, exchange=exchange, routing_key=binding_key)
            logging.info("Succeeded to connect to RabbitMQ.")
        except Exception as e:
            logging.error("Failed to connect to RabbitMQ: {}".format(str(e)))
            traceback.print_exc()
            return False
        return True
 
    def clean_up(self):
        """断开通道、连接"""
        try:
            if self.channel and self.channel.is_open:
                self.channel.close()
            if self.connection and self.connection.is_open:
                self.connection.close()
        except Exception as e:
            logging.error("Failed to close connection with RabbitMQ: {}".format(str(e)))
            traceback.print_exc()
 
    @check_connection
    def producer(self, message):
        """向队列发送消息"""
        if not isinstance(message, bytes):
            message = str(message).encode()
        try:
            self.channel.basic_publish(
                exchange='',
                routing_key=self.queue,  # 队列名字
                body=message,
                properties=pika.BasicProperties(
                    delivery_mode=2,  # 消息持久化
                    content_type="application/json"
                )
            )
        except Exception as e:
            logging.error('Failed to send message to RabbitMQ: {}'.format(str(e)))
            traceback.print_exc()
 
    @check_connection
    def consumer(self):
        """从队列获取消息"""
        self.channel.basic_qos(prefetch_count=1)  # 类似权重,按能力分发,如果有一个消息,就不再给我们发
        self.channel.basic_consume(  # 消费消息
            on_message_callback=self.callback,  # 如果收到消息,则回调
            queue=self.queue,
        )
        try:
            self.channel.start_consuming()
        except KeyboardInterrupt:
            self.channel.stop_consuming()
            self.connection.channel()
 
    def callback(self, ch, method, properties, body):
        """获取消息后的回调函数"""
        # message = ast.literal_eval(body.decode())
        print("consumed %r " % body)
        # self.on_message_callback(message)
        ch.basic_ack(delivery_tag=method.delivery_tag)  # 告诉生产者,消息处理完成
 
    # 统计消息数目
    def msg_count(self, queue_name, is_durable=True):
        queue = self.channel.queue_declare(queue=queue_name, durable=is_durable)
        count = queue.method.message_count
        return count
 
if __name__ == '__main__':
    mq = RabbitMQClient('testQueue2')
    import json
    msg = json.dumps({'name': 'hutong'})
    mq.producer(msg)
    mq.consumer()
运行上述示例代码,我们可以发现生产者生产了消息,并往名为 testQueue2 的队列中发送了消息,如图8所示。

消息队列接收消息
图8:消息队列接收消息

调用消费者代码,我们可以看到刚刚发送的消息被成功消费了,如图9所示。

消费者消费消息
图9:消费者消费消息

推荐阅读