Topics模式,官方的解释是Receiving messages based on a pattern (topics),它的结构是
消费者各自监控自己的队列;交换机通过一种模式策略确定生产者的消息放入那个队列。
详细介绍请参照: 中的主题模式(Topic Exchange)
一、编写代码
1、编写常量类RabbitTopicConstant
package com.lvgang.springbootrabbitmq.topic;/** * @author lvgang */public class RabbitTopicConstant { public static final String QUEUQ_A = "Queue_Topic_A"; public static final String QUEUQ_B = "Queue_Topic_B"; //*(星号):可以(只能)匹配一个单词 //#(井号):可以匹配多个单词(或者零个) public static final String QUEUQ_KEY_A = "topic.*"; public static final String QUEUQ_KEY_B = "topic.#"; public static final String EXCHANGE = "Exchange_Topic";}
2、编写配置类RabbitTopicConfig
package com.lvgang.springbootrabbitmq.topic;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.amqp.core.Binding;import org.springframework.amqp.core.BindingBuilder;import org.springframework.amqp.core.Queue;import org.springframework.amqp.core.TopicExchange;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;/** * @author lvgang */@Configurationpublic class RabbitTopicConfig { private static Logger logger = LoggerFactory.getLogger(RabbitTopicConfig.class); /** * Queue 可以有4个参数 * 1.队列名 * 2.durable 持久化消息队列 ,rabbitmq重启的时候不需要创建新的队列 默认true * 3.auto-delete 表示消息队列没有在使用时将被自动删除 默认是false * 4.exclusive 表示该消息队列是否只在当前connection生效,默认是false */ @Bean public Queue createTopicQueueA() { logger.info("创建TopicQueueA成功"); return new Queue(RabbitTopicConstant.QUEUQ_A,true); } @Bean public Queue createTopicQueueB() { logger.info("创建TopicQueueB成功"); return new Queue(RabbitTopicConstant.QUEUQ_B,true); } @Bean public TopicExchange topicExchange() { //配置广播路由器 logger.info("创建TopicExchange成功"); return new TopicExchange(RabbitTopicConstant.EXCHANGE); } @Bean public Binding bingQueueAToTopicExchange() { logger.info("绑定TopicQueueA到TopicExchange成功"); return BindingBuilder.bind(createTopicQueueA()).to(topicExchange()).with(RabbitTopicConstant.QUEUQ_KEY_A); } @Bean public Binding bingQueueBToTopicExchange() { logger.info("绑定TopicQueueB到TopicExchange成功"); return BindingBuilder.bind(createTopicQueueB()).to(topicExchange()).with(RabbitTopicConstant.QUEUQ_KEY_B); }}
3、编写消息生产者TopicSender
package com.lvgang.springbootrabbitmq.topic;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Component;import java.util.Date;import java.util.UUID;/** * @author lvgang */@Componentpublic class TopicSender { private static Logger logger = LoggerFactory.getLogger(TopicSender.class); @Autowired private RabbitTemplate rabbitTemplate; public void send(int i) { String contentA = "ExchangeTopic A="+ i+"," + new Date() + ", content= " + UUID.randomUUID().toString(); //消息发送,使用void convertAndSend(String exchange, String routingKey, Object message) throws AmqpException; //但不指定routingKey。因为FanoutExchange类型的交换机,routingKey不起作用,它向所有的队列发送广播,只要队列绑定到该交换机即接受消息。 this.rabbitTemplate.convertAndSend(RabbitTopicConstant.EXCHANGE,"topic.n",contentA); logger.info("Send ok,"+new Date()+","+contentA); String contentB = "ExchangeTopic B="+ i+"," + new Date() + ", content= " + UUID.randomUUID().toString(); //消息发送,使用void convertAndSend(String exchange, String routingKey, Object message) throws AmqpException; //但不指定routingKey。因为FanoutExchange类型的交换机,routingKey不起作用,它向所有的队列发送广播,只要队列绑定到该交换机即接受消息。 this.rabbitTemplate.convertAndSend(RabbitTopicConstant.EXCHANGE,"topic.n.n",contentB); logger.info("Send ok,"+new Date()+","+contentB); }}
4、编写消息消费者A TopicReceiverA
package com.lvgang.springbootrabbitmq.topic;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.amqp.rabbit.annotation.RabbitHandler;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;import java.util.Date;/** * @author lvgang */@Component@RabbitListener(queues = RabbitTopicConstant.QUEUQ_A)public class TopicReceiverA { private static Logger logger = LoggerFactory.getLogger(TopicReceiverA.class); @RabbitHandler public void process(String message) { logger.info("ReceiverA : " + message +","+ new Date()); }}
5、编写消息消费者B TopicReceiverB
package com.lvgang.springbootrabbitmq.topic;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.amqp.rabbit.annotation.RabbitHandler;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;import java.util.Date;/** * @author lvgang */@Component@RabbitListener(queues = RabbitTopicConstant.QUEUQ_B)public class TopicReceiverB { private static Logger logger = LoggerFactory.getLogger(TopicReceiverB.class); @RabbitHandler public void process(String message) { logger.info("ReceiverB : " + message +","+ new Date()); }}
二、测试结果
1、编写测试类TopicTests
package com.lvgang.springbootrabbitmq.topicTests;import com.lvgang.springbootrabbitmq.topic.TopicSender;import org.junit.Test;import org.junit.runner.RunWith;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.boot.test.context.SpringBootTest;import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;@RunWith(SpringJUnit4ClassRunner.class)@SpringBootTestpublic class TopicTests { @Autowired private TopicSender topicSender; @Test public void hello() throws Exception { int i=1; while (true) { try { if(i<=1) { topicSender.send(i); } i++; Thread.sleep(1000); } catch (Exception e) { ; } } }}
2、执行测试类,并查看结果
通过执行测类,查看到了消息消费的情况,生产者共计生产了2个消息,其它一条消息被消费者A、 B各消费了一次,另一条消息被消费者B消费了一次。