专注Java教育14年 全国咨询/投诉热线:444-1124-454
赢咖4LOGO图
始于2009,口口相传的Java黄埔军校
首页 hot资讯 Rabbitmq的几种模式介绍

Rabbitmq的几种模式介绍

更新时间:2022-04-14 09:25:46 来源:赢咖4 浏览1089次

SpringBoot RabbitMQ 七种工作模式介绍

简单模式

很简单:生产者、队列和消费者。生产者向队列发送消息,消费者监听队列并消费消息

工作模式

工作:一个生产者,一个队列和多个消费者。生产者向队列发送消息,多个消费者监听同一个队列的消费消息

发布/订阅模式

发布/订阅:发布/订阅模式包括一个生产者、一个交换机、多个队列和多个消费者。交换机(Exchange)直接绑定到队列。生产者通过交换机(Exchange)将消息存储在绑定到交换机的队列中,消费者监听队列并消费

路由模式

路由:路由模式可以根据路由键将消息发送到指定队列。Exchange 和队列通过路由键绑定。生产者通过 Exchange 和路由键将消息准确地发送到队列。消费者监听队列并消费消息

主题模式

主题:主题模式支持在路由模式的基础上进行通配符操作。交换机会根据通配符将消息存入匹配队列,消费者监听队列并消费

标头模式

Header:header模式取消了路由key,而是使用header中的key/value对来匹配。匹配成功后,会通过交换机将消息发送到队列中,由消息制造者获取和消费

RPC 模式

RPC:RPC方式主要用于获取消费者的处理结果。通常,生产者将消息发送给消费者。消费者收到消息并消费后,将处理结果返回给生产者

SpringBoot 集成 RabbitMQ

首先,搭建SpringBoot项目,在POM XML文件中添加如下依赖

<依赖>
    <groupid>org.springframework.boot</groupid>
    <artifactid>spring-boot-starter-amqp</artifactid>
</依赖>
<依赖>
    <groupid>org.springframework.boot</groupid>
    <artifactid>spring-boot-starter-web</artifactid>
</依赖>

修改配置文件,添加如下RabbitMQ配置

服务器:
  port: 8888 # 设置端口号
Spring:
  rabbitMQ:
    host: 127.0.0.1 # 设置 RabbitMQ 的主机
    port: 5672 # 设置 RabbitMQ 服务端口
    username: guest # 设置 RabbitMQ 用户名
    password: guest # 设置 RabbitMQ 密码

新的公共常量类

public interface RabbitConstant { 
    /** 
     * 简单模式
     */ 
    String SIMPLE_QUEUE_NAME = "simple_queue"; 
    /** 
     * 工作模式
     */ 
    String WORK_QUEUE_NAME = "work_queue"; 
    /** 
     * 发布/订阅模式
     */ 
    String PUBLISH_SUBSCRIBE_EXCHANGE_NAME = "publish_subscribe_exchange"; 
    字符串 PUBLISH_SUBSCRIBE_FIRST_QUEUE_NAME = "publish_subscribe_first_queue"; 
    字符串 PUBLISH_SUBSCRIBE_SECOND_QUEUE_NAME = "publish_subscribe_second_queue"; 
    /** 
     * 路由模式
     */ 
    String ROUTING_EXCHANGE_NAME = "routing_exchange";
    字符串 ROUTING_FIRST_QUEUE_NAME = "routing_first_queue"; 
    字符串 ROUTING_SECOND_QUEUE_NAME = "routing_second_queue"; 
    字符串 ROUTING_THIRD_QUEUE_NAME = "routing_third_queue"; 
    字符串 ROUTING_FIRST_QUEUE_ROUTING_KEY_NAME = "routing_first_queue_routing_key"; 
    字符串 ROUTING_SECOND_QUEUE_ROUTING_KEY_NAME = "routing_second_queue_routing_key"; 
    字符串 ROUTING_THIRD_QUEUE_ROUTING_KEY_NAME = "routing_third_queue_routing_key"; 
    /** 
     * 主题模式
     */ 
    String TOPICS_EXCHANGE_NAME = "topics_exchange"; 
    字符串 TOPICS_FIRST_QUEUE_NAME = "topics_first_queue"; 
    字符串 TOPICS_SECOND_QUEUE_NAME = "
    字符串 TOPICS_THIRD_QUEUE_NAME = "topics_third_queue"; 
    String TOPICS_FIRST_QUEUE_ROUTING_KEY = "topics.first.routing.key"; 
    String TOPICS_SECOND_QUEUE_ROUTING_KEY = "topics.second.routing.key"; 
    String TOPICS_THIRD_QUEUE_ROUTING_KEY = "topics.third.routing.key"; 
    字符串 TOPICS_ROUTING_KEY_FIRST_WILDCARD = "#.first.#"; 
    字符串 TOPICS_ROUTING_KEY_SECOND_WILDCARD = "*.second.#"; 
    字符串 TOPICS_ROUTING_KEY_THRID_WILDCARD = "*.third.*"; 
    /** 
     * 标题模式
     */ 
    String HEADER_EXCHANGE_NAME = "header_exchange"; 
    字符串 HEADER_FIRST_QUEUE_NAME = "header_first_queue"; 
    字符串 HEADER_SECOND_QUEUE_NAME = "header_second_queue"; 
    /**
     * rpc 模式
     */ 
    String RPC_QUEUE_NAME = "rpc_queue"; 
}

