RabbitMQ详解

  |   0 评论   |   0 浏览

本篇文章基于RabbitMQ中间件,包含消息中间件应用场景、AMQP协议、常用命令、基础架构、可靠性解决方案、死信队列、延迟队列等等,希望对大家有所帮助。

一、消息中间件基础知识

两种常见分布式架构

SOA架构:用Dubbo和Zookeeper进行服务间的远程通信。根据实际业务,把系统拆分成合适的、独立部署的模块,模块之间相互独立。根据实际业务,把系统拆分成合适的、独立部署的模块,模块之间相互独立。Dubbo使用自定义的TCP协议,可以让请求报文体积更小,或者使用HTTP2协议,也可以减少报文 的体积,提高传输效率。

微服务架构:SpringCloud中使用Feign解决服务之间远程通信的问题,Feign是轻量级RESTful的HTTP服务客户端,广泛应用于Spring Cloud中。符合面向接口化的编程习惯。

本质:封装了HTTP调用流程,类似Dubbo的服务调用。RPC主要基于TCP/UDP协议,HTTP协议是应用层协议,是构建在传输层协议TCP之上的,RPC效率更高,RPC长连接:不必每次通信都像HTTP一样三次握手,减少网络开销; HTTP服务开发迭代更快:在接口不多,系统与系统之间交互比较少的情况下,HTTP就显得更加方便;相反,在接口比较多,系统与系统之间交互比较多的情况下,HTTP就没有RPC有优势。

分布式通信存在的问题及解决办法

电商项目中,如果后台添加商品信息,该信息放到数据库,我们同时,需要更新搜索引擎的倒排索引。

解决办法一:在后台添加商品的方法中,如果数据插入数据库成功,就调用更新倒排索引的方法, 接着调用更新静态化页面的方法,如果更新失败重试,(不推荐,更新失败容易出现死循环且高并发场景不适用)

解决办法二:先执行添加商品的方法,商品添加成功,将更新索引和更新静态页面的任务缓存到一 个公共的位置,然后由相应的服务从该位置获取任务来执行。比如使用redis,使用阻塞队列轮询异步执行。(单使用redis,不使用消息队列,无法确认消息,不推荐)

解决办法三:分布式异步通信模式,如下图。

优点:系统间解耦,并具有一定的可恢复性,支持异构系统,下游通常可并发执行,系统具备弹性。服务解耦、流量削峰填谷等

缺点:消息中间件存在一些瓶颈和一致性问题,对于开发来讲不直观且不易调试,有额外成本。

使用异步消息模式需要注意的问题: 1. 哪些业务需要同步处理,哪些业务可以异步处理? 2. 如何保证消息的安全?消息是否会丢失,是否会重复? 3. 请求的延迟如何能够减少? 4. 消息接收的顺序是否会影响到业务流程的正常执行? 5. 消息处理失败后是否需要重发?如果重发如何保证幂等性?

幂等性:不管重发多少次,都要保证结果的一致性。

消息中间件概念

消息中间件也可以称消息队列,是指用高效可靠的消息传递机制进行与平台无关的数据交流,并基于数据通信来进行分布式系统的集成。通过提供消息传递和消息队列模型,可以在分布式环境下扩展进程的通信。

消息中间件就是在通信的上下游之间截断:break it,Broker,然后利用中间件解耦、异步的特性,构建弹性、可靠、稳定的系统。异步处理、流量削峰、限流、缓冲、排队、最终一致性、消息驱动等需求的场景都可以使用消息中间件。

常用主流消息中间件介绍

使用最为广泛的三款消息中间件:RabbitMQ、RocketMQ、Kafka。

RabbitMQ

RabbitMQ开始是用在电信业务的可靠通信的,也是少有的几款支持AMQP协议的产品之一。

优点: 1. 轻量级,快速,部署使用方便 2. 支持灵活的路由配置。RabbitMQ中,在生产者和队列之间有一个交换器模块。根据配置的路 由规则,生产者发送的消息可以发送到不同的队列中。路由规则很灵活,还可以自己实现。 3. RabbitMQ的客户端支持大多数的编程语言。

缺点: 1. 如果有大量消息堆积在队列中,性能会急剧下降 2. RabbitMQ的性能在Kafka和RocketMQ中是最差的,每秒处理几万到几十万的消息。如果应用要求高的性能,不要选择RabbitMQ。 3. RabbitMQ是Erlang开发的,功能扩展和二次开发代价很高。

