消息队列


引言

消息是人与计算机进行信息传递的有效载体,消息从发送者到接收者的典型传递方式有两种:

  • 即时消息:即消息从一端发出后(消息发送者)立即就可以达到另一端(消息接收者),这种方式的具体实现就是平时最常见的IM聊天消息;

  • 延迟消息:即消息从一端发出后,首先进入消息队列进行临时存储,然后再由消息队列发送给另一端。

一、消息队列的两种消息模式

Java消息服务(JavaMessage Service,JMS)应用程序接口是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。它使得分布式通信耦合度更低,消息服务更加可靠。在JMS规范中,支持两种消息模型:点对点模式(point to point, queue)和发布/订阅模式(publish/subscribe,topic)。

点对点模式(P2P)

消息生产者生产消息发送到Queue中,然后消息消费者从Queue中取出并且消费消息。 消息被消费以后,Queue中不再有存储,所以消息只能被一个消费者消费一次。

点对点模式

发布/订阅模式(Pub/Sub)

消息生产者发布消息到Topic中,多个消费者可以从该Topic中订阅到这条消息并消费。

发布/订阅模式

区别

这两种模式主要区别是:点对模式生产者发送到队列的消息只能被一个消费者消费一次,实现了一个可靠的负载均衡。而发布/订阅模式生产者发布消息到主题中,多个消费者可以订阅该消息,可以重复消费。

二、什么是消息队列?

消息队列(Message Queue),是一种进程间通信或同一进程的不同线程间的通信方式,把数据放到消息队列的叫做生产者,从消息队列里边取数据叫做消费者。通俗的讲就是把要传输的数据放到队列中,需要消费的系统从队列中取需要的数据。

消息队列是分布式系统中重要的组件,主要用来解决应用耦合、异步消息、流量削锋等问题。实现高性能、高可用、可伸缩和最终一致性架构。是大型分布式系统不可缺少的中间件。在生产环境中,使用较多的消息队列有Kafka、ActiveMQ、RabbitMQ、RocketMQ 等。已被广泛用于电商、即时通讯、社交等各种中大型分布式应用系统。

三、为什么要使用消息队列?

消息队列在实际生产应用中包含的场景主要有:解耦、异步、削峰、消息通讯、日志等。

场景1 应用解耦

场景说明:一个系统或者一个模块,调用了多个系统或者模块,互相之间的调用很复杂,维护很麻烦。

不使用MQ之前

比如我有一个系统A,它会产生一些数据,然后系统B和系统C需要调用系统A的接口来使用这些数据去做相应的操作,过了一段时间,又新增了系统D,系统D也需要调用系统A接口获取数据完成相应的操作,此时系统A就要修改代码,为系统D开放一个接口供其使用,又过了一段时间,系统B不需要调用系统A的接口获取数据了,此时系统A又要继续修改代码,把之前给系统B提供的接口去掉,除此之外,系统A还要考虑系统B、C、D挂了怎么办,系统超时怎么办,要不要重新发送消息等等一系列问题,就这样系统A需要不断进行很多操作,跟其他系统有严重的耦合。

解耦前

使用MQ以后

在系统中加入MQ以后,系统A就只负责把需要发送的数据写到MQ中,至于谁需要数据、谁不需要数据,系统A不需要关心。接下来系统B、系统C如果需要系统A的数据进行操作只需要从MQ中去消费;如果又新增一个系统D也需要系统A的数据,它也直接去MQ中消费即可;如果系统B又不需要这个数据了,那它只需要取消对MQ的消费即可;如果其中某一系统挂了或者出现请求超时等问题,都只跟MQ有关,跟系统A没有关系,这样下来,系统A与其他系统都解耦了。

解耦后

具体场景

1、用户下单后,订单系统需要通知库存系统。

不使用MQ前:用户下单以后,如果出现库存系统无法访问,则订单减库存将失败,从而导致订单失败,订单系统与库存系统耦合。

使用MQ后:对于订单系统,用户下单后,订单系统完成持久化处理,将消息写入消息队列,返回用户订单下单成功。对于库存系统,订阅下单的消息,采用拉/推的方式,获取下单信息,库存系统根据下单信息,进行库存操作。如果在下单时库存系统不能正常访问。也不会影响正常下单,因为下单后,订单系统写入消息队列就不再关心其他的后续操作了。实现订单系统与库存系统的应用解耦。

2、用户在某一系统上传一张图片,人脸识别系统会对该图片进行人脸识别。

