Administrator
Administrator
Published on 2025-01-06 / 0 Visits
0
0

微服务项目教程-RocketMQ-消息队列(八)

Apache RocketMQ 是一款典型的分布式架构下的中间件产品,使用异步通信方式和发布订阅的消息传输模型。通信方式和传输模型的具体说明,请参见下文通信方式介绍和消息传输模型介绍。 Apache RocketMQ 产品具备异步通信的优势,系统拓扑简单、上下游耦合较弱,主要应用于异步解耦,流量削峰填谷等场景。
阿里文章:https://developer.aliyun.com/article/974434

工作模型

Description

为什么要使用消息队列

不使用情况 (同步调用)

Description

使用情况 (异步解耦)

Description

典型案例

最常见的一个场景是用户注册后,需要发送注册邮件和短信通知,以告知用户注册成功。

传统的做法有如下:

Description

  1. 您在注册页面填写账号和密码并提交注册信息,这些注册信息首先会被写入注册系统。

  2. 注册信息写入注册系统成功后,再发送请求至邮件通知系统。邮件通知系统收到请求后向用户发送邮件通知。

  3. 邮件通知系统接收注册系统请求后再向下游的短信通知系统发送请求。短信通知系统收到请求后向用户发送短信通知。

以上三个任务全部完成后,才返回注册结果到客户端,用户才能使用账号登录。
假设每个任务耗时分别为50 ms,则用户需要在注册页面等待总共150 ms才能登录。

异步解构如下:
Description

  1. 用户在注册页面填写账号和密码并提交注册信息,这些注册信息首先会被写入注册系统。

  2. 注册信息写入注册系统成功后,再发送消息至消息队列RocketMQ。消息队列RocketMQ会马上返回响应给注册系统,注册完成。用户可立即登录。

  3. 下游的邮件和短信通知系统订阅消息队列RocketMQ的此类注册请求消息,即可向用户发送邮件和短信通知,完成所有的注册流程。

用户只需在注册页面等待注册数据写入注册系统和消息队列RocketMQ的时间,即等待55 ms即可登录。

异步解耦是消息队列RocketMQ的主要特点,主要目的是减少请求响应时间和解耦。主要的适用场景就是将比较耗时而且不需要即时(同步)返回结果的操作作为消息放入消息队列。同时,由于使用了消息队列RocketMQ,只要保证消息格式不变,消息的发送方和接收方并不需要彼此联系,也不需要受对方的影响,即解耦。

应用场景

削峰填谷:诸如秒杀、抢红包、企业开门红等大型活动时皆会带来较高的流量脉冲,或因没做相应的保护而导致系统超负荷甚至崩溃,或因限制太过导致请求大量失败而影响用户体验,消息队列RocketMQ可提供削峰填谷的服务来解决该问题。
异步解耦:交易系统作为淘宝和天猫主站最核心的系统,每笔交易订单数据的产生会引起几百个下游业务系统的关注,包括物流、购物车、积分、流计算分析等等,整体业务系统庞大而且复杂,消息队列RocketMQ可实现异步通信和应用解耦,确保主站业务的连续性。
顺序收发:细数日常中需要保证顺序的应用场景非常多,例如证券交易过程时间优先原则,交易系统中的订单创建、支付、退款等流程,航班中的旅客登机消息处理等等。与先进先出FIFO(First In First Out)原理类似,消息队列RocketMQ提供的顺序消息即保证消息FIFO。
●** 分布式事务一致性**:交易系统、支付红包等场景需要确保数据的最终一致性,大量引入消息队列RocketMQ的分布式事务,既可以实现系统之间的解耦,又可以保证最终的数据一致性。
大数据分析:数据在“流动”中产生价值,传统数据分析大多是基于批量计算模型,而无法做到实时的数据分析,利用阿里云消息队列RocketMQ与流式计算引擎相结合,可以很方便的实现业务数据的实时分析。
分布式缓存同步:天猫双11大促,各个分会场琳琅满目的商品需要实时感知价格变化,大量并发访问数据库导致会场页面响应时间长,集中式缓存因带宽瓶颈,限制了商品变更的访问流量,通过消息队列RocketMQ构建分布式缓存,实时通知商品数据的变化。

框架对比:

Description

MQ

MQ 描述

RabbitMQ