RocketMQ

RocketMQ是一个开源的消息队列,使用java实现。借鉴了Kafka的设计并做了很多改进。 RocketMQ主要用于有序,事务,流计算,消息推送,日志流处理,binlog分发等场景。

RocketMQ几乎具备了消息队列应该具备的所有特性和功能。 java开发,阅读源代码、扩展、二次开发很方便。 对电商领域的响应延迟做了很多优化。在大多数情况下,响应在毫秒级。如果应用很关注响应时间,可以使用RocketMQ。 性能比RabbitMQ高一个数量级,每秒处理几十万的消息。

缺点: 跟周边系统的整合和兼容不是很好。

Kafka

Kafka的可靠性,稳定性和功能特性基本满足大多数的应用场景。 跟周边系统的兼容性是数一数二的,尤其是大数据和流计算领域,几乎所有相关的开源软件都支持 Kafka,Kafka高效,可伸缩,消息持久化。支持分区、副本和容错。Kafka是Scala和Java开发的,对批处理和异步处理做了大量的设计,因此Kafka可以得到非常高的性能。它的异步消息的发送和接收是三个中最好的,但是跟RocketMQ拉不开数量级,每秒处理几十万的消息。 如果是异步消息,并且开启了压缩,Kafka最终可以达到每秒处理2000w消息的级别。 但是由于是异步的和批处理的,延迟也会高,不适合电商场景。

消息中间件应用场景

电商秒杀场景

当秒杀开始前,用户在不断的刷新页面,系统应该如何应对高并发的读请求呢?

在秒杀开始时,大量并发用户瞬间向系统请求生成订单,扣减库存,系统应该如何应对高并发的写请求呢?

系统应该如何应对高并发的读请求:

使用缓存策略将请求挡在上层中的缓存中

能静态化的数据尽量做到静态化

加入限流(比如对短时间之内来自某一个用户,某一个IP、某个设备的重复请求做丢弃处理)

系统应该如何应对高并发的写请求:

生成订单,扣减库存,用户这些操作不经过缓存直达数据库。如果在 1s内,有 1 万个数据连接同时到达,系统的数据库会濒临崩溃。如何解决这个问题呢?我们可以使用消息队列。

消息队列的作用:

削去秒杀场景下的峰值写流量——流量削峰(削去秒杀场景下的峰值写流量,将秒杀请求暂存于消息队列,业务服务器响应用户“秒杀结果正在处理中。。。”,释放系统资源去 处理其它用户的请求)

通过异步处理简化秒杀请求中的业务流程——异步处理(先处理主要的业务,异步处理次要的业务。 如主要流程是生成订单、扣减库存;次要流程比如购买成功之后会给用户发优惠券,增加用户的积分)

解耦,实现秒杀系统模块之间松耦合——解耦(实现秒杀系统模块之间松耦合,将数据全部发送给消息队列,然后数据服务订阅这个消息队列,接收数据进行处理)

B端C端数据同步场景

B端面向企业用户,C端面向求职者。这两个模块业务处理逻辑不同,数据库表结构不同,实际上是处于解耦的状态。但是各自又需要对方的数据,需要共享:如 1. 当C端求职者在更新简历之后,B端企业用户如何尽早看到该简历更新? 2. 当B端企业用户发布新的职位需求后,C端用户如何尽早看到该职位信息?

如何解决B端C端数据共享的问题?

  1. 同步方式:B端和C端通过RPC或WebService的方式发布服务,让对方来调用,以获取对方的信息。求职者每更新一次简历,就调用一次B端的服务,进行数据的同步;B端企业用户每更新职位需求,就调用C端的服务,进行数据的同步。
  2. 异步方式:使用消息队列,B端将更新的数据发布到消息队列,C端将更新的数据发布到消息队列,B端订阅C端的消息队列,C端订阅B端的消息队列。

使用同步方式,B端和C端耦合比较紧密,如果其中一个服务有问题,可能会导致另一个服务不可用。使用消息队列的异步方式,对B端C端进行解耦,只要消息队列可用,双方都可以将需要同步的信息 发送到消息队列,对方在收到消息队列推送来的消息的时候,各自更新自己的搜索引擎,更新自己的缓存数据。

JMS规范