不使用MQ前:用户上传照片,服务器接收到图片后,图片上传系统立即调用人脸识别系统,需要人脸识别系统处理完成后,再返回给客户端,这样的方式延迟高,如果人脸识别系统被调失败,则导致图片上传失败,即使用户并不需要立即知道结果。因为图片上传系统与人脸识别系统之间的互相调用存在耦合。

使用MQ后:客户端上传图片后,图片上传系统将图片信息、批次写入消息队列,直接返回成功;而人脸识别系统则定时从消息队列中取数据,完成对新增图片的识别。此时图片上传系统并不需要关心人脸识别系统是否对这些图片信息的处理、以及何时对这些图片信息进行处理。事实上,由于用户并不需要立即知道人脸识别结果,人脸识别系统可以选择不同的调度策略,按照闲时、忙时、正常时间,对队列中的图片信息进行处理。

场景2 异步处理

场景说明:用户对某系统发送一个请求,如果系统从接受用户请求操作到响应给用户能够在 200 ms 以内完成,那用户几乎是无感知的,如果请求耗时很长,比如等到将近1s,这几乎不可接受的,用户体验很差。

不使用MQ之前

比如有一个用户,对A系统发送一个请求,A系统接收用户请求后,需要在自己本地写库耗时50ms,同时需要调用系统B、系统C和系统D分别写库,耗时200ms、300ms和300ms,那么通过计算可以知道这次请求总延时需要850ms,耗时很长,对于用户体验是极差的。

异步前

使用MQ以后

在系统中加入MQ以后,用户发送请求到系统A,系统A写入本地耗时50ms,然后系统A把消息写入MQ队列中,耗时50ms,然后系统A就直接返回响应给用户,通过将调用其他接口异步化,总延时只需要需要100ms,对于用户来说是几乎是无感知的,提高了用户体验和吞吐量。

异步后

具体场景

用户注册某一网站后,网站系统需要发送注册邮件和注册短信给用户。

不使用MQ之前:在串行方式下,用户发送注册请求后,系统需要将注册信息写入数据库成功后,再发送注册邮件,然后发送注册短信,等所有任务执行完成后,返回信息给客户端;在并行方式下,系统将注册信息写入数据库成功后,同时进行发送注册邮件和发送注册短信的操作,等所有任务执行完成后,返回信息给客户端,同串行方式相比,并行方式可以提高执行效率,减少执行时间,但耗时还是比较长的。

使用MQ以后:用户发送注册请求,系统将注册信息写入数据库成功后,发送注册邮件、注册短信的消息到消息队列,即可返回执行结果,写入消息队列的时间很快,几乎可以忽略,用户的响应时间基本相当于将用户数据写入数据库的时间。

场景3 限流削峰

场景说明:在某一段时间内,系统的每秒并发请求突然暴增,比如达到3000个请求,而处理请求的机器最多只能处理2000个请求,此时将会导致系统崩溃。

不使用MQ之前

比如有一系统A是电商平台,在往常用户并发请求数量只有1000个,但是一到促销活动的时候,并发请求数量就会暴增,比如每秒发送3000个请求,但是该用于处理请求的机器最多能处理2000个请求,这就会导致系统崩溃,但是促销活动一结束,就到了低峰期,每秒的并发请求也就只有1000个,对于系统没有压力。

削峰前

使用MQ以后

在系统加入MQ以后,服务器在接收到大量的用户请求后,先把并发请求写入到MQ,处理请求的机器就会根据自己每秒能处理的最大请求数从MQ中拉取请求,通过这样的方式,即使在高峰期,系统就不会崩溃,这样虽然会将过多的请求积压到MQ中,但是高峰期过后,并发请求数量就会下降,处理请求的机器很快可以把积压在MQ中的请求解决掉。

削峰后

具体场景

在某一段时间秒杀或团抢活动中,大量用户购买商品。

不使用MQ之前:在某一段时间秒杀活动开始,大量的用户参与抢购,并发请求数量过多,导致系统挂掉。

使用MQ以后:在某一段时间秒杀活动开始后,用户并发请求暴增,服务器接收到用户请求后,先将其写入消息队列,处理请求的机器根据能处理的最大请求数从消息队列中拉取请求,进行后续处理,可以缓解系统压力,避免系统崩溃。

场景4 日志处理

日志处理是指将消息队列用在日志处理中,比如Linkedin这种大型职业社交应用架构中Kafka的应用(Kafka就是Linkedin开发并开源的),解决大量日志传输的问题。

日志处理
  • 日志采集客户端:负责日志数据采集,定时写受写入Kafka队列;
  • Kafka消息队列:负责日志数据的接收,存储和转发;
  • 日志处理应用:订阅并消费kafka队列中的日志数据;

