一.概念

rocketMQ是一款典型的分布式架构下的中间件产品,使用异步通信方式和发布订阅的消息传输模型,具备异步通信的优势,系统拓扑简单,上下游耦合较弱,主要应用于异步解耦,流量削峰填谷等场景

二.服务端部署

部署声明

名称 版本 IP
Rocket 4.4.0 192.168.10.99

官方链接

名称 地址
官方文档 https://rocketmq.apache.org/docs/

安装namesrv

1. 拉取镜像

docker pull rocketmqinc/rocketmq

2. 创建挂载目录

mkdir -p  /docker/rocketmq/data/namesrv/logs   /docker/rocketmq/data/namesrv/store

3. 创建和启动容器

docker run -d \
--restart=always \
--name rmqnamesrv \
-p 9876:9876 \
-v /docker/rocketmq/data/namesrv/logs:/root/logs \
-v /docker/rocketmq/data/namesrv/store:/root/store \
-e "MAX_POSSIBLE_HEAP=100000000" \
rocketmqinc/rocketmq \
sh mqnamesrv 

安装broker

1. 创建broker数据存储路径

mkdir -p  /docker/rocketmq/data/broker/logs   /docker/rocketmq/data/broker/store /docker/rocketmq/conf

2. 创建配置文件

vi /docker/rocketmq/conf/broker.conf
# 所属集群名称,如果节点较多可以配置多个
brokerClusterName = DefaultCluster
#broker名称,master和slave使用相同的名称,表明他们的主从关系
brokerName = broker-a
#0表示Master,大于0表示不同的slave
brokerId = 0
#表示几点做消息删除动作,默认是凌晨4点
deleteWhen = 04
#在磁盘上保留消息的时长,单位是小时
fileReservedTime = 48
#有三个值:SYNC_MASTER,ASYNC_MASTER,SLAVE;同步和异步表示Master和Slave之间同步数据的机制;
brokerRole = ASYNC_MASTER
#刷盘策略,取值为:ASYNC_FLUSH,SYNC_FLUSH表示同步刷盘和异步刷盘;SYNC_FLUSH消息写入磁盘后才返回成功状态,ASYNC_FLUSH不需要;
flushDiskType = ASYNC_FLUSH
# 设置broker节点所在服务器的ip地址
brokerIP1 = 192.168.10.99
# 磁盘使用达到95%之后,生产者再写入消息会报错 CODE: 14 DESC: service not available now, maybe disk full
diskMaxUsedSpaceRatio=95
#开启自动创建主题
autoCreateTopicEnable=true
#开启过滤消息
enablePropertyFilter=true

3. 构建broker容器

docker run -d  \
--restart=always \
--name rmqbroker \
--link rmqnamesrv:namesrv \
-p 10911:10911 \
-p 10909:10909 \
-v  /docker/rocketmq/data/broker/logs:/root/logs \
-v  /docker/rocketmq/data/broker/store:/root/store \
-v /docker/rocketmq/conf/broker.conf:/opt/rocketmq-4.4.0/conf/broker.conf \
-e "NAMESRV_ADDR=namesrv:9876" \
-e "MAX_POSSIBLE_HEAP=200000000" \
rocketmqinc/rocketmq \
sh mqbroker -c /opt/rocketmq-4.4.0/conf/broker.conf 

安装控制台服务

1. 拉起镜像

docker pull pangliang/rocketmq-console-ng

2. 构建容器

docker run -d \
--restart=always \
--name rmqadmin \
-e "JAVA_OPTS=-Drocketmq.namesrv.addr=192.168.10.99:9876 \
-Dcom.rocketmq.sendMessageWithVIPChannel=false \
-Duser.timezone='Asia/Shanghai'" \
-v /etc/localtime:/etc/localtime \
-p 9999:8080 \
pangliang/rocketmq-console-ng

控制台地址: http://192.168.10.99:9999

image-20220926233248383

错误1

控制台报错 This date have’t data

构建容器时加入jvm 时区及系统时区文件

-v /etc/localtime:/etc/localtime

错误2

集成boot时 CODE: 1 DESC: The broker does not support consumer to filter message by SQL92

在broker.conf配置文件中 #开启过滤消息功能

enablePropertyFilter=true
docker run -d --restart=always --name rmqadmin -e "JAVA_OPTS=-Drocketmq.namesrv.addr=b.youlai.tech:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false" -Duser.timezone='Asia/Shanghai'" -v /etc/localtime:/etc/localtime -p 9999:8080 pangliang/rocketmq-console-ng