添加一个Controller请求类(用于验证结果,最后可以添加)

导入 com.example.rabbitmq.constant.RabbitConstant;
导入 org.springframework.amqp.core.Message;
导入 org.springframework.amqp.core.MessageProperties;
导入 org.springframework.amqp.rabbit.core.RabbitTemplate;
导入 org.springframework.beans.factory.annotation.Autowired;
导入 org.springframework.web.bind.annotation.GetMapping;
导入 org.springframework.web.bind.annotation.RestController;
导入 java.nio.charset.StandardCharsets;
@RestController 
public class RabbitController { 
    @Autowired 
    private RabbitTemplate rabbitTemplate; 
    @GetMapping(value = "/simple") 
    public void simple() {
        rabbitTemplate.convertAndSend(RabbitConstant.SIMPLE_QUEUE_NAME, "你好世界!"); 
    } 
    @GetMapping(value = "/work") 
    public void work() { 
        rabbitTemplate.convertAndSend(RabbitConstant.WORK_QUEUE_NAME, "work hello!"); 
    } 
    @GetMapping(value = "/pubsub") 
    public void pubsub() { 
        rabbitTemplate.convertAndSend(RabbitConstant.PUBLISH_SUBSCRIBE_EXCHANGE_NAME, null, "发布/订阅你好"); 
    } 
    @GetMapping(value = "/routing") 
    public void routing() { 
        // 向第一个队列发送消息
        rabbitTemplate.convertAndSend(RabbitConstant.ROUTING_EXCHANGE_NAME, RabbitConstant.ROUTING_FIRST_QUEUE_ROUTING_KEY_NAME, "路由你好"); 
    } 
    @GetMapping(value = "/topics") 
    public void topics() { 
        // 向第一个队列发送消息。这时候队列可以接收到消息,因为队列的通配符是#first.#,而routing_key是topics first。路由。键,匹配成功
        rabbitTemplate.convertAndSend(RabbitConstant.TOPICS_EXCHANGE_NAME, RabbitConstant.TOPICS_FIRST_QUEUE_ROUTING_KEY, "topics hello");
        // 向第二个队列发送消息。这时候队列也能收到消息了,因为队列的通配符是*秒#,而routing_key是topic秒。路由。键,匹配成功
        rabbitTemplate.convertAndSend(RabbitConstant.TOPICS_EXCHANGE_NAME, RabbitConstant.TOPICS_SECOND_QUEUE_ROUTING_KEY, "topics hello"); 
        // 向第三个队列发送消息。此时队列无法接受消息,因为队列通配符是*第三个*,而routing_key是topics第三个。路由。键,匹配失败
        rabbitTemplate.convertAndSend(RabbitConstant.TOPICS_EXCHANGE_NAME, RabbitConstant.TOPICS_THIRD_QUEUE_ROUTING_KEY, "topics hello"); 
    } 
    @GetMapping(value = "/header")
    public void header() { 
        // 这个消息应该被两个队列接收。第一个队列全部匹配成功,第二个队列 Hello 值任意匹配成功
        MessageProperties messageProperties = new MessageProperties(); 
        messageProperties.setHeader("matchAll", "YES"); 
        messageProperties.setHeader("你好", "world"); 
        Message message = new Message("header first hello".getBytes(StandardCharsets.UTF_8), messageProperties); 
        rabbitTemplate.convertAndSend(RabbitConstant.HEADER_EXCHANGE_NAME, null, message); 
        // 这个消息应该只被第二个队列接受。第一个队列全部匹配失败,
        MessageProperties messagePropertiesSecond = new MessageProperties(); 
        messagePropertiesSecond.setHeader("matchAll", "NO"); 
        Message messageSecond = new Message("header second hello".getBytes(StandardCharsets.UTF_8), messagePropertiesSecond); 
        rabbitTemplate.convertAndSend(RabbitConstant.HEADER_EXCHANGE_NAME, null, messageSecond); 
    } 
    @GetMapping(value = "/rpc") 
    public void rpc() { 
        Object responseMsg = rabbitTemplate.convertSendAndReceive(RabbitConstant.RPC_QUEUE_NAME, "rpc hello!"); 
        System.out.println("rabbit rpc 响应消息:" + responseMsg); 
    } 
}

