• 152870

      文章

    • 1250

      評論

    • 6

      友鏈

    • 最近新加了換膚功能,大家多來逛逛吧~~~~
    • 喜歡這個網站的朋友可以加一下QQ群,我們一起交流技術。

    RocketMQ 5.0: POP 消費模式 原理詳解 & 源碼解析

    RocketMQ POP 消費模式 原理詳解 & 源碼解析

    原文地址:http://hscarb.github.io/rocketmq/20221212-rocketmq-consumer-7-pop-consume.md

    1. 背景

    1.1 什么是 Pop 消費

    RocketMQ 5.0 中引入了一種新的消費模式:Pop 消費模式。

    我們知道 RocketMQ 原來有兩種消費模式:Pull 模式消費和 Push 模式消費,其中 Push 模式指的是 Broker 將消息主動“推送”給消費者,它的背后其實是消費者在不斷地 Pull 消息來實現類似于 Broker “推”消息給消費者的效果。

    新引入的 Pop 消費模式主要是用于 Push 消費時將拉消息的動作替換成 Pop 。Pop 消費的行為和 Pull 消費很像,區別在于 Pop 消費的重平衡是在 Broker 端做的,而之前的 Pull 和 Push 消費都是由客戶端完成重平衡。

    1.2 如何使用 Pop 消費

    RocketMQ 提供了 2 種方式,能夠讓 Push 消費切換為使用 Pop 模式拉取消息(Pull 消費暫不支持切換 Pop 模式),分別為命令行方式切換和客戶端代碼方式切換。

    1.2.1 使用命令行方式切換

    利用命令行,用如下命令,指定集群和需要切換的消費組,可以將一個消費組切換成 Pop 消費模式消費某個 Topic

    mqadmin setConsumeMode -c cluster -t topic -g group -m POP -q 8
    

    以下為參數含義

    opt = new Option("c", "clusterName", true, "create subscription group to which cluster");
    opt = new Option("t", "topicName", true, "topic name");
    opt = new Option("g", "groupName", true, "consumer group name");
    opt = new Option("m", "mode", true, "consume mode. PULL/POP");
    opt = new Option("q", "popShareQueueNum", true, "num of queue which share in pop mode");
    

    1.2.2 代碼切換

    在創建 Consumer 之前,先運行 switchPop() 方法,它其實與上面命令行的邏輯一樣,也是發送請求給集群中的所有 Broker 節點,讓它們切換對應消費者組和 Topic 的消費者的消費模式為 Pop 模式。

    // PopPushConsumer.java
    public class PopPushConsumer {
    
        public static final String CONSUMER_GROUP = "CID_JODIE_1";
        public static final String TOPIC = "TopicTest";
    
        // Or use AdminTools directly: mqadmin setConsumeMode -c cluster -t topic -g group -m POP -n 8
        private static void switchPop() throws Exception {
            DefaultMQAdminExt mqAdminExt = new DefaultMQAdminExt();
            mqAdminExt.start();
    
            ClusterInfo clusterInfo = mqAdminExt.examineBrokerClusterInfo();
            Set<String> brokerAddrs = clusterInfo.getBrokerAddrTable().values().stream().map(BrokerData::selectBrokerAddr).collect(Collectors.toSet());
    
            for (String brokerAddr : brokerAddrs) {
                mqAdminExt.setMessageRequestMode(brokerAddr, TOPIC, CONSUMER_GROUP, MessageRequestMode.POP, 8, 3_000);
            }
        }
    
        public static void main(String[] args) throws Exception {
            switchPop();
    
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(CONSUMER_GROUP);
            consumer.subscribe(TOPIC, "*");
            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
            consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            });
            consumer.setClientRebalance(false);
            consumer.start();
            System.out.printf("Consumer Started.%n");
        }
    }
    

    1.3 引入 Pop 消費模式的原因

    引入 Pop 消費主要的原因是由于 Push 消費的機制導致它存在一些痛點。RocketMQ 5.0 云原生化的要求催生著一種能夠解決這些痛點的新消費模式誕生。

    Push 消費模式的重平衡邏輯是在客戶端完成的,這就導致了幾個問題:

    1. 客戶端代碼邏輯較重,要支持一種新語言的客戶端就必須實現完整的重平衡邏輯,此外還需要實現拉消息、位點管理、消費失敗后將消息發回 Broker 重試等邏輯。這給多語言客戶端的支持造成很大的阻礙。
    2. 當客戶端升級或者下線時,都要進行重平衡操作,可能造成消息堆積。

    此外,Push 消費的特性是重平衡后每個消費者都分配到消費一定數量的隊列,而每個隊列最多只能被一個消費者消費。這就決定了消費者的橫向擴展能力受到 Topic 中隊列數量的限制。這里有引入了如下痛點

    1. 消費者無法無限擴展,當消費者數量擴大到大于隊列數量時,有的消費者將無法分配到隊列。
    2. 當某些消費者僵死(hang ?。r(與 Broker 的心跳未斷,但是無法消費消息),會造成其消費的隊列的消息堆積,遲遲無法被消費,也不會主動重平衡來解決這個問題。

    引入 Pop 消費模式之后,可以解決 Push 消費導致的可能的消息堆積問題和橫向擴展能力問題。此外,RocketMQ 5.0 中引入了的輕量化客戶端就用到了 Pop 消費能力,將 Pop 消費接口用 gRPC 封裝,實現了多語言輕量化客戶端,而不必在客戶端實現重平衡邏輯。詳見該項目 rocketmq-clients。

    2. 概要設計

    Pop 消費主要的設計思想是將繁重的客戶端邏輯如重平衡、消費進度提交、消費失敗后發到 Broker 重試等邏輯放到 Broker 端。

    客戶端只需要不斷發送 Pop 請求,由 Broker 端來分配每次拉取請求要拉取的隊列并返回消息。這樣就可以實現多個客戶端同時拉取一個隊列的效果,不會存在一個客戶端 hang 住導致隊列消息堆積,也不會存在頻繁的重平衡導致消息積壓。

    2.1 Pop 消費流程

    為了保證消費速度,Pop 消費一次請求可以拉取一批消息,拉取到的消息系統屬性中有一個比較重要的屬性叫做 POP_CK,它是該消息的句柄,ACK 時要通過句柄來定位到它。在 Broker 端會為這批消息保存一個 CheckPoint,它里面包含一批消息的句柄信息。

    對于長時間沒有 ACK 的消息,Broker 端并非毫無辦法。Pop 消費引入了消息不可見時間(invisibleTime)的機制。當 Pop 出一條消息后,這條消息對所有消費者不可見,即進入不可見時間,當它超過該時刻還沒有被 ACK,Broker 將會把它放入 Pop 專門的重試 Topic(這個過程稱為 Revive),這條消息重新可以被消費。

    Push 消費的重試間隔時間會隨著重試次數而增加,Pop 消費也沿用了這個設計。此外,Pop 消費提供了一個接口 changeInvisibleTime() 來修改單條消息的不可見時間。

    從圖上可以看見,本來消息會在中間這個時間點再一次的可見的,但是我們在可見之前提前使用 changeInvisibleTime 延長了不可見時間,讓這條消息的可見時間推遲了。

    當消費失?。ㄓ脩魳I務代碼返回 reconsumeLater 或者拋異常)的時候,消費者就通過 changeInvisibleTime 按照重試次數來修改下一次的可見時間。另外如果消費消息用時超過了 30 秒(默認值,可以修改),則 Broker 也會把消息放到重試隊列。

    2.2 客戶端-服務端交互

    Pop 消費的流程與 Push 消費較為相似,這里我分為 5 個步驟。

    1. 向 Broker 端發送請求,切換消息拉取模式為 Pop 模式
    2. 重平衡服務執行重平衡,此時已經切換為 Pop 模式,所以是向 Broker 端發起請求,請求中帶有重平衡策略,Broker 會返回重平衡的結果。
    3. 重平衡完畢之后開始拉取消息,拉取消息服務發送 POP_MESSAGE 請求給 Broker,獲取一批消息
    4. 消費這批消息
    5. 對成功消費的消息,發送 ACK 請求給 Broker

    2.3 服務端實現

    服務端收到 Pop 請求后,會先在 Queue 維度上加鎖,保證同一時間只有一個消費者可以拉取該隊列的消息。

    隨后服務端會在存儲中查詢一批消息,將這批消息的構建的 CheckPoint 保存在 Broker 中,以便與 ACK 的消息匹配。

    CheckPoint 的存在目的是與 ACK 的消息匹配,并將沒有匹配的消息重試。CheckPointReviveTime 就是它這批消息需要被嘗試重試(喚醒)的時間。

    CheckPoint會先被保存在內存中,一般來說消息消費很快,所以在內存中就能夠與 ACK 消息匹配成功后刪除。如果在一段時間(默認 3s)內沒有匹配成功,它將會從內存中被刪除,轉入磁盤等待匹配。

    對于 ACK 消息也一樣,它先被放入內存中匹配,如果在內存中找不到對應的 CheckPoint,也會放入磁盤。


    RocketMQ 的磁盤存儲實際上就是 Topic 和隊列。為了避免頻繁檢查匹配狀態,我們只在 CheckPoint 需要被喚醒時做檢查,這里就可以用到定時消息,將 CheckPoint 和 ACK 消息定時到 ReviveTime 投遞。這里 RocketMQ 將 CheckPoint 的投遞時間提前 1s,以便能先消費到,與 ACK 消息匹配。

    當定時到期,它們會被投遞到 REVIVE_TOPIC。有個后臺線程消費這個 Topic,把 CheckPoint 放到一個 map 中,對于 ACK 消息則從 map 中查找 CheckPoint 來嘗試匹配,如果匹配成功則更新 REVIVE_TOPIC 的消費位點。對于超過 ReviveTime 還沒有被匹配的 CheckPoint,查出這批消息中要重試消息對應的真實消息,并放到 Pop 消費重試 Topic 中。

    Broker 端的 Pop 消費邏輯會概率性消費到重試 Topic 中的消息。

    3. 詳細設計

    3.1 Broker 端重平衡

    Pop 消費的重平衡在 Broker 端完成,客戶端的重平衡服務重平衡時會向 Broker 端發送查詢請求,查詢自己的分配結果。

    重平衡的主要邏輯其實與在客戶端重平衡類似,只不過變成了 Broker 接收客戶端的參數之后根據這些參數進行重平衡,然后把重平衡結果返回給客戶端。

    Broker 端重平衡入口為 QueryAssignmentProcessor#doLoadBalance()。

    對于廣播模式,直接返回 Topic 下所有的隊列。

    對于集群模式,Pop 模式的重平衡與 Push 模式不同,它允許一個隊列被多個消費者 Pop 消費。在切換 Pop 模式時引入了 popShareQueueNum 參數,表示允許消費者進行額外的負載獲取隊列的次數(可以被共享的隊列數),0 表示可以消費所有隊列。

    所以重平衡時對每個消費者執行 popShareQueueNum 次重平衡策略,將多次重平衡分配到的隊列都分給這個消費者消費。這樣,每個隊列就會被多個消費者消費。

    下圖為 popShareQueueNum = 1 時的重平衡情況,每個消費者被負載了 2 次,每個隊列被 2 個消費者共享(1 + popShareQueueNum)。

    3.2 Broker 端 Pop 消息

    3.2.1 請求處理入口

    Pop 消息的 Broker 端處理是由 PopMessageProcessor#processRequest() 完成。

    該方法邏輯為

    1. 完成請求體解析和一些參數和權限的校驗
    2. 生成一個 0 到 99 的隨機整數,如果能被 5 整除,則先拉取重試 Topic。
    3. 從重試 Topic 的每個 Queue 中 Pop 消息
    4. 根據請求的隊列 Pop 對應的隊列的消息。如果 Pop 請求指定了隊列,只會消費一個隊列的消息;如果沒有指定隊列,則 Pop 所有隊列的消息
    5. 如果 Pop 的消息沒有滿(達到請求的最大消息數量),且之前沒有拉取過重試消息,則 Pop 重試 Topic 所有隊列的消息(期望填充滿 Pop 請求要求的數量)
    6. 判斷是否 Pop 到消息,如果有則傳輸回客戶端,如果沒有則掛起輪詢,直到超過請求的 timeout 參數指定的時間

    3.2.2 Pop 消息方法

    上面的 3、4、5 都涉及到從存儲中 Pop 消息,它們都調用同一個方法:popMsgFromQueue,它是真正查詢消息的方法,下面看一下它的邏輯

    1. 將需要 Pop 的隊列上鎖(用 AtomicBoolean 實現)
    2. 計算 Pop 消息的起始偏移量,會返回內存中 CheckPoint 與 ACK 消息匹配后的最新位點
    3. 從磁盤中根據起始偏移量查詢一批消息
    4. 計算隊列剩余的消息數量(用作返回值)
    5. 拉取的這批消息將生成一個 CheckPoint,存入內存和磁盤
    6. 解鎖隊列
    7. 返回 Pop 到的消息

    上面方法第 5 步會將生成的 CheckPoint 放入內存和磁盤,注意這個 CheckPoint 會保存一批獲取到的消息的起始偏移量和相對偏移量(相對于起始偏移量),所以一個 CheckPoint 在保存和匹配時都對應一批消息。

    3.2.3 保存 CheckPoint 用于匹配

    1. 構造 CheckPoint,添加起始偏移量和所有 Pop 出的消息的相對偏移量
    2. 嘗試將 CheckPoint 添加到內存 Buffer,如果成功則直接返回。但是在內存中匹配 CheckPointAckMsg 的開關默認是關閉的,所以這里不會加入到內存,會繼續后面的邏輯放入磁盤
    3. CheckPoint 構造成一個消息,數據都放到消息體中,然后這個消息定時到 ReviveTime(喚醒重試的時間)- 1s(為了留時間與 AckMsg 匹配)發送。會發送到 ReviveTopic 的一個隊列。

    3.3 Broker 端 ACK 消息

    Ack 消息接口每次只允許 Ack 一條消息,入口是 AckMessageProcessor#processRequest()

    1. 從請求頭解析和構造 Ack 消息,并作一些校驗
    2. 順序消息 Ack 和普通消息 Ack 分別處理,這里針對普通消息
    3. 先嘗試將 Ack 消息放入內存 Buffer,如果成功則直接返回。失敗則有可能是內存匹配未開啟。
    4. 如果放入內存失敗,構造一個用于存到磁盤的消息,定時到喚醒重試時間投遞(到 ReviveTopic)。

    3.4 Broker 端 CheckPointAckMsg 匹配

    CheckPointAckMsg 都被設計成先嘗試放入內存中匹配,然后再磁盤中匹配,因為通常情況下消息消費之后都能很快 ACK,內存匹配性能較高。如果 CheckPoint 在內存中停留太久沒有被匹配,則會轉移到磁盤中(ReviveTopic),有個線程消費這個 ReviveTopic 來匹配。到達喚醒重試時間(ReviveTime)還沒有被匹配的 CheckPoint 里面的消息將會重試(發送到 Pop 消息重試 Topic,后面的 Pop 有概率消費到)。

    3.4.1 內存匹配

    內存匹配邏輯由一個線程 PopBufferMergeService 完成,只有主節點運行該匹配線程。

    Pop 消息時會先添加 CheckPoint 到 buffer,Ack 消息時嘗試從內存 buffer 中的 CheckPoint 匹配。同時,它每 5ms 執行一次掃描,將不符合內存中存活條件的 CheckPoint 移除,放入磁盤存儲。

    addCk 方法將 CheckPoint 放入內存 Buffer。CheckPoint 中有一個碼表 BitMap,用來表示它里面的每個條消息是否被 Ack 和被存到磁盤。用 BitMap 可以加速匹配。

    addAk 方法會嘗試從 buffer 中找 CheckPoint 來匹配。如果找到對應的 CheckPoint,則修改它碼表的對應位,表示這條消息被 ACK。

    scan 方法每 5ms 執行一次

    1. 將已經匹配或存盤的 CheckPoint 移出 buffer
    2. 把超時的 CheckPoint 存入磁盤
    3. 對于匹配完成或者存盤的 CheckPoint,為他們提交消息偏移量

    3.4.2 Store 匹配和消息重試

    從內存中移除保存到磁盤的 CheckPointAckMsg 都會封裝成消息進行定時投遞(定時到重試時間),最終投遞到 ReviveTopic。存儲中匹配也由一個線程 PopReviveService 完成,它消費 ReviveTopic 的消息進行匹配和重試。

    Pop 消費由于要根據 Topic 來 Pop 消息,重試 Topic 需要針對每個 [消費組-Topic] 隔離,所以它不能用普通消息的消費組維度的重試 Topic,而是用專門的 Pop 重試 Topic %RETRY%{消費組}_{TOPIC}。

    PopReviveService#run 方法是該處理線程的入口,它每秒都會調用 consumeReviveMessage 消費和匹配 ReviveTopic 消息,然后調用 mergeAndRevive 方法檢查匹配的情況并對達到喚醒時間還沒有成功匹配的消息重試。

    這兩個方法會先初始化一個 map,用于存放 CheckPoint,供 AckMsg 根據 map key 查找 CheckPoint。


    consumeReviveMessage 會消費 2s 內的一批 ReviveTopic 消息,CK 消息放入 map,Ack 消息則從 map 中查找 CK,在碼表上標記對應的消息為 Acked。

    mergeAndRevive 方法如其名,遍歷消費到的 CK 消息,對于已經到重試時間的,對沒有 Ack 的消息進行重試。

    重試邏輯為先從 MessageStore 查詢對應的真正消息,然后將該消息發送到 Pop 重試隊列。

    4. 源碼解析

    4.1 Broker 端重平衡

    4.1.1 QueryAssignmentProcessor#doLoadBalance

    /**
     * Broker 端重平衡
     * Returns empty set means the client should clear all load assigned to it before, null means invalid result and the
     * client should skip the update logic
     *
     * @param topic
     * @param consumerGroup
     * @param clientId
     * @param messageModel 消費模型(廣播/集群)
     * @param strategyName 重平衡策略名
     * @return the MessageQueues assigned to this client
     */
    private Set<MessageQueue> doLoadBalance(final String topic, final String consumerGroup, final String clientId,
                                            final MessageModel messageModel, final String strategyName,
                                            SetMessageRequestModeRequestBody setMessageRequestModeRequestBody, final ChannelHandlerContext ctx) {
        Set<MessageQueue> assignedQueueSet = null;
        final TopicRouteInfoManager topicRouteInfoManager = this.brokerController.getTopicRouteInfoManager();
    
        switch (messageModel) {
            case BROADCASTING: {
                // 廣播模式,返回該 Topic 下所有隊列
                assignedQueueSet = topicRouteInfoManager.getTopicSubscribeInfo(topic);
                if (assignedQueueSet == null) {
                    log.warn("QueryLoad: no assignment for group[{}], the topic[{}] does not exist.", consumerGroup, topic);
                }
                break;
            }
            case CLUSTERING: {
                // 集群模式
                // 獲取 Topic 下所有隊列
                Set<MessageQueue> mqSet = topicRouteInfoManager.getTopicSubscribeInfo(topic);
                if (null == mqSet) {
                    if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                        log.warn("QueryLoad: no assignment for group[{}], the topic[{}] does not exist.", consumerGroup, topic);
                    }
                    return null;
                }
    
                if (!brokerController.getBrokerConfig().isServerLoadBalancerEnable()) {
                    return mqSet;
                }
    
                List<String> cidAll = null;
                // 獲取發起請求的消費組信息
                ConsumerGroupInfo consumerGroupInfo = this.brokerController.getConsumerManager().getConsumerGroupInfo(consumerGroup);
                if (consumerGroupInfo != null) {
                    cidAll = consumerGroupInfo.getAllClientId();
                }
                if (null == cidAll) {
                    log.warn("QueryLoad: no assignment for group[{}] topic[{}], get consumer id list failed", consumerGroup, topic);
                    return null;
                }
    
                List<MessageQueue> mqAll = new ArrayList<MessageQueue>();
                mqAll.addAll(mqSet);
                // 將隊列和消費者客戶端ID 排序
                Collections.sort(mqAll);
                Collections.sort(cidAll);
                List<MessageQueue> allocateResult = null;
    
                try {
                    // 根據重平衡策略名稱獲取策略
                    AllocateMessageQueueStrategy allocateMessageQueueStrategy = name2LoadStrategy.get(strategyName);
                    if (null == allocateMessageQueueStrategy) {
                        log.warn("QueryLoad: unsupported strategy [{}],  {}", strategyName, RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
                        return null;
                    }
    
                    if (setMessageRequestModeRequestBody != null && setMessageRequestModeRequestBody.getMode() == MessageRequestMode.POP) {
                        // POP 模式重平衡
                        allocateResult = allocate4Pop(allocateMessageQueueStrategy, consumerGroup, clientId, mqAll,
                                                      cidAll, setMessageRequestModeRequestBody.getPopShareQueueNum());
    
                    } else {
                        // 普通重平衡
                        allocateResult = allocateMessageQueueStrategy.allocate(consumerGroup, clientId, mqAll, cidAll);
                    }
                } catch (Throwable e) {
                    log.error("QueryLoad: no assignment for group[{}] topic[{}], allocate message queue exception. strategy name: {}, ex: {}", consumerGroup, topic, strategyName, e);
                    return null;
                }
    
                assignedQueueSet = new HashSet<MessageQueue>();
                if (allocateResult != null) {
                    assignedQueueSet.addAll(allocateResult);
                }
                break;
            }
            default:
                break;
        }
        return assignedQueueSet;
    }
    

    4.1.2 QueryAssignmentProcessor#allocate4Pop

    /**
     * POP 模式重平衡
     *
     * @param allocateMessageQueueStrategy 重平衡策略
     * @param consumerGroup 消費組
     * @param clientId 消費組客戶端 ID
     * @param mqAll 全部消息隊列
     * @param cidAll 全部客戶端ID
     * @param popShareQueueNum Pop 模式下可允許被共享的隊列數,0 表示無限
     * @return 該消費者負載的隊列列表
     */
    public List<MessageQueue> allocate4Pop(AllocateMessageQueueStrategy allocateMessageQueueStrategy,
                                           final String consumerGroup, final String clientId, List<MessageQueue> mqAll, List<String> cidAll,
                                           int popShareQueueNum) {
    
        List<MessageQueue> allocateResult;
        if (popShareQueueNum <= 0 || popShareQueueNum >= cidAll.size() - 1) {
            // 每個消費者能消費所有隊列,返回全部隊列。隊列 ID 為 -1 表示 Pop 消費時消費全部隊列
            //each client pop all messagequeue
            allocateResult = new ArrayList<>(mqAll.size());
            for (MessageQueue mq : mqAll) {
                //must create new MessageQueue in case of change cache in AssignmentManager
                MessageQueue newMq = new MessageQueue(mq.getTopic(), mq.getBrokerName(), -1);
                allocateResult.add(newMq);
            }
    
        } else {
            if (cidAll.size() <= mqAll.size()) {
                // 消費者數量小于等于隊列數量,每個消費者分配 N 個隊列,每個隊列也會被分配給多個消費者
                //consumer working in pop mode could share the MessageQueues assigned to the N (N = popWorkGroupSize) consumer following it in the cid list
                allocateResult = allocateMessageQueueStrategy.allocate(consumerGroup, clientId, mqAll, cidAll);
                int index = cidAll.indexOf(clientId);
                if (index >= 0) {
                    // 負載 popShareQueueNum 次,將每次負載的結果加入最終結果
                    for (int i = 1; i <= popShareQueueNum; i++) {
                        index++;
                        index = index % cidAll.size();
                        List<MessageQueue> tmp = allocateMessageQueueStrategy.allocate(consumerGroup, cidAll.get(index), mqAll, cidAll);
                        allocateResult.addAll(tmp);
                    }
                }
            } else {
                // 消費者數量大于隊列數量,保證每個消費者都有隊列消費
                //make sure each cid is assigned
                allocateResult = allocate(consumerGroup, clientId, mqAll, cidAll);
            }
        }
    
        return allocateResult;
    }
    

    4.2 Broker 端 Pop 消息

    4.2.1 PopMessageProcessor#processRequest

    /**
     * 處理 POP 消息請求
     *
     * @param channel
     * @param request
     * @return
     * @throws RemotingCommandException
     */
    private RemotingCommand processRequest(final Channel channel, RemotingCommand request)
        throws RemotingCommandException {
        // ... 解析請求體和一系列校驗
    
        // 生成隨機數
        int randomQ = random.nextInt(100);
        int reviveQid;
        if (requestHeader.isOrder()) {
            reviveQid = KeyBuilder.POP_ORDER_REVIVE_QUEUE;
        } else {
            // 輪詢選一個 Revive 隊列
            reviveQid = (int) Math.abs(ckMessageNumber.getAndIncrement() % this.brokerController.getBrokerConfig().getReviveQueueNum());
        }
    
        int commercialSizePerMsg = this.brokerController.getBrokerConfig().getCommercialSizePerMsg();
        GetMessageResult getMessageResult = new GetMessageResult(commercialSizePerMsg);
    
        // 隊列中剩余的消息數量
        long restNum = 0;
        // 1/5 的概率拉取重試消息
        boolean needRetry = randomQ % 5 == 0;
        long popTime = System.currentTimeMillis();
        // 拉取重試消息
        if (needRetry && !requestHeader.isOrder()) {
            TopicConfig retryTopicConfig =
                this.brokerController.getTopicConfigManager().selectTopicConfig(KeyBuilder.buildPopRetryTopic(requestHeader.getTopic(), requestHeader.getConsumerGroup()));
            if (retryTopicConfig != null) {
                for (int i = 0; i < retryTopicConfig.getReadQueueNums(); i++) {
                    int queueId = (randomQ + i) % retryTopicConfig.getReadQueueNums();
                    restNum = popMsgFromQueue(true, getMessageResult, requestHeader, queueId, restNum, reviveQid,
                                              channel, popTime, messageFilter,
                                              startOffsetInfo, msgOffsetInfo, orderCountInfo);
                }
            }
        }
        // 如果拉取請求沒有指定隊列(-1),則拉取所有隊列
        if (requestHeader.getQueueId() < 0) {
            // read all queue
            for (int i = 0; i < topicConfig.getReadQueueNums(); i++) {
                int queueId = (randomQ + i) % topicConfig.getReadQueueNums();
                restNum = popMsgFromQueue(false, getMessageResult, requestHeader, queueId, restNum, reviveQid, channel, popTime, messageFilter,
                                          startOffsetInfo, msgOffsetInfo, orderCountInfo);
            }
        } else {
            // 拉取請求指定了隊列,拉取對應的隊列
            int queueId = requestHeader.getQueueId();
            restNum = popMsgFromQueue(false, getMessageResult, requestHeader, queueId, restNum, reviveQid, channel,
                                      popTime, messageFilter,
                                      startOffsetInfo, msgOffsetInfo, orderCountInfo);
        }
        // 如果前面拉取普通消息之后,沒有滿,則再拉取一次重試消息
        // if not full , fetch retry again
        if (!needRetry && getMessageResult.getMessageMapedList().size() < requestHeader.getMaxMsgNums() && !requestHeader.isOrder()) {
            TopicConfig retryTopicConfig =
                this.brokerController.getTopicConfigManager().selectTopicConfig(KeyBuilder.buildPopRetryTopic(requestHeader.getTopic(), requestHeader.getConsumerGroup()));
            if (retryTopicConfig != null) {
                for (int i = 0; i < retryTopicConfig.getReadQueueNums(); i++) {
                    int queueId = (randomQ + i) % retryTopicConfig.getReadQueueNums();
                    restNum = popMsgFromQueue(true, getMessageResult, requestHeader, queueId, restNum, reviveQid,
                                              channel, popTime, messageFilter,
                                              startOffsetInfo, msgOffsetInfo, orderCountInfo);
                }
            }
        }
        // 拉取消息成功
        if (!getMessageResult.getMessageBufferList().isEmpty()) {
            response.setCode(ResponseCode.SUCCESS);
            getMessageResult.setStatus(GetMessageStatus.FOUND);
            if (restNum > 0) {
                // all queue pop can not notify specified queue pop, and vice versa
                notifyMessageArriving(requestHeader.getTopic(), requestHeader.getConsumerGroup(),
                                      requestHeader.getQueueId());
            }
        } else {
            // 沒有拉取到消息,長輪詢
            int pollingResult = polling(channel, request, requestHeader);
            if (POLLING_SUC == pollingResult) {
                return null;
            } else if (POLLING_FULL == pollingResult) {
                response.setCode(ResponseCode.POLLING_FULL);
            } else {
                response.setCode(ResponseCode.POLLING_TIMEOUT);
            }
            getMessageResult.setStatus(GetMessageStatus.NO_MESSAGE_IN_QUEUE);
        }
        responseHeader.setInvisibleTime(requestHeader.getInvisibleTime());
        responseHeader.setPopTime(popTime);
        responseHeader.setReviveQid(reviveQid);
        responseHeader.setRestNum(restNum);
        responseHeader.setStartOffsetInfo(startOffsetInfo.toString());
        responseHeader.setMsgOffsetInfo(msgOffsetInfo.toString());
        if (requestHeader.isOrder() && orderCountInfo != null) {
            responseHeader.setOrderCountInfo(orderCountInfo.toString());
        }
        response.setRemark(getMessageResult.getStatus().name());
        // 傳輸消息
        return response;
    }
    

    4.2.2 PopMessageProcessor#popMsgFromQueue

    /**
     * 從消息隊列中 POP 消息
     *
     * @param isRetry 是否是重試 Topic
     * @param getMessageResult
     * @param requestHeader
     * @param queueId 消息隊列 ID
     * @param restNum 隊列剩余消息數量
     * @param reviveQid 喚醒隊列 ID
     * @param channel Netty Channel,用于獲取客戶端 host,來提交消費進度
     * @param popTime Pop 時間
     * @param messageFilter
     * @param startOffsetInfo 獲取 Pop 的起始偏移量
     * @param msgOffsetInfo 獲取所有 Pop 的消息的邏輯偏移量
     * @param orderCountInfo
     * @return 隊列剩余消息
     */
    private long popMsgFromQueue(boolean isRetry, GetMessageResult getMessageResult,
                                 PopMessageRequestHeader requestHeader, int queueId, long restNum, int reviveQid,
                                 Channel channel, long popTime,
                                 ExpressionMessageFilter messageFilter, StringBuilder startOffsetInfo,
                                 StringBuilder msgOffsetInfo, StringBuilder orderCountInfo) {
        String topic = isRetry ? KeyBuilder.buildPopRetryTopic(requestHeader.getTopic(),
                                                               requestHeader.getConsumerGroup()) : requestHeader.getTopic();
        // {TOPIC}@{GROUP}@{QUEUE_ID}
        String lockKey =
            topic + PopAckConstants.SPLIT + requestHeader.getConsumerGroup() + PopAckConstants.SPLIT + queueId;
        boolean isOrder = requestHeader.isOrder();
        long offset = getPopOffset(topic, requestHeader, queueId, false, lockKey);
        // Queue 上加鎖,保證同一時刻只有一個消費者可以拉取同一個 Queue 的消息
        if (!queueLockManager.tryLock(lockKey)) {
            // 返回該隊列中待 Pop 的消息數量
            restNum = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId) - offset + restNum;
            return restNum;
        }
        // 計算要 POP 的消息偏移量
        offset = getPopOffset(topic, requestHeader, queueId, true, lockKey);
        GetMessageResult getMessageTmpResult = null;
        try {
            // 順序消費,阻塞
            if (isOrder && brokerController.getConsumerOrderInfoManager().checkBlock(topic,
                                                                                     requestHeader.getConsumerGroup(), queueId, requestHeader.getInvisibleTime())) {
                return this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId) - offset + restNum;
            }
    
            // 已經拉取到足夠的消息
            if (getMessageResult.getMessageMapedList().size() >= requestHeader.getMaxMsgNums()) {
                restNum =
                    this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId) - offset + restNum;
                return restNum;
            }
            // 從磁盤消息存儲中根據邏輯偏移量查詢消息
            getMessageTmpResult = this.brokerController.getMessageStore().getMessage(requestHeader.getConsumerGroup()
                                                                                     , topic, queueId, offset,
                                                                                     requestHeader.getMaxMsgNums() - getMessageResult.getMessageMapedList().size(), messageFilter);
            if (getMessageTmpResult == null) {
                return this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId) - offset + restNum;
            }
            // maybe store offset is not correct.
            if (GetMessageStatus.OFFSET_TOO_SMALL.equals(getMessageTmpResult.getStatus())
                || GetMessageStatus.OFFSET_OVERFLOW_BADLY.equals(getMessageTmpResult.getStatus())
                || GetMessageStatus.OFFSET_FOUND_NULL.equals(getMessageTmpResult.getStatus())) {
                // commit offset, because the offset is not correct
                // If offset in store is greater than cq offset, it will cause duplicate messages,
                // because offset in PopBuffer is not committed.
                POP_LOGGER.warn("Pop initial offset, because store is no correct, {}, {}->{}",
                                lockKey, offset, getMessageTmpResult.getNextBeginOffset());
                offset = getMessageTmpResult.getNextBeginOffset();
                this.brokerController.getConsumerOffsetManager().commitOffset(channel.remoteAddress().toString(), requestHeader.getConsumerGroup(), topic,
                                                                              queueId, offset);
                getMessageTmpResult =
                    this.brokerController.getMessageStore().getMessage(requestHeader.getConsumerGroup(), topic,
                                                                       queueId, offset,
                                                                       requestHeader.getMaxMsgNums() - getMessageResult.getMessageMapedList().size(), messageFilter);
            }
    
            // 計算隊列還剩下的消息數量
            restNum = getMessageTmpResult.getMaxOffset() - getMessageTmpResult.getNextBeginOffset() + restNum;
            if (!getMessageTmpResult.getMessageMapedList().isEmpty()) {
                // 更新統計數據
                this.brokerController.getBrokerStatsManager().incBrokerGetNums(getMessageTmpResult.getMessageCount());
                this.brokerController.getBrokerStatsManager().incGroupGetNums(requestHeader.getConsumerGroup(), topic,
                                                                              getMessageTmpResult.getMessageCount());
                this.brokerController.getBrokerStatsManager().incGroupGetSize(requestHeader.getConsumerGroup(), topic,
                                                                              getMessageTmpResult.getBufferTotalSize());
    
                if (isOrder) {
                    // 順序消費,更新偏移量
                    int count = brokerController.getConsumerOrderInfoManager().update(topic,
                                                                                      requestHeader.getConsumerGroup(),
                                                                                      queueId, getMessageTmpResult.getMessageQueueOffset());
                    this.brokerController.getConsumerOffsetManager().commitOffset(channel.remoteAddress().toString(),
                                                                                  requestHeader.getConsumerGroup(), topic, queueId, offset);
                    ExtraInfoUtil.buildOrderCountInfo(orderCountInfo, isRetry, queueId, count);
                } else {
                    // 添加 CheckPoint 到內存,用于等待 ACK
                    appendCheckPoint(requestHeader, topic, reviveQid, queueId, offset, getMessageTmpResult, popTime, this.brokerController.getBrokerConfig().getBrokerName());
                }
                ExtraInfoUtil.buildStartOffsetInfo(startOffsetInfo, isRetry, queueId, offset);
                ExtraInfoUtil.buildMsgOffsetInfo(msgOffsetInfo, isRetry, queueId,
                                                 getMessageTmpResult.getMessageQueueOffset());
            } else if ((GetMessageStatus.NO_MATCHED_MESSAGE.equals(getMessageTmpResult.getStatus())
                        || GetMessageStatus.OFFSET_FOUND_NULL.equals(getMessageTmpResult.getStatus())
                        || GetMessageStatus.MESSAGE_WAS_REMOVING.equals(getMessageTmpResult.getStatus())
                        || GetMessageStatus.NO_MATCHED_LOGIC_QUEUE.equals(getMessageTmpResult.getStatus()))
                       && getMessageTmpResult.getNextBeginOffset() > -1) {
                // 沒有拉取到消息,添加假的消息 CheckPoint 到隊列
                popBufferMergeService.addCkMock(requestHeader.getConsumerGroup(), topic, queueId, offset,
                                                requestHeader.getInvisibleTime(), popTime, reviveQid, getMessageTmpResult.getNextBeginOffset(), brokerController.getBrokerConfig().getBrokerName());
                //                this.brokerController.getConsumerOffsetManager().commitOffset(channel.remoteAddress().toString(), requestHeader.getConsumerGroup(), topic,
                //                        queueId, getMessageTmpResult.getNextBeginOffset());
            }
        } catch (Exception e) {
            POP_LOGGER.error("Exception in popMsgFromQueue", e);
        } finally {
            // Pop 完后解鎖
            queueLockManager.unLock(lockKey);
        }
        // 將拉取到的消息放入結果容器中
        if (getMessageTmpResult != null) {
            for (SelectMappedBufferResult mapedBuffer : getMessageTmpResult.getMessageMapedList()) {
                getMessageResult.addMessage(mapedBuffer);
            }
        }
        return restNum;
    }
    

    4.2.3 PopMessageProcessor#appendCheckPoint

    /**
     * 在 POP 拉取消息后調用,添加 CheckPoint,等待 ACK
     *
     * @param requestHeader
     * @param topic POP 的 Topic
     * @param reviveQid Revive 隊列 ID
     * @param queueId POP 的隊列 ID
     * @param offset POP 消息的起始偏移量
     * @param getMessageTmpResult POP 一批消息的結果
     * @param popTime POP 時間
     * @param brokerName
     */
    private void appendCheckPoint(final PopMessageRequestHeader requestHeader,
                                  final String topic, final int reviveQid, final int queueId, final long offset,
                                  final GetMessageResult getMessageTmpResult, final long popTime, final String brokerName) {
        // add check point msg to revive log
        final PopCheckPoint ck = new PopCheckPoint();
        // ... 構造 PopCheckPoint,賦值過程省略
        
        for (Long msgQueueOffset : getMessageTmpResult.getMessageQueueOffset()) {
            // 添加所有拉取的消息的偏移量與起始偏移量的差值
            ck.addDiff((int) (msgQueueOffset - offset));
        }
    
        // 將 Offset 放入內存
        final boolean addBufferSuc = this.popBufferMergeService.addCk(
            ck, reviveQid, -1, getMessageTmpResult.getNextBeginOffset()
        );
    
        if (addBufferSuc) {
            return;
        }
    
        // 放入內存匹配失?。▋却嫫ヅ湮撮_啟),將 Offset 放入內存和磁盤
        this.popBufferMergeService.addCkJustOffset(
            ck, reviveQid, -1, getMessageTmpResult.getNextBeginOffset()
        );
    }
    

    4.3 Broker 端 Ack 消息

    4.3.1 AckMessageProcessor#processRequest

    /**
     * 處理 Ack 消息請求,每次 Ack 一條消息
     *
     * @param channel
     * @param request
     * @param brokerAllowSuspend
     * @return
     * @throws RemotingCommandException
     */
    private RemotingCommand processRequest(final Channel channel, RemotingCommand request,
                                           boolean brokerAllowSuspend) throws RemotingCommandException {
        // 解析請求頭
        final AckMessageRequestHeader requestHeader = (AckMessageRequestHeader) request.decodeCommandCustomHeader(AckMessageRequestHeader.class);
        MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
        AckMsg ackMsg = new AckMsg();
        RemotingCommand response = RemotingCommand.createResponseCommand(ResponseCode.SUCCESS, null);
        response.setOpaque(request.getOpaque());
        // ... 校驗
        
        // 拆分消息句柄字符串
        String[] extraInfo = ExtraInfoUtil.split(requestHeader.getExtraInfo());
    
        // 用請求頭中的信息構造 AckMsg
        ackMsg.setAckOffset(requestHeader.getOffset());
        ackMsg.setStartOffset(ExtraInfoUtil.getCkQueueOffset(extraInfo));
        ackMsg.setConsumerGroup(requestHeader.getConsumerGroup());
        ackMsg.setTopic(requestHeader.getTopic());
        ackMsg.setQueueId(requestHeader.getQueueId());
        ackMsg.setPopTime(ExtraInfoUtil.getPopTime(extraInfo));
        ackMsg.setBrokerName(ExtraInfoUtil.getBrokerName(extraInfo));
    
        int rqId = ExtraInfoUtil.getReviveQid(extraInfo);
    
        this.brokerController.getBrokerStatsManager().incBrokerAckNums(1);
        this.brokerController.getBrokerStatsManager().incGroupAckNums(requestHeader.getConsumerGroup(), requestHeader.getTopic(), 1);
    
        if (rqId == KeyBuilder.POP_ORDER_REVIVE_QUEUE) {
            // ... 順序消息 ACK
        }
    
        // 普通消息 ACK
        // 先嘗試放入內存匹配,成功則直接返回。失敗可能是內存匹配未開啟
        if (this.brokerController.getPopMessageProcessor().getPopBufferMergeService().addAk(rqId, ackMsg)) {
            return response;
        }
    
        // 構造 Ack 消息
        msgInner.setTopic(reviveTopic);
        msgInner.setBody(JSON.toJSONString(ackMsg).getBytes(DataConverter.charset));
        //msgInner.setQueueId(Integer.valueOf(extraInfo[3]));
        msgInner.setQueueId(rqId);
        msgInner.setTags(PopAckConstants.ACK_TAG);
        msgInner.setBornTimestamp(System.currentTimeMillis());
        msgInner.setBornHost(this.brokerController.getStoreHost());
        msgInner.setStoreHost(this.brokerController.getStoreHost());
        // 定時消息,定時到喚醒重試時間投遞
        msgInner.setDeliverTimeMs(ExtraInfoUtil.getPopTime(extraInfo) + ExtraInfoUtil.getInvisibleTime(extraInfo));
        msgInner.getProperties().put(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX, PopMessageProcessor.genAckUniqueId(ackMsg));
        msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
        // 保存 Ack 消息到磁盤
        PutMessageResult putMessageResult = this.brokerController.getEscapeBridge().putMessageToSpecificQueue(msgInner);
        if (putMessageResult.getPutMessageStatus() != PutMessageStatus.PUT_OK
            && putMessageResult.getPutMessageStatus() != PutMessageStatus.FLUSH_DISK_TIMEOUT
            && putMessageResult.getPutMessageStatus() != PutMessageStatus.FLUSH_SLAVE_TIMEOUT
            && putMessageResult.getPutMessageStatus() != PutMessageStatus.SLAVE_NOT_AVAILABLE) {
            POP_LOGGER.error("put ack msg error:" + putMessageResult);
        }
        return response;
    }
    

    4.4 Broker 端 CheckPointAckMsg 匹配

    4.4.1 PopBufferMergeService#addCk

    /**
     * POP 消息后,新增 CheckPoint,放入內存 Buffer
     *
     * @param point
     * @param reviveQueueId
     * @param reviveQueueOffset
     * @param nextBeginOffset
     * @return 是否添加成功
     */
    public boolean addCk(PopCheckPoint point, int reviveQueueId, long reviveQueueOffset, long nextBeginOffset) {
        // key: point.getT() + point.getC() + point.getQ() + point.getSo() + point.getPt()
        if (!brokerController.getBrokerConfig().isEnablePopBufferMerge()) {
            return false;
        }
        // 內存匹配服務是否開啟
        if (!serving) {
            return false;
        }
    
        // 距離下次可重試 Pop 消費的時刻 < 4.5s
        long now = System.currentTimeMillis();
        if (point.getReviveTime() - now < brokerController.getBrokerConfig().getPopCkStayBufferTimeOut() + 1500) {
            if (brokerController.getBrokerConfig().isEnablePopLog()) {
                POP_LOGGER.warn("[PopBuffer]add ck, timeout, {}, {}", point, now);
            }
            return false;
        }
    
        if (this.counter.get() > brokerController.getBrokerConfig().getPopCkMaxBufferSize()) {
            POP_LOGGER.warn("[PopBuffer]add ck, max size, {}, {}", point, this.counter.get());
            return false;
        }
    
        PopCheckPointWrapper pointWrapper = new PopCheckPointWrapper(reviveQueueId, reviveQueueOffset, point, nextBeginOffset);
    
        if (!checkQueueOk(pointWrapper)) {
            return false;
        }
    
        // 將 CheckPoint 放入 Offset 隊列
        putOffsetQueue(pointWrapper);
        // 將 CheckPoint 放入內存 Buffer
        this.buffer.put(pointWrapper.getMergeKey(), pointWrapper);
        this.counter.incrementAndGet();
        if (brokerController.getBrokerConfig().isEnablePopLog()) {
            POP_LOGGER.info("[PopBuffer]add ck, {}", pointWrapper);
        }
        return true;
    }
    

    4.4.2 PopBufferMergeService#addAk

    /**
     * 消息 ACK,與內存中的 CheckPoint 匹配
     *
     * @param reviveQid
     * @param ackMsg
     * @return 是否匹配成功
     */
    public boolean addAk(int reviveQid, AckMsg ackMsg) {
        // 如果未開啟內存匹配,直接返回
        if (!brokerController.getBrokerConfig().isEnablePopBufferMerge()) {
            return false;
        }
        if (!serving) {
            return false;
        }
        try {
            // 根據 ACK 的消息找到內存 Buffer 中的 CheckPoint
            PopCheckPointWrapper pointWrapper = this.buffer.get(ackMsg.getTopic() + ackMsg.getConsumerGroup() + ackMsg.getQueueId() + ackMsg.getStartOffset() + ackMsg.getPopTime() + ackMsg.getBrokerName());
            if (pointWrapper == null) {
                // 找不到 CheckPoint
                if (brokerController.getBrokerConfig().isEnablePopLog()) {
                    POP_LOGGER.warn("[PopBuffer]add ack fail, rqId={}, no ck, {}", reviveQid, ackMsg);
                }
                return false;
            }
    
            // 內存中僅保存 Offset,實際已經保存到磁盤,內存中不處理 ACK 消息的匹配,直接返回
            if (pointWrapper.isJustOffset()) {
                return false;
            }
    
            PopCheckPoint point = pointWrapper.getCk();
            long now = System.currentTimeMillis();
    
            if (point.getReviveTime() - now < brokerController.getBrokerConfig().getPopCkStayBufferTimeOut() + 1500) {
                if (brokerController.getBrokerConfig().isEnablePopLog()) {
                    POP_LOGGER.warn("[PopBuffer]add ack fail, rqId={}, almost timeout for revive, {}, {}, {}", reviveQid, pointWrapper, ackMsg, now);
                }
                return false;
            }
    
            if (now - point.getPopTime() > brokerController.getBrokerConfig().getPopCkStayBufferTime() - 1500) {
                if (brokerController.getBrokerConfig().isEnablePopLog()) {
                    POP_LOGGER.warn("[PopBuffer]add ack fail, rqId={}, stay too long, {}, {}, {}", reviveQid, pointWrapper, ackMsg, now);
                }
                return false;
            }
    
            // 標記該 CheckPoint 已經被 ACK
            int indexOfAck = point.indexOfAck(ackMsg.getAckOffset());
            if (indexOfAck > -1) {
                // 設置 CheckPoint 中被 Ack 消息的 bit 碼表為 1
                markBitCAS(pointWrapper.getBits(), indexOfAck);
            } else {
                POP_LOGGER.error("[PopBuffer]Invalid index of ack, reviveQid={}, {}, {}", reviveQid, ackMsg, point);
                return true;
            }
    
            return true;
        } catch (Throwable e) {
            POP_LOGGER.error("[PopBuffer]add ack error, rqId=" + reviveQid + ", " + ackMsg, e);
        }
    
        return false;
    }
    

    4.4.3 PopBufferMergeService#scan

    /**
     * 掃描內存中的 CheckPoint
     * 把已經匹配或存盤的 CheckPoint 移出 buffer
     * 把已經全部 Ack 的 CheckPoint 存盤
     */
    private void scan() {
        long startTime = System.currentTimeMillis();
        int count = 0, countCk = 0;
        Iterator<Map.Entry<String, PopCheckPointWrapper>> iterator = buffer.entrySet().iterator();
        // 遍歷所有內存中的 CheckPoint
        while (iterator.hasNext()) {
            Map.Entry<String, PopCheckPointWrapper> entry = iterator.next();
            PopCheckPointWrapper pointWrapper = entry.getValue();
    
            // 如果 CheckPoint 已經在磁盤中,或者全部消息都匹配成功,從內存中 buffer 中移除
            // just process offset(already stored at pull thread), or buffer ck(not stored and ack finish)
            if (pointWrapper.isJustOffset() && pointWrapper.isCkStored() || isCkDone(pointWrapper)
                || isCkDoneForFinish(pointWrapper) && pointWrapper.isCkStored()) {
                iterator.remove();
                counter.decrementAndGet();
                continue;
            }
    
            PopCheckPoint point = pointWrapper.getCk();
            long now = System.currentTimeMillis();
    
            // 是否要從內存中移除 CheckPoint
            boolean removeCk = !this.serving;
            // 距離 ReviveTime 時間小于閾值(默認3s)
            // ck will be timeout
            if (point.getReviveTime() - now < brokerController.getBrokerConfig().getPopCkStayBufferTimeOut()) {
                removeCk = true;
            }
    
            // 在內存中時間大于閾值(默認10s)
            // the time stayed is too long
            if (now - point.getPopTime() > brokerController.getBrokerConfig().getPopCkStayBufferTime()) {
                removeCk = true;
            }
    
            if (now - point.getPopTime() > brokerController.getBrokerConfig().getPopCkStayBufferTime() * 2L) {
                POP_LOGGER.warn("[PopBuffer]ck finish fail, stay too long, {}", pointWrapper);
            }
    
            // double check
            if (isCkDone(pointWrapper)) {
                continue;
            } else if (pointWrapper.isJustOffset()) {
                // just offset should be in store.
                if (pointWrapper.getReviveQueueOffset() < 0) {
                    putCkToStore(pointWrapper, false);
                    countCk++;
                }
                continue;
            } else if (removeCk) {
                // 將 CheckPoint 包裝成消息放入磁盤,從內存中移除
                // put buffer ak to store
                if (pointWrapper.getReviveQueueOffset() < 0) {
                    putCkToStore(pointWrapper, false);
                    countCk++;
                }
    
                if (!pointWrapper.isCkStored()) {
                    continue;
                }
    
                // 在內存中移除 CheckPoint 前,把它當中已經 Ack 的消息也作為 Ack 消息存入磁盤
                for (byte i = 0; i < point.getNum(); i++) {
                    // 遍歷 CheckPoint 中消息 bit 碼表每一位,檢查是否已經 Ack 并且沒有存入磁盤
                    // reput buffer ak to store
                    if (DataConverter.getBit(pointWrapper.getBits().get(), i)
                        && !DataConverter.getBit(pointWrapper.getToStoreBits().get(), i)) {
                        if (putAckToStore(pointWrapper, i)) {
                            count++;
                            markBitCAS(pointWrapper.getToStoreBits(), i);
                        }
                    }
                }
    
                if (isCkDoneForFinish(pointWrapper) && pointWrapper.isCkStored()) {
                    if (brokerController.getBrokerConfig().isEnablePopLog()) {
                        POP_LOGGER.info("[PopBuffer]ck finish, {}", pointWrapper);
                    }
                    iterator.remove();
                    counter.decrementAndGet();
                    continue;
                }
            }
        }
    
        // 掃描已經完成的 CheckPoint,為它們提交消息消費進度
        int offsetBufferSize = scanCommitOffset();
    
        scanTimes++;
    
        if (scanTimes >= countOfMinute1) {
            counter.set(this.buffer.size());
            scanTimes = 0;
        }
    }
    

    4.4.4 PopReviveService#consumeReviveMessage

    /**
     * 消費 Revive Topic 中的消息,匹配 ACK 消息和 CheckPoint
     * CK 消息放到 Map 中,ACK 消息根據 Map key 匹配 CK 消息,更新 CK 消息的碼表以完成 ACK
     * 只對 CK 進行標記
     * 消費時間差 2s 內的 CK、ACK 消息,或 4s 沒有消費到新消息
     *
     * @param consumeReviveObj CK 與 ACK 匹配對象,用于 Revive 需要重試 Pop 消費的消息
     */
    protected void consumeReviveMessage(ConsumeReviveObj consumeReviveObj) {
        // CheckPoint 匹配 map,key = point.getTopic() + point.getCId() + point.getQueueId() + point.getStartOffset() + point.getPopTime()
        HashMap<String, PopCheckPoint> map = consumeReviveObj.map;
        long startScanTime = System.currentTimeMillis();
        long endTime = 0;
        // 查詢 ReviveTopic queue 之前的消費進度
        long oldOffset = this.brokerController.getConsumerOffsetManager().queryOffset(PopAckConstants.REVIVE_GROUP, reviveTopic, queueId);
        consumeReviveObj.oldOffset = oldOffset;
        POP_LOGGER.info("reviveQueueId={}, old offset is {} ", queueId, oldOffset);
        long offset = oldOffset + 1;
        // 沒有查詢到消息的次數
        int noMsgCount = 0;
        long firstRt = 0;
        // offset self amend
        while (true) {
            if (!shouldRunPopRevive) {
                POP_LOGGER.info("slave skip scan , revive topic={}, reviveQueueId={}", reviveTopic, queueId);
                break;
            }
            // 查詢一批 Revive Topic 中的消息(32條)
            List<MessageExt> messageExts = getReviveMessage(offset, queueId);
            if (messageExts == null || messageExts.isEmpty()) {
                long old = endTime;
                long timerDelay = brokerController.getMessageStore().getTimerMessageStore().getReadBehind();
                long commitLogDelay = brokerController.getMessageStore().getTimerMessageStore().getEnqueueBehind();
                // move endTime
                if (endTime != 0 && System.currentTimeMillis() - endTime > 3 * PopAckConstants.SECOND && timerDelay <= 0 && commitLogDelay <= 0) {
                    endTime = System.currentTimeMillis();
                }
                POP_LOGGER.info("reviveQueueId={}, offset is {}, can not get new msg, old endTime {}, new endTime {}",
                                queueId, offset, old, endTime);
                // 最后一個 CK 的喚醒時間與第一個 CK 的喚醒時間差大于 2s,中斷消費
                if (endTime - firstRt > PopAckConstants.ackTimeInterval + PopAckConstants.SECOND) {
                    break;
                }
                noMsgCount++;
                // Fixme: why sleep is useful here?
                try {
                    Thread.sleep(100);
                } catch (Throwable ignore) {
                }
                // 連續 4s 沒有消費到新的消息,中斷消費
                if (noMsgCount * 100L > 4 * PopAckConstants.SECOND) {
                    break;
                } else {
                    continue;
                }
            } else {
                noMsgCount = 0;
            }
            if (System.currentTimeMillis() - startScanTime > brokerController.getBrokerConfig().getReviveScanTime()) {
                POP_LOGGER.info("reviveQueueId={}, scan timeout  ", queueId);
                break;
            }
            // 遍歷查詢到的消息
            for (MessageExt messageExt : messageExts) {
                if (PopAckConstants.CK_TAG.equals(messageExt.getTags())) {
                    // 如果是 CheckPoint
                    String raw = new String(messageExt.getBody(), DataConverter.charset);
                    if (brokerController.getBrokerConfig().isEnablePopLog()) {
                        POP_LOGGER.info("reviveQueueId={},find ck, offset:{}, raw : {}", messageExt.getQueueId(), messageExt.getQueueOffset(), raw);
                    }
                    PopCheckPoint point = JSON.parseObject(raw, PopCheckPoint.class);
                    if (point.getTopic() == null || point.getCId() == null) {
                        continue;
                    }
                    // 放入 HashMap,等待 ACK 消息匹配
                    map.put(point.getTopic() + point.getCId() + point.getQueueId() + point.getStartOffset() + point.getPopTime(), point);
                    // 設置 reviveOffset 為 revive 隊列中消息的邏輯 offset
                    point.setReviveOffset(messageExt.getQueueOffset());
                    if (firstRt == 0) {
                        firstRt = point.getReviveTime();
                    }
                } else if (PopAckConstants.ACK_TAG.equals(messageExt.getTags())) {
                    // 如果是 ACK 消息
                    String raw = new String(messageExt.getBody(), DataConverter.charset);
                    if (brokerController.getBrokerConfig().isEnablePopLog()) {
                        POP_LOGGER.info("reviveQueueId={},find ack, offset:{}, raw : {}", messageExt.getQueueId(), messageExt.getQueueOffset(), raw);
                    }
                    AckMsg ackMsg = JSON.parseObject(raw, AckMsg.class);
                    PopCheckPoint point = map.get(ackMsg.getTopic() + ackMsg.getConsumerGroup() + ackMsg.getQueueId() + ackMsg.getStartOffset() + ackMsg.getPopTime());
                    if (point == null) {
                        continue;
                    }
                    // 如果 HashMap 中有 CheckPoint,計算 ACK 的 bit 碼表
                    int indexOfAck = point.indexOfAck(ackMsg.getAckOffset());
                    if (indexOfAck > -1) {
                        // Ack 消息 bit 碼表為 1 的位 Ack 成功
                        point.setBitMap(DataConverter.setBit(point.getBitMap(), indexOfAck, true));
                    } else {
                        POP_LOGGER.error("invalid ack index, {}, {}", ackMsg, point);
                    }
                }
                long deliverTime = messageExt.getDeliverTimeMs();
                if (deliverTime > endTime) {
                    endTime = deliverTime;
                }
            }
            offset = offset + messageExts.size();
        }
        consumeReviveObj.endTime = endTime;
    }
    

    4.4.5 PopReviveService#mergeAndRevive

    /**
     * 匹配消費到的一批 CK 和 ACK 消息,對于沒有成功 ACK 的消息,重發到重試 Topic
     */
    protected void mergeAndRevive(ConsumeReviveObj consumeReviveObj) throws Throwable {
        // 獲取排序后的 CheckPoint 列表
        ArrayList<PopCheckPoint> sortList = consumeReviveObj.genSortList();
    	// ...
        long newOffset = consumeReviveObj.oldOffset;
        for (PopCheckPoint popCheckPoint : sortList) {
            // ...
            // 如果沒有到 Revive 時間,跳過
            if (consumeReviveObj.endTime - popCheckPoint.getReviveTime() <= (PopAckConstants.ackTimeInterval + PopAckConstants.SECOND)) {
                break;
            }
    
            // 從 CK 中解析原 Topic 并檢查該 Topic 是否存在,如果不存在則跳過
            // check normal topic, skip ck , if normal topic is not exist
            String normalTopic = KeyBuilder.parseNormalTopic(popCheckPoint.getTopic(), popCheckPoint.getCId());
            if (brokerController.getTopicConfigManager().selectTopicConfig(normalTopic) == null) {
                POP_LOGGER.warn("reviveQueueId={},can not get normal topic {} , then continue ", queueId, popCheckPoint.getTopic());
                newOffset = popCheckPoint.getReviveOffset();
                continue;
            }
            if (null == brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(popCheckPoint.getCId())) {
                POP_LOGGER.warn("reviveQueueId={},can not get cid {} , then continue ", queueId, popCheckPoint.getCId());
                newOffset = popCheckPoint.getReviveOffset();
                continue;
            }
    
            // 重發 CK 中沒有 Ack 的所有消息
            reviveMsgFromCk(popCheckPoint);
    
            newOffset = popCheckPoint.getReviveOffset();
        }
        // 匹配和重試完成后,更新 ReviveTopic 消費進度
        if (newOffset > consumeReviveObj.oldOffset) {
            if (!shouldRunPopRevive) {
                POP_LOGGER.info("slave skip commit, revive topic={}, reviveQueueId={}", reviveTopic, queueId);
                return;
            }
            this.brokerController.getConsumerOffsetManager().commitOffset(PopAckConstants.LOCAL_HOST, PopAckConstants.REVIVE_GROUP, reviveTopic, queueId, newOffset);
        }
        consumeReviveObj.newOffset = newOffset;
    }
    

    4.4.6 PopReviveService: 重試消息

    /**
     * 重發 CK 中沒有 Ack 的所有消息
     */
    private void reviveMsgFromCk(PopCheckPoint popCheckPoint) throws Throwable {
        // 遍歷 CK 中的所有消息
        for (int j = 0; j < popCheckPoint.getNum(); j++) {
            if (DataConverter.getBit(popCheckPoint.getBitMap(), j)) {
                continue;
            }
    
            // retry msg
            long msgOffset = popCheckPoint.ackOffsetByIndex((byte) j);
            // 查詢 CK 消息對應的真正消息
            MessageExt messageExt = getBizMessage(popCheckPoint.getTopic(), msgOffset, popCheckPoint.getQueueId(), popCheckPoint.getBrokerName());
            if (messageExt == null) {
                POP_LOGGER.warn("reviveQueueId={},can not get biz msg topic is {}, offset is {} , then continue ",
                                queueId, popCheckPoint.getTopic(), msgOffset);
                continue;
            }
            //skip ck from last epoch
            if (popCheckPoint.getPopTime() < messageExt.getStoreTimestamp()) {
                POP_LOGGER.warn("reviveQueueId={},skip ck from last epoch {}", queueId, popCheckPoint);
                continue;
            }
            // 喚醒沒有被 ACK 的消息,發到重試隊列
            reviveRetry(popCheckPoint, messageExt);
        }
    }
    
    /**
     * 根據 CheckPoint 喚醒沒有被 ACK 的消息,發到重試隊列
     *
     * @param popCheckPoint CK
     * @param messageExt 要被重試的消息
     * @throws Exception
     */
    private void reviveRetry(PopCheckPoint popCheckPoint, MessageExt messageExt) throws Exception {
        if (!shouldRunPopRevive) {
            POP_LOGGER.info("slave skip retry , revive topic={}, reviveQueueId={}", reviveTopic, queueId);
            return;
        }
        // 構造新的消息
        MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
        // 喚醒的消息發到重試 Topic
        if (!popCheckPoint.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
            msgInner.setTopic(KeyBuilder.buildPopRetryTopic(popCheckPoint.getTopic(), popCheckPoint.getCId()));
        } else {
            msgInner.setTopic(popCheckPoint.getTopic());
        }
        msgInner.setBody(messageExt.getBody());
        msgInner.setQueueId(0);
        if (messageExt.getTags() != null) {
            msgInner.setTags(messageExt.getTags());
        } else {
            MessageAccessor.setProperties(msgInner, new HashMap<String, String>());
        }
        msgInner.setBornTimestamp(messageExt.getBornTimestamp());
        msgInner.setBornHost(brokerController.getStoreHost());
        msgInner.setStoreHost(brokerController.getStoreHost());
        // 重試次數 += 1
        msgInner.setReconsumeTimes(messageExt.getReconsumeTimes() + 1);
        msgInner.getProperties().putAll(messageExt.getProperties());
        if (messageExt.getReconsumeTimes() == 0 || msgInner.getProperties().get(MessageConst.PROPERTY_FIRST_POP_TIME) == null) {
            msgInner.getProperties().put(MessageConst.PROPERTY_FIRST_POP_TIME, String.valueOf(popCheckPoint.getPopTime()));
        }
        msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
        // 添加 Pop 重試 Topic
        addRetryTopicIfNoExit(msgInner.getTopic(), popCheckPoint.getCId());
        // 保存重試消息到存儲
        PutMessageResult putMessageResult = brokerController.getEscapeBridge().putMessageToSpecificQueue(msgInner);
        if (brokerController.getBrokerConfig().isEnablePopLog()) {
            POP_LOGGER.info("reviveQueueId={},retry msg , ck={}, msg queueId {}, offset {}, reviveDelay={}, result is {} ",
                            queueId, popCheckPoint, messageExt.getQueueId(), messageExt.getQueueOffset(),
                            (System.currentTimeMillis() - popCheckPoint.getReviveTime()) / 1000, putMessageResult);
        }
        if (putMessageResult.getAppendMessageResult() == null || putMessageResult.getAppendMessageResult().getStatus() != AppendMessageStatus.PUT_OK) {
            throw new Exception("reviveQueueId=" + queueId + ",revive error ,msg is :" + msgInner);
        }
        // ... 更新統計數據
        if (brokerController.getPopMessageProcessor() != null) {
            brokerController.getPopMessageProcessor().notifyMessageArriving(
                KeyBuilder.parseNormalTopic(popCheckPoint.getTopic(), popCheckPoint.getCId()),
                popCheckPoint.getCId(),
                -1
            );
        }
    }
    

    參考資料

    • [RIP 19] Server side rebalance, lightweight consumer client support
    • RocketMQ 5.0 POP 消費模式探秘

    歡迎關注公眾號【消息中間件】(middleware-mq),更新消息中間件的源碼解析和最新動態!

    本文由博客一文多發平臺 OpenWrite 發布!


    695856371Web網頁設計師②群 | 喜歡本站的朋友可以收藏本站,或者加入我們大家一起來交流技術!

    自定義皮膚 主體內容背景
    打開支付寶掃碼付款購買視頻教程
    遇到問題聯系客服QQ:419400980
    注冊梁鐘霖個人博客
    图片区乱小说区电影区