RabbitMQ 队列使用基础教程

实践环境

JDK 1.8.0_121

amqp-client 5.16.0

附:查看不同版本的amqp-client客户端支持的Java JDK版本

https://www.rabbitmq.com/client-libraries/java-versions

mavn settings.xml

<?xml version="1.0" encoding="UTF-8" ?>
<settings xsi:schemaLocation="http://maven.apache.org/SETTINGS/1.1.0 http://maven.apache.org/xsd/settings-1.1.0.xsd" xmlns="http://maven.apache.org/SETTINGS/1.1.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
    <localRepository>D:\maven-repo</localRepository>
    <mirrors>
        <mirror>
            <id>aliyunmaven</id>
            <name>阿里云公共仓库</name>
            <url>https://maven.aliyun.com/repository/public</url>
            <mirrorOf>*</mirrorOf>
        </mirror>
    </mirrors>
    <profiles>
        <profile>
            <repositories>                
            </repositories>
            <pluginRepositories>
            </pluginRepositories>
            <id>artifactory</id>
        </profile>
        <profile>
            <id>jdk-1.8</id>
            <activation>
                <activeByDefault>true</activeByDefault>
                <jdk>1.8</jdk>
            </activation>
            <properties>
                <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
                <maven.compiler.source>1.8</maven.compiler.source>
                <maven.compiler.target>1.8</maven.compiler.target>
                <maven.compiler.compilerVersion>1.8</maven.compiler.compilerVersion>
            </properties>
        </profile>
    </profiles>
    <activeProfiles>
        <activeProfile>artifactory</activeProfile>
    </activeProfiles>
</settings>

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>ore.example</groupId>
    <artifactId>rabbitMQStudy</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>3.5.4</maven.compiler.source>
        <maven.compiler.target>3.5.4</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <amqp.client.version>5.16.0</amqp.client.version>
        <slf4j.api.version>1.7.36</slf4j.api.version>
    </properties>
    <dependencies>
        <!-- 添加 RabbitMQ 客户端依赖 -->
        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>${amqp.client.version}</version>
        </dependency>

        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>${slf4j.api.version}</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-simple</artifactId>
            <version>${slf4j.api.version}</version>
        </dependency>
    </dependencies>
</project>

Hello World

场景:生产者 -> hello队列 -> 消费者

Sent.java

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;

import java.nio.charset.StandardCharsets;

public class Sent {
    private final static String QUEUE_NAME = "hello";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.88.128"); // 设置RabbitMQ服务器
        factory.setPort(5672);  // 默认端口为 5672
        factory.setUsername("testacc"); // 设置连接登录用户
        factory.setPassword("test1234"); // 设置用户访问密码
        factory.setAutomaticRecoveryEnabled(true);  // 开启Connection自动恢复功能,这意味着如果连接丢失,客户端将尝试重新连接到 RabbitMQ 服务器。
        factory.setNetworkRecoveryInterval(5000); // 尝试重连时间间隔 // 设置为 5000:如果RabbitMQ客户端失去连接后,每5秒自动尝试重连一次
        factory.setVirtualHost("/"); // 设置虚拟主机,默认 /
        factory.setConnectionTimeout(30 * 1000); // 设置TCP连接超时时间 默认 60000(60秒)
        factory.setHandshakeTimeout(30 * 1000); // 设置SSL握手超时时间 默认 10000(10秒)
        factory.setShutdownTimeout(0); // 设置客户端关闭前等待操作完成的最大时间 默认 10000(10秒)

        // 因为Connection和Channel都实现了java.lang.AutoCloseable,使用try-with-resources语句,可以在代码中显示关闭连接和信道
        try (Connection connection = factory.newConnection(); // 创建连接
             Channel channel = connection.createChannel()) { // 创建信道
            channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 声明一个队列(等幂操作),如果队列不存在,自动创建,Routing Key: hello 交换机:(AMQP default)
            String message = "Hello World!";
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));
            System.out.println(" [x] Sent '" + message + "'");
        }
    }

}

