中文字幕在线观看,亚洲а∨天堂久久精品9966,亚洲成a人片在线观看你懂的,亚洲av成人片无码网站,亚洲国产精品无码久久久五月天

Spring Boot與RabbitMQ結(jié)合實(shí)現(xiàn)延遲隊列

2018-07-20    來源:編程學(xué)習(xí)網(wǎng)

容器云強(qiáng)勢上線!快速搭建集群,上萬Linux鏡像隨意使用

顧名思義,延遲隊列就是進(jìn)入該隊列的消息會被延遲消費(fèi)的隊列。而一般的隊列,消息一旦入隊了之后就會被消費(fèi)者馬上消費(fèi)。

延遲隊列能做什么?

延遲隊列多用于需要延遲工作的場景。最常見的是以下兩種場景:

  1. 延遲消費(fèi)。比如:
    • 用戶生成訂單之后,需要過一段時間校驗(yàn)訂單的支付狀態(tài),如果訂單仍未支付則需要及時地關(guān)閉訂單。
    • 用戶注冊成功之后,需要過一段時間比如一周后校驗(yàn)用戶的使用情況,如果發(fā)現(xiàn)用戶活躍度較低,則發(fā)送郵件或者短信來提醒用戶使用。
  2. 延遲重試。比如消費(fèi)者從隊列里消費(fèi)消息時失敗了,但是想要延遲一段時間后自動重試。

如果不使用延遲隊列,那么我們只能通過一個輪詢掃描程序去完成。這種方案既不優(yōu)雅,也不方便做成統(tǒng)一的服務(wù)便于開發(fā)人員使用。但是使用延遲隊列的話,我們就可以輕而易舉地完成。

如何實(shí)現(xiàn)?

別急,在下文中,我們將詳細(xì)介紹如何利用 Spring BootRabbitMQ 來實(shí)現(xiàn)延遲隊列。

本文出現(xiàn)的示例代碼都已push到Github倉庫中: https://github.com/Lovelcp/blog-demos/tree/master/spring-boot-rabbitmq-delay-queue

實(shí)現(xiàn)思路

在介紹具體的實(shí)現(xiàn)思路之前,我們先來介紹一下RabbitMQ的兩個特性,一個是Time-To-Live Extensions,另一個是Dead Letter Exchanges。

Time-To-Live Extensions

RabbitMQ允許我們?yōu)橄⒒蛘哧犃性O(shè)置TTL(time to live),也就是過期時間。TTL表明了一條消息可在隊列中存活的最大時間,單位為毫秒。也就是說,當(dāng)某條消息被設(shè)置了TTL或者當(dāng)某條消息進(jìn)入了設(shè)置了TTL的隊列時,這條消息會在經(jīng)過TTL秒后“死亡”,成為Dead Letter。如果既配置了消息的TTL,又配置了隊列的TTL,那么較小的那個值會被取用。更多資料請查閱 官方文檔 。

Dead Letter Exchange

剛才提到了,被設(shè)置了TTL的消息在過期后會成為Dead Letter。其實(shí)在RabbitMQ中,一共有三種消息的“死亡”形式:

  1. 消息被拒絕。通過調(diào)用basic.reject或者basic.nack并且設(shè)置的requeue參數(shù)為false。
  2. 消息因?yàn)樵O(shè)置了TTL而過期。
  3. 消息進(jìn)入了一條已經(jīng)達(dá)到最大長度的隊列。

如果隊列設(shè)置了Dead Letter Exchange(DLX),那么這些Dead Letter就會被重新publish到Dead Letter Exchange,通過Dead Letter Exchange路由到其他隊列。更多資料請查閱 官方文檔 。

流程圖

聰明的你肯定已經(jīng)想到了,如何將RabbitMQ的TTL和DLX特性結(jié)合在一起,實(shí)現(xiàn)一個延遲隊列。

針對于上述的延遲隊列的兩個場景,我們分別有以下兩種流程圖:

延遲消費(fèi)

延遲消費(fèi)是延遲隊列最為常用的使用模式。如下圖所示,生產(chǎn)者產(chǎn)生的消息首先會進(jìn)入緩沖隊列(圖中紅色隊列)。通過RabbitMQ提供的TTL擴(kuò)展,這些消息會被設(shè)置過期時間,也就是延遲消費(fèi)的時間。等消息過期之后,這些消息會通過配置好的DLX轉(zhuǎn)發(fā)到實(shí)際消費(fèi)隊列(圖中藍(lán)色隊列),以此達(dá)到延遲消費(fèi)的效果。

