博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
SpringBoot RabbitMQ 集成 五 "Topics"
阅读量:7055 次
发布时间:2019-06-28

本文共 6039 字,大约阅读时间需要 20 分钟。

  hot3.png

Topics模式,官方的解释是Receiving messages based on a pattern (topics),它的结构是

消费者各自监控自己的队列;交换机通过一种模式策略确定生产者的消息放入那个队列。

详细介绍请参照:  中的主题模式(Topic Exchange)

一、编写代码

1、编写常量类RabbitTopicConstant 

package com.lvgang.springbootrabbitmq.topic;/** * @author lvgang */public class RabbitTopicConstant {    public  static final  String QUEUQ_A = "Queue_Topic_A";    public  static final  String QUEUQ_B = "Queue_Topic_B";    //*(星号):可以(只能)匹配一个单词    //#(井号):可以匹配多个单词(或者零个)    public  static final  String QUEUQ_KEY_A = "topic.*";    public  static final  String QUEUQ_KEY_B = "topic.#";    public static final String EXCHANGE = "Exchange_Topic";}

2、编写配置类RabbitTopicConfig 

package com.lvgang.springbootrabbitmq.topic;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.amqp.core.Binding;import org.springframework.amqp.core.BindingBuilder;import org.springframework.amqp.core.Queue;import org.springframework.amqp.core.TopicExchange;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;/** * @author lvgang */@Configurationpublic class RabbitTopicConfig {    private static Logger logger = LoggerFactory.getLogger(RabbitTopicConfig.class);    /**     * Queue 可以有4个参数     *      1.队列名     *      2.durable       持久化消息队列 ,rabbitmq重启的时候不需要创建新的队列 默认true     *      3.auto-delete   表示消息队列没有在使用时将被自动删除 默认是false     *      4.exclusive     表示该消息队列是否只在当前connection生效,默认是false     */    @Bean    public Queue createTopicQueueA() {        logger.info("创建TopicQueueA成功");        return new Queue(RabbitTopicConstant.QUEUQ_A,true);    }    @Bean    public Queue createTopicQueueB() {        logger.info("创建TopicQueueB成功");        return new Queue(RabbitTopicConstant.QUEUQ_B,true);    }    @Bean    public TopicExchange topicExchange() {        //配置广播路由器        logger.info("创建TopicExchange成功");        return new TopicExchange(RabbitTopicConstant.EXCHANGE);    }    @Bean    public Binding bingQueueAToTopicExchange() {        logger.info("绑定TopicQueueA到TopicExchange成功");        return BindingBuilder.bind(createTopicQueueA()).to(topicExchange()).with(RabbitTopicConstant.QUEUQ_KEY_A);    }    @Bean    public Binding bingQueueBToTopicExchange() {        logger.info("绑定TopicQueueB到TopicExchange成功");        return BindingBuilder.bind(createTopicQueueB()).to(topicExchange()).with(RabbitTopicConstant.QUEUQ_KEY_B);    }}

3、编写消息生产者TopicSender 

package com.lvgang.springbootrabbitmq.topic;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Component;import java.util.Date;import java.util.UUID;/** * @author lvgang */@Componentpublic class TopicSender {	private static Logger logger = LoggerFactory.getLogger(TopicSender.class);	@Autowired	private RabbitTemplate rabbitTemplate;	public void send(int i) {		String contentA = "ExchangeTopic A="+ i+"," + new Date() + ", content= " + UUID.randomUUID().toString();		//消息发送,使用void convertAndSend(String exchange, String routingKey, Object message) throws AmqpException;		//但不指定routingKey。因为FanoutExchange类型的交换机,routingKey不起作用,它向所有的队列发送广播,只要队列绑定到该交换机即接受消息。		this.rabbitTemplate.convertAndSend(RabbitTopicConstant.EXCHANGE,"topic.n",contentA);		logger.info("Send ok,"+new Date()+","+contentA);		String contentB = "ExchangeTopic B="+ i+"," + new Date() + ", content= " + UUID.randomUUID().toString();		//消息发送,使用void convertAndSend(String exchange, String routingKey, Object message) throws AmqpException;		//但不指定routingKey。因为FanoutExchange类型的交换机,routingKey不起作用,它向所有的队列发送广播,只要队列绑定到该交换机即接受消息。		this.rabbitTemplate.convertAndSend(RabbitTopicConstant.EXCHANGE,"topic.n.n",contentB);		logger.info("Send ok,"+new Date()+","+contentB);	}}

4、编写消息消费者A  TopicReceiverA 

package com.lvgang.springbootrabbitmq.topic;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.amqp.rabbit.annotation.RabbitHandler;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;import java.util.Date;/** * @author lvgang */@Component@RabbitListener(queues = RabbitTopicConstant.QUEUQ_A)public class TopicReceiverA {    private static Logger logger = LoggerFactory.getLogger(TopicReceiverA.class);    @RabbitHandler    public void process(String message) {        logger.info("ReceiverA : " + message +","+ new Date());    }}

5、编写消息消费者B  TopicReceiverB 

package com.lvgang.springbootrabbitmq.topic;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.amqp.rabbit.annotation.RabbitHandler;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;import java.util.Date;/** * @author lvgang */@Component@RabbitListener(queues = RabbitTopicConstant.QUEUQ_B)public class TopicReceiverB {    private static Logger logger = LoggerFactory.getLogger(TopicReceiverB.class);    @RabbitHandler    public void process(String message) {        logger.info("ReceiverB : " + message +","+ new Date());    }}

二、测试结果

1、编写测试类TopicTests 

package com.lvgang.springbootrabbitmq.topicTests;import com.lvgang.springbootrabbitmq.topic.TopicSender;import org.junit.Test;import org.junit.runner.RunWith;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.boot.test.context.SpringBootTest;import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;@RunWith(SpringJUnit4ClassRunner.class)@SpringBootTestpublic class TopicTests {    @Autowired    private TopicSender topicSender;    @Test    public void hello() throws Exception {        int i=1;        while (true) {            try {                if(i<=1) {                    topicSender.send(i);                }                i++;                Thread.sleep(1000);            } catch (Exception e) {                ;            }        }    }}

2、执行测试类,并查看结果

bc1fdd99c56fa395698f6e735937d7c7f9b.jpg

通过执行测类,查看到了消息消费的情况,生产者共计生产了2个消息,其它一条消息被消费者A、 B各消费了一次,另一条消息被消费者B消费了一次。

转载于:https://my.oschina.net/sdlvzg/blog/3044472

你可能感兴趣的文章
PHP下载断点续传 转
查看>>
【新手】【转】如何学习java程序设计
查看>>
企业邮箱发送不到sina、hotmail等解决
查看>>
如果finalize()抛出异常会怎样?
查看>>
自己的微博
查看>>
php取整函数ceil,floor,round,intval函数的区别
查看>>
NH5-Nhibernate映射中的三种关系
查看>>
SpringBoot项目启动时自动执行指定方法
查看>>
设计模式(行为型模式)——命令模式(Command)
查看>>
vi常用命令
查看>>
Office 2013 兼容性测试(四)——部署遥测处理器
查看>>
Struts1和Struts2的区别和对比
查看>>
Forms开发中触发器的执行顺序
查看>>
SEO博客三个月没更新排行骤步康复
查看>>
JQuery 插件开发的入门介绍
查看>>
马哥2016全新Linux+Python高端运维班第五周作业
查看>>
联想扬天A4680R台式电脑增加内存不识别的解决方案
查看>>
(5)Powershell别名(Alias)
查看>>
我的友情链接
查看>>
我的友情链接
查看>>