说明:

basicPublish函数说明

void basicPublish(String var1, String var2, BasicProperties var3, byte[] var4) throws IOException;
void basicPublish(String var1, String var2, boolean var3, BasicProperties var4, byte[] var5) throws IOException;
void basicPublish(String var1, String var2, boolean var3, boolean var4, BasicProperties var5, byte[] var6) throws IOException;
  • var1 : 指定要发送的交换机的名称, 如果设置为空字符串, 那么消息会被发送到RabbitMQ的默认交换机.
  • var2 : 路由键, 用于指定消息要路由到的队列
  • var3:是否强制路由,如果设置为true,并且消息无法路由到任何队列(没有匹配的绑定),那么RabbitMQ会返回一个错误给生产者。如果设置为false,则消息将被丢弃
  • var4:是否立即发布(immediate flag)。如果设置为true,并且消息无法路由到任何消费者(没有匹配的队列或消费者不在线),那么RabbitMQ会返回一个错误给生产者。如果设置为false,消息将被存储在队列中等待消费者。
  • BasicProperties 可以使用PERSISTENT_TEXT_PLAIN表示发送的是需要持久化的消息,其实也就是将BasicProperties中的deliveryMode设置为2
  • props : 消息的属性, 这是一个可选参数, 里面有: 消息类型, 格式, 优先级, 过期时间等等
  • body : 消息体, 也就是要发送的消息本身
  • var5 : 消息属性,同props
  • var6:消息体,同body

queueDeclare函数说明

com.rabbitmq.client.AMQP.Queue.DeclareOk queueDeclare() throws IOException;
com.rabbitmq.client.AMQP.Queue.DeclareOk (String var1, boolean var2, boolean var3, boolean var4, Map<String, Object> var5) throws IOException;

说明:

  • 当调用第一个不带参数的queueDeclare()时,RabbitMQ 会自动创建一个新的队列,该队列的名称将由 RabbitMQ 自动生成,并且这个队列是非持久的、排他的、自动删除的,且不带任何额外的参数。

    由于没有指定队列名称,你通常无法预先知道队列的确切名称,这可能会在某些场景下造成不便,比如当你需要多个消费者共享同一个队列时。此外,由于队列是非持久的,如果 RabbitMQ 服务器重启,这个队列将会丢失,所有在队列中的消息也会丢失。

    该方法适用于那些不需要复杂队列配置的场景,比如临时测试或简单应用,可能不适用于需要持久化存储或明确指定队列名称的场景。

  • 第二个方法允许更细致地配置队列的属性,参数说明如下:

    • var1:队列的名称,不能为空,且要求在 RabbitMQ 服务器上是唯一的。
    • var2:是否持久化队列。true -- 持久化,即 RabbitMQ 服务器重启后依然存在。false,非持久化的,服务器重启后队列将不存在。
    • var3:是否排他。true--是,队列只能被声明它的连接使用,并且当连接关闭时,队列会被自动删除。这通常用于临时队列。false -- 否
    • var4:是否自动删除。true,当最后一个消费者断开连接后,队列会自动删除。如果设置为 false,则不会自动删除队列。
    • var5:一组额外的队列参数,可以用来设置队列的更多高级特性。例如,队列的最大长度、消息生存时间等。

    该方法适用于那些需要复杂队列配置和高级特性的场景。

  • 当调用第二个方法时,RabbitMQ会检查是否已经存在具有相同名称的队列,如果如果队列不存在,则根据提供的参数创建一个新的队列。如果已存在,则不再创建

basicConsume 函数说明

basicConsume 有20个重载函数,这里就不一一列出了,常用方法如下:

String basicConsume(String var1, boolean var2, DeliverCallback var3, CancelCallback var4) throws IOException;

