RabbitMQ消息的交换

目录1.简介在前面的例子中,每个消息都只对应一个消费者,即使有多个消费者在线,也只会有一个消费者接收并处理一条消息,这是消息中间件的一种常用方式。另外一种方式,生产者生产一条消息,广播给一个或多个队列,所有订阅了这个队列的消费者,都可以消费这条消息,这就是消息订阅。官方教程列举了这样一个场景,生产者发出一条记录日志的消息,消费者1接收到后写日志到硬盘,消费者2接收到后打印日志到屏幕。工作中还有很多这样的场景有待发掘,适当的使用消息订阅后可以成倍的增加效率。2.RabbitMQ的交换中心(Exchange)在前两章的例子中,我们涉及到了三个概念
  • 生产者
  • 队列
  • 消费者
  • 这不禁让我们以为,生产者生产消息后直接到发送到队列,消费者从队列中获取消息,再消费掉。其实这是错误的,在RabbitMQ中,生产者不会直接把消息发送给队列,实际上,生产者甚至不知道一条消息会不会被发送到队列上。正确的概念是,生产者会把消息发送给RabbitMQ的交换中心(Exchange),Exchange的一侧是生产者,另一侧则是一个或多个队列,由Exchange决定一条消息的生命周期--发送给某些队列,或者直接丢弃掉。这个概念在官方文档中被称作RabbitMQ消息模型的核心思想(core idea)如下图,其中X代表的是Exchange。imageRabbitMQ中,有4种类型的Exchange
  • direct    通过消息的routing key比较queue的key,相等则发给该queue,常用于相同应用多实例之间的任务分发
    • 默认类型   本身是一个direct类型的exchange,routing key自动设置为queue name。注意,direct不等于默认类型,默认类型是在queue没有指定exchange时的默认处理方式,发消息时,exchange字段也要相应的填成空字符串“”
  • topic    话题,通过可配置的规则分发给绑定在该exchange上的队列,通过地理位置推送等场景适用
  • headers    当分发规则很复杂,用routing key不好表达时适用,忽略routing key,用header取代之,header可以为非字符串,例如Integer或者String
  • fanout    分发给所有绑定到该exchange上的队列,忽略routing key,适用于MMO游戏、广播、群聊等场景
  • 更详细的介绍,请看官方文档3.临时队列可以对一个队列命名是十分重要的,在消费者消费消息时,要指明消费哪个队列的消息(下面的queue),这样就可以让多个消费者同时分享一个队列

    String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException;

    上述记录日志的场景中,有以下几个特点
  • 所有消费者都需要监听所有的日志消息,因此每个消费者都需要一个单独的队列,不需要和别人分享
  • 消费者只关心最新的消息,连接到RabbitMQ之前的消息不需要关心,因此,每次连接时需要创建一个队列,绑定到相应的exchange上,连接断开后,删除该队列
  • 自己声明队列是比较麻烦的,因此,RabbitMQ提供了简便的获取临时队列的方法,该队列会在连接断开后销毁
    String queueName = channel.queueDeclare().getQueue();
    这行代码会获取一个名字类似于“amq.gen-JzTY20BRgKO-HjmUJj0wLg”的临时队列4.绑定再次注意,在RabbitMQ中,消息是发送到Exchange的,不是直接发送的Queue。因此,需要把Queue和Exchange进行绑定,告诉RabbitMQ把指定的Exchange上的消息发送的这个队列上来绑定队列使用此方法

    Queue.BindOk queueBind(String queue, String exchange, String routingKey) throws IOException;

    其中,queue是队列名,exchange是要绑定的交换中心,routingKey就是这个queue的routingKey5.实践下面来实现上述场景,生产者发送日志消息,消费者1记录日志,消费者2打印日志下面的代码中,把连接工厂等方法放到了构造函数中,也就是说,每new一个对象,都会创建一个连接,在生产环境这样做是很浪费性能的,每次创建一个connection都会建立一次TCP连接,生产环境应使用连接池。而Channel又不一样,多个Channel是共用一个TCP连接的,因此可以放心的获取Channel(本结论出自官方文档对Channel的定义)

    AMQP 0-9-1 connections are multiplexed with channels that can be thought of as "lightweight connections that share a single TCP connection".

    For applications that use multiple threads/processes for processing, it is very common to open a new channel per thread/process and not share channels between them.

    日志消息发送类 LogSender
    复制代码
     1 import java.io.IOException;
     2 import java.util.concurrent.TimeoutException;
     3 
     4 import org.slf4j.Logger;
     5 import org.slf4j.LoggerFactory;
     6 
     7 import com.rabbitmq.client.Channel;
     8 import com.rabbitmq.client.Connection;
     9 import com.rabbitmq.client.ConnectionFactory;
    10 
    11 public class LogSender {
    12 
    13     private Logger logger = LoggerFactory.getLogger(LogSender.class);
    14     private  ConnectionFactory factory;
    15     private  Connection connection;
    16     private  Channel channel;
    17     
    18     /**
    19      * 在构造函数中获取连接
    20      */
    21     public LogSender(){
    22         super();
    23         try {
    24             factory = new ConnectionFactory();
    25             factory.setHost("127.0.0.1");
    26             connection = factory.newConnection();
    27             channel = connection.createChannel();
    28         } catch (Exception e) {
    29             logger.error(" [X] INIT ERROR!",e);
    30         }
    31     }
    32     /**
    33      * 提供个关闭方法,现在并没有什么卵用
    34      * @return
    35      */
    36     public boolean closeAll(){
    37         try {
    38             this.channel.close();
    39             this.connection.close();
    40         } catch (IOException | TimeoutException e) {
    41             logger.error(" [X] CLOSE ERROR!",e);
    42             return false;
    43         }
    44         return true;
    45     }
    46     
    47     /**
    48      * 我们更加关心的业务方法
    49      * @param message
    50      */
    51     public void sendMessage(String message) {
    52             try {
    53                 //声明一个exchange,命名为logs,类型为fanout
    54                 channel.exchangeDeclare("logs", "fanout");
    55                 //exchange是logs,表示发送到此Exchange上
    56                 //fanout类型的exchange,忽略routingKey,所以第二个参数为空
    57                 channel.basicPublish("logs", "", null, message.getBytes());
    58                 logger.debug(" [D] message sent:"+message);
    59             } catch (IOException e) {
    60                 e.printStackTrace();
    61             }
    62     }
    63 }
    复制代码
    在LogSender中,和之前的例子不一样的地方是,我们没有直接声明一个Queue,取而代之的是声明了一个exchange发布消息时,第一个参数填了我们声明的exchange名字,routingKey留空,因为fanout类型忽略它。在前面的例子中,我们routingKey填的是队列名,因为默认的exchange(exchange位空字符串时使用默认交换中心)会把队列的routingKey设置为queueName(声明队列的时候设置的,不是发送消息的时候),又是direct类型,所以可以通过queueName当做routingKey找到队列。消费类 LogConsumer
    复制代码
     1 package com.liyang.ticktock.rabbitmq;
     2 
     3 import java.io.IOException;
     4 import java.util.concurrent.TimeoutException;
     5 
     6 import org.slf4j.Logger;
     7 import org.slf4j.LoggerFactory;
     8 
     9 import com.rabbitmq.client.AMQP;
    10 import com.rabbitmq.client.Channel;
    11 import com.rabbitmq.client.Connection;
    12 import com.rabbitmq.client.ConnectionFactory;
    13 import com.rabbitmq.client.Consumer;
    14 import com.rabbitmq.client.DefaultConsumer;
    15 import com.rabbitmq.client.Envelope;
    16 
    17 public class LogConsumer {
    18 
    19     private Logger logger = LoggerFactory.getLogger(LogConsumer.class);
    20     private ConnectionFactory factory;
    21     private Connection connection;
    22     private Channel channel;
    23 
    24     /**
    25      * 在构造函数中获取连接
    26      */
    27     public LogConsumer() {
    28         super();
    29         try {
    30             factory = new ConnectionFactory();
    31             factory.setHost("127.0.0.1");
    32             connection = factory.newConnection();
    33             channel = connection.createChannel();
    34             // 声明exchange,防止生产者没启动,exchange不存在
    35             channel.exchangeDeclare("logs","fanout");
    36         } catch (Exception e) {
    37             logger.error(" [X] INIT ERROR!", e);
    38         }
    39     }
    40 
    41     /**
    42      * 提供个关闭方法,现在并没有什么卵用
    43      * 
    44      * @return
    45      */
    46     public boolean closeAll() {
    47         try {
    48             this.channel.close();
    49             this.connection.close();
    50         } catch (IOException | TimeoutException e) {
    51             logger.error(" [X] CLOSE ERROR!", e);
    52             return false;
    53         }
    54         return true;
    55     }
    56 
    57     /**
    58      * 我们更加关心的业务方法
    59      */
    60     public void consume() {
    61         try {
    62             // 获取一个临时队列
    63             String queueName = channel.queueDeclare().getQueue();
    64             // 把刚刚获取的队列绑定到logs这个交换中心上,fanout类型忽略routingKey,所以第三个参数为空
    65             channel.queueBind(queueName, "logs", "");
    66             //定义一个Consumer,消费Log消息
    67             Consumer consumer = new DefaultConsumer(channel) {
    68                 @Override
    69                 public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
    70                         byte[] body) throws IOException {
    71                     String message = new String(body, "UTF-8");
    72                     logger.debug(" [D] 我是来打印日志的:"+message);
    73                 }
    74             };
    75             //这里自动确认为true,接收到消息后该消息就销毁了
    76             channel.basicConsume(queueName, true, consumer);
    77         } catch (IOException e) {
    78             e.printStackTrace();
    79         }
    80     }
    81 }
    复制代码
    复制一个项目,把72行改为如下代码,代表两个做不同工作的消费者
    1 logger.debug(" [D] 我已经把消息写到硬盘了:"+message);
    消费者App
    复制代码
    1 public class App 
    2 {
    3     public static void main( String[] args )
    4     {
    5         LogConsumer consumer = new LogConsumer();
    6         consumer.consume();
    7     }
    8 }
    复制代码
    生产者App
    复制代码
    1 public class App {
    2     public static void main( String[] args ) throws InterruptedException{
    3         LogSender sender = new LogSender();
    4         while(true){
    5             sender.sendMessage(System.nanoTime()+"");
    6             Thread.sleep(1000);
    7         }
    8     }
    9 }
    复制代码
    把消费者打包成两个可执行的jar包,方便观察控制台用java -jar 命令执行,结果如下6.结束语本章介绍了RabbitMQ中消息的交换,再次强调,RabbitMQ中,消息是通过交换中心转发到队列的,不要被默认的exchange混淆,默认的exchange会自动把queue的名字设置为它的routingKey,所以消息发布时,才能通过queueName找到该队列,其实此时queueName扮演的角色就是routingKey。

    相关内容推荐