消息队列

RabbitMQ 是一个在 AMQP(Advanced Message Queuing Protocol )基础上实现的、可复用的企业消息系统。它可以用于大型软件系统各个模块之间的高效通信,支持高并发,支持可扩展。

AMQP

即 Advanced Message Queuing Protocol, 一个提供统一消息服务的应用层标准高级消息队列协议, 是应用层协议的一个开放标准, 为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端 / 中间件不同产品,不同的开发语言等条件的限制。

MQ

全称为 Message Queue, 消息队列。是一种应用程序对应用程序的通信方法。应用程序通过读写出入队列的消息(针对应用程序的数据)来通信,而无需专用连接来链接它们。

消息传递指的是程序之间通过在消息中发送数据进行通信,而不是通过直接调用彼此来通信。队列的使用除去了接收和发送应用程序同时执行的要求。

在项目中,将一些无需即时返回且耗时的操作提取出来,进行了异步处理,而这种异步处理的方式大大的节省了服务器的请求响应时间,从而提高了系统的吞吐量。

RabbitMQ 应用场景

  1. 降低耦合
  2. 削峰
  3. 支持持久化
  4. 支持消息的确认机制
  5. 集群扩展
  6. 提供了 Web 可视化管理和监控
RabbitMQ 工作原理

RabbitMQ 工作原理

安装

Ubuntu

为了安装新版本,建议到官网

## 安装依赖包 socat
sudo apt install socat
wget https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.8.5/rabbitmq-server_3.8.5-1_all.deb
sudo dpkg -i rabbitmq-server_3.8.5-1_all.deb

CentOS7

sudo yum install -y socat

# 下载
$ wget https://github.com/rabbitmq/erlang-rpm/releases/download/v22.3/erlang-22.3-1.el7.x86_64.rpm

# 安装

$ rpm -ivh erlang-22.3-1.el7.x86_64.rpm

使用

启动服务

使用命令启动服务

# 直接启动
sudo rabbitmq-server

# -detached为可选参数,表示后台开启
sudo rabbitmq-server -detached

#关闭RabbitMQ服务:  
sudo rabbitmqctl stop

启动成功后显示

  ##  ##      RabbitMQ 3.8.5
  ##  ##
  ##########  Copyright (c) 2007-2020 VMware, Inc. or its affiliates.
  ######  ##
  ##########  Licensed under the MPL 1.1. Website: https://rabbitmq.com

  Doc guides: https://rabbitmq.com/documentation.html
  Support:    https://rabbitmq.com/contact.html
  Tutorials:  https://rabbitmq.com/getstarted.html
  Monitoring: https://rabbitmq.com/monitoring.html

  Logs: /var/log/rabbitmq/rabbit@william-pc.log
        /var/log/rabbitmq/rabbit@william-pc_upgrade.log

  Config file(s): (none)

  Starting broker... completed with 0 plugins.

插件管理

需要先启动插件,然后在启动 rabbitmq 服务

sudo rabbitmq-plugins enable rabbitmq_management

启动成功后

Enabling plugins on node rabbit@william-pc:
rabbitmq_management
The following plugins have been configured:
  rabbitmq_management
  rabbitmq_management_agent
  rabbitmq_web_dispatch
Applying plugin configuration to rabbit@william-pc...
The following plugins have been enabled:
  rabbitmq_management
  rabbitmq_management_agent
  rabbitmq_web_dispatch

started 3 plugins.

这样

  1. 先启动插件管理

  2. 然后启动服务

  3. 产生以下结果

    ##  ##      RabbitMQ 3.8.5
    ##  ##
    ##########  Copyright (c) 2007-2020 VMware, Inc. or its affiliates.
    ######  ##
    ##########  Licensed under the MPL 1.1. Website: https://rabbitmq.com
    
    Doc guides: https://rabbitmq.com/documentation.html
    Support:    https://rabbitmq.com/contact.html
    Tutorials:  https://rabbitmq.com/getstarted.html
    Monitoring: https://rabbitmq.com/monitoring.html
    
    Logs: /var/log/rabbitmq/rabbit@william-pc.log
       /var/log/rabbitmq/rabbit@william-pc_upgrade.log
    
    Config file(s): (none)
    
    Starting broker... completed with 3 plugins.
  4. 说明使用了 3 个插件:

    Starting broker... completed with 3 plugins.