JMS即Java消息服务(Java Message Service)应用程序接口,是一个Java平台中关于面向消息中间件(MOM,Message oriented Middleware)的API,用于在两个应用程序之间,或分布式系统中发送 消息,进行异步通信。与具体平台无关的API,绝大多数MOM提供商都支持。 它类似于JDBC(Java Database Connectivity)。消息是JMS中的一种类型对象,由两部分组成:报文头和消息主体。

根据有效负载的类型来划分,可以将消息分为几种类型: 1. 简单文本(TextMessage) 2. 可序列化的对象(ObjectMessage) 3. 属性集合(MapMessage) 4. 字节流(BytesMessage) 5. 原始值流(StreamMessage) 6. 无有效负载的消息(Message)。

对象模型:ConnectionFactory 接口(连接工厂)、Connection 接口(连接)、Destination 接口(目标)、Session 接口(会话)、MessageConsumer 接口(消息消费者)、MessageProducer 接口(消息生产者)、Message 接口(消息)

点对点模式

一个生产者向一个特定的队列发布消息,一个消费者从该队列中读取消息。这里,生产者知道消费 者的队列,并直接将消息发送到消费者的队列,概括为: 一条消息只有一个消费者获得 生产者无需在接收者消费该消息期间处于运行状态,接收者也同样无需在消息发送时处于运行 状态。 每一个成功处理的消息要么自动确认,要么由接收者手动确认。

发布/订阅模式

支持向一个特定的主题发布消息。0或多个订阅者可能对接收特定消息主题的消息感兴趣。发布者和订阅者彼此不知道对方。多个消费者可以获得消息。

在发布者和订阅者之间存在时间依赖性。 发布者需要建立一个主题,以便客户能够订阅。 订阅者必须保持持续的活动状态以接收消息,否则会丢失未上线时的消息。 对于持久订阅,订阅者未连接时发布的消息将在订阅者重连时重发。

JMS在应用集群中的问题

点对点和发布订阅模式在集群下都存在问题,点对点浪费空间,发布订阅对业务侵入较大,ActiveMQ通过“虚拟主题”解决了这个问题。

JMS规范文档(jms-1_1-fr-spec.pdf)下载地址: https://download.oracle.com/otndocs/jcp/7195-jms-1.1-fr-spec-oth-JSpec/

AMQP协议

AMQP全称高级消息队列协议(Advanced Message Queuing Protocol),是一种标准,类似于 JMS,兼容JMS协议。目前RabbitMQ主流支持AMQP 0-9-1,3.8.4版本支持AMQP 1.0,AMQP是一个二进制的协议,信息被组织成数据帧,有很多类型。。所有数据帧都拥有基本相同的格式:帧头,负载,帧尾。数据帧负载的格式依赖于数据帧的类型。

Publisher:消息发送者,将消息发送到Exchange并指定RoutingKey,以便queue可以接收到指定的消息。

Consumer:消息消费者,从queue获取消息,一个Consumer可以订阅多个queue以从多个 queue中接收消息。 Server:一个具体的MQ服务实例,也称为Broker。

Virtual host:虚拟主机,一个Server下可以有多个虚拟主机,用于隔离不同项目,一个Virtual host通常包含多个Exchange、Message Queue。

Exchange:交换器,接收Producer发送来的消息,把消息转发到对应的Message Queue中。

Routing key:路由键,用于指定消息路由规则(Exchange将消息路由到具体的queue中),通 常需要和具体的Exchange类型、Binding的Routing key结合起来使用

Message Queue:实际存储消息的容器,并把消息传递给最终的Consumer。

AMQP 使用的数据类型如下:

Integers(数值范围1-8的十进制数字):用于表示大小,数量,限制等,整数类型无符号的,可以在帧内不对齐。 Bits(统一为8个字节):用于表示开/关值。

Short strings:用于保存简短的文本属性,字符串个数限制为255,8个字节

Long strings:用于保存二进制数据块。

Field tables:包含键值对,字段值一般为字符串,整数等。

AMQP协议文档下载地址: https://www.amqp.org/sites/amqp.org/files/amqp0-9-1.zip

二、RabbitMQ架构

RabbitMQ概念及基本架构

RabbitMQ,俗称“兔子MQ”(可见其轻巧,敏捷),是目前非常热门的一款开源消息中间件。RabbitMQ具有很强大的插件扩展能力,并具备以下特点:

  1. 高可靠性、易扩展、高可用、功能丰富等
  2. 支持大多数(甚至冷门)的编程语言客户端。
  3. RabbitMQ遵循AMQP协议,自身采用Erlang
  4. RabbitMQ也支持MQTT等其他协议。