说明:

  • var1:消费者要从中接收消息的队列名称

  • var2:设置是否自动确认消息。

    true 自动确认消息--一旦消息被交付给消费者,RabbitMQ 会自动将其标记为已确认,消息就从队列中移除,即使消费者还没有实际处理完这条消息。这种模式下,如果消费者在处理消息时崩溃或发生错误,那么这条消息就会丢失,因为 RabbitMQ 认为它已经被成功处理了。

    false 不启动确认消息。消费者需要显式地调用 basicAck 方法来确认消息已被成功处理。这样,如果消费者在处理消息时崩溃,RabbitMQ 会重新将这条消息放回队列中,等待其他消费者处理,从而保证了消息的可靠性。

  • var3:一个回调函数,当 RabbitMQ 向消费者发送消息,切消费被消费者成功消息后,会自动调用这个回调。开发者可以在该回调函数中处理接收到的消息,比如打印消息内容或者进行其他业务逻辑。

  • var4:可选的回调函数,当消费者取消订阅时会自动调用这个回调。这个回调可以用于执行清理工作,比如释放资源、记录日志等。

Recv.java

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import java.nio.charset.StandardCharsets;

public class Recv {

    private final static String QUEUE_NAME = "hello";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.88.128"); // 设置RabbitMQ服务器IP
        factory.setUsername("testacc"); // 设置连接登录用户
        factory.setPassword("test1234"); // 设置用户访问密码

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();  // 创建信道
        channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 声明需要消费的队列,如果队列不存在则创建
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        DeliverCallback deliverCallback = (consumerTag, delivery) -> { // 定义一个回调函数用于缓冲服务器推送的消息
            String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
            System.out.println(" [x] Received '" + message + "'");
        };
        channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { }); // 启动一个消费者,并返回服务端生成的消费者标识

    }
}

说明:这里为啥不用类似 生产者代码中的try-with-resources语句,因为这里想让消费者持续异步监听队列消息,而不是消费完一条消息后马上退出。

运行测试

先运行Recv,控制台输出:

 [*] Waiting for messages. To exit press CTRL+C

再运行Sent,控制台输出:

 [x] Sent 'Hello World!'

运行Recv的控制台输出:

 [x] Received 'Hello World!'

此外,运行Recv后, 查看RabbitMQ管理界面,可以看到Channels Tab页新增显示一条信道,Connections Tab页新增显示一条连接,Queues界面新增一个名为hello的队列

参考链接:

https://www.rabbitmq.com/tutorials/tutorial-one-java

工作队列(任务队列)

在本节中,将创建一个工作队列,用于在多个woker之间分配耗时的任务。
工作队列(又名:任务队列)背后的主要思想是避免立即执行资源密集型任务,并等待其完成。而是把任务安排在以后完成。将任务封装为消息并将其发送到队列。在后台运行的工作进程将pop出任务并最终执行作业。当你运行多个worker时,任务将在它们之间共享。
这个概念在web应用程序中特别有用,因为在短HTTP请求窗口内无法处理复杂的任务。

场景--轮询(round-robin)

说明:P 代表生产者,Queue为队列, C 代表 消费者

NewTask.java

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.MessageProperties;

public class NewTask {
    private final static String TASK_QUEUE_NAME = "task_queue";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.88.128");
        factory.setPort(5672);
        factory.setUsername("testacc");
        factory.setPassword("test1234");
        factory.setShutdownTimeout(0);


        try (Connection connection = factory.newConnection();
            Channel channel = connection.createChannel()) {
            channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);

            String[] msgs = {"First message...", "Second message", "Third message...", "Fourth message", "Fifth message..."};
            for (int i = 0; i < msgs.length; i++) {
                channel.basicPublish("", TASK_QUEUE_NAME, // 第一个参数代表交换机名称,设置为空,表示使用默认交换机
                        MessageProperties.PERSISTENT_TEXT_PLAIN,
                        msgs[i].getBytes("UTF-8"));

                System.out.println(" [x] Sent '" + msgs[i] + "'");
            }

        }
    }

}

Worker.java

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

public class Worker {

