中文字幕在线观看,亚洲а∨天堂久久精品9966,亚洲成a人片在线观看你懂的,亚洲av成人片无码网站,亚洲国产精品无码久久久五月天

說說MQ之RocketMQ(二)

2018-10-16    來源:importnew

容器云強(qiáng)勢上線!快速搭建集群,上萬Linux鏡像隨意使用

RocketMQ 的 Java API

RocketMQ 是用 Java 語言開發(fā)的,因此,其 Java API 相對是比較豐富的,當(dāng)然也有部分原因是 RocketMQ 本身提供的功能就比較多。RocketMQ API 提供的功能包括,

  1. 廣播消費(fèi),這個(gè)在之前已經(jīng)提到過;
  2. 消息過濾,支持簡單的 Message Tag 過濾,也支持按 Message Header、body 過濾;
  3. 順序消費(fèi)和亂序消費(fèi),之前也提到過,這里的順序消費(fèi)應(yīng)該指的是普通順序性,這一點(diǎn)與 Kafka 相同;
  4. Pull 模式消費(fèi),這個(gè)是相對 Push 模式來說的,Kafka 就是 Pull 模式消費(fèi);
  5. 事務(wù)消息,這個(gè)好像沒有開源,但是 example 代碼中有示例,總之,不推薦用;
  6. Tag,RocketMQ 在 Topic 下面又分了一層 Tag,用于表示消息類別,可以用來過濾,但是順序性還是以 Topic 來看;

單看功能的話,即使不算事務(wù)消息,也不算 Tag,RocketMQ 也遠(yuǎn)超 Kafka,Kafka 應(yīng)該只實(shí)現(xiàn)了 Pull 模式消費(fèi) + 順序消費(fèi)這2個(gè)功能。RocketMQ 的代碼示例在 rocketmq-example 中,注意,代碼是不能直接運(yùn)行的,因?yàn)樗械拇a都少了設(shè)置 name server 的部分,需要自己手動(dòng)加上,例如,producer.setNamesrvAddr("192.168.232.23:9876");

先來看一下生產(chǎn)者的 API,比較簡單,只有一種,如下,

import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
import com.alibaba.rocketmq.client.producer.MessageQueueSelector;
import com.alibaba.rocketmq.client.producer.SendResult;
import com.alibaba.rocketmq.common.message.Message;
import com.alibaba.rocketmq.common.message.MessageQueue;
import java.util.List;
public class Producer {
    public static void main(String[] args) throws MQClientException, InterruptedException {
        DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
        producer.setNamesrvAddr("192.168.232.23:9876");
        producer.start();
        for (int i = 0; i < 10; i++)
            try {
                {
                    Message msg = new Message("TopicTest1",// topic
                        "TagA",// tag
                        "OrderID188",// key
                        ("RocketMQ "+String.format("%05d", i)).getBytes());// body
                    SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
                        @Override
                        public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                            Integer id = (Integer) arg;
                            int index = id % mqs.size();
                            return mqs.get(index);
                        }
                    }, i));
                    System.out.println(String.format("%05d", i)+sendResult);
                }
            }
            catch (Exception e) {
                e.printStackTrace();
            }
        producer.shutdown();
    }
}

可以發(fā)現(xiàn),相比 Kafka 的 API,只多了 Tag,但實(shí)際上行為有很大不同。Kafka 的生產(chǎn)者客戶端,有同步和異步兩種模式,但都是阻塞模式,send?方法返回發(fā)送狀態(tài)的?Future,可以通過?Future?的?get?方法阻塞獲得發(fā)送狀態(tài)。而 RocketMQ 采用的是同步非阻塞模式,發(fā)送之后立刻返回發(fā)送狀態(tài)(而不是?Future)。正常情況下,兩者使用上差別不大,但是在高可用場景中發(fā)生主備切換的時(shí)候,Kafka 的同步可以等待切換完成并重連,最后返回;而 RocketMQ 只能立刻報(bào)錯(cuò),由生產(chǎn)者選擇是否重發(fā)。所以,在生產(chǎn)者的 API 上,其實(shí) Kafka 是要強(qiáng)一些的。

另外,RocketMQ 可以通過指定?MessageQueueSelector?類的實(shí)現(xiàn)來指定將消息發(fā)送到哪個(gè)分區(qū)去,Kafka 是通過指定生產(chǎn)者的?partitioner.class?參數(shù)來實(shí)現(xiàn)的,靈活性上 RocketMQ 略勝一籌。