三 客户端部署

1.Maven依赖

        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
            <version>2.2.2</version>
        </dependency>

2.客户端配置

rocketmq:
  name-server: http://192.168.10.99:9876
  producer:
    group: defaultGroup

四. RocketMQ实战

普通消息

生产者

同步发送

    public boolean sync(String message) {
        boolean flag = true;
        String text1 = "发送消息:" + message;
        log.info(text1);
        SendResult sendResult1 = rocketMQTemplate.syncSend("base_topic", text1);
        log.info("同步响应:"+sendResult1.getSendStatus().toString());
        if(!SendStatus.SEND_OK.equals(sendResult1.getSendStatus())){
          flag = false;
        }
        return  flag;
    }

异步发送

    /**
     * 异步消息
     */
    public boolean async(String message) {
        String text1 = "发送消息:" + message;
        log.info(text1);
        rocketMQTemplate.asyncSend("base_topic", text1 , new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                log.info("异步消息响应成功");
            }
            @Override
            public void onException(Throwable throwable) {
                log.info("异步消息响应发送失败");
            }
        });
        return true;
    }

单向发送

    /**
     * 单向消息,不关心返回结果
     */
    public boolean oneWay(String message) {
        String text1 = "发送消息:" + message;
        log.info(text1);
        rocketMQTemplate.sendOneWay("base_topic", text1);
        log.info("单向发送-已发送...");
        return true;
    }

消费者

@Component
@RocketMQMessageListener(  topic = "base_topic",consumerGroup = "defaultGroup", messageModel = MessageModel.BROADCASTING, consumeMode= ConsumeMode.CONCURRENTLY)
@Slf4j
public class BaseConsumer implements RocketMQListener<String> {


    @Override
    public void onMessage(String message) {
        log.info("普通信息-接受到消息:" + message);
    }
}

测试

image-20230127045144788

image-20230127045213223

顺序消息

生产者

@Service
@Slf4j
public class OrderProducer {
    @Resource
    RocketMQTemplate rocketMQTemplate;

    private String topic = "order_topic";
    public boolean order(String create, String pay, String deliver) {
        boolean flag = true;
        try {
            TimeUnit.MILLISECONDS.sleep(50);

        String text1 = "发送顺序消息1:" + create;
        log.info(text1);
        SendResult sendResult1 = rocketMQTemplate.syncSendOrderly(topic, text1,"order");
        if(!SendStatus.SEND_OK.equals(sendResult1.getSendStatus())){
            flag = false;
        }

        TimeUnit.MILLISECONDS.sleep(50);
        String text2 = "发送顺序消息2:" + pay;
        log.info(text2);
        SendResult sendResult2 = rocketMQTemplate.syncSendOrderly(topic, text2,"order");
        if(!SendStatus.SEND_OK.equals(sendResult2.getSendStatus())){
            flag = false;
        }
        TimeUnit.MILLISECONDS.sleep(50);
        String text3 = "发送顺序消息3:" + deliver;
        log.info(text3);
        SendResult sendResult3 = rocketMQTemplate.syncSendOrderly(topic, text3,"order");
        if(!SendStatus.SEND_OK.equals(sendResult3.getSendStatus())){
            flag = false;
        }
        return  flag;
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }
}

消费者

@Component
@RocketMQMessageListener( consumeMode= ConsumeMode.ORDERLY, topic = "order_topic", consumerGroup = "order_group")
@Slf4j
public class OrderConsumer implements RocketMQListener<String> {

    @Override
    public void onMessage(String message) {
        log.info("顺序消息-接收到消息:" + message);
    }
}

测试

image-20230127045723191

image-20230127045748643

延迟消息

生产者

@Service
@Slf4j
public class ScheduledProducer {
    @Resource
    RocketMQTemplate rocketMQTemplate;

    public void scheduled(Integer delayLevel,String body) {
        String text = "延时"+delayLevel+"消息:"+ body;
        log.info(text);
        // 设置延时等级2,这个消息将在5s之后发送
        // 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
        Message<String> message = MessageBuilder.withPayload(body).build();
        SendResult sendResult = rocketMQTemplate.syncSend("scheduled_topic", message, 1000, delayLevel);
        log.info("延时消息发送:"+sendResult.getSendStatus().toString());
    }
}

消费者

@Component
@RocketMQMessageListener(topic = "scheduled_topic", consumerGroup = "scheduled_group", messageModel = MessageModel.BROADCASTING, consumeMode= ConsumeMode.CONCURRENTLY)
@Slf4j
public class ScheduleConsumer implements RocketMQListener<String> {