场景5 消息通讯

消息通讯是指,消息队列一般都内置了高效的通信机制,因此也可以用在线的消息通讯。比如实现点对点消息队列、聊天室等。

点对点通讯

在点对点通讯架构设计中,客户端A和客户端B共用一个消息队列,即可实现消息通讯功能。

点对点通讯

聊天室通讯

客户端A、客户端B、客户端C直至客户端N订阅同一消息队列,进行消息的发布与接收。

聊天室通讯

四、使用消息队列会存在什么问题?

通过上诉介绍,我们知道,在特定场景下,使用消息队列会带来很多好处,但是有优点必然也会存在缺点,接下来就介绍一下使用消息队列以后会存在的一些问题。

系统可用性降低

当我们在系统中加入MQ以后,导致系统引入的外部依赖增加,系统可用性降低,越容易出现问题,如果我们使用单机的消息队列,如果MQ挂掉以后,那我们的整个系统就崩掉了,不能用了,因此我们就需要考虑MQ的高可用问题。

系统复杂度提高

当我们在系统中加入MQ以后,导致系统需要考虑的问题增多,比如由于系统A与MQ协调问题,系统A可能会把同一条消息发送给系统B两次,这就需要考虑消息被重复消费问题;或者系统A像MQ中发送了一条消息,而MQ把消息丢失了,其他系统去MQ中消费消息时找不到,这就需要考虑消息丢失问题;或者系统A向MQ中连续发送了3条消息,它们是有顺序的,而到了MQ以后消息顺序乱了,导致其他系统拿到的数据顺序也是乱的,这就需要考虑消息队列的顺序性问题;或者系统A在MQ中不断发送消息,而其他系统挂掉了,导致大量的消息在MQ中积压,这也是需要我们考虑的问题。因此MQ会导致系统的复杂性提高。

系统一致性问题

假如用户发送请求给系统A,系统A处理请求并把请求写入MQ,系统B、C需要到MQ中消费,等所有的系统都执行完成后才能返回,然而,在此过程中会出现系统A、B执行成功返回,而系统C执行失败,这就会导致数据不一致,这就需要我们考虑一致性问题。

五、如何解决消息队列存在的问题?

通过上诉分析,我们可以发现,使用消息队列以后也会存在很多问题,接下来就出现的问题给出解决方案。

如何保证消息队列的高可用?

1、RabbitMQ 的高可用性

RabbitMQ 是比较有代表性的,因为是基于主从(非分布式)做高可用性的。RabbitMQ 有三种模式:单机模式、普通集群模式、镜像集群模式。

单机模式

本地启动使用单机模式,不适合实际生产环境中。

普通集群模式(无高可用性)

在多台机器上启动多个 RabbitMQ 实例,每个机器启动一个。你创建的 queue,只会放在一个 RabbitMQ 实例上,但是每个实例都同步 queue 的元数据(元数据可以认为是 queue 的一些配置信息,通过元数据,可以找到 queue 所在实例)。消费者消费的时候,实际上如果连接到了另外一个实例,那么那个实例会从 queue 所在实例上拉取数据过来。

普通集群模式

这种集群模式没做到所谓的分布式,就是个普通集群。没有什么所谓的高可用性,主要是提高吞吐量的,让集群中多个节点来服务某个 queue 的读写操作。

这种方式有两种情况:一是消费者每次随机连接一个实例然后拉取数据,增加数据拉取的开销,MQ集群内部会产生大量的数据传输;二是固定连接那个 queue 所在实例消费数据,导致单实例性能瓶颈,如果 queue 所在节点宕机了,会导致接下来其他实例就无法从那个实例拉取,数据丢失,如果你开启了消息持久化,让 RabbitMQ 落地存储消息的话,消息不一定会丢,得等这个实例恢复了,然后才可以继续从这个 queue 拉取数据。

镜像集群模式(高可用性)

在镜像集群模式下,创建的 queue,无论元数据还是 queue 里的实际消息数据都会存在于多个实例上,就是说,每个 RabbitMQ 节点都有这个 queue 的一个完整镜像,包含 queue 的全部数据。然后每次写消息到 queue 的时候,都会自动把消息同步到多个实例的 queue 上。

镜像集群模式

通过这样的集群方式,当任何一个机器宕机了,其它机器(节点)还包含了这个 queue 的完整数据,别的消费者都可以到其它节点上去消费数据。

