SpringBoot+RabbitMQ学习笔记(四)使用RabbitMQ的三种交换器之Fanout

一丶简介 Fanout Exchange    不处理路由键。你只需要简单的将队列绑定到交换机上。一个发送到交换机的消息都会被转发到与该交换机绑定的所有队列上。很像子网广播,每台子网内的主...

一丶简介

Fanout Exchange 

  不处理路由键。你只需要简单的将队列绑定到交换机上。一个发送到交换机的消息都会被转发到与该交换机绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。Fanout交换机转发消息是最快的。

业务场景:

1.订单服务需要同时向短信服务和push服务发送,两个服务都有各自的消息队列。

2.使用Fanout交换器。

二丶配置文件

同样的创建了两个项目,一个作为生产者,一个作为消费者。

生产者配置:

server.port=8883

spring.application.name=hello-world
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

#设置交换器名称
mq.config.exchange=order.fanout
View Code

消费者配置:

server.port=8884

spring.application.name=lesson1

spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

#设置交换器名称
mq.config.exchange=order.fanout
#短信消息服务队列名称
mq.config.queue.sms=order.sms
#push消息服务队列名称
mq.config.queue.push=order.push
#log消息服务队列名称
mq.config.queue.log=order.log
View Code

注:本是要配置两个消息队列,但是为了测试fanout交换器是否能够将消息发送到所有消息队列(准确的说是配置了路由键的队列和没有配置路由键的队列)多创建的一个。

三丶编写生产者

package com.example.amqpfanoutprovider;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

/**
 * Author:aijiaxiang
 * Date:2020/4/26
 * Description:发送消息
 */
@Component
public class FanoutSender {

    @Autowired
    private AmqpTemplate amqpTemplate;

    //exChange 交换器
    @Value("${mq.config.exchange}")
    private String exChange;

    /**
     * 发送消息的方法
     * @param msg
     */
    public void send(String msg){
        //向消息队列发送消息
        //参数1:交换器名称
        //参数2:路由键,广播模式时(fanout交换器)没有路由键使用""空字符串代替
        //参数3:消息
        this.amqpTemplate.convertAndSend(exChange,"",msg);

    }
}
View Code

四丶编写消费者

短信服务类:

package com.ant.amqpfanoutconsumer;

import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;

/**
 * Author:aijiaxiang
 * Date:2020/4/26
 * Description:消息接收者
 * @RabbitListener bindings:绑定队列
 * @QueueBinding  value:绑定队列的名称
 *                exchange:配置交换器
 *                key:路由键(广播模式时不需要路由键,所以不写)
 * @Queue : value:配置队列名称
 *          autoDelete:是否是一个可删除的临时队列
 * @Exchange value:为交换器起个名称
 *           type:指定具体的交换器类型
 */
@Component
@RabbitListener(
        bindings = @QueueBinding(
                value = @Queue(value = "${mq.config.queue.sms}",autoDelete = "true"),
                exchange = @Exchange(value = "${mq.config.exchange}", type = ExchangeTypes.FANOUT)
        )
)
public class SmsReceiver {

    /**
     * 接收消息的方法,采用消息队列监听机制
     * @param msg
     */
    @RabbitHandler
    public void process(String msg){
        System.out.println("sms-receiver:"+msg);
    }
}
View Code

push服务类:

package com.ant.amqpfanoutconsumer;

import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;

/**
 * Author:aijiaxiang
 * Date:2020/4/26
 * Description:消息接收者
 * @RabbitListener bindings:绑定队列
 * @QueueBinding  value:绑定队列的名称
 *                exchange:配置交换器
 *                key:路由键(广播模式时不需要路由键,所以不写)
 * @Queue : value:配置队列名称
 *          autoDelete:是否是一个可删除的临时队列
 * @Exchange value:为交换器起个名称
 *           type:指定具体的交换器类型
 */
@Component
@RabbitListener(
        bindings = @QueueBinding(
                value = @Queue(value = "${mq.config.queue.push}",autoDelete = "true"),
                exchange = @Exchange(value = "${mq.config.exchange}", type = ExchangeTypes.FANOUT)
        )
)
public class PushReceiver {

    /**
     * 接收消息的方法,采用消息队列监听机制
     * @param msg
     */
    @RabbitHandler
    public void process(String msg){
        System.out.println("push-receiver:"+msg);
    }
}
View Code

log服务类:该类是为了测试配置了路由键的消息队列和没配置路由键的消息队列是否都能接收到fanout交换器发送的消息。

package com.ant.amqpfanoutconsumer;

import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;

/**
 * Author:aijiaxiang
 * Date:2020/4/26
 * Description:消息接收者
 * @RabbitListener bindings:绑定队列
 * @QueueBinding  value:绑定队列的名称
 *                exchange:配置交换器
 *                key:路由键(广播模式时不需要路由键,所以不写)注:消息队列配置了路由键同样能接收到fanout交换器传过来的消息。
 * @Queue : value:配置队列名称
 *          autoDelete:是否是一个可删除的临时队列
 * @Exchange value:为交换器起个名称
 *           type:指定具体的交换器类型
 */
@Component
@RabbitListener(
        bindings = @QueueBinding(
                value = @Queue(value = "${mq.config.queue.log}",autoDelete = "true"),
                exchange = @Exchange(value = "${mq.config.exchange}", type = ExchangeTypes.FANOUT),
                key = "user.log.info"
        )
)
public class LogReceiver {

    /**
     * 接收消息的方法,采用消息队列监听机制
     * @param msg
     */
    @RabbitHandler
    public void process(String msg){
        System.out.println("log-receiver:"+msg);
    }
}
View Code

五丶测试一发

测试类:

package com.example.amqp;

import com.example.amqpfanoutprovider.FanoutSender;
import com.example.helloworld.HelloworldApplication;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

/**
 * Author:aijiaxiang
 * Date:2020/4/26
 * Description:
 */
@RunWith(SpringRunner.class)
@SpringBootTest(classes = HelloworldApplication.class)
public class QueueTest {

    @Autowired
    private FanoutSender fanoutSender;

    /**
     * 测试消息队列
     */
    @Test
    public void test1() throws InterruptedException {

            fanoutSender.send("hello");


    }
}
View Code

OK,看控制台输出得出,配置了路由键的消息队列和没配置路由键的消息队列都能接收到fanout交换器发送的消息!

如有不足之处欢迎指正!

  • 发表于 2020-04-29 10:40
  • 阅读 ( 167 )
  • 分类:网络文章

条评论

请先 登录 后评论
不写代码的码农
小编

篇文章

作家榜 »

  1. 小编 文章
返回顶部
部分文章转自于网络,若有侵权请联系我们删除