RabbitMQ常用的交换器类型有: fanout 、 direct 、 topic 、 headers 四种。

Fanout 会把所有发送到该交换器的消息路由到所有与该交换器绑定的队列中。

Direct direct类型的交换器路由规则很简单,它会把消息路由到那些BindingKey和RoutingKey完全匹配的队列中。

Topic topic类型的交换器在direct匹配规则上进行了扩展,可以存在两种特殊字符“*”和 “#”,用于模糊匹配。

Headers headers类型的交换器不依赖于路由键的匹配规则来路由信息,而是根据发送的消息内容中的 headers属性进行匹配。

RabbitMQ消息有两种类型: 1. 持久化消息和非持久化消息。 2. 这两种消息都会被写入磁盘。

持久化消息在到达队列时写入磁盘,同时会内存中保存一份备份,当内存吃紧时,消息从内存中清除。

非持久化消息一般只存于内存中,当内存压力大时数据刷盘处理,以节省内存空间。

RabbitMQ存储层包含两个部分:队列索引和消息存储。

队列索引:rabbit_queue_index 索引维护队列的落盘消息的信息,如存储地点、是否已被给消费者接收、是否已被消费者ack等。

消息存储:rabbit_msg_store 消息以键值对的形式存储到文件中,一个虚拟主机上的所有队列使用同一块存储,每个节点只有一 个。存储分为持久化存储(msg_store_persistent)和短暂存储(msg_store_transient)。持久化存 储的内容在broker重启后不会丢失,短暂存储的内容在broker重启后丢失。

队列结构 通常队列由rabbit_amqqueue_process和backing_queue这两部分组成, rabbit_amqqueue_process负责协议相关的消息处理,即接收生产者发布的消息、向消费者交付消 息、处理消息的确认(包括生产端的confirm和消费端的ack)等。backing_queue是消息存储的具体形 式和引擎,并向rabbit_amqqueue_process提供相关的接口以供调用。

为什么消息的堆积导致性能下降?

在系统负载较高时,消息若不能很快被消费掉,这些消息就会进入到很深的队列中去,这样会增加处理每个消息的平均开销。因为要花更多的时间和资源处理“堆积”的消息,如此用来处理新流入的消息 的能力就会降低,使得后流入的消息又被积压到很深的队列中,继续增大处理每个消息的平均开销,继 而情况变得越来越恶化,使得系统的处理能力大大降低。

安装和配置RabbitMQ

第一步:安装依赖

yum install socat -y

第二步:安装Erlang

rpm -ivh erlang-23.0.2-1.el7.x86_64.rpm --force --nodeps

第三步:安装RabbitMQ

rpm -ivh rabbitmq-server-3.8.4-1.el7.noarch.rpm --force –nodeps

第四步:启用RabbitMQ的管理插件

cd ../usr/lib/rabbitmq

rabbitmq-plugins enable rabbitmq_management

第五步:启动RabbitMQ

systemctl start rabbitmq-server 前台启动

rabbitmq-server -detached 后台启动

第六步:添加用户

rabbitmqctl add_user root 123456

第七步:给用户添加权限

rabbitmqctl set_permissions root -p / "." "." ".*"

第八步: 给用户设置标签

rabbitmqctl set_user_tags root administrator

第九步:打开浏览器,登录客户端

RabbitMQ常用操作命令

前台启动Erlang VM和RabbitMQ

rabbitmq-server

后台启动

rabbitmq-server -detached

停止RabbitMQ和Erlang VM

rabbitmqctl stop

查看所有队列

rabbitmqctl list_queues

查看所有虚拟主机

rabbitmqctl list_vhosts

在Erlang VM运行的情况下启动RabbitMQ应用

rabbitmqctl start_app rabbitmqctl stop_app

查看节点状态

rabbitmqctl status

查看所有可用的插件

rabbitmq-plugins list

启用插件

rabbitmq-plugins enable

停用插件

rabbitmq-plugins disable

添加用户

rabbitmqctl add_user username password

列出所有用户:

rabbitmqctl list_users

删除用户:

rabbitmqctl delete_user username

清除用户权限: r

abbitmqctl clear_permissions -p vhostpath username

列出用户权限:

rabbitmqctl list_user_permissions username

修改密码:

rabbitmqctl change_password username newpassword

设置用户权限:

rabbitmqctl set_permissions -p vhostpath username "." "." ".*"

创建虚拟主机:

rabbitmqctl add_vhost vhostpath

列出所以虚拟主机:

rabbitmqctl list_vhosts

列出虚拟主机上的所有权限:

rabbitmqctl list_permissions -p vhostpath

删除虚拟主机:

rabbitmqctl delete_vhost vhost vhostpath

移除所有数据,要在 rabbitmqctl stop_app 之后使用:

rabbitmqctl reset

RabbitMQ工作流程详解

生产者发送消息的流程

  1. 生产者连接RabbitMQ,建立TCP连接( Connection),开启信道(Channel)
  2. 生产者声明一个Exchange(交换器),并设置相关属性,比如交换器类型、是否持久化等
  3. 生产者声明一个队列井设置相关属性,比如是否排他、是否持久化、是否自动删除等
  4. 生产者通过 bindingKey (绑定Key)将交换器和队列绑定( binding )起来
  5. 生产者发送消息至RabbitMQ Broker,其中包含 routingKey (路由键)、交换器等信息
  6. 相应的交换器根据接收到的 routingKey 查找相匹配的队列。
  7. 如果找到,则将从生产者发送过来的消息存入相应的队列中。
  8. 如果没有找到,则根据生产者配置的属性选择丢弃还是回退给生产者
  9. 关闭信道。
  10. 关闭连接。

消费者接收消息的过程

  1. 消费者连接到RabbitMQ Broker ,建立一个连接(Connection ) ,开启一个信道(Channel) 。
  2. 消费者向RabbitMQ Broker 请求消费相应队列中的消息,可能会设置相应的回调函数, 以及 做一些准备工作 3. 等待RabbitMQ Broker 回应并投递相应队列中的消息, 消费者接收消息。
  3. 消费者确认( ack) 接收到的消息。
  4. RabbitMQ 从队列中删除相应己经被确认的消息。
  5. 关闭信道。
  6. 关闭连接。

Connection 和Channel关系

生产者和消费者,需要与RabbitMQ Broker 建立TCP连接,也就是Connection 。一旦TCP 连接建立起来,客户端紧接着创建一个AMQP 信道(Channel),每个信道都会被指派一个唯一的ID。信道是建立在Connection 之上的虚拟连接, RabbitMQ 处理的每条AMQP 指令都是通过信道完成的。

为什么不直接使用TCP连接,而是使用信道?

RabbitMQ 采用类似NIO的做法,复用TCP 连接,减少性能开销,便于管理。 当每个信道的流量不是很大时,复用单一的Connection 可以在产生性能瓶颈的情况下有效地节省 TCP 连接资源。当信道本身的流量很大时,一个Connection 就会产生性能瓶颈,流量被限制。需要建立多个 Connection ,分摊信道。

RabbitMQ工作模式详解

工作队列模式Work Queue

生产者发消息,启动多个消费者实例来消费消息,每个消费者仅消费部分信息,可达到负载均衡的效果。

结果:生产者生产消息发送给交换器,交换器向绑定他的消费者分发消息,负载均衡。

发布订阅模式fanout

使用fanout类型交换器,routingKey忽略。每个消费者定义生成一个队列并绑定到同一个Exchange,每个消费者都可以消费到完整的消息。消息广播给所有订阅该消息的消费者。生产者将消息发送给交换器。交换器非常简单,从生产者接收消息,将消息推送给消息队列。交换器的类型: direct 、 topic 、 headers 和 fanout 四种类型。发布订阅使 用fanout。不指定交换器会使用默认交换器default。

实现RabbitMQ的消费者有两种模式,推模式(Push)和拉模式(Pull)。 实现推模式推荐的方式 是继承 DefaultConsumer 基类,也可以使用Spring AMQP的 SimpleMessageListenerContainer 。

结果:生产者往交换器发送消息,消费者会声明一个临时队列,绑定到交换器,当消息过来,交换器会复制N份发送给订阅的消费者。实现将消息广播到很多接收者。

路由模式direct

使用 direct 类型的Exchange,发N条消费并使用不同的 routingKey ,消费者定义队列并将队列、 routingKey 、Exchange绑定。此时使用 direct 模式Exchagne必须要 routingKey 完全匹配的情况下消息才会转发到对应的队列中被消费,通过路由模式实现让接收者只接收部分消息。

结果:生产者往交换器发送消息,消费者根据key绑定交换器,交换器根据匹配路由和key推送消息。实现通过 direct 类型的交换器做到了根据日志级别的不同,将消息发送给了不同队列的。