缺点:第一,性能开销太大,消息需要同步到所有机器上,导致网络带宽压力和消耗很重。第二,不是分布式的,就没有扩展性可言了,如果某个 queue 负载很重,你加机器,新增的机器也包含了这个 queue 的所有数据,并没有办法线性扩展你的 queue。如果这个 queue 的数据量很大,大到这个机器上的容量无法容纳了,此时该怎么办呢?

Kafka 的高可用性

基本概念:Kafka由多个 broker 组成,每个 broker 是一个节点;创建一个 topic,这个 topic 可以划分为多个 partition,每个 partition 可以存在于不同的 broker 上,每个 partition 就放一部分数据。

这就是天然的分布式消息队列,就是说一个 topic 的数据,是分散放在多个机器上的,每个机器就放一部分数据

实际上 RabbmitMQ 之类的,并不是分布式消息队列,它就是传统的消息队列,只不过提供了一些集群、HA(High Availability, 高可用性) 的机制而已,RabbitMQ 一个 queue 的数据都是放在一个节点里的,镜像集群下,也是每个节点都放这个 queue 的完整数据。

Kafka 0.8 以前,是没有 HA 机制的,就是任何一个 broker 宕机了,那个 broker 上的 partition 就废了,没法写也没法读,没有什么高可用性可言。比如说,我们假设创建了一个 topic,指定其 partition 数量是 3 个,分别在三台机器上。但是,如果第二台机器宕机了,会导致这个 topic 的 1/3 的数据就丢了,因此这个是做不到高可用的。

Kafka 0.8 以后,提供了 HA 机制,就是 replica(复制品) 副本机制。每个 partition 的数据都会同步到其它机器上,形成自己的多个 replica 副本。所有 replica 会选举一个 leader 出来,那么生产和消费都跟这个 leader 打交道,然后其他 replica 就是 follower。写的时候,leader 会负责把数据同步到所有 follower 上去,读的时候就直接读 leader 上的数据即可。为什么只能读写 leader呢,要是你可以随意读写每个 follower,那么就要关心数据一致性的问题,系统复杂度太高,很容易出问题。Kafka 会均匀地将一个 partition 的所有 replica 分布在不同的机器上,这样才可以提高容错性。

kafka集群高可用

这就是所谓的高可用性,因为如果某个 broker 宕机了,那个 broker上面的 partition 在其他机器上都有副本的。如果这个宕机的 broker 上面有某个 partition 的 leader,那么此时会从 follower 中重新选举一个新的 leader 出来,然后消费者继续读写那个新的 leader 即可。

写数据的时候,生产者就写入 leader,然后 leader 将数据落地写本地磁盘,接着其他 follower 自己主动从 leader 来 pull 数据。一旦所有 follower 同步好数据了,就会发送 ack 给 leader,leader 收到所有 follower 的 ack 之后,就会返回写成功的消息给生产者。(当然,这只是其中一种模式,还可以适当调整这个行为)

消费的时候,只会从 leader 去读,但是只有当一个消息已经被所有 follower 都同步成功返回 ack 的时候,这个消息才会被消费者读到。

如何保证消息消费的幂等性(处理重复数据)?

在开发生产环境中,消息队列都有可能会出现消费重复消费的问题,这问题通常不是MQ自己保证的,是开发保证的。现在主要说明一下kafka是怎么重复消费的并且怎么保证消息的幂等性。

Kafka 实际上有个 offset 的概念,就是每个消息写进去,都有一个 offset,代表消息的序号,然后 consumer 消费了数据之后,每隔一段时间(定时定期),会把自己消费过的消息的 offset 提交一下,表示“我已经消费过了,下次我要是重启啥的,你就让我继续从上次消费到的 offset 来继续消费吧”。但是凡事总有意外,在实际生产环境中,如果碰到着急事情需要重启系统,直接 kill 进程了,再重启。这会导致 consumer 有些消息处理了,但是没来得及提交 offset。重启之后,少数消息会再次消费一次。这就会出现重复消费问题。

场景

假如数据 1、2、3 依次进入 kafka,kafka 会给这三条数据每条分配一个 offset,代表这条数据的序号,我们就假设分配的 offset 依次是 152、153、154。消费者从 kafka 去消费的时候,也是按照这个顺序去消费。假如当消费者消费了 offset=153 的这条数据,刚准备去提交 offset 到 zookeeper,此时消费者进程被重启了。那么此时消费过的数据 1、2 的 offset 并没有提交,kafka 也就不知道你已经消费了 offset=153 这条数据。那么重启之后,消费者会找 kafka 说,嘿,哥儿们,你给我接着把上次我消费到的那个地方后面的数据继续给我传递过来。由于之前的 offset 没有提交成功,那么数据 1、2 会再次传过来,如果此时消费者没有去重的话,那么就会导致重复消费。如果消费者的任务是拿一条数据就往数据库里写一条,会导致你可能就把数据 1、2 在数据库里插入了 2 次,那么数据就错啦。