Springboot RabbitMQ 简单模式

生产者声明队列并向队列发送消息

导入 com.example.rabbitmq.constant.RabbitConstant;
导入 org.springframework.amqp.core.Queue;
导入 org.springframework.context.annotation.Bean;
导入 org.springframework.context.annotation.Configuration;
@Configuration 
public class RabbitSimpleProvider { 
    @Bean 
    public Queue simpleQueue() { 
        return new Queue(RabbitConstant.SIMPLE_QUEUE_NAME); 
    } 
}

消费者监听队列并消费消息

导入 com.example.rabbitmq.constant.RabbitConstant;
导入 org.springframework.amqp.rabbit.annotation.RabbitHandler;
导入 org.springframework.amqp.rabbit.annotation.RabbitListener;
导入 org.springframework.stereotype.Component;
@Component 
public class RabbitSimpleConsumer { 
    @RabbitHandler 
    @RabbitListener(queues = RabbitConstant.SIMPLE_QUEUE_NAME) 
    public void simpleListener(String context) { 
        System.out.println("rabbit receiver: " + context); 
    } 
}

单元测试

@Test 
public void simple() { 
    rabbitTemplate.convertAndSend(RabbitConstant.SIMPLE_QUEUE_NAME, "hello world!"); 
}

响应结果

SpringBoot RabbitMQ 工作模式

生产者声明队列并向队列生成消息

导入 com.example.rabbitmq.constant.RabbitConstant;
导入 org.springframework.amqp.core.Queue;
导入 org.springframework.context.annotation.Bean;
导入 org.springframework.context.annotation.Configuration;
@Configuration
公共类 RabbitWorkProvider { 
    @Bean
    公共队列 workQueue() { 
        return new Queue(RabbitConstant.WORK_QUEUE_NAME); 
    } 
}

消费者监听队列并消费消息(有两个消费者监听同一个队列)

导入 com.example.rabbitmq.constant.RabbitConstant;
导入 org.springframework.amqp.rabbit.annotation.RabbitHandler;
导入 org.springframework.amqp.rabbit.annotation.RabbitListener;
导入 org.springframework.stereotype.Component;
@Component 
public class RabbitWorkConsumer { 
    @RabbitListener(queues = RabbitConstant.WORK_QUEUE_NAME) 
    @RabbitHandler 
    public void workQueueListenerFirst(String context) { 
        System.out.println("rabbit workQueue listener first receiver:" + context); 
    } 
    @RabbitListener(queues = RabbitConstant.WORK_QUEUE_NAME) 
    @RabbitHandler 
    public void workQueueListenerSecond(String context) {
        System.out.println("rabbit workQueue listener 第二个接收者:" + context); 
    } 
}