再來看消費(fèi)者的API,由于 RocketMQ 的功能比較多,我們先看 Pull 模式消費(fèi)的API,如下,

import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import com.alibaba.rocketmq.client.consumer.DefaultMQPullConsumer;
import com.alibaba.rocketmq.client.consumer.PullResult;
import com.alibaba.rocketmq.client.consumer.store.OffsetStore;
import com.alibaba.rocketmq.client.consumer.store.RemoteBrokerOffsetStore;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.common.message.MessageExt;
import com.alibaba.rocketmq.common.message.MessageQueue;
public class PullConsumer {
    private static final Map<MessageQueue, Long> offseTable = new HashMap<MessageQueue, Long>();
    public static void main(String[] args) throws MQClientException {
        DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("please_rename_unique_group_name_5");
        consumer.setNamesrvAddr("192.168.232.23:9876");
        consumer.start();
        Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("TopicTest1");
        for (MessageQueue mq : mqs) {
            System.out.println("Consume from the queue: " + mq);
            SINGLE_MQ: while (true) {
                try {
                    long offset = consumer.fetchConsumeOffset(mq, true);
                    PullResult pullResult =
                            consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32);
                    if (null != pullResult.getMsgFoundList()) {
                        for (MessageExt messageExt : pullResult.getMsgFoundList()) {
                            System.out.print(new String(messageExt.getBody()));
                            System.out.print(pullResult);
                            System.out.println(messageExt);
                        }
                    }
                    putMessageQueueOffset(mq, pullResult.getNextBeginOffset());
                    switch (pullResult.getPullStatus()) {
                    case FOUND:
                        // TODO
                        break;
                    case NO_MATCHED_MSG:
                        break;
                    case NO_NEW_MSG:
                        break SINGLE_MQ;
                    case OFFSET_ILLEGAL:
                        break;
                    default:
                        break;
                    }
                }
                catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
        consumer.shutdown();
    }
    private static void putMessageQueueOffset(MessageQueue mq, long offset) {
        offseTable.put(mq, offset);
    }
    private static long getMessageQueueOffset(MessageQueue mq) {
        Long offset = offseTable.get(mq);
        if (offset != null)
            return offset;
        return 0;
    }
}

這部分的 API 其實(shí)是和 Kafka 很相似的,唯一不同的是,RocketMQ 需要手工管理 offset 和指定分區(qū),而 Kafka 可以自動(dòng)管理(當(dāng)然也可以手動(dòng)管理),并且不需要指定分區(qū)(分區(qū)是在 Kafka 訂閱的時(shí)候指定的)。例子中,RocketMQ 使用 HashMap 自行管理,也可以用?OffsetStore?接口,提供了兩種管理方式,本地文件和遠(yuǎn)程 Broker。這部分感覺兩者差不多。

下面再看看 Push 模式順序消費(fèi),代碼如下,

import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerOrderly;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
import com.alibaba.rocketmq.common.message.MessageExt;
public class Consumer {
    public static void main(String[] args) throws MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_3");
        consumer.setNamesrvAddr("192.168.232.23:9876");
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        consumer.subscribe("TopicTest1", "TagA || TagC || TagD");
        consumer.registerMessageListener(new MessageListenerOrderly() {
            AtomicLong consumeTimes = new AtomicLong(0);
            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                context.setAutoCommit(false);
                System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);
                this.consumeTimes.incrementAndGet();
                if ((this.consumeTimes.get() % 2) == 0) {
                    return ConsumeOrderlyStatus.SUCCESS;
                }
                else if ((this.consumeTimes.get() % 3) == 0) {
                    return ConsumeOrderlyStatus.ROLLBACK;
                }
                else if ((this.consumeTimes.get() % 4) == 0) {
                    return ConsumeOrderlyStatus.COMMIT;
                }
                else if ((this.consumeTimes.get() % 5) == 0) {
                    context.setSuspendCurrentQueueTimeMillis(3000);
                    return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
                }
                return ConsumeOrderlyStatus.SUCCESS;
            }
        });
        consumer.start();
        System.out.println("Consumer Started.");
    }
}