消息重复

那怎么保证幂等性呢?

举个例子吧。假设你有个系统,消费一条消息就往数据库里插入一条数据,要是你一个消息重复两次,你就插入了两条,这数据不就错了?但是你要是消费到第二次的时候,自己判断一下是否已经消费过了,若是就直接扔了,这样不就保留了一条数据,从而保证了数据的正确性。一条数据重复出现两次,数据库里就只有一条数据,这就保证了系统的幂等性。

幂等性,通俗点说,就一个数据,或者一个请求,给你重复来多次,你得确保对应的数据是不会改变的,不能出错

幂等性

保证消息队列消费的幂等性还是得结合业务来思考,这里给几个思路:

  • 比如你拿个数据要写库,你先根据主键查一下,如果这数据都有了,你就别插入了,update 一下。
  • 比如你是写 Redis,那没问题了,反正每次都是 set,天然幂等性。
  • 比如你不是上面两个场景,那做的稍微复杂一点,你需要让生产者发送每条数据的时候,里面加一个全局唯一的 id,类似订单 id 之类的东西,然后你这里消费到了之后,先根据这个 id 去比如去 Redis 里查一下,之前消费过吗?如果没有消费过,你就处理,然后这个 id 写 Redis。如果消费过了,那你就别处理了,保证别重复处理相同的消息即可。
  • 比如基于数据库的唯一键来保证重复数据不会重复插入多条。因为有唯一键约束了,重复数据插入只会报错,不会导致数据库中出现脏数据。

如何保证消息的可靠性传输(处理消息丢失)?

数据的丢失问题,可能出现在生产者、MQ、消费者中,接下来从从 RabbitMQ 和 Kafka 分别来分析一下。

RabbitMQ

生产者弄丢了数据

场景:生产者将数据发送到 RabbitMQ 的时候,可能数据就在半路给搞丢了,因为网络问题等情况都有可能发生。

解决方案1:此时可以选择用 RabbitMQ 提供的事务功能,就是生产者发送数据之前开启 RabbitMQ 事务channel.txSelect,然后发送消息,如果消息没有成功被 RabbitMQ 接收到,那么生产者会收到异常报错,此时就可以回滚事务channel.txRollback,然后重试发送消息;如果收到了消息,那么可以提交事务channel.txCommit。但是问题是,RabbitMQ 事务机制(同步)一搞,基本上吞吐量会下来,因为太耗性能

解决方案2:所以一般来说,如果你要确保说写 RabbitMQ 的消息别丢,可以开启 confirm 模式,在生产者那里设置开启 confirm 模式之后,你每次写的消息都会分配一个唯一的 id,然后如果写入了 RabbitMQ 中,RabbitMQ 会给你回传一个 ack 消息,告诉你说这个消息 ok 了。如果 RabbitMQ 没能处理这个消息,会回调你的一个 nack 接口,告诉你这个消息接收失败,你可以重试。而且你可以结合这个机制自己在内存里维护每个消息 id 的状态,如果超过一定时间还没接收到这个消息的回调,那么你可以重发。

区别:事务机制和 confirm 机制最大的不同在于,事务机制是同步的,你提交一个事务之后会阻塞在那儿,但是 confirm 机制是异步的,你发送个消息之后就可以发送下一个消息,然后那个消息 RabbitMQ 接收了之后会异步回调你的一个接口通知你这个消息接收到了。所以一般在生产者这块避免数据丢失,都是用 confirm 机制的。

RabbitMQ 弄丢了数据

就是 RabbitMQ 自己弄丢了数据,你必须开启 RabbitMQ 的持久化,就是消息写入之后会持久化到磁盘,哪怕是 RabbitMQ 自己挂了,恢复之后会自动读取之前存储的数据,一般数据不会丢。除非极其罕见的是,RabbitMQ 还没持久化,自己就挂了,可能导致少量数据丢失,但是这个概率较小。

设置持久化有两个步骤

  • 创建 queue 的时候将其设置为持久化,这样就可以保证 RabbitMQ 持久化 queue 的元数据,但是它是不会持久化 queue 里的数据的。
  • 第二个是发送消息的时候将消息的 deliveryMode 设置为 2,就是将消息设置为持久化的,此时 RabbitMQ 就会将消息持久化到磁盘上去。