延遲重試

延遲重試本質(zhì)上也是延遲消費(fèi)的一種,但是這種模式的結(jié)構(gòu)與普通的延遲消費(fèi)的流程圖較為不同,所以單獨(dú)拎出來介紹。

如下圖所示,消費(fèi)者發(fā)現(xiàn)該消息處理出現(xiàn)了異常,比如是因?yàn)榫W(wǎng)絡(luò)波動引起的異常。那么如果不等待一段時間,直接就重試的話,很可能會導(dǎo)致在這期間內(nèi)一直無法成功,造成一定的資源浪費(fèi)。那么我們可以將其先放在緩沖隊列中(圖中紅色隊列),等消息經(jīng)過一段的延遲時間后再次進(jìn)入實(shí)際消費(fèi)隊列中(圖中藍(lán)色隊列),此時由于已經(jīng)過了“較長”的時間了,異常的一些波動通常已經(jīng)恢復(fù),這些消息可以被正常地消費(fèi)。

代碼實(shí)現(xiàn)

接下來我們將介紹如何在Spring Boot中實(shí)現(xiàn)基于RabbitMQ的延遲隊列。我們假設(shè)讀者已經(jīng)擁有了Spring Boot與RabbitMQ的基本知識。如果想快速了解Spring Boot的相關(guān)基礎(chǔ)知識,可以參考我之前寫的一篇文章。

初始化工程

首先我們在Intellij中創(chuàng)建一個Spring Boot工程,并且添加 spring-boot-starter-amqp 擴(kuò)展。

配置隊列

從上述的流程圖中我們可以看到,一個延遲隊列的實(shí)現(xiàn),需要一個緩沖隊列以及一個實(shí)際的消費(fèi)隊列。又由于在RabbitMQ中,我們擁有兩種消息過期的配置方式,所以在代碼中,我們一共配置了三條隊列:

  • delay_queue_per_message_ttl:TTL配置在消息上的緩沖隊列。
  • delay_queue_per_queue_ttl:TTL配置在隊列上的緩沖隊列。
  • delay_process_queue:實(shí)際消費(fèi)隊列。

我們通過Java Config的方式將上述的隊列配置為Bean。由于我們添加了 spring-boot-starter-amqp 擴(kuò)展,Spring Boot在啟動時會根據(jù)我們的配置自動創(chuàng)建這些隊列。為了方便接下來的測試,我們將delay_queue_per_message_ttl以及delay_queue_per_queue_ttl的DLX配置為同一個,且過期的消息都會通過DLX轉(zhuǎn)發(fā)到delay_process_queue。

delay_queue_per_message_ttl

首先介紹delay_queue_per_message_ttl的配置代碼:

@Bean
QueuedelayQueuePerMessageTTL(){
    return QueueBuilder.durable(DELAY_QUEUE_PER_MESSAGE_TTL_NAME)
                       .withArgument("x-dead-letter-exchange", DELAY_EXCHANGE_NAME) // DLX,dead letter發(fā)送到的exchange
                       .withArgument("x-dead-letter-routing-key", DELAY_PROCESS_QUEUE_NAME) // dead letter攜帶的routing key
                       .build();
}

其中, x-dead-letter-exchange 聲明了隊列里的死信轉(zhuǎn)發(fā)到的DLX名稱, x-dead-letter-routing-key 聲明了這些死信在轉(zhuǎn)發(fā)時攜帶的routing-key名稱。

delay_queue_per_queue_ttl

類似地,delay_queue_per_queue_ttl的配置代碼:

@Bean
QueuedelayQueuePerQueueTTL(){
    return QueueBuilder.durable(DELAY_QUEUE_PER_QUEUE_TTL_NAME)
                       .withArgument("x-dead-letter-exchange", DELAY_EXCHANGE_NAME) // DLX
                       .withArgument("x-dead-letter-routing-key", DELAY_PROCESS_QUEUE_NAME) // dead letter攜帶的routing key
                       .withArgument("x-message-ttl", QUEUE_EXPIRATION) // 設(shè)置隊列的過期時間
                       .build();
}