雖然提供了 Push 模式,RocketMQ 內(nèi)部實(shí)際上還是 Pull 模式的 MQ,Push 模式的實(shí)現(xiàn)應(yīng)該采用的是長輪詢,這點(diǎn)與 Kafka 一樣。使用該方式有幾個(gè)注意的地方,

  1. 接收消息的監(jiān)聽類要使用?MessageListenerOrderly
  2. ConsumeFromWhere?有幾個(gè)參數(shù),表示從頭開始消費(fèi),從尾開始消費(fèi),還是從某個(gè) TimeStamp 開始消費(fèi);
  3. 可以控制 offset 的提交,應(yīng)該就是?context.setAutoCommit(false);?的作用;

控制 offset 提交這個(gè)特性非常有用,某種程度上擴(kuò)展一下,就可以當(dāng)做事務(wù)來用了,看代碼?ConsumeMessageOrderlyService?的實(shí)現(xiàn),其實(shí)并沒有那么復(fù)雜,在不啟用 AutoCommit 的時(shí)候,只有返回?COMMIT?才 commit offset;啟用 AutoCommit 的時(shí)候,返回?COMMIT、ROLLBACK(這個(gè)比較扯)、SUCCESS?的時(shí)候,都 commit offset。

后來發(fā)現(xiàn),commit offset 功能在 Kafka 里面也有提供,使用新的 API,調(diào)用?consumer.commitSync。

再看一個(gè) Push 模式亂序消費(fèi) + 消息過濾的例子,消費(fèi)者的代碼如下,

import java.util.List;
import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.common.message.MessageExt;
public class Consumer {
    public static void main(String[] args) throws InterruptedException, MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroupNamecc4");
        consumer.setNamesrvAddr("192.168.232.23:9876");
        consumer.subscribe("TopicTest1", MessageFilterImpl.class.getCanonicalName());
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                    ConsumeConcurrentlyContext context) {
                System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
        System.out.println("Consumer Started.");
    }
}

這個(gè)例子與之前順序消費(fèi)不同的地方在于,

  1. 接收消息的監(jiān)聽類使用的是?MessageListenerConcurrently;
  2. 回調(diào)方法中,使用的是自動(dòng) offset commit;
  3. 訂閱的時(shí)候增加了消息過濾類?MessageFilterImpl;

消息過濾類?MessageFilterImpl?的代碼如下,

import com.alibaba.rocketmq.common.filter.MessageFilter;
import com.alibaba.rocketmq.common.message.MessageExt;
public class MessageFilterImpl implements MessageFilter {
    @Override
    public boolean match(MessageExt msg) {
        String property = msg.getUserProperty("SequenceId");
        if (property != null) {
            int id = Integer.parseInt(property);
            if ((id % 3) == 0 && (id > 10)) {
                return true;
            }
        }
        return false;
    }
}

RocketMQ 執(zhí)行過濾是在 Broker 端,Broker 所在的機(jī)器會(huì)啟動(dòng)多個(gè) FilterServer 過濾進(jìn)程;Consumer 啟動(dòng)后,會(huì)向 FilterServer 上傳一個(gè)過濾的 Java 類;Consumer 從 FilterServer 拉消息,F(xiàn)ilterServer 將請求轉(zhuǎn)發(fā)給 Broker,F(xiàn)ilterServer 從 Broker 收到消息后,按照 Consumer 上傳的 Java 過濾程序做過濾,過濾完成后返回給 Consumer。這種過濾方法可以節(jié)省網(wǎng)絡(luò)流量,但是增加了 Broker 的負(fù)擔(dān)?上覜]有實(shí)驗(yàn)出來使用過濾的效果,即使是用 github wiki 上的例子8也沒成功,不糾結(jié)了。RocketMQ 的按 Tag 過濾的功能也是在 Broker 上做的過濾,能用,是個(gè)很方便的功能。

還有一種廣播消費(fèi)模式,比較簡單,可以去看代碼,不再列出。

總之,RocketMQ 提供的功能比較多,比 Kafka 多很多易用的 API。

標(biāo)簽: 代碼 網(wǎng)絡(luò)

版權(quán)申明:本站文章部分自網(wǎng)絡(luò),如有侵權(quán),請聯(lián)系:west999com@outlook.com
特別注意:本站所有轉(zhuǎn)載文章言論不代表本站觀點(diǎn)!
本站所提供的圖片等素材,版權(quán)歸原作者所有,如需使用,請與原作者聯(lián)系。

上一篇:說說MQ之RocketMQ(三)

下一篇:攜程實(shí)時(shí)用戶行為系統(tǒng)實(shí)踐