RabbitMQ erlang开发,对消息堆积的支持并不好,当大量消息积压的时候,会导致 RabbitMQ 的性能急剧下降。每秒钟可以处理几万到十几万条消息。

RocketMQ

RocketMQ java开发,面向互联网集群化功能丰富,对在线业务的响应时延做了很多的优化,大多数情况下可以做到毫秒级的响应,每秒钟大概能处理几十万条消息。。

Kafka

Scala开发,面向日志功能丰富,性能最高。当你的业务场景中,每秒钟消息数量没有那么多的时候,Kafka 的时延反而会比较高。所以,Kafka 不太适合在线业务场景。

ActiveMQ

ava开发,简单,稳定,性能不如前面三个。小型系统用也ok,但是不推荐。推荐用互联网主流的。

RocketMQ教程

RocketMQ是由阿里捐赠给Apache的一款低延迟、高并发、高可用、高可靠的分布式消息中间件。经历了淘宝双十一的洗礼。RocketMQ既可为分布式应用系统提供异步解耦和削峰填谷的能力,同时也具备互联网应用所需的海量消息堆积、高吞吐、可靠重试等特性。
Apache RocketMQ 是一款典型的分布式架构下的中间件产品,使用异步通信方式和发布订阅的消息传输模型。通信方式和传输模型的具体说明,请参见下文通信方式介绍和消息传输模型介绍。 Apache RocketMQ 产品具备异步通信的优势,系统拓扑简单、上下游耦合较弱,主要应用于异步解耦,流量削峰填谷等场景。

系统物理架构

RocketMQ是开源的消息中间件,它主要由NameServer,Producer,Broker,Consumer四部分构成。

Description

● Name Server:是一个几乎无状态节点,可集群部署,在消息队列RocketMQ版中提供命名服务,更新和发现Broker服务。可以理解为是一个注册中心,主要是用来保存topic路由信息,管理Broker。在NameServer的集群中,NameServer与NameServer之间是没有任何通信的
● Broker:消息中转角色,负责存储消息,转发消息。分为Master Broker和Slave Broker,一个Master Broker可以对应多个Slave Broker,但是一个Slave Broker只能对应一个Master Broker。Broker启动后需要完成一次将自己注册至Name Server的操作;随后每隔30s定期向Name Server上报Topic路由信息。
● 生产者:与Name Server集群中的其中一个节点(随机)建立长链接(Keep-alive),定期从Name Server读取Topic路由信息,并向提供Topic服务的Master Broker建立长链接,且定时向Master Broker发送心跳。
● 消费者:与Name Server集群中的其中一个节点(随机)建立长连接,定期从Name Server拉取Topic路由信息,并向提供Topic服务的Master Broker、Slave Broker建立长连接,且定时向Master Broker、Slave Broker发送心跳。Consumer既可以从Master Broker订阅消息,也可以从Slave Broker订阅消息,订阅规则由Broker配置决定。

系统逻辑架构

Description

  1. group
    一类Producer或Consumer,这类Producer或Consumer通常生产或消费同一类消息,且消息发布或订阅的逻辑一致。

  2. topic
    消息主题,一级消息类型,通过Topic对消息进行分类。

  3. Tag
    消息标签,二级消息类型,用来进一步区分某个Topic下的消息分类。

  4. key
    4.1 message key 消息的业务标识,由消息生产者(Producer)设置,唯一标识某个业务逻辑
    4.2 Sharding Key 顺序消息中用来区分Topic中不同分区的关键字段,会将设置了相同Sharding Key的消息路由到同一个分区下,同一个分区内的消息将按照消息发布顺序进行消费。将和普通消息的Key是完全不同的概念。

系统存储架构

RocketMQ主要的存储文件包括CommitLog文件、ConsumeQueue文件、Indexfile文件。
Description

在RocketMQ中,这三种文件类型各自有其特定的用途。

  1. CommitLog文件:这是RocketMQ中存储真实消息数据的文件,所有的消息体都会存储在此。当生产者发送消息至RocketMQ时,消息首先会被存储在CommitLog中。

  2. ConsumeQueue文件:这是消费队列文件,用于存储指向CommitLog中的物理偏移量,消息大小和消息Tag的HashCode。其作用是为了加速消息消费,本质上就是commitLog的一个索引文件,对应逻辑架构中的一个messageQueue。

  3. IndexFile文件:这是索引文件,利用索引可以极大提高查询消息的效率,实现快速的消息查找。一般使用某个消息的key或者id来查找。但要注意的是,并不是所有消费的消息都会在IndexFile中创建索引,只有那些具有特定的关键字(key)的消息才会在indexFile中创建索引。