    @Override
    public void onMessage(String message) {
        log.info("延时消息-接受到消息:" + message);
    }
}

测试

image-20230127045943345

image-20230127045957717

批量消息

生产者

@Service
@Slf4j
public class BatchProducer {
    @Resource
    RocketMQTemplate rocketMQTemplate;

    public void batch(String message1,String message2,String message3,String message4,String message5) {
        List<Message> messageList = new ArrayList<>();
        messageList.add(MessageBuilder.withPayload(message1).build());
        messageList.add(MessageBuilder.withPayload(message2).build());
        messageList.add(MessageBuilder.withPayload(message3).build());
        messageList.add(MessageBuilder.withPayload(message4).build());
        messageList.add(MessageBuilder.withPayload(message5).build());
        log.info("开始发送...");
        SendResult result = rocketMQTemplate.syncSend("batch_topic", messageList);
        log.info("已发送...");
    }
}

消费者

@Component
@RocketMQMessageListener(topic = "batch_topic", consumerGroup = "batch_group", messageModel = MessageModel.BROADCASTING, consumeMode= ConsumeMode.CONCURRENTLY)
@Slf4j
public class BatchConsumer implements RocketMQListener<String> {

    @Override
    public void onMessage(String message) {
        log.info("批量消息-接受到消息:" + message);
    }
}

测试

image-20230127050110095

image-20230127050120998

事务消息

生产者

@Service
@Slf4j
@RequiredArgsConstructor
public class TxProducer {
    @Resource
    RocketMQTemplate rocketMQTemplate;


    public void tx(Boolean isOpenTx) {
        Message<Long> message =  MessageBuilder.withPayload(Long.valueOf(RandomUtil.randomInt(1,1000)))
                // 设置事务Id
                .setHeader(RocketMQHeaders.TRANSACTION_ID,RandomUtil.randomInt(1,1000))
                .build();
        rocketMQTemplate.sendMessageInTransaction("tx:tx_expression", message, isOpenTx);
        log.info("发送事务消息");
    }
}

消费者

@Slf4j
@RocketMQTransactionListener
@RequiredArgsConstructor
public class TxProducerListener implements RocketMQLocalTransactionListener {


    /**
     * 记录各个事务Id的状态:1-正在执行,2-执行成功,3-执行失败
     */
    private ConcurrentHashMap<String, Integer> transMap = new ConcurrentHashMap<>();

    /**
     * 执行本地事务
     *
     * @param msg
     * @param arg
     * @return
     */
    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        Boolean isOpenTx = (Boolean) arg;
        if(isOpenTx){
            log.info("提交事务");
            return RocketMQLocalTransactionState.COMMIT;
        }else{
            log.info("回滚事务");
            return RocketMQLocalTransactionState.ROLLBACK;
        }
    }


    /**
     * 事务超时,回查方法
     * 检查本地事务,如果RocketMQ长时间(1分钟左右)没有收到本地事务的返回结果,则会定时主动执行改方法,查询本地事务执行情况。
     *
     * @param msg
     * @return
     */
    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
        log.info("检查事务");
        return RocketMQLocalTransactionState.UNKNOWN;
    }
}

测试

image-20230127050252118

image-20230127050308986

踩坑1

消费者接收不到消息

@RocketMQMessageListener(  topic = "base_topic",consumerGroup = "defaultGroup", messageModel = MessageModel.BROADCASTING, consumeMode= ConsumeMode.CONCURRENTLY)

消费者默认为集群消费模式,需要修改为广播模式

  • 集群消费模式:当使用集群消费模式时,RocketMQ 认为任意一条消息只需要被消费组内的任意一个消费者处理即可。
  • 广播消费模式:当使用广播消费模式时,RocketMQ 会将每条消息推送给消费组所有的消费者,保证消息至少被每个消费者消费一次。

踩坑2

image-20230127003955396

springboot版本大于2.6时会和knife4j冲突

解决方法

spring:
  mvc:
   pathmatch:
      matching-strategy: ant_path_matcher

五.总结

本文只是简单写了一下rocketmq的基本使用,主要需要理解的是rocketmq里面生产者和消费者,消息类型,了解清楚概念就没什么问题了

所有代码地址:https://gitee.com/youlaiorg/youlai-learning.git