必须要同时设置这两个持久化才行,RabbitMQ 哪怕是挂了,再次重启,也会从磁盘上重启恢复 queue,恢复这个 queue 里的数据。

注意,哪怕是你给 RabbitMQ 开启了持久化机制,也有一种可能,就是这个消息写到了 RabbitMQ 中,但是还没来得及持久化到磁盘上,结果不巧,此时 RabbitMQ 挂了,就会导致内存里的一点点数据丢失。

所以,持久化可以跟生产者那边的 confirm 机制配合起来,只有消息被持久化到磁盘之后,才会通知生产者 ack 了,所以哪怕是在持久化到磁盘之前,RabbitMQ 挂了,数据丢了,生产者收不到 ack,你也是可以自己重发的。

消费端弄丢了数据

RabbitMQ 如果丢失了数据,主要是因为你消费的时候,刚消费到,还没处理,结果进程挂了,比如重启了,那么就尴尬了,RabbitMQ 认为你都消费了,这数据就丢了。

这个时候得用 RabbitMQ 提供的 ack 机制,简单来说,就是你必须关闭 RabbitMQ 的自动 ack,可以通过一个 api 来调用就行,然后每次你自己代码里确保处理完的时候,再在程序里 ack 一把。这样的话,如果你还没处理完,不就没有 ack 了?那 RabbitMQ 就认为你还没处理完,这个时候 RabbitMQ 会把这个消费分配给别的 consumer 去处理,消息是不会丢的。

六、常用的消息队列及比较

本部分主要介绍几种常用的消息队列,包括ActiveMQRabbitMQRocketMQKafka

ActiveMQ

ActiveMQ是由Apache出品的一款开源消息中间件,旨在为应用程序提供高效、可扩展、稳定、安全的企业级消息通信。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现。它非常快速,支持多种语言的客户端和协议,而且可以非常容易的嵌入到企业的应用环境中,并有许多高级功能。

主要特性

  1. 服从 JMS 规范:JMS 规范提供了良好的标准和保证,包括:同步或异步的消息分发,一次和仅一次的消息分发,消息接收和订阅等等。遵从 JMS 规范的好处在于,不论使用什么 JMS 实现提供者,这些基础特性都是可用的;
  2. 连接性:ActiveMQ 提供了广泛的连接选项,支持的协议有:HTTP/S,IP 多播,SSL,STOMP,TCP,UDP,XMPP等等。对众多协议的支持让 ActiveMQ 拥有了很好的灵活性。
  3. 支持的协议种类多:OpenWire、STOMP、REST、XMPP、AMQP ;
  4. 持久化插件和安全插件:ActiveMQ 提供了多种持久化选择。而且,ActiveMQ 的安全性也可以完全依据用户需求进行自定义鉴权和授权;
  5. 支持的客户端语言种类多:除了 Java 之外,还有:C/C++,.NET,Perl,PHP,Python,Ruby;
  6. 代理集群:多个 ActiveMQ 代理可以组成一个集群来提供服务;
  7. 异常简单的管理:ActiveMQ 是以开发者思维被设计的。所以,它并不需要专门的管理员,因为它提供了简单又使用的管理特性。有很多中方法可以监控 ActiveMQ 不同层面的数据,包括使用在 JConsole 或者 ActiveMQ 的Web Console 中使用 JMX,通过处理 JMX 的告警消息,通过使用命令行脚本,甚至可以通过监控各种类型的日志。

优点

  1. 跨平台(JAVA编写与平台无关有,ActiveMQ几乎可以运行在任何的JVM上)
  2. 可以用JDBC:可以将数据持久化到数据库。虽然使用JDBC会降低ActiveMQ的性能,但是数据库一直都是开发人员最熟悉的存储介质。将消息存到数据库,看得见摸得着。而且公司有专门的DBA去对数据库进行调优,主从分离;
  3. 支持JMS :支持JMS的统一接口;
  4. 支持自动重连;
  5. 有安全机制:支持基于shiro,jaas等多种安全配置机制,可以对Queue/Topic进行认证和授权。
  6. 监控完善:拥有完善的监控,包括Web Console,JMX,Shell命令行,Jolokia的REST API;
  7. 界面友善:提供的Web Console可以满足大部分情况,还有很多第三方的组件可以使用,如hawtio;