这三种文件类型共同构成了RocketMQ的消息存储机制,它们各自负责不同的职责,共同保证了RocketMQ的高效率和高性能。

CommitLog文件,一个文件大小为1G, 1GB = 102410241024 = 1073741824 Bytes,所以下一个文件就会被命名为00000000001073741824。
比如第一个文件中第一条消息的偏移量为0,那么第一个文件的名称为00000000000000000000,当这个文件存满之后,需要重新建立一个。

安装MQ

下载地址:https://rocketmq.apache.org/download/

一:官网地址https://rocketmq.apache.org/docs/quick-start/

Description

二:配置rocketmq,添加到windows系统环境里
ROCKETMQ_HOME=“\rocketmq-all-4.8.0-bin-release” #解压后根目录
NAMESRV_ADDR=“localhost:9876” #ip:端口

三:启动NameServer和Borker
window:

.\bin\mqnamesrv.cmd  #先启动服务
.\bin\mqbroker.cmd -n localhost:9876 autoCreateTopicEnable=true #再启动节点

linux

$: nohupsh bin/mqnamesrv &   
启动 Start Broker and Proxy
$ nohupsh bin/mqbroker -n localhost:9876 --enable-proxy &

四:排除错误
启动时候,如果出现 rocketmq 找不到或无法加载主类 Files\Java\jdk-1.8\lib等问题,请该更cmd文件

Description

在%CLASSPATH% 处 加双引号就可以解决问题

五:测试
windows:


bin/tools.cmd  org.apache.rocketmq.example.quickstart.Producer

bin/tools.cmd  org.apache.rocketmq.example.quickstart.Consumer

linux:

$ export NAMESRV_ADDR=localhost:9876  
$ sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
 SendResult [sendStatus=SEND_OK, msgId= ...

$ sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
 ConsumeMessageThread_%d Receive New Messages: [MessageExt...

安装控制台

//TODO

Springboot集成MQ

添加依赖:

<!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-spring-boot-starter -->
<dependency>
  <groupId>org.apache.rocketmq</groupId>
  <artifactId>rocketmq-spring-boot-starter</artifactId>
  <version>2.2.2</version>
</dependency>

添加配置:

rocketmq:
  name-server: localhost:9876
  producer:
    group: group1

生产者案例:

    @Autowired
    RocketMQTemplate rocketMQTemplate;
    @GetMapping("/orders")
    public  CommonResult  insert(){
        //TODO 订单插入
        rocketMQTemplate.convertAndSend("order-topic","编号2的订单已经成功,请发邮件");
        return  CommonResult.SUCCESS("下订单成功");

    }

消费者案例:

@Slf4j
@Service
@RocketMQMessageListener(consumerGroup = "group1",topic ="order-topic")
public class UserServiceImpl  implements  RocketMQListener<String> {

    /**
     * mq监听方法
     * @param
     */
    @Override
    public void onMessage(String message) {
        log.info("收到order-topic的来信,信的内容是[{}]",message);
    }
}

发送方式:

从功能上来说,rocketmq支持三种发送消息的方式,分别是同步发送(sync),异步发送(async)和直接发送(oneway)。下面来简单说明一下这三种发送消息的方式,以便了解它们之间的差异。

同步发送 sync

发送消息采用同步模式,这种方式只有在消息完全发送完成之后才返回结果,此方式存在需要同步等待发送结果的时间代价。

这种方式具有内部重试机制,即在主动声明本次消息发送失败之前,内部实现将重试一定次数,默认为2次(DefaultMQProducer#getRetryTimesWhenSendFailed)。 发送的结果存在同一个消息可能被多次发送给给broker,这里需要应用的开发者自己在消费端处理幂等性问题。

    public void sync() {
        rocketMQTemplate.syncSend("topic-name", "send sync message !");
    }

异步发送 async

发送消息采用异步发送模式,消息发送后立刻返回,当消息完全完成发送后,会调用回调函数sendCallback来告知发送者本次发送是成功或者失败。异步模式通常用于响应时间敏感业务场景,即承受不了同步发送消息时等待返回的耗时代价。

同同步发送一样,异步模式也在内部实现了重试机制,默认次数为2次(DefaultMQProducer#getRetryTimesWhenSendAsyncFailed})。发送的结果同样存在同一个消息可能被多次发送给给broker,需要应用的开发者自己在消费端处理幂等性问题。

    public void async() {
        rocketMQTemplate.asyncSend("topic-name", "send async message!", new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                log.info("send successful");
            }

            @Override
            public void onException(Throwable throwable) {
                log.info("send fail; {}", throwable.getMessage());
            }
        });
    }

直接发送 one-way

采用one-way发送模式发送消息的时候,发送端发送完消息后会立即返回,不会等待来自broker的ack来告知本次消息发送是否完全完成发送。这种方式吞吐量很大,但是存在消息丢失的风险,所以其适用于不重要的消息发送,比如日志收集。

    public void oneWay() {
        rocketMQTemplate.sendOneWay("topic-name", "send one-way message");
    }

消息类型

Apache RocketMQ 支持的消息类型如下:

● Normal:普通消息,消息本身无特殊语义,消息之间也没有任何关联。
● FIFO:顺序消息,Apache RocketMQ 通过消息分组MessageGroup标记一组特定消息的先后顺序,可以保证消息的投递顺序严格按照消息发送时的顺序。
● Delay:定时/延时消息,通过指定延时时间控制消息生产后不要立即投递,而是在延时间隔后才对消费者可见。
● Transaction:事务消息,Apache RocketMQ 支持分布式事务消息,支持应用数据库更新和消息调用的事务一致性保障。

普通消息

普通消息为 Apache RocketMQ 中最基础的消息,区别于有特性的顺序消息、定时/延时消息和事务消息。普通消息一般应用于微服务解耦、事件驱动、数据集成等场景,这些场景大多数要求数据传输通道具有可靠传输的能力,且对消息的处理时机、处理顺序没有特别要求。

顺序消息

顺序消息是 Apache RocketMQ 提供的一种高级消息类型,支持消费者按照发送消息的先后顺序获取消息,从而实现业务场景中的顺序处理。 相比其他类型消息,顺序消息在发送、存储和投递的处理过程中,更多强调多条消息间的先后顺序关系。
Apache RocketMQ 顺序消息的顺序关系通过消息组(MessageGroup)判定和识别,发送顺序消息时需要为每条消息设置归属的消息组,相同消息组的多条消息之间遵循先进先出的顺序关系,不同消息组、无消息组的消息之间不涉及顺序性。
消息生产的顺序性仅支持单一生产者,不同生产者分布在不同的系统,即使设置相同的消息组,不同生产者之间产生的消息也无法判定其先后顺序。
Apache RocketMQ 生产者客户端支持多线程安全访问,但如果生产者使用多线程并行发送,则不同线程间产生的消息将无法判定其先后顺序。
满足以上条件的生产者,将顺序消息发送至 Apache RocketMQ 后,会保证设置了同一消息组的消息,按照发送顺序存储在同一队列中。服务端顺序存储逻辑如下:
● 相同消息组的消息按照先后顺序被存储在同一个队列。
● 不同消息组的消息可以混合在同一个队列中,且不保证连续。

延时消息

省略

事务消息

事务消息是一种可以保证消息投递和业务处理的原子性的特殊类型的消息,在MQ(Message Queue)中,特指RocketMQ,为了解决分布式事务问题引入的一种机制。
而两段提交(Two-Phase Commit)是实现分布式事务的一种机制。它的要点是:在执行事务操作过程中,涉及到的所有节点,都需要对操作成败做出投票,然后根据投票结果决定是否要提交本次操作。
具体到RocketMQ的事务消息,它的两阶段提交可以分解为以下步骤:

  1. 第一阶段(Prepared):生产者向MQ发送一条预备消息(Prepared Message)。RocketMQ只是接收这个消息,但是并不立即把这个消息可见给消费者。

  2. 第二阶段:
    ○ 提交:如果业务操作执行成功,生产者会向MQ提交这条消息,在MQ端,这条消息会变为可见状态,然后即可被消费者消费。
    ○ 回滚:如果业务操作执行失败,生产者将告诉MQ回滚这条消息,在MQ端,这条消息将被删除,不会被消费者消费。
    通过这个流程,RocketMQ可以保证:只有当业务操作成功时,消息才会被消费者消费,以此来保证分布式事务的最终一致性。

Description

面试题:

传送门


Comment