单元测试

@Test 
public void work() { 
    rabbitTemplate.convertAndSend(RabbitConstant.WORK_QUEUE_NAME, "工作你好!"); 
}

响应结果(由于有两个消费者监听同一个队列,所以消息只能被其中一个消费者消费。默认情况下,消息是负载均衡发送给所有消费者的)

Springboot rabbit MQ 发布/订阅模式

生产者声明两个队列和一个扇出交换机,并将两个队列绑定到交换机

开关有四种类型:fanout、direct、topic和header(文末有介绍)

导入 com.example.rabbitmq.constant.RabbitConstant;
导入 org.springframework.amqp.core.*;
导入 org.springframework.context.annotation.Bean;
导入 org.springframework.context.annotation.Configuration;
@Configuration
公共类 RabbitPublishSubscribeProvider { 
    @Bean
    公共队列 pubsubQueueFirst() { 
        return new Queue(RabbitConstant.PUBLISH_SUBSCRIBE_FIRST_QUEUE_NAME); 
    } 
    @Bean
    公共队列 pubsubQueueSecond() { 
        return new Queue(RabbitConstant.PUBLISH_SUBSCRIBE_SECOND_QUEUE_NAME); 
    } 
    @Bean
    公共 FanoutExchange fanoutExchange() {
        // 创建扇出类型开关,表示交换机会向所有绑定队列发送消息
        return new FanoutExchange(RabbitConstant.PUBLISH_SUBSCRIBE_EXCHANGE_NAME); 
    } 
    @Bean 
    public Binding pubsubQueueFirstBindFanoutExchange() { 
        // 队列绑定开关
        return BindingBuilder.bind(pubsubQueueFirst()).to(fanoutExchange()); 
    } 
    @Bean 
    public Binding pubsubQueueSecondBindFanoutExchange() { 
        // 队列二绑定开关
        return BindingBuilder.bind(pubsubQueueSecond()).to(fanoutExchange()); 
    } 
}

消费者监听队列并消费

导入 com.example.rabbitmq.constant.RabbitConstant;
导入 org.springframework.amqp.rabbit.annotation.RabbitHandler;
导入 org.springframework.amqp.rabbit.annotation.RabbitListener;
导入 org.springframework.stereotype.Component;
@Component 
public class RabbitPublishSubscribeConsumer { 
    @RabbitListener(queues = RabbitConstant.PUBLISH_SUBSCRIBE_FIRST_QUEUE_NAME) 
    @RabbitHandler 
    public void pubsubQueueFirst(String context) { 
        System.out.println("rabbit pubsub queue first receiver:" + context); 
    } 
    @RabbitListener(queues = RabbitConstant.PUBLISH_SUBSCRIBE_SECOND_QUEUE_NAME) 
    @RabbitHandler 
    public void pubsubQueueSecond(String context) {
        System.out.println("rabbit pubsub 队列第二个接收者:" + context); 
    } 
}

单元测试

@Test 
public void pubsub() { 
    rabbitTemplate.convertAndSend(RabbitConstant.PUBLISH_SUBSCRIBE_EXCHANGE_NAME, null, "发布/订阅你好"); 
}

响应结果

SpringBoot RabbitMQ 路由方式

生产者声明三个队列和一个直接交换机,将三个队列绑定到交换机,并设置交换机和队列之间的路由