缺点

  1. 社区活跃度不及RabbitMQ高;
  2. 根据其他用户反馈,会出莫名其妙的问题,会丢失消息;
  3. 目前重心放到activemq6.0产品-apollo,对5.x的维护较少;
  4. 不适合用于上千个队列的应用场景;

RabbitMQ

RabbitMQ 2007年发布,是一个在AMQP(高级消息队列协议)基础上完成的,可复用的企业消息系统,是当前最主流的消息中间件之一。

主要特性

  1. 可靠性: 提供了多种技术可以让你在性能和可靠性之间进行权衡。这些技术包括持久性机制、投递确认、发布者证实和高可用性机制;
  2. 灵活的路由: 消息在到达队列前是通过交换机进行路由的。RabbitMQ为典型的路由逻辑提供了多种内置交换机类型。如果你有更复杂的路由需求,可以将这些交换机组合起来使用,你甚至可以实现自己的交换机类型,并且当做RabbitMQ的插件来使用;
  3. 消息集群:在相同局域网中的多个RabbitMQ服务器可以聚合在一起,作为一个独立的逻辑代理来使用;
  4. 队列高可用:队列可以在集群中的机器上进行镜像,以确保在硬件问题下还保证消息安全;
  5. 多种协议的支持:支持多种消息队列协议;
  6. 服务器端用Erlang语言编写,支持只要是你能想到的所有编程语言;
  7. 管理界面: RabbitMQ有一个易用的用户界面,使得用户可以监控和管理消息Broker的许多方面;
  8. 跟踪机制:如果消息异常,RabbitMQ提供消息跟踪机制,使用者可以找出发生了什么;
  9. 插件机制:提供了许多插件,来从多方面进行扩展,也可以编写自己的插件;

优点

  1. 由于erlang语言的特性,mq 性能较好,高并发;
  2. 健壮、稳定、易用、跨平台、支持多种语言、文档齐全;
  3. 有消息确认机制和持久化机制,可靠性高;
  4. 高度可定制的路由;
  5. 管理界面较丰富,在互联网公司也有较大规模的应用;
  6. 社区活跃度高;

缺点

  1. 尽管结合erlang语言本身的并发优势,性能较好,但是不利于做二次开发和维护;
  2. 实现了代理架构,意味着消息在发送到客户端之前可以在中央节点上排队。此特性使得RabbitMQ易于使用和部署,但是使得其运行速度较慢,因为中央节点增加了延迟,消息封装后也比较大;
  3. 需要学习比较复杂的接口和协议,学习和维护成本较高;

RocketMQ

RocketMQ出自 阿里公司的开源产品,用 Java 语言实现,在设计时参考了 Kafka,并做出了自己的一些改进,消息可靠性上比 Kafka 更好。RocketMQ在阿里集团被广泛应用在订单,交易,充值,流计算,消息推送,日志流式处理,binglog分发等场景。

主要特性

  1. 是一个队列模型的消息中间件,具有高性能、高可靠、高实时、分布式特点;
  2. Producer、Consumer、队列都可以分布式;
  3. Producer向一些队列轮流发送消息,队列集合称为Topic,Consumer如果做广播消费,则一个consumer实例消费这个Topic对应的所有队列,如果做集群消费,则多个Consumer实例平均消费这个topic对应的队列集合;
  4. 能够保证严格的消息顺序;
  5. 提供丰富的消息拉取模式;
  6. 高效的订阅者水平扩展能力;
  7. 实时的消息订阅机制;
  8. 亿级消息堆积能力;
  9. 较少的依赖;

优点

  1. 单机支持 1 万以上持久化队列

  2. RocketMQ 的所有消息都是持久化的,先写入系统 PAGECACHE,然后刷盘,可以保证内存与磁盘都有一份数据, 访问时,直接从内存读取。

  3. 模型简单,接口易用(JMS 的接口很多场合并不太实用);

  4. 性能非常好,可以大量堆积消息在broker中;

  5. 支持多种消费,包括集群消费、广播消费等。

  6. 各个环节分布式扩展设计,主从HA;

  7. 开发度较活跃,版本更新很快。

缺点

支持的客户端语言不多,目前是java及c++,其中c++不成熟;

RocketMQ社区关注度及成熟度也不及前两者;

没有web管理界面,提供了一个CLI(命令行界面)管理工具带来查询、管理和诊断各种问题;

没有在 mq 核心中去实现JMS等接口;

Kafka

Apache Kafka是一个分布式消息发布订阅系统。它最初由LinkedIn公司基于独特的设计实现为一个分布式的提交日志系统( a distributed commit log),之后成为Apache项目的一部分。Kafka系统快速、可扩展并且可持久化。它的分区特性,可复制和可容错都是其不错的特性。