    private static final String TASK_QUEUE_NAME = "task_queue";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.88.128");
        factory.setPort(5672);
        factory.setUsername("testacc");
        factory.setPassword("test1234");

        final Connection connection = factory.newConnection();
        final Channel channel = connection.createChannel();

        channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null); // 设置第2个参数为True,设置队列为持久化队列
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

//        channel.basicQos(1);

        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");

            System.out.println(" [x] Received '" + message + "'");
            try {
                doWork(message);
            } finally {
                System.out.println(" [x] Done");
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            }
        };
        // 将第二个参数设置为false,即不自动应答,保证消息处理的可靠性
        channel.basicConsume(TASK_QUEUE_NAME, false, deliverCallback, consumerTag -> { });
    }

    private static void doWork(String task) {
        /*模拟执行任务耗时*/
        for (char ch : task.toCharArray()) {
            if (ch == '.') {
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException _ignored) {
                    Thread.currentThread().interrupt();
                }
            }
        }
    }
}

先运行Worker,开启两个Worker进程,然后运行NewTask 五次,查看控制台输出

NewTask运行输出

 [x] Sent 'First message...'
 [x] Sent 'Second message'
 [x] Sent 'Third message...'
 [x] Sent 'Fourth message'
 [x] Sent 'Fifth message...'

第一Worker输出

 [*] Waiting for messages. To exit press CTRL+C
 [x] Received 'First message...'
 [x] Done
 [x] Received 'Third message...'
 [x] Done
 [x] Received 'Fifth message...'
 [x] Done

第二个Worker输出

 [*] Waiting for messages. To exit press CTRL+C
 [x] Received 'Second message'
 [x] Done
 [x] Received 'Fourth message'
 [x] Done

说明:

默认情况下,RabbitMQ将按顺序将每条消息发送给下一个消费者。平均而言,每个消费者都会收到相同数量的消息。这种分发消息的方式称为轮询。

场景-公平分发(Fair dispatch)

你可能已经注意到,分发仍然没有完全按照我们的要求工作。例如,在有两个worker的情况下,当所有奇数消息都很重(消息处理比较耗时),偶数消息都很轻时(消息处理比较简单,不怎么耗时),一个worker会一直很忙,另一个几乎不做任何工作。好吧,RabbitMQ对此一无所知,仍然会均匀地发送消息。
这是因为RabbitMQ只是在消息进入队列时分发消息。它不考虑消费者未确认的消息数量。它只是盲目地将每第n条消息分派给第n个消费者

为了克服这一点,可使用带参数prefetchCount = 1basicQos方法。这告诉RabbitMQ一次不要给一个worker发送多条消息。或者,换句话说,在处理完并确认前一条消息之前,不要向worker发送新消息。取而代之,将消息发送给下一个不忙的worker

int prefetchCount = 1;
channel.basicQos(prefetchCount);

注意队列大小
如果所有的worker都很忙,队列可能会排满。需要密切关注这一点,也许可以增加更多的worker,或者采取其他策略。

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

public class Worker {

    private static final String TASK_QUEUE_NAME = "task_queue";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.88.128");
        factory.setPort(5672);
        factory.setUsername("testacc");
        factory.setPassword("test1234");

        final Connection connection = factory.newConnection();
        final Channel channel = connection.createChannel();

        channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null); // 设置第2个参数为True,设置队列为持久化队列
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        channel.basicQos(1);

        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");

            System.out.println(" [x] Received '" + message + "'");
            try {
                doWork(message);
            } finally {
                System.out.println(" [x] Done");
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            }
        };
        // 将第二个参数设置为false,即不自动应答,保证消息处理的可靠性
        channel.basicConsume(TASK_QUEUE_NAME, false, deliverCallback, consumerTag -> { });
    }

    private static void doWork(String task) {
        /*模拟执行任务耗时*/
        for (char ch : task.toCharArray()) {
            if (ch == '.') {
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException _ignored) {
                    Thread.currentThread().interrupt();
                }
            }
        }
    }
}

