再看rabbitmq的交換器和隊列的關係

最近又要用到rabbitmq,業務上要求服務器只發一次消息,需要多個客戶端都去單獨消費。但我們知道rabbitmq的機制里,每個隊列里的消息只能消費一次,所以客戶端要單獨消費信息,就必須得每個客戶端單獨監聽一個queue。所以我最終想實現的是服務端只聲明exchange,客戶端來創建queue和綁定exchange。但是在看各種rabbitmq博文和討論的時候,我覺得對exchange的模式和queue間的關係講的都不是很清楚。所以我決定自己驗證一下

fanout模式和direct模式

本文主要驗證fanout模式和direct模式下以上猜想是否可行。fanout模式就是大名鼎鼎的廣播模式了,只要queue綁定了fanout的交換器,就可以直接的收到消息,無需routingkey的參与。而direct模式就是通過routing key直接發送到綁定了同樣routing key的隊列中。那麼,在這兩種exchange的模式下,是否都可以實現服務端僅創建exchange,客戶端創建queue並綁定exchange呢?

Direct模式驗證

我們先把交換器、routingkey、隊列的名稱定義好:

  1. 交換器為directTest
  2. routingkey為direct_routing_key
  3. 隊列測試3個,首先測試Direct_test_queue_1,再行測試Direct_test_queue_2,再行測試Direct_test_queue_3

代碼使用spring boot框架快速搭建。我們先規劃好需要幾個類來完成這個事情:

  1. 針對生產者,需要RabbitmqConfig,用來配置exchange的
  2. 針對生產者,需要DirectRabbitSender,用來實現Direct模式的消息發送
  3. 針對消費者,需要DirectConsumerOne,來測試第一個隊列Direct_test_queue_1生成和消息接收
  4. 針對消費者,需要DirectConsumerTwo,來測試第二個隊列Direct_test_queue_2生成和消息接收
  5. 針對消費者,需要DirectConsumerThree,來測試第三個隊列Direct_test_queue_3生成和消息接收
  6. 我們還需要一個測試類RabbitmqApplicationTests,用於測試消息的發送和接收

rabbitmq先配置一個DirectExchange

@Bean
DirectExchange directExchange(){
    return new DirectExchange("directTest", true, false);
}

我們可以看到Direct交換器的名稱定義為了directTest,這時候還未綁定任何的隊列。啟動程序,若我們的設想沒錯,則rabbitmq中應該已經生成了directTest的exchange。

Bingo!directTest交換器成功創建。接下來,我們去編寫DirectRabbitSender的代碼

@Component
public class DirectRabbitSender{

    @Autowired
    private RabbitTemplate rabbitTemplate;

    private final String EXCHANGE_NAME = "directTest";
    private final String ROUTING_KEY = "direct_routing_key";

    public void send(Object message) {
        rabbitTemplate.convertAndSend(EXCHANGE_NAME, ROUTING_KEY, message);
    }

}

我們可以看到代碼中,通過rabbitTemplate發送消息到了交換器為directTest,routingkey為direct_routing_key的地方。但這時候我們沒有任何隊列了,自然接不到消息。現在我們去編寫第一個消費者DirectConsumerOne來接受消息。

@Component
@RabbitListener(bindings = @QueueBinding(
        value = @Queue(value = "Direct_test_queue_1", durable = "true"),
        exchange = @Exchange(value = "directTest"),
        key = "direct_routing_key"
))
public class DirectConsumerOne {

    @RabbitHandler
    private void onMessage(String message){
        System.out.println("監聽隊列Direct_test_queue_1接到消息" + message);
    }

}

通過代碼可以看到,我們通過@QueueBinding把Direct_test_queue_1隊列綁定到了directTest和direct_routing_key上。Direct_test_queue_1並沒有在rabbitmq創建,這並沒有關係。一般來說,@RabbitListener會自動去創建隊列。啟動程序,我們去看一下rabbitmq里隊列是不是創建了。

Bingo!再次驗證成功。我們去看看綁定關係是不是正確。這時候Direct_test_queue_1應該綁定到了名為directTest的交換器,而綁定的routingkey為direct_routing_key

biubiubiu!綁定關係完全正確。到了這裏,我們進行最後一步,寫了單元測試去發送消息,查看控制台中消費者是否成功收到消息。RabbitmqApplicationTests的代碼如下:

@SpringBootTest
class RabbitmqApplicationTests {

    @Autowired
    private DirectRabbitSender directRabbitSender;

    @Test
    void contextLoads() {
    }

    @Test
    public void directSendTest(){
        directRabbitSender.send("direct-sender");
        directRabbitSender.send("direct-sender_test");
    }

}

啟動測試類,然後去查看控制台。

沒錯,這就是我們想要達到的效果!基本可以宣布Direct模式驗證成功。服務端生成exchange,客戶端去生成隊列綁定的方式在direct模式下完全可行。為了保險起見,再驗證一下生成多個消費者綁定到同一個隊列是否可行。

DirectConsumerTwo代碼如下:

@Component
@RabbitListener(bindings = @QueueBinding(
        value = @Queue(value = "Direct_test_queue_2", durable = "true"),
        exchange = @Exchange(value = "directTest"),
        key = "direct_routing_key"
))
public class DirectConsumerTwo {

    @RabbitHandler
    private void onMessage(String message){
        System.out.println("監聽隊列Direct_test_queue_2接到消息" + message);
    }

}

DirectConsumerThree代碼如下:

@Component
@RabbitListener(bindings = @QueueBinding(
        value = @Queue(value = "Direct_test_queue_3", durable = "true"),
        exchange = @Exchange(value = "directTest"),
        key = "direct_routing_key"
))
public class DirectConsumerThree {