主要特性

  1. 快速持久化,可以在O(1)的系统开销下进行消息持久化;
  2. 高吞吐,在一台普通的服务器上既可以达到10W/s的吞吐速率;
  3. .完全的分布式系统,Broker、Producer、Consumer都原生自动支持分布式,自动实现负载均衡
  4. 支持同步和异步复制两种HA;
  5. 支持数据批量发送和拉取;
  6. zero-copy:减少IO操作步骤;
  7. 数据迁移、扩容对用户透明;
  8. 无需停机即可扩展机器;
  9. 其他特性:严格的消息顺序、丰富的消息拉取模型、高效订阅者水平扩展、实时的消息订阅、亿级的消息堆积能力、定期删除机制;

优点

  1. 客户端语言丰富,支持java、.net、php、ruby、python、go等多种语言;
  2. 性能卓越,单机写入TPS约在百万条/秒,消息大小10个字节;
  3. 提供完全分布式架构, 并有replica机制, 拥有较高的可用性和可靠性, 理论上支持消息无限堆积;
  4. 支持批量操作;
  5. 消费者采用Pull方式获取消息, 消息有序, 通过控制能够保证所有消息被消费且仅被消费一次;
  6. 有优秀的第三方Kafka Web管理界面Kafka-Manager;
  7. 在日志领域比较成熟,被多家公司和多个开源项目使用;

缺点

  1. Kafka单机超过64个队列/分区,Load会发生明显的飙高现象,队列越多,load越高,发送消息响应时间变长
  2. 使用短轮询方式,实时性取决于轮询间隔时间;
  3. 消费失败不支持重试;
  4. 支持消息顺序,但是一台代理宕机后,就会产生消息乱序;
  5. 社区更新较慢;

ActiveMQ、RabbitMQ、RocketMQ、Kafka比较

特性 ActiveMQ RabbitMQ RocketMQ Kafka
单机吞吐量 万级,比 RocketMQ、Kafka 低一个数量级 同 ActiveMQ 10 万级,支撑高吞吐 10 万级,高吞吐,一般配合大数据类的系统来进行实时数据计算、日志采集等场景
topic 数量对吞吐量的影响 topic 可以达到几百/几千的级别,吞吐量会有较小幅度的下降,这是 RocketMQ 的一大优势,在同等机器下,可以支撑大量的 topic topic 从几十到几百个时候,吞吐量会大幅度下降,在同等机器下,Kafka 尽量保证 topic 数量不要过多,如果要支撑大规模的 topic,需要增加更多的机器资源
时效性 ms 级 微秒级,这是 RabbitMQ 的一大特点,延迟最低 ms 级 延迟在 ms 级以内
可用性 高,基于主从架构实现高可用 同 ActiveMQ 非常高,分布式架构 非常高,分布式,一个数据多个副本,少数机器宕机,不会丢失数据,不会导致不可用
消息可靠性 有较低的概率丢失数据 基本不丢 经过参数优化配置,可以做到 0 丢失 同 RocketMQ
功能支持 MQ 领域的功能极其完备 基于 erlang 开发,并发能力很强,性能极好,延时很低 MQ 功能较为完善,还是分布式的,扩展性好 功能较为简单,主要支持简单的 MQ 功能,在大数据领域的实时计算以及日志采集被大规模使用

对于中小型公司,技术实力较为一般,技术挑战不是特别高,用 RabbitMQ 是不错的选择。

对于大型公司,基础架构研发实力较强,用 RocketMQ 是很好的选择。

如果是大数据领域的实时计算、日志采集等场景,用 Kafka 是业内标准的,绝对没问题,社区活跃度很高,绝对不会黄,何况几乎是全世界这个领域的事实性规范。

参考资料:

中华石杉–互联网Java进阶面试训练营:https://github.com/cyh756085049/Java-Interview-Advanced

分布式消息队列:https://github.com/cyh756085049/Java-Interview-Advanced

什么是消息队列?:https://juejin.im/post/5cb025fb5188251b0351ef48

消息队列技术介绍:https://www.jianshu.com/p/689ce4205021

消息队列及常见消息队列介绍:https://cloud.tencent.com/developer/article/1006035

通俗易懂,正确理解并用好MQ消息队列:https://cloud.tencent.com/developer/article/1346912

消息队列之 ActiveMQ:https://juejin.im/post/5ad46f34518825651d08265c

消息队列两种模式:https://cloud.tencent.com/developer/article/1116301


评论
评论
  目录