先运行Worker,开启两个Worker进程,然后运行NewTask,查看控制台输出

第一Worker输出

 [x] Received 'First message...'
 [x] Done
 [x] Received 'Fourth message'
 [x] Done

第二个Worker输出

 [x] Received 'Second message'
 [x] Done
 [x] Received 'Third message...'
 [x] Done
 [x] Received 'Fifth message...'
 [x] Done

参考链接:

https://www.rabbitmq.com/tutorials/tutorial-two-java

发布和订阅

上节示例中,我们创建了一个工作队列。工作队列背后的假设是,每个任务只传递给一个工作者。本例将实现向多个消费者传递一个信息。这种模式被称为“发布/订阅”。

场景:

说明:P 代表生产者,X 代表 交换机,Q 代表队列

EmitLog.java

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class EmitLog {

    private static final String EXCHANGE_NAME = "logs";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.88.128");
        factory.setPort(5672);
        factory.setUsername("testacc");
        factory.setPassword("test1234");

        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT); // 声明名为 logs,类型为 fanout交换机

            String message = argv.length < 1 ? "info: Hello World!" :
                    String.join(" ", argv);

            channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));
            System.out.println(" [x] Sent '" + message + "'");
        }
    }
}

ReceiveLogs.java

import com.rabbitmq.client.*;

public class ReceiveLogs {
    private static final String EXCHANGE_NAME = "logs";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.88.128");
        factory.setPort(5672);
        factory.setUsername("testacc");
        factory.setPassword("test1234");

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
        // 假设需求是:1,无论何时,消费者连接到RabbitMQ,都需要一个新的空的队列
        // 2,端开消费者时在,自动删除队列
        String queueName = channel.queueDeclare().getQueue(); // channel.queueDeclare() 定义一个非持久,排他的,自动删除的,名称随机生成且保持唯一的队列
        channel.queueBind(queueName, EXCHANGE_NAME, ""); // 绑定队列和交换机,以告知交换机需要发送消息到哪个队列

        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [x] Received '" + message + "'");
        };
        channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
    }
}

先运行 ReceiveLogs(开两个进程),再运行 EmitLog(不带参数运行)

EmitLog 运行输出:

 [x] Sent 'info: Hello World!'

ReceiveLogs运行控制台输出:

 [*] Waiting for messages. To exit press CTRL+C
 [x] Received 'info: Hello World!'

参考链接:

https://www.rabbitmq.com/tutorials/tutorial-three-java

路由

上节示例中,实现了多个消费者订阅所有队列消息,本节示例中,将实现仅订阅消息子集,即订阅部分消息。

直接交换机(Direct exchange)

The routing algorithm behind a direct exchange is simple - a message goes to the queues whose binding key exactly matches the routing key of the message.

上一节示例中实现了将所有消息广播给所有消费者,本节希望在此基础上,以允许根据消息的严重性对其进行过滤,实现不同消费者接收不同级别的日志

上节使用的扇出交换机(fanout),没有太多的灵活性——它只能进行无意识的广播。所以,本节示例中将使用直接交换机(direct)。直接交换机背后的路由算法很简单——队列的绑定键和消息的路由键完全匹配,则将消息进入到该队列。

为了说明这一点,假设有以下设置:

这里,我们可以看到直接交换机X绑定了两个队列。第一个队列用绑定键orange绑定,第二个队列有两个绑定,一个绑定键black,另一个绑定键 green
在这种设置下,使用orange 路由键发布到交换机的消息将被路由到队列Q1。使用路由键为blackgreen的发布的消息件将路由到Q2。所有其他消息都将被丢弃。

多个绑定

使用相同的绑定键绑定多个队列是完全合法的。以下示例中,使用绑定键blackXQ1之间添加绑定。在这种情况下,direct交换机将表现得像fanout交换机,将消息广播到所有匹配的队列。拥有路由键为black的消息将同时发送到Q1Q2

本节示例中实现的场景:

EmitLogDirect.java

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class EmitLogDirect {

    private static final String EXCHANGE_NAME = "direct_logs";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.88.128");
        factory.setPort(5672);
        factory.setUsername("testacc");
        factory.setPassword("test1234");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);

            String severity = getSeverity(argv);
            String message = getMessage(argv);

            channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes("UTF-8"));
            System.out.println(" [x] Sent '" + severity + "':'" + message + "'");
        }
    }

    private static String getSeverity(String[] strings) {
        if (strings.length < 1)
            return "info";
        return strings[0];
    }

    private static String getMessage(String[] strings) {
        if (strings.length < 2)
            return "Hello World!";
        return joinStrings(strings, " ", 1);
    }

    private static String joinStrings(String[] strings, String delimiter, int startIndex) {
        int length = strings.length;
        if (length == 0) return "";
        if (length <= startIndex) return "";
        StringBuilder words = new StringBuilder(strings[startIndex]);
        for (int i = startIndex + 1; i < length; i++) {
            words.append(delimiter).append(strings[i]);
        }
        return words.toString();
    }
}

ReceiveLogsDirect.java

import com.rabbitmq.client.*;

public class ReceiveLogsDirect {

    private static final String EXCHANGE_NAME = "direct_logs";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.88.128");
        factory.setPort(5672);
        factory.setUsername("testacc");
        factory.setPassword("test1234");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
        String queueName = channel.queueDeclare().getQueue();

        if (argv.length < 1) {
            System.err.println("Usage: ReceiveLogsDirect [info] [warning] [error]");
            System.exit(1);
        }

        for (String severity : argv) {
            channel.queueBind(queueName, EXCHANGE_NAME, severity); // 第三个参数为绑定键
        }
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [x] Received '" + delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
        };
        channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {
        });
    }
}

函数说明:

com.rabbitmq.client.AMQP.Queue.BindOk queueBind(String var1, String var2, String var3) throws IOException;
com.rabbitmq.client.AMQP.Queue.BindOk queueBind(String var1, String var2, String var3, Map<String, Object> var4) throws IOException;

说明:

  • var1:队列名称

  • var2: 交换机名称

  • var3:用于绑定交换机和队列的路由键,为了同basic_publish routingKey参数混淆,称之为 绑定建(bindingKey)

  • var4:一些额外参数

先运行ReceiveLogsDirect(开两个进程,分别携带info warning errorwarning error参数运行),再运行EmitLogDirect,查看控制台输出。

第一次运行EmitLogDirect时携带以下参数

error "Run. Run. Or it will explode."

运行ReceiveLogsDirect的两个控制台都输出以下内容

 [*] Waiting for messages. To exit press CTRL+C
 [x] Received 'error':'Run. Run. Or it will explode.'

第2次,去掉运行参数,直接运行运行EmitLogDirect,结果仅带info warning error参数运行ReceiveLogsDirect的控制台增加输出以下内容:

 [x] Received 'info':'Hello World!'

参考链接:

https://www.rabbitmq.com/tutorials/tutorial-four-java

主题

上节示例中,采用了direct交换机,实现了选择性接收消息,虽然有所改进,单仍然有局限性,不能基于多个标准进行路由,比如纪要根据日志严重级别来订阅日志,同时还要根据日志消息产生源订阅日志,为此还需要了解更复杂的主题交换机。

主题交换机(Topic exchange)

发送到主题交换机的消息不能是任意的路由键——它必须是一个由点分隔的单词列表。单词可以是任意的,通常是与消息相关的一些特征。几个有效的路由键示例:"stock.usd.nyse"、“nyse.vmw”、“quick.ornge.rabbit”。如你喜欢,路由键可以包含任意多个单词,但是最多不能超过255个字节。

绑定键也必须采用相同的形式。主题交换机背后的逻辑类似于直接交换机——使用特定路由键发送的消息将被传递到使用匹配绑定键绑定的所有队列。但是,对绑定键来说,有两个重要的特殊情况:

  1. * 可以匹配一个单词。
  2. # 开匹配0个或更多个单次。