delay_queue_per_queue_ttl隊列的配置比delay_queue_per_message_ttl隊列的配置多了一個 x-message-ttl ,該配置用來設(shè)置隊列的過期時間。

delay_process_queue

delay_process_queue的配置最為簡單:

@Bean
QueuedelayProcessQueue(){
    return QueueBuilder.durable(DELAY_PROCESS_QUEUE_NAME)
                       .build();
}

配置Exchange

配置DLX

首先,我們需要配置DLX,代碼如下:

@Bean
DirectExchangedelayExchange(){
    return new DirectExchange(DELAY_EXCHANGE_NAME);
}

然后再將該DLX綁定到實(shí)際消費(fèi)隊列即delay_process_queue上。這樣所有的死信都會通過DLX被轉(zhuǎn)發(fā)到delay_process_queue:

@Bean
BindingdlxBinding(Queue delayProcessQueue, DirectExchange delayExchange){
    return BindingBuilder.bind(delayProcessQueue)
                         .to(delayExchange)
                         .with(DELAY_PROCESS_QUEUE_NAME);
}

配置延遲重試所需的Exchange

從延遲重試的流程圖中我們可以看到,消息處理失敗之后,我們需要將消息轉(zhuǎn)發(fā)到緩沖隊列,所以緩沖隊列也需要綁定一個Exchange。 在本例中,我們將delay_process_per_queue_ttl作為延遲重試?yán)锏木彌_隊列 。具體代碼是如何配置的,這里就不贅述了,大家可以查閱我 Github 中的代碼。

定義消費(fèi)者

我們創(chuàng)建一個最簡單的消費(fèi)者ProcessReceiver,這個消費(fèi)者監(jiān)聽delay_process_queue隊列,對于接受到的消息,他會:

  • 如果消息里的消息體不等于FAIL_MESSAGE,那么他會輸出消息體。
  • 如果消息里的消息體恰好是FAIL_MESSAGE,那么他會模擬拋出異常,然后將該消息重定向到緩沖隊列(對應(yīng)延遲重試場景)。

另外,我們還需要新建一個監(jiān)聽容器用于存放消費(fèi)者,代碼如下:

@Bean
SimpleMessageListenerContainerprocessContainer(ConnectionFactory connectionFactory, ProcessReceiver processReceiver){
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
    container.setConnectionFactory(connectionFactory);
    container.setQueueNames(DELAY_PROCESS_QUEUE_NAME); // 監(jiān)聽delay_process_queue
    container.setMessageListener(new MessageListenerAdapter(processReceiver));
    return container;
}

至此,我們前置的配置代碼已經(jīng)全部編寫完成,接下來我們需要編寫測試用例來測試我們的延遲隊列。

編寫測試用例

延遲消費(fèi)場景

首先我們編寫用于測試TTL設(shè)置在消息上的測試代碼。

我們借助 spring-rabbit 包下提供的RabbitTemplate類來發(fā)送消息。由于我們添加了 spring-boot-starter-amqp 擴(kuò)展,Spring Boot會在初始化時自動地將RabbitTemplate當(dāng)成bean加載到容器中。

解決了消息的發(fā)送問題,那么又該如何為每個消息設(shè)置TTL呢?這里我們需要借助MessagePostProcessor。MessagePostProcessor通常用來設(shè)置消息的Header以及消息的屬性。我們新建一個ExpirationMessagePostProcessor類來負(fù)責(zé)設(shè)置消息的TTL屬性:

/**
 * 設(shè)置消息的失效時間
 */
public class ExpirationMessagePostProcessorimplements MessagePostProcessor{
    private final Long ttl; // 毫秒

    public ExpirationMessagePostProcessor(Long ttl){
        this.ttl = ttl;
    }

    @Override
    public Message postProcessMessage(Message message)throws AmqpException {
        message.getMessageProperties()
               .setExpiration(ttl.toString()); // 設(shè)置per-message的失效時間
        return message;
    }
}

然后在調(diào)用RabbitTemplate的convertAndSend方法時,傳入ExpirationMessagePostPorcessor即可。我們向緩沖隊列中發(fā)送3條消息,過期時間依次為1秒,2秒和3秒。具體的代碼如下所示:

@Test
public void testDelayQueuePerMessageTTL()throws InterruptedException {
    ProcessReceiver.latch = new CountDownLatch(3);
    for (int i = 1; i <= 3; i++) {
        long expiration = i * 1000;
        rabbitTemplate.convertAndSend(QueueConfig.DELAY_QUEUE_PER_MESSAGE_TTL_NAME,
                (Object) ("Message From delay_queue_per_message_ttl with expiration " + expiration), new ExpirationMessagePostProcessor(expiration));
    }
    ProcessReceiver.latch.await();
}

細(xì)心的朋友一定會問,為什么要在代碼中加一個CountDownLatch呢?這是因?yàn)槿绻麤]有l(wèi)atch阻塞住測試方法的話,測試用例會直接結(jié)束,程序退出,我們就看不到消息被延遲消費(fèi)的表現(xiàn)了。

那么類似地,測試TTL設(shè)置在隊列上的代碼如下:

@Test
public void testDelayQueuePerQueueTTL()throws InterruptedException {
    ProcessReceiver.latch = new CountDownLatch(3);
    for (int i = 1; i <= 3; i++) {
        rabbitTemplate.convertAndSend(QueueConfig.DELAY_QUEUE_PER_QUEUE_TTL_NAME,
                "Message From delay_queue_per_queue_ttl with expiration " + QueueConfig.QUEUE_EXPIRATION);
    }
    ProcessReceiver.latch.await();
}

我們向緩沖隊列中發(fā)送3條消息。理論上這3條消息會在4秒后同時過期。

延遲重試場景

我們同樣還需測試延遲重試場景。

@Test
public void testFailMessage()throws InterruptedException {
    ProcessReceiver.latch = new CountDownLatch(6);
    for (int i = 1; i <= 3; i++) {
        rabbitTemplate.convertAndSend(QueueConfig.DELAY_PROCESS_QUEUE_NAME, ProcessReceiver.FAIL_MESSAGE);
    }
    ProcessReceiver.latch.await();
}

我們向delay_process_queue發(fā)送3條會觸發(fā)FAIL的消息,理論上這3條消息會在4秒后自動重試。

查看測試結(jié)果

延遲消費(fèi)場景

延遲消費(fèi)的場景測試我們分為了TTL設(shè)置在消息上和TTL設(shè)置在隊列上兩種。首先,我們先看一下TTL設(shè)置在消息上的測試結(jié)果:

從上圖中我們可以看到,ProcessReceiver分別經(jīng)過1秒、2秒、3秒收到消息。測試結(jié)果表明消息不僅被延遲消費(fèi)了,而且每條消息的延遲時間是可以被個性化設(shè)置的。TTL設(shè)置在消息上的延遲消費(fèi)場景測試成功。

然后,TTL設(shè)置在隊列上的測試結(jié)果如下圖:

從上圖中我們可以看到,ProcessReceiver經(jīng)過了4秒的延遲之后,同時收到了3條消息。測試結(jié)果表明消息不僅被延遲消費(fèi)了,同時也證明了當(dāng)TTL設(shè)置在隊列上的時候,消息的過期時間是固定的。TTL設(shè)置在隊列上的延遲消費(fèi)場景測試成功。

延遲重試場景

接下來,我們再來看一下延遲重試的測試結(jié)果:

ProcessReceiver首先收到了3條會觸發(fā)FAIL的消息,然后將其移動到緩沖隊列之后,過了4秒,又收到了剛才的那3條消息。延遲重試場景測試成功。

總結(jié)

本文首先介紹了延遲隊列的概念以及用途,并且通過代碼詳細(xì)講解了如何通過Spring Boot和RabbitMQ實(shí)現(xiàn)一個延遲隊列。希望本文能夠?qū)Υ蠹移綍r的學(xué)習(xí)和工作能有所啟發(fā)和幫助。

 

 

來自:http://www.kissyu.org/2017/11/18/Spring Boot與RabbitMQ結(jié)合實(shí)現(xiàn)延遲隊列/

 

標(biāo)簽: 代碼 網(wǎng)絡(luò)

版權(quán)申明:本站文章部分自網(wǎng)絡(luò),如有侵權(quán),請聯(lián)系:west999com@outlook.com
特別注意:本站所有轉(zhuǎn)載文章言論不代表本站觀點(diǎn)!
本站所提供的圖片等素材,版權(quán)歸原作者所有,如需使用,請與原作者聯(lián)系。

上一篇:寫出優(yōu)質(zhì)Java代碼的4個技巧

下一篇:Google S2 中的四叉樹求 LCA 最近公共祖先