    @RabbitHandler
    private void onMessage(String message){
        System.out.println("監聽隊列Direct_test_queue_3接到消息" + message);
    }

}

啟動測試類,我們去看兩個地方:

  1. rabbitmq是否創建了客戶端綁定的三個隊列Direct_test_queue_1、Direct_test_queue_2、Direct_test_queue_3
  2. 消費者應該各自收到2條消息(Test中發送了兩條,參看上面 RabbitmqApplicationTests 的代碼)。那3個隊列,控制台中應該打印了6條消息。

hohohoho!創建成功,並且綁定關係我看了也全都正確。我們去看控制台

6條!沒有任何毛病,至此,可以宣布Direct模式下,完全支持我們最初的想法:服務端生成exchange,客戶端去生成隊列綁定的方式在direct模式下完全可行。

fanout模式驗證

接下來我們驗證一下fanout的方式,基本操作流程和Direct模式一致。代碼的結構也差不多:

  1. 針對生產者,需要RabbitmqConfig,直接在Direct模式下的rabbitmqConfig里直接添加Fanout的交換器配置
  2. 針對生產者,需要FanoutRabbitSender,用來實現Fanout模式的消息發送
  3. 針對消費者,需要FanoutConsumerOne,來測試第一個隊列Fanout_test_queue_1生成和消息接收
  4. 針對消費者,需要FanoutConsumerTwo,來測試第二個隊列Fanout_test_queue_2生成和消息接收
  5. 針對消費者,需要FanoutConsumerThree,來測試第三個隊列Fanout_test_queue_3生成和消息接收
  6. 測試類RabbitmqApplicationTests也直接復用Direact模式下測試的類

我就不多BB,直接上代碼了。

RabbitmqConfig代碼如下

@Configuration
public class RabbitmqConfig {

    @Bean
    DirectExchange directExchange(){
        return new DirectExchange("directTest", true, false);
    }

    @Bean
    FanoutExchange fanoutExchange(){
        return new FanoutExchange("fanoutTest", true, false);
    }

}

FanoutRabbitSender的代碼如下,此處和direct模式的區別是Fanout中沒有routingkey,所以代碼里也沒定義routingkey:

@Component
public class FanoutRabbitSender{

    @Autowired
    private RabbitTemplate rabbitTemplate;

    private final String EXCHANGE_NAME = "fanoutTest";

    public void send(Object message) {
        rabbitTemplate.convertAndSend(EXCHANGE_NAME, null, message);
    }

}

我們到這裏先啟動程序試試,看看fanoutTest的交換器在沒有綁定隊列的情況下是否生成了。

棒棒棒!和我們想的一樣,那接下來去寫完所有的消費者,這裏和Direct模式最重要的區別是@Exchange中必須要指定type為fanout。direct模式的代碼里沒指定是因為@Exchange的type默認值就是direct。我直接上代碼了:

/**
 * 監聽器主動去聲明queue=fanout_test_queue_1,並綁定到fanoutTest交換器
 */
@Component
@RabbitListener(bindings = @QueueBinding(
        value = @Queue(value = "fanout_test_queue_1", durable = "true"),
        exchange = @Exchange(value = "fanoutTest", type = ExchangeTypes.FANOUT)
))
public class FanoutConsumerOne {

    @RabbitHandler
    private void onMessage(String message){
        System.out.println("監聽隊列fanout_test_queue_1接到消息" + message);
    }

}

@Component
@RabbitListener(bindings = @QueueBinding(
        value = @Queue(value = "fanout_test_queue_2", durable = "true"),
        exchange = @Exchange(value = "fanoutTest", type = ExchangeTypes.FANOUT)
))
public class FanoutConsumerTwo {

    @RabbitHandler
    private void onMessage(String message){
        System.out.println("監聽隊列fanout_test_queue_2接到消息" + message);
    }

}

@Component
@RabbitListener(bindings = @QueueBinding(
        value = @Queue(value = "fanout_test_queue_3", durable = "true"),
        exchange = @Exchange(value = "fanoutTest", type = ExchangeTypes.FANOUT)
))
public class FanoutConsumerThree {

    @RabbitHandler
    private void onMessage(String message){
        System.out.println("監聽隊列fanout_test_queue_3接到消息" + message);
    }

}

接着去測試類RabbitmqApplicationTests中加上fanout的發送測試,然後註釋掉direct的單元測試,以便一會造成干擾

@SpringBootTest
class RabbitmqApplicationTests {

    @Autowired
    private DirectRabbitSender directRabbitSender;

    @Autowired
    private FanoutRabbitSender fanoutRabbitSender;

    @Test
    void contextLoads() {
    }

//    @Test
//    public void directSendTest(){
//        directRabbitSender.send("direct-sender");
//        directRabbitSender.send("direct-sender_test");
//    }

    @Test
    public void fanoutSendTest(){
        fanoutRabbitSender.send("fanout-sender_1");
        fanoutRabbitSender.send("fanout-sender_2");
    }

}

代碼都完成了,現在我們啟動測試類,看看控制台是否正常收到了消息

看圖看圖,fanout模式下也完全認證成功!!!那我們可以宣布,文章開頭的猜想完全可以實現。

總結

服務端只聲明exchange,客戶端來創建queue和綁定exchange的方式完全可行。並且在Direct和Fanout模式下都可行。

那我們可以推測在Header模式的交換器和Topic模式的交換器下應該也大差不差。具體各位可自行驗證,基本流程和上面direct和fanout的流程差不多。

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

網頁設計公司推薦不同的風格,搶佔消費者視覺第一線

※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益

※自行創業缺乏曝光? 網頁設計幫您第一時間規劃公司的形象門面

南投搬家公司費用需注意的眉眉角角,別等搬了再說!

新北清潔公司,居家、辦公、裝潢細清專業服務

※教你寫出一流的銷售文案?