主题模式topic

使用 topic 类型的交换器,队列绑定到交换器、 bindingKey 时使用通配符,交换器将消息路由转发到具体队列时会根据消息 routingKey 模糊匹配,比较灵活。

结果:生产者往交换器发送消息,消费者根据key绑定交换器,通过绑定器里面的通配符让交换器发送消息时进行分类发送,最后达到定制分发效果。

SpringBoot整合RabbitMQ

  1. 添加starter依赖
org.springframework.boot spring-boot-starter-amqp
  1. application.properties中添加连接信息

spring.application.name=springboot_rabbit
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.virtual-host=/
spring.rabbitmq.username=root
spring.rabbitmq.password=123456
spring.rabbitmq.port=5672

  1. 主入口类

4.RabbitConfig类

5.使用RestController发送消息

6.使用监听器,用于推消息

三、RabbitMQ特性

消息可靠性问题

支付平台必须保证数据正确性,保证数据并发安全性,保证数据最终一致性。 支付平台通过如下几种方式保证数据一致性:

  1. 分布式锁,用redis或zookeeper等常用框架来实现。 比如我们在修改账单时,先锁定该账单,如果该账单有并发操作,后面的操作只能等 待上一个操作的锁释放后再依次执行。

优点:能够保证数据强一致性。 缺点:高并发场景下可能有性能问题。

  1. 消息队列,保证最终一致性,我们需要确保消息队列有ack机制 客户端收到消 息并消费处理完成后,客户端发送ack消息给消息中间件 如果消息中间件超过指定时间还没收 到ack消息,则定时去重发消息。

优点:异步、高并发 缺点:有一定延时、数据弱一致性,并且必须能够确保该业务操作肯定能够成 功完成,不可能失败。

消息可靠性解决之异常捕获机制

先执行业务操作,业务操作成功后执行消息发送,消息发送过程通过try catch 方式捕获异常, 在异常处理理的代码块中执行回滚业务操作或者执行重发操作等。这是一种最大努力确保的方式,并无法保证100%绝对可靠,因为这里没有异常并不代表消息就一定投递成功。

消息可靠性解决之AMQP/RabbitMQ的事务机制

没有捕获到异常并不能代表消息就一定投递成功了。一直到事务提交后都没有异常,确实就说明消息是投递成功了。这种方式在性能方面的开销比较大,不推荐使用。

消息可靠性解决之发送端确认机制

RabbitMQ后来引入了一种轻量量级的方式,叫发送方确认(publisher confirm)机制。生产者将信道设置成confirm(确认)模式,一旦信道进入confirm 模式,所有在该信道上⾯面发布的消息都会被指派 一个唯一的ID(从1 开始),一旦消息被投递到所有匹配的队列之后(如果消息和队列是持久化的,那么确认消息会在消息持久化后发出),RabbitMQ 就会发送一个确认(Basic.Ack)给生产者(包含消息的唯一 ID),这样生产者就知道消息已经正确送达了。

消息可靠性解决之持久化存储机制

持久化是提高RabbitMQ可靠性的基础,否则当RabbitMQ遇到异常时(如:重启、断电、停机 等)数据将会丢失。主要从以下几个方面来保障消息的持久性:

  1. Exchange的持久化。通过定义时设置durable 参数为ture来保证Exchange相关的元数据不丢失。
  2. Queue的持久化。也是通过定义时设置durable 参数为ture来保证Queue相关的元数据不丢失。

3.消息的持久化。通过将消息的投递模式 (BasicProperties 中的 deliveryMode 属性)设置为 2 即可实现消息的持久化,保证消息自身不丢失。

RabbitMQ中的持久化消息都需要写入磁盘(当系统内存不足时,非持久化的消息也会被刷盘处理),这些处理动作都是在“持久层”中完成的。

  1. 队列索引(rabbit_queue_index),rabbit_queue_index 负责维护Queue中消息的信息,包括 消息的存储位置、是否已交给消费者、是否已被消费及Ack确认等,每个Queue都有与之对应 的rabbit_queue_index。
  2. 消息存储(rabbit_msg_store),rabbit_msg_store 以键值对的形式存储消息,它被所有队列列 共享,在每个节点中有且只有一个。

消息可靠性解决之Consumer ACK

如何保证消息被消费者成功消费?