Web 访问

Web 管理程序的端口号是 15672,在浏览器中输入 http://localhost:15672,即可以访问。

用户管理

登录以上 Web 管理界面后,需要我们输入账户和密码。

  1. 增加用户

    ## 增加用户,并配置密码
    sudo rabbitmqctl add_user william <Password>
  2. 删除用户

    sudo rabbitmqctl delete_user william
  3. 修改用户密码

    sudo rabbitmqctl change_password william <Password>
  4. 列出用户

    sudo rabbitmqctl list_users
  5. 设置用户权限

    用户的角色可以分为5种:

    • 超级管理员 (administrator):可登陆管理控制台 (启用 management plugin 的情况下),可查看所有的信息,并且可以对用户,策略(policy) 进行操作。
    • 监控者 (monitoring):可登陆管理控制台 (启用 management plugin 的情况下),同时可以查看 rabbitmq 节点的相关信息 (进程数,内存使用情况,磁盘使用情况等)
    • 策略制定者 (policymaker):可登陆管理控制台 (启用 management plugin 的情况下), 同时可以对 policy 进行管理。但无法查看节点的相关信息 (上图红框标识的部分)。
    • 普通管理者 (management):仅可登陆管理控制台 (启用 management plugin 的情况下),无法看到节点信息,也无法对策略进行管理。
    • 其他

    设置用户角色的命令为:

    ## 设置用户角色
    sudo rabbitmqctl set_user_tags william administrator
    
    ## 也可以给同一用户设置多个角色,例如
    
    sudo rabbitmqctl set_user_tags william administrator monitoring policymaker

应用

连接过程

消息队列的使用过程大概如下:

  1. 客户端连接到消息队列服务器(broker),打开一个 channel。

  2. 客户端声明一个 exchange,并设置相关属性。

  3. 客户端声明一个 queue,并设置相关属性。

  4. 客户端使用 routing key,在 exchange 和 queue 之间建立好绑定关系。

  5. 客户端投递消息到 exchange。

Python

Python 环境下可以使用 pika 软件包

python -m pip install pika

server.py

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()


try:
    channel.exchange_declare(exchange='logs',
                             exchange_type='fanout', 
                             durable=True)
except:
    channel = connection.channel()
    channel.exchange_delete( exchange='logs' )
    channel.exchange_declare(exchange='logs',
                             exchange_type='fanout', 
                             durable=True)
    

from time import sleep
counter = 0

channel.basic_qos(prefetch_count=1)
while 1:
    counter += 1
    message = "[{}] info: Hello World!".format(counter)
    channel.basic_publish(exchange='logs',
                          routing_key='',
                          body=message,
                          properties=pika.BasicProperties(delivery_mode = 2))
    print(" [x] Sent %r" % message)
    sleep(1)
connection.close()

client.py

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='logs',
                         exchange_type='fanout', durable=True)

result = channel.queue_declare(queue = "", exclusive=True, durable=True)
queue_name = result.method.queue

channel.queue_bind(exchange='logs',
                   queue=queue_name)

print(' [*] Waiting for logs. To exit press CTRL+C')


def callback(ch, method, properties, body):
    print(" [x] %r" % body)

channel.basic_qos(prefetch_count=1)
channel.basic_consume(on_message_callback = callback,
                      queue = queue_name,
                      auto_ack = True)

channel.start_consuming()

C++

AMQP-CPP

这个项目相对比较活跃,而且支持 C++11 标准。

AMQP-CPP is a C++ library for communicating with a RabbitMQ message broker. The library can be used to parse incoming data from a RabbitMQ server, and to generate frames that can be sent to a RabbitMQ server.

AMQP-CPP is fully asynchronous and does not do any blocking (system) calls, so it can be used in high performance applications without the need for threads.

git clone git@github.com:CopernicaMarketingSoftware/AMQP-CPP.git

mkdir build
cd build
## cmake .. [-DAMQP-CPP_BUILD_SHARED] [-DAMQP-CPP_LINUX_TCP]
cmake .. -DAMQP-CPP_BUILD_SHARED=ON -DAMQP-CPP_LINUX_TCP=ON
sudo cmake --build . --target install

Demo

git clone git@github.com:hoxnox/examples.amqp-cpp.git
cd examples.amqp-cpp
mkdir build
cd build
cmake ..
make -j

./receive

./send