import com.example.rabbitmq.constant.RabbitConstant;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitRoutingProvider {
    @Bean
    public Queue rabbitRoutingFirstQueue() {
        return new Queue(RabbitConstant.ROUTING_FIRST_QUEUE_NAME);
    }
    @Bean
    public Queue rabbitRoutingSecondQueue() {
        return new Queue(RabbitConstant.ROUTING_SECOND_QUEUE_NAME);
    }
    @Bean
    public Queue rabbitRoutingThirdQueue() {
        return new Queue(RabbitConstant.ROUTING_THIRD_QUEUE_NAME);
    }
    @Bean
    public DirectExchange directExchange() {
        // Create a direct switch, indicating that the exchange opportunity sends messages to routing_ Queue with identical key
        return new DirectExchange(RabbitConstant.ROUTING_EXCHANGE_NAME);
    }
    @Bean
    public Binding routingFirstQueueBindDirectExchange() {
        // Bind the direct switch to the queue and set the routing_key is routing_first_queue_routing_key
        return BindingBuilder.bind(rabbitRoutingFirstQueue()).to(directExchange()).with(RabbitConstant.ROUTING_FIRST_QUEUE_ROUTING_KEY_NAME);
    }
    @Bean
    public Binding routingSecondQueueBindDirectExchange() {
        // Queue 2 binds the direct switch and sets routing_key is routing_second_queue_routing_key
        return BindingBuilder.bind(rabbitRoutingSecondQueue()).to(directExchange()).with(RabbitConstant.ROUTING_SECOND_QUEUE_ROUTING_KEY_NAME);
    }
    @Bean
    public Binding routingThirdQueueBindDirectExchange() {
        // The queue 3 is bound to the direct switch and the routing is set_ Key is routing_third_queue_routing_key
        return BindingBuilder.bind(rabbitRoutingThirdQueue()).to(directExchange()).with(RabbitConstant.ROUTING_THIRD_QUEUE_ROUTING_KEY_NAME); 
    } 
}

消费者监听队列并消费

导入 com.example.rabbitmq.constant.RabbitConstant;
导入 org.springframework.amqp.rabbit.annotation.RabbitHandler;
导入 org.springframework.amqp.rabbit.annotation.RabbitListener;
导入 org.springframework.stereotype.Component;
@Component 
public class RabbitRoutingConsumer { 
    @RabbitListener(queues = RabbitConstant.ROUTING_FIRST_QUEUE_NAME) 
    @RabbitHandler 
    public void routingFirstQueueListener(String context) { 
        System.out.println("rabbit routing queue first receiver:" + context); 
    } 
    @RabbitListener(queues = RabbitConstant.ROUTING_SECOND_QUEUE_NAME) 
    @RabbitHandler 
    public void routingSecondQueueListener(String context) {
        System.out.println("rabbit pubsub 队列第二个接收者:" + context); 
    } 
    @RabbitListener(queues = RabbitConstant.ROUTING_THIRD_QUEUE_NAME) 
    @RabbitHandler 
    public void routingThirdQueueListener(String context) { 
        System.out.println("rabbit pubsub 队列第三个接收者:" + context); 
    } 
}

单元测试

@Test 
public void routing() { 
    // 向第一个队列发送消息
    rabbitTemplate.convertAndSend(RabbitConstant.ROUTING_EXCHANGE_NAME, RabbitConstant.ROUTING_FIRST_QUEUE_ROUTING_KEY_NAME, "routing hello"); 
}

响应结果

Springboot RabbitMQ 主题模式

生产者声明了三个队列和一个主题切换。队列分别与主题交换机绑定,并设置了路由键统一字符。如果路由键满足交换机和队列之间的通配符要求,则将消息存储在队列中

#通配符可以匹配一个或多个单词,*通配符可以匹配一个单词;如果Exchange和队列之间的routing key通配符是#hello.#,则表示中间所有带hello的routing key都满足条件,消息会被存入队列