生产者发送确认机制和消息的持久化存储机制,然而这依然无法完全保证整个过程的可靠性,因为如果消息被消费过程中业务处理失败了但是消息却已经出列了(被标记为已消费了),我 们又没有任何重试,那结果跟消息丢失没什么分别。 因此RabbitMQ在消费端会有Ack机制,即消费端消费消息后需要发送Ack确认报文给Broker端,告知自己是否已消费完成,否则可能会一直重发消息直到消息过期(AUTO模式)。

  1. 采用NONE模式,消费的过程中自行捕获异常,引发异常后直接记录日志并落到异常恢复表, 再通过后台定时任务扫描异常恢复表尝试做重试动作。如果业务不自行处理则有丢失数据的风险
  2. 采用AUTO(自动Ack)模式,不主动捕获异常,当消费过程中出现异常时会将消息放回 Queue中,然后消息会被重新分配到其他消费者节点(如果没有则还是选择当前节点)重新 被消费,默认会一直重发消息并直到消费完成返回Ack或者一直到过期
  3. 采用MANUAL(手动Ack)模式,消费者自行控制流程并手动调用channel相关的方法返回 Ack

SpringBoot项目中支持如下的一些配置:

#最大重试次数

spring.rabbitmq.listener.simple.retry.max-attempts=5

#是否开启消费者重试(为false时关闭消费者重试,意思不是“不重试”,而是一直收到消息直到jack 确认或者一直到超时)

spring.rabbitmq.listener.simple.retry.enabled=true

#重试间隔时间(单位毫秒)

spring.rabbitmq.listener.simple.retry.initial-interval=5000

#重试超过最大次数后是否拒绝

spring.rabbitmq.listener.simple.default-requeue-rejected=false

#ack模式

spring.rabbitmq.listener.simple.acknowledge-mode=manual

消息可靠性解决之消费端限流

在电商的秒杀活动中,活动一开始会有大量并发写请求到达服务端,需要对消息进行削峰处理,如何削峰?

当消息投递速度远快于消费速度时,随着时间积累就会出现“消息积压”。消息中间件本身是具备一 定的缓冲能力的,但这个能力是有容量限制的,如果长期运行并没有任何处理,最终会导致Broker崩 溃,而分布式系统的故障往往会发生上下游传递,产生连锁反应。

  1. RabbitMQ 可以对内存和磁盘使用量设置阈值,当达到阈值后,生产者将被阻塞(block),直 到对应项指标恢复正常。全局上可以防止超大流量、消息积压等导致的Broker被压垮。
  2. RabbitMQ 还默认提供了一种基于credit flow 的流控机制,面向每一个连接进行流控。当单个队列达到最大流速时,或者多个队列达到总流速时,都会触发流控。
  3. RabbitMQ中有一种QoS保证机制,可以限制Channel上接收到的未被Ack的消息数量,如果 超过这个数量限制RabbitMQ将不会再往消费端推送消息。这是一种流控手段,可以防止大量消息瞬时从Broker送达消费端造成消费端巨大压力(甚至压垮消费端)。比较值得注意的是 QoS机制仅对于消费端推模式有效,对拉模式无效。

提升下游应用的吞吐量和缩短消费过程的耗时,优化主要以下几种方式:

  1. 优化应用程序的性能,缩短响应时间(需要时间)
  2. 增加消费者节点实例(成本增加,而且底层数据库操作这些也可能是瓶颈)
  3. 调整并发消费的线程数(线程数并非越大越好,需要大量压测调优至合理值)

消息可靠性保障

消息可靠传输一般是业务系统接入消息中间件时首要考虑的问题,一般消息中间件的消息传输保障 分为三个层级:

  1. At most once:最多一次。消息可能会丢失,但绝不会重复传输
  2. At least once:最少一次。消息绝不会丢失,但可能会重复传输
  3. Exactly once:恰好一次。每条消息肯定会被传输一次且仅传输一次

RabbitMQ 支持其中的“最多一次”和“最少一次”。

其中“最少一次”投递实现需要考虑以下这个几个方面的内容:

  1. 消息生产者需要开启事务机制或者publisher confirm 机制,以确保消息可以可靠地传输到 RabbitMQ 中。
  2. 消息生产者需要配合使用 mandatory 参数或者备份交换器来确保消息能够从交换器路由到队列中,进而能够保存下来而不会被丢弃。
  3. 消息和队列都需要进行持久化处理,以确保RabbitMQ 服务器在遇到异常情况时不会造成消息丢失。
  4. 消费者在消费消息的同时需要将autoAck 设置为false,然后通过手动确认的方式去确认已经正确消费的消息,以避免在消费端引起不必要的消息丢失。