用一个例子来解释这一点:

在这个例子中,我们将发送描述动物的消息。消息将使用由三个单词(两点)组成的路由键进行发送。路由键中的第一个单词将描述速度,第二个单词描述颜色,第三个单词描述物种:“..”。

我们创建了三个绑定:Q1用绑定键“*.ornge.*”绑定,Q2用“*.*.rabit”和“lazy.#”绑定。

这些绑定可以概括为:

  1. Q1对所有橙色的动物都感兴趣。
  2. Q2想听听关于兔子的一切,以及关于懒惰动物的一切。

消息路由示例:

  • 带有路由键“quick.ornge.robit”、 "lazy.orange.elephant"的消息将会被发送给所有队列。

  • 带有路由键“quick.orange.fox”的消息将仅被投放入Q1队列中。

  • 带有路由键"lazy.pink.rabbit"的消息的将仅被投放入Q2队列中,且只会放入一次,虽然匹配两个绑定键。

  • 带有路由键"quick.brown.fox orangequick.orange.new.rabbit的消息将不会被投放入任何队列中,会被丢弃。

  • 带有路由键lazy.orange.new.rabbit消息将被投放入Q2队列中

说明:
主题交换j机器功能强大,可以像其他交换一样运行。

  • 当队列使用“#” 绑定键绑定时,它将接收所有消息,而不管路由键如何,就像fanout交换机一样。
  • 当绑定中不使用特殊字符“*”和“#”时,主题交换机的行为就像direct交换机一样。

EmitLogTopic

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class EmitLogTopic {

    private static final String EXCHANGE_NAME = "topic_logs";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.88.128");
        factory.setPort(5672);
        factory.setUsername("testacc");
        factory.setPassword("test1234");

        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {

            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);

            String routingKey = getRouting(argv);
            String message = getMessage(argv);

            channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8"));
            System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'");
        }
    }

    private static String getRouting(String[] strings) {
        if (strings.length < 1)
            return "anonymous.info";
        return strings[0];
    }

    private static String getMessage(String[] strings) {
        if (strings.length < 2)
            return "Hello World!";
        return joinStrings(strings, " ", 1);
    }

    private static String joinStrings(String[] strings, String delimiter, int startIndex) {
        int length = strings.length;
        if (length == 0) return "";
        if (length < startIndex) return "";
        StringBuilder words = new StringBuilder(strings[startIndex]);
        for (int i = startIndex + 1; i < length; i++) {
            words.append(delimiter).append(strings[i]);
        }
        return words.toString();
    }
}

ReceiveLogsTopic

import com.rabbitmq.client.*;

public class ReceiveLogsTopic {

    private static final String EXCHANGE_NAME = "topic_logs";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.88.128");
        factory.setPort(5672);
        factory.setUsername("testacc");
        factory.setPassword("test1234");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
        String queueName = channel.queueDeclare().getQueue();

        if (argv.length < 1) {
            System.err.println("Usage: ReceiveLogsTopic [binding_key]...");
            System.exit(1);
        }

        for (String bindingKey : argv) {
            channel.queueBind(queueName, EXCHANGE_NAME, bindingKey);
        }

        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [x] Received '" + delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
        };
        channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
    }
}

分别使用以下参数先运行ReceiveLogsTopic(开4个进程),再运行EmitLogTopic(带参数"kern.critical" "A critical kernel error"运行),查看控制台输出。

"#"
"kern.*"
"*.critical"
"kern.*" "*.critical"

结果,运行ReceiveLogsTopic的四个控制台输出:

 [*] Waiting for messages. To exit press CTRL+C
 [x] Received 'kern.critical':'A critical kernel error'

运行EmitLogTopic的控制台输出:

 [x] Sent 'kern.critical':'A critical kernel error'

参考链接:https://www.rabbitmq.com/tutorials/tutorial-five-java

参考链接

https://github.com/rabbitmq/rabbitmq-tutorials/tree/main/java