导入 com.example.rabbitmq.constant.RabbitConstant;
导入 org.springframework.amqp.core.Binding;
导入 org.springframework.amqp.core.BindingBuilder;
导入 org.springframework.amqp.core.Queue;
导入 org.springframework.amqp.core.TopicExchange;
导入 org.springframework.context.annotation.Bean;
导入 org.springframework.context.annotation.Configuration;
@Configuration 
public class RabbitTopicProvider { 
    @Bean 
    public Queue topicFirstQueue() { 
        return new Queue(RabbitConstant.TOPICS_FIRST_QUEUE_NAME); 
    } 
    @Bean
    公共队列 topicSecondQueue() { 
        return new Queue(RabbitConstant.TOPICS_SECOND_QUEUE_NAME);
    } 
    @Bean
    公共队列 topicThirdQueue() { 
        return new Queue(RabbitConstant.TOPICS_THIRD_QUEUE_NAME); 
    } 
    @Bean 
    public TopicExchange topicExchange() { 
        // 创建一个主题类型切换,表示交换机会发送消息到 routing_key 通配符匹配队列成功
        return new TopicExchange(RabbitConstant.TOPICS_EXCHANGE_NAME); 
    } 
    @Bean 
    public Binding topicFirstQueueBindExchange() { 
        // 绑定topic类型切换到队列1并设置routing_key通配符#first.#
        return BindingBuilder.bind(topicFirstQueue()).to(topicExchange()).with(RabbitConstant.TOPICS_ROUTING_KEY_FIRST_WILDCARD); 
    } 
    @Bean 
    public Binding topicSecondQueueBindExchange() { 
        //第二个队列绑定主题类型切换,设置路由_key通配符为* second.# 
        return BindingBuilder.bind(topicSecondQueue()).to(topicExchange()) .with(RabbitConstant.TOPICS_ROUTING_KEY_SECOND_WILDCARD); 
    } 
    @Bean 
    public Binding topicThirdQueueBindExchange() { 
        // 三个队列绑定主题切换,设置routing_key通配符为*third.*
        return BindingBuilder.bind(topicThirdQueue()).to(topicExchange()).with(RabbitConstant.TOPICS_ROUTING_KEY_THRID_WILDCARD); 
    } 
}

消费者监听队列并消费

导入 com.example.rabbitmq.constant.RabbitConstant;
导入 org.springframework.amqp.rabbit.annotation.RabbitHandler;
导入 org.springframework.amqp.rabbit.annotation.RabbitListener;
导入 org.springframework.stereotype.Component;
@Component 
public class RabbitTopicsConsumer { 
    @RabbitListener(queues = RabbitConstant.TOPICS_FIRST_QUEUE_NAME) 
    @RabbitHandler 
    public void topicFirstQueue(String context) { 
        System.out.println("rabbit topics queue first receiver:" + context); 
    } 
    @RabbitListener(queues = RabbitConstant.TOPICS_SECOND_QUEUE_NAME) 
    @RabbitHandler 
    public void topicSecondQueue(String context) {
        System.out.println("兔子主题队列第二个接收者:" + context); 
    } 
    @RabbitListener(queues = RabbitConstant.TOPICS_THIRD_QUEUE_NAME) 
    @RabbitHandler 
    public void topicThirdQueue(String context) { 
        System.out.println("rabbit 主题队列第三个接收者:" + context); 
    } 
}

单元测试

@Test 
public void topics() { 
    // 向第一个队列发送消息。这时候队列可以接收到消息,因为队列的通配符是#first.#,而routing_key是topics first。路由。键,匹配成功
    rabbitTemplate.convertAndSend(RabbitConstant.TOPICS_EXCHANGE_NAME, RabbitConstant.TOPICS_FIRST_QUEUE_ROUTING_KEY, "topics hello"); 
    // 向第二个队列发送消息。这时候队列也能收到消息了,因为队列的通配符是*秒#,而routing_key是topic秒。路由。键,匹配成功
    rabbitTemplate.convertAndSend(RabbitConstant.TOPICS_EXCHANGE_NAME, RabbitConstant.TOPICS_SECOND_QUEUE_ROUTING_KEY, "topics hello");
    // 向第三个队列发送消息。此时队列无法接受消息,因为队列通配符是*第三个*,而routing_key是topics第三个。路由。键,匹配失败
    rabbitTemplate.convertAndSend(RabbitConstant.TOPICS_EXCHANGE_NAME, RabbitConstant.TOPICS_THIRD_QUEUE_ROUTING_KEY, "topics hello"); 
    }

响应结果

以上就是关于“Rabbitmq的几种模式介绍”,如果大家想了解更多相关知识,可以关注一下赢咖4的RabbitMQ教程,里面的课程内容细致全面,通俗易懂,适合小白学习,希望对大家能够有所帮助。

提交申请后,顾问老师会电话与您沟通安排学习

免费课程推荐 >>
技术文档推荐 >>