“最多一次”的方式就无须考虑以上那些方面,生产者随意发送,消费者随意消费,不过这样很难确 保消息不会丢失。

消息幂等性处理

一个幂等操作的特点是,其任意多次执行所产生的影响均与一次执行的影响相同。一个幂等的方 法,使用同样的参数,对它进行多次调用和一次调用,对系统产生的影响是一样的。 对于幂等的方法,不用担心重复执行会对系统造成任何改变。

对于幂等性的一些常见做法:

  1. 借助数据库唯一索引,重复插入直接报错,事务回滚。
  2. 前置检查机制。为了防止并发问题,我们通常需要借助“排他锁”来完成。在支付宝有一条铁律叫:一锁、二判、三操作。当然,我们也可以使用乐观锁或CAS机制,乐观锁一般会使用扩展一个版本号字段做判断条件。
  3. 唯一Id机制,比较通用的方式。

消息可靠性分析

在使用任何消息中间件的过程中,难免会出现消息丢失等异常情况,这个时候就需要有一个良好的机制来跟踪记录消息的过程(轨迹溯源),帮助我们排查问题。 在RabbitMQ 中可以使用Firehose 功能来实现消息追踪,Firehose 可以记录每一次发送或者消费 消息的记录,方便RabbitMQ 的使用者进行调试、排错等。Firehose 的原理是将生产者投递给RabbitMQ 的消息,或者RabbitMQ 投递给消费者的消息按照指定的格式发送到默认的交换器上。

开启Firehose命令: rabbitmqctl trace_on [-p vhost]

关闭命令为:rabbitmqctl trace_off [-p vhost]

Firehose 默认情况下处于关闭状态,并且Firehose 的状态是非持久化的,会在RabbitMQ服务重启的时候还原成默认的状态。Firehose 开启之后多少会影响RabbitMQ 整体服务性能,因为它会引起额 外的消息生成、路由和存储。

TTL机制

在京东下单,订单创建成功,等待支付,一般会给30分钟的时间,开始倒计时。如果在这段时间内 用户没有支付,则默认订单取消。

常用实现办法:

  1. 定期轮询检查(数据库等)
  2. Timer定时器
  3. ScheduledExecutorService多线程
  4. RabbitMQ消息队列

TTL,Time to Live 的简称,即过期时间。 RabbitMQ 可以对消息和队列两个维度来设置TTL。

两种方法可以设置消息的TTL:

  1. 通过Queue属性设置,队列中所有消息都有相同的过期时间。
  2. 对消息自身进行单独设置,每条消息的TTL可以不同。

默认规则:

  1. 如果不设置TTL,则表示此消息不会过期;
  2. 如果TTL设置为0,则表示除非此时可以直接将消息投递到消费者,否则该消息会被立即丢弃;

通过命令行方式设置全局TTL,执行如下命令:

rabbitmqctl set_policy TTL ".*" '{"message-ttl":30000}' --apply-to queues

死信队列

用户下单,调用订单服务,然后订单服务调用派单系统通知外卖人员送单,这时候订单系统与派单系统采用 MQ异步通讯。在定义业务队列时可以考虑指定一个 死信交换机,并绑定一个死信队列。当消息变成死信时,该消 息就会被发送到该死信队列上,这样方便我们查看消息失败的原因。 DLX,全称为Dead-Letter-Exchange,死信交换器。消息在一个队列中变成死信(Dead Letter) 之后,被重新发送到一个特殊的交换器(DLX)中,同时,绑定DLX的队列就称为“死信队列”。

以下几种情况导致消息变为死信:

  1. 消息被拒绝(Basic.Reject/Basic.Nack),并且设置requeue参数为false;
  2. 消息过期;
  3. 队列达到最大长度。

延迟队列

延迟消息是指的消息发送出去后并不想立即就被消费,而是需要等(指定的)一段时间后才触发消费。

  1. 生产者将消息(msg)和路由键(routekey)发送指定的延时交换机(exchange)上
  2. 延时交换机(exchange)存储消息等待消息到期根据路由键(routekey)找到绑定自己的队列 (queue)并把消息给它 3. 队列(queue)再把消息发送给监听它的消费者(customer)

原文链接https://zhuanlan.zhihu.com/p/494448561?utm_id=0