4.3万字详解PHP+RabbitMQ(AMQP协议、通讯架构、6大模式、交换机队列消息持久化、死信队列、延时队列、消息丢失、重复消费、消息应答、消息应答、发布确认、故障转移、不公平分发、优先级、等)
理论(后半部分有实操详解)
哲学思考
- 易经思维:向各国人讲述一种动物叫乌龟,要学很久的各国语言,但是随手画一个乌龟,全世界的人都能看得懂。
- 道家思维:努力没有用(指劳神费心的机械性重复、肢体受累、刻意行为),要用心(深度思考、去感悟、透过现象看本质)才有用。
- 举例:类似中学做不出来的几何题的底层原理:不是不知道xx定理或公式(招式),而是不知道画辅助线的思路(内功)。
- 总结:万事万物、用道家思维思考本质与规律,用易经思维从众多信息中简化模型练出来的内功,叫觉悟(内力),而知识仅仅是外功(招式)。
做研发的,可能距离成功一步之遥,别因为一叶障目而放弃。
消息队列与消息中间件
- 消息队列:一种存储消息的数据结构,消息队列通常是一种先进先出(FIFO)的数据结构,可以确保消息的顺序传递。类比Redis的List,先进先出类比lPush和rPop。
- 消息中间件:是管理消息传递的一种组件。功能包含消息队列。
- 中间件:中间件是指位于客户端(或调用端)和服务端之间的组件,用于协调、存储、接口、管理组件之间的通信,类比卖家与买家之间的快递驿站,不是一定要用,但是最好得有。
AMQP协议
AMQP(Advanced Message Queuing Protocol),是一种用于消息传递协议,类比成HTTP、TCP、UDP这种不同的协议就行。它定义了消息中间件客户端与服务端(买家买家对驿站的沟通)的通信规则(怎么运快递),包括消息格式(什么类型的快递)、消息发布(怎么发快递)、消息订阅(怎么收快递)、队列管理(怎么处理快递)等。
AMQP高效就高效在把通信(物流阶段)能遇到的问题都解决了。
- 异步通信:AMQP支持异步通信,可以让消息发送者和接收者在不同的时间和速度进行消息传递。
- 消息路由:AMQP协议定义了灵活的消息路由规则,可以根据消息内容自动将消息路由到指定的接收者。
- 消息确认:AMQP支持消息确认机制,确保消息被可靠地接收。
- 批量处理:AMQP协议支持批量消息处理,可以同时发送和接收多个消息,减少了网络通信的开销和系统的资源消耗。
RabbitMQ
- 官方文档:https://www.rabbitmq.com/docs
- 极简概括:使用ErLang语言写基于AMQP协议的C/S架构的消息中间件,用于不同组件之间高效的传递数据。
- 解决问题:
- 削峰:卖家一家伙运了几十吨的货(海量请求)、买家没地方存、扛不住了,那就放驿站缓冲一下。
- 解耦:没有驿站中转,快递送过来硬塞给买家,买家不在,这个流程就走不下去(耦合)。
- 异步:卖家只要发货、流程基本走完,剩下的流程交给物流和驿站(中间件),不影响卖家做其它事(非阻塞),买家也一样、快递放驿站、我忙我的。需要时再取(按需订阅)。
- 卷:面试加分项,工作用到减轻负担。只想卷死各位,或者被各位卷死。
- 适用场景
- 流量削峰:当系统抗不出海量的请求的时候,把MQ放置在用户端与业务端之间(强制排队)削去部分峰值流量(Nginx令牌桶和滴水算法、或基于Redis也能实现),这个过程和加锁的缺点差不多,性能会受影响。
- 应用解耦:小型项目用不上,大型项目中,库存、订单、支付、物流等各种模块,为了防止硬关联耦合度大,一节点挂掉其余瘫痪,所以用MQ作为通信的桥梁。
- 异步处理:用的最多,异步发短信,发邮件、导出导出文件、延时任务、自动取消订单、推送通知、回调重试等。
- 助力跳槽涨薪:只想卷死各位,或者被各位卷死。
- 赋能成就感:RabbitMQ都用上了,感觉离架构师不远了(呵呵)。
- 优点:
- 应用场景就是优点,主流。
- 丰富的客户端支持,支持PHP、Java、Python、Ruby、C、Golang、NodeJS、.NET、Objective-C、Swift。
- 缺点:
- 性能下降:流量削峰引起的性能下降。
- 运维成本:多引入一个组件,就要考虑它的运维成本,以及各项配置问题。
- 大数据处理:对于大数据、流数据、日志分析、更适合Kafka,RabbitMQ性能会下降。
- 安装困难:成熟的软件安装不方便,Erlang环境依赖强,其次是官网还把CentOS7的安装去掉了。
- 同类产品:RocketMQ(仅支持Java和C++)、ActiveMQ(后期停止维护)、Kafka(大数据场景)、基于Redis实现的任务队列(轻量级使用)、编程语言框架提供的支持(Laravel Queue)、AWSMQ(亚马逊)、ApsaraMQ(阿里巴巴)、PulsarMQ(Apache)。
工作架构图
- Producer:生产者,类比卖家。
- Connection:客户端与服务端的通信层。
- Channel:这里叫信道,类比要发那个快递,或从那家快递取货。
- Broker:用于接受和分发消息的处理,这是MQ Server要干的事,类比驿站。
- Exchange:交换机(不是switch),指定规则路由到那个队列,类比分配到那个货架的方法。
- Consumer:消费者,类比买家。
- 补充:每次访问RabbitMQ都需要建立一个连接,海量的请求对于这块的开销是绝大的,所以需要在channel与connection内部建立逻辑连接,从而减少性能损耗。
- 多租户设计:每个Block里面可以多个Vhost,Vhost里面,可以有多个Exchange(交换机),每个Exchange可以有多个Queue。
四大概念的通俗理解
- 生产者:顾名思义生产数据的角色,类比卖家发快递。
- 消费者:生产者产出的数据,给消费者使用,类比买家收快递。
- 交换机:,用于接受生产者的消息,通过指定模式(4大模式)和路由键(交换机与队列绑定标识符)分发给队列,一个交换机可绑定多个队列,将消息路由到多个队列,类比快递驿站的分发到那个货架。
- 队列:一种数据结构,存放消息队列,类比快递驿站的货架。
- 流程:生产者(发快递)->交换机(驿站怎么分类)->队列(驿站怎么存)->消费者(拿快递)。
Exchange(交换机)
- 极简概括:位于生产者和队列之间,用于接受生产者的消息,并分发给队列,一个交换机可绑定多个队列,将消息路由到多个队列。
- 解决问题:生产者发一条消息,让所有或指定消费者能够收到(类似广播)。如果没有交换机机制,只会有一个消费者能收到此消息。
- 交换机4大类型:直接(direct)类型、主题(topic)类型、头(headers)类型、扇出(fanout)类型(下文有详解)。
- 补充:在常规模式,工作队列模式(生产者端,basic_publish方法参数2为空字符串),也有一个默认类型交换机,或称之为无名Exchange。
Routing Key(路由键)
- 极简概括:交换机绑定队列的标识符,一个交换机,可以有多个Routing key。
- 解决问题:起个名用于区分,方便对不同队列进行不同的操作,就像MySQL表id作用一样。
死信
无法被消费的消息。
死信队列
- 极简概括:队列中的消息无法被消费,若没有后续的处理,就成了死信队列。
- 解决问题:将消费异常的数据放入死信队列,用于存储消费失败或者异常时的情况,确保失败的消息能够得到适当的处理(重试或由开发者调试查看用)。可以简单的理解为找个地方存失败的消费任务。也可将计就计,利用某些特性作为延时队列使用。
- 产生原因:
- 消息TTL过期。
- 队列满了。
- 消息被拒绝(basic reject 或basic nack),并且requeue为false。
有Redis List去实现消息队列,为什么要RabbitMQ?
- 持久化问题:RabbitMQ支持持久化,Redis虽然也支持持久化,但只要不是每次操作都持久化,那么就有丢失数据的风险。
- 消息应答问题:消息处理成功与失败,Redis用队列无法记录,任务消息只会取一个少一个,而RabbitMQ可以。
- 故障转移问题:Redis哨兵机制、主从复制,是针对缓存高可用,做消息中间件有局限性。RabbitMQ支持消息重新入队。如果某个消费者由于某些原因失去连接,导致消息未发送ACK确认,那么RabbitMQ有让消息重新排队的机制,如果此时其它消费者可以处理,那就让其它消费者处理。
- 支持消息优先级:RabbitMQ支持消息优先级,而Redis不支持。
- 广播支持:RabbitMQ支持广播,等指定队列发送,而Redis不支持。
- 路由转发:RabbitMQ通过交换机机制,支持设定不同的分发队列规则,满足各个场景,而Redis List需要手动实现这块内部机制。
有Redis Sorted Set、或者过期监听去实现延时队列,为什么要RabbitMQ?
RabbitMQ是推模式还是拉模式?
都有,生产者发数据到MQ是推,消费者消费消息是拉。
通信方案选择Push还是Pull?
推或拉是两种通信的方向选择,跟MQ无关,但是类似MQ,顺便提一下。
个人认为:
- 看业务场景,抛开业务场景谈架构都是耍流氓
- 要求实时性的,就选择推送,轮询耗费网络资源,调用端或客户端每次请求,服务端都得执行一次,尤其是并发量大或者响应流程任务重的场景。
- 不需要实时性的,就拉取,减低耦合,服务端就纯粹的产生服务端的数据就行,客户端或调用端谁想拉取让它自己来。
- 看改动开销
- 保证工程质量的前提下,那种方式开销小,技术老大说用哪个,或者同事们习惯那一种,就用哪一种。
- 看调用端可信任性。
- 各种鉴权,验证是一方面,数据传输也是一方面,对于不信任的平台,白推送的数据,与对方直接请求获取。还是有区别的。
RabbitMQ可以直连队列吗?
RabbitMQ内部不可以直连队列,但是操作上可以直连队列。
就算是常规(Hello World)模式,没有声明交换机,也会经过一个默认交换机。
不过这样丧失了交换机灵活的路由分发功能,适用于简单的场景。
实操
安装
Docker安装
docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3.13-management
浏览器访问:http://192.168.xxx.xxx:15672
普通安装
CentOS7的安装RabbitMQ的教程,已经被官网删除了,支持CentOS8,CentOS需要借助外力。
安装Erlang
curl -s https://packagecloud.io/install/repositories/rabbitmq/erlang/script.rpm.sh | sudo bash
yum -y install erlang
安装RabbitMQ
curl -s https://packagecloud.io/install/repositories/rabbitmq/rabbitmq-server/script.rpm.sh | sudo bash
yum install rabbitmq-server
开启服务,并设置开机自启
systemctl start rabbitmq-server
systemctl enable rabbitmq-server
检查状态
systemctl status rabbitmq-server
启动网页端控制台
rabbitmq-plugins enable rabbitmq_management
开启防火墙
firewall-cmd --zone=public --add-port=80/tcp --permanent
systemctl restart firewalld
新建网页端登录用户,并配置角色与权限
rabbitmqctl add_user admin 12345678
rabbitmqctl set_user_tags admin administrator
rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"
rabbitmqctl set_permissions -p <virtual_host> <username> <configure_permission> <write_permission> <read_permission>
-p <virtual_host>:指定虚拟主机的名称,例如/(默认虚拟主机)。
<username>:要为其设置权限的用户名。
<configure_permission>:配置权限的正则表达式。允许用户对队列、交换机等进行配置。
<write_permission>:写权限的正则表达式。允许用户发布消息。
<read_permission>:读权限的正则表达式。允许用户获取消息。
浏览器访问
http://192.168.xxx.xxx:15672
用户名admin,密码12345678
命令行常用命令
systemctl start/stop/restart/status/enable rabbitmq-server # RabbitMQ Server开启、关停、重启、状态查看、开机自启
rabbitmq-plugins enable 插件名 # RabbitMQ Server安装插件
rabbitmq-plugins list # 插件列表
rabbitmqctl version # 查看RabbitMQ Server版本
rabbitmqctl list_exchanges # 查看交换机列表
rabbitmqctl list_queues # 查看队列列表
rabbitmqctl list_bindings # 查看绑定列表
PHP实现RabbitMQ Client
- RabbitMQ6大模式官方教程:https://www.rabbitmq.com/tutorials
- 官方扩展(不用):https://pecl.php.net/package/amqp/1.11.0/windows
这个扩展官方下载地址有最后一版Windows系统的php_amqp.dll的下载地址,(用Windows是为了方便,在CentOS上还需要编译,改完PHP代码每次需要重新上传,不想费事),但是我使用报错,所以废弃了。 - 官方推荐:composer require php-amqplib/php-amqplib
PHP操作RabbitMQ思路并不复杂,有6种工作模式,翻来覆去就是研究消息怎么发,发到哪里,怎么处理消息的问题。 - Laravel框架:可以使用php-amqplib/php-amqplib,去操作,也可以使用现成的laravel-queue-rabbitmq去操作。composer require vladimir-yuldashev/laravel-queue-rabbitmq
消费者常驻进程偏好
while (count($channel->callbacks)) {
$channel->wait();
}
//或者
$channel->consume();
常规模式(Hello World!)
- 极简概括:生产者生产消息给消费者。
- 解决问题:跨进程,或跨组件、跨网络通信,适用与两个角色,但是一个PHP进程无法完成的时候用。
- 备注:用MySQL、Redis也能实现。
- 生产者端代码:
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
//初始化连接
$connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
//初始化信道
$channel = $connection->channel();
/*
参数1:队列名
参数2:在声明队列时指定是否启用passively模式,passively模式用于检查队列是否存在,而不是实际创建一个新队列。如果队列不存在,则会返回一个通知,而不会创建新队列。
参数3:指定队列的持久性。在这里,它是false,表示队列不是持久的。如果设置为true,则队列将在服务器重启时或宕机后保留下来。
参数4:指定队列的排他性。如果设置为 true,则该队列只能被声明它的连接使用,一般用于临时队列。false表示队列不是排它的。
参数5:指定队列的自动删除,如果设置为 true,则在队列不再被使用时将自动删除。在这里,它是 false,表示队列不会自动删除。
*/
$channel->queue_declare('hello', false, false, false, false);
//编辑消息
$msg = new AMQPMessage('Hello World!');
//发送消息,交换机用不上,所以留空。这方法没有返回值
$channel->basic_publish($msg, '', 'hello');
//用完了就关闭,释放资源
$channel->close();
$connection->close();
- 消费者端代码
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
//初始化连接
$connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
//初始化信道
$channel = $connection->channel();
/*
参数1:队列名
参数2:在声明队列时指定是否启用passively模式,passively模式用于检查队列是否存在,而不是实际创建一个新队列。如果队列不存在,则会返回一个通知,而不会创建新队列。
参数3:指定队列的持久性。在这里,它是false,表示队列不是持久的。如果设置为true,则队列将在服务器重启后保留下来。
参数4:指定队列的排他性。如果设置为 true,则该队列只能被声明它的连接使用,一般用于临时队列。false表示队列不是排它的。
参数5:指定队列的自动删除,如果设置为 true,则在队列不再被使用时将自动删除。在这里,它是 false,表示队列不会自动删除。
*/
$channel->queue_declare('hello', false, false, false, false);
/*
参数1:队列名称
参数2:这是消费者标签(consumer tag),用于唯一标识消费者。在这里,它是空字符串,表示不为消费者指定任何特定的标签。
参数3:如果设置了无本地字段,则服务器将不会向发布消息的连接发送消息。
参数4:是指定是否自动确认消息(auto-ack)。设置为true则表示消费者在接收到消息后会立即确认消息,RabbitMQ将会将消息标记为已处理并从队列中删除。false表示消费者会手动确认消息,即在处理消息后,通过调用 $channel->basic_ack($deliveryTag) 手动确认消息。
参数5:指定是否独占消费者。如果设置为true,则表示只有当前连接能够使用该消费者。在这里,它是true,表示只有当前连接可以使用这个消费者。
参数6:如果设置了,服务器将不会对该方法作出响应。客户端不应等待答复方法。如果服务器无法完成该方法,它将引发通道或连接异常。
参数7:回调参数,拿到数据怎样处理。
*/
$channel->basic_consume('hello', '', false, true, false, false, function ($msg) {
echo $msg->body;
});
//通过死循环持久化当前进程,实时消费
$channel->consume();
工作队列模式(Work Queues)
- 极简概括:类比Redis的lPush、Rpop。但是RabbitMQ可以针对一个队列有多个消费者,但一条消息,只能被一个消费者消费一次,不能多次消费。为了避免消费者处理数据倾斜问题(有的队列处理任务多,有的处理的少),所以使用了轮询的方式,挨个处理任务。
- 解决问题:耗时且不需要串行执行的任务,可以丢给队列,例如发短信、邮件、大量数据导入导出。
- 测试普通用法
生产者代码,在cli模式下,依次输入1~10,执行10次
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
$channel = $connection->channel();
$channel->queue_declare('hello', false, false, false, false);
//获取命令行参数
$msg = new AMQPMessage($argv[1]);
$channel->basic_publish($msg, '', 'hello');
$channel->close();
$connection->close();
消费者1代码,cli模式下运行,依次返回1、3、5、7、9,可见RabbitMQ不管消费节点处理的时间,只会根据消费者数量轮询处理,哪怕其中任意几个队列任务重,其它队列任务轻松。
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
$channel = $connection->channel();
$channel->basic_consume('hello', '', false, true, false, false,
function ($msg) {
echo "收到消息,内容为{$msg->getBody()}\n";
sleep(5);
echo "成功处理消息\n";
}
);
$channel->consume();
消费者2代码,(与消费者1代码唯一不同的,就是sleep函数的时间),cli模式下运行,依次返回2、4、6、8、10,可见RabbitMQ不管消费节点处理的时间,只会根据消费者数量轮询处理,哪怕其中任意几个队列任务重,其它队列任务轻松。
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
$channel = $connection->channel();
$channel->basic_consume('hello', '', false, true, false, false,
function ($msg) {
echo "收到消息,内容为{$msg->getBody()}\n";
sleep(10);
echo "成功处理消息\n";
}
);
$channel->consume();
发布订阅模式(Pub/Sub)
- 极简概括:生产者生产数据,所有队列关联的消费者都接收(类似广播),使用交换机的扇出(Fanout)模式实现。
- 解决问题:聊天室的功能,可以由它实现。
生产者代码:
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
$channel = $connection->channel();
/**
参数1:交换机名称。
参数2:交换机类型,这里是扇出。
参数3:当passive参数设置为true时,表示不会实际创建新的交换机或队列,而是用来检查已经存在的交换机或队列是否已经存在。如果存在,则返回成功,如果不存在,则返回失败。passive参数主要用于检查交换机或队列是否存在,而不是实际创建新的实体
参数4:交换机是否持久化,即当RabbitMQ服务器重启时,交换机会不会被重新创建。
参数5:当所有绑定的队列都与交换机解绑后,是否自动删除交换机。
*/
$channel->exchange_declare('fanout_test', 'fanout', false, false, false);
$msg = new AMQPMessage('扇出测试');
//发送给指定的交换机
$channel->basic_publish($msg, 'fanout_test');
$channel->close();
$connection->close();
消费者1代码:
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
$connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
$channel = $connection->channel();
//交换机初始化
$channel->exchange_declare('fanout_test', 'fanout', false, false, false);
//创建临时队列,用于接受队列的消息
$queue_info = $channel->queue_declare("", false, false, true, false);
$queue_name = $queue_info[0];
//队列绑定指定的交换机
$channel->queue_bind($queue_name, 'fanout_test');
$callback = function ($msg) {
echo $msg->getBody();
};
$channel->basic_consume($queue_name, '', false, true, false, false, $callback);
$channel->consume();
$channel->close();
$connection->close();
消费者2代码(同1):
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
$connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
$channel = $connection->channel();
//交换机初始化
$channel->exchange_declare('fanout_test', 'fanout', false, false, false);
//创建临时队列,用于接受队列的消息
$queue_info = $channel->queue_declare("", false, false, true, false);
$queue_name = $queue_info[0];
//队列绑定指定的交换机
$channel->queue_bind($queue_name, 'fanout_test');
$callback = function ($msg) {
echo $msg->getBody();
};
$channel->basic_consume($queue_name, '', false, true, false, false, $callback);
$channel->consume();
$channel->close();
$connection->close();
路由模式(Routing)
- 极简概括:通过路由键,将消息发送给指定的(任意数量)消费者,一个消费者可配置多个路由键。
- 解决问题:只发送部分消费者。
生产端代码,只让消费者1消费。
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
$channel = $connection->channel();
$channel->exchange_declare('direct_test', 'direct', false, false, false);
$msg = new AMQPMessage('扇出测试');
//发送给指定的交换机,并指定路由键
$channel->basic_publish($msg, 'direct_test', 'consumer1');
$channel->close();
$connection->close();
消费者1代码
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
$connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
$channel = $connection->channel();
//交换机初始化
$channel->exchange_declare('direct_test', 'direct', false, false, false);
//创建临时队列,用于接受队列的消息
$queue_info = $channel->queue_declare("", false, false, true, false);
$queue_name = $queue_info[0];
//队列绑定指定的交换机,并声明路由键
$channel->queue_bind($queue_name, 'direct_test', 'consumer1');
$callback = function ($msg) {
echo $msg->getBody();
};
$channel->basic_consume($queue_name, '', false, true, false, false, $callback);
$channel->consume();
$channel->close();
$connection->close();
消费者2代码
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
$connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
$channel = $connection->channel();
//交换机初始化
$channel->exchange_declare('direct_test', 'direct', false, false, false);
//创建临时队列,用于接受队列的消息
$queue_info = $channel->queue_declare("", false, false, true, false);
$queue_name = $queue_info[0];
//队列绑定指定的交换机,并声明路由键
$channel->queue_bind($queue_name, 'direct_test', 'consumer2');
$callback = function ($msg) {
echo $msg->getBody();
};
$channel->basic_consume($queue_name, '', false, true, false, false, $callback);
$channel->consume();
$channel->close();
$connection->close();
主题模式(Topics)
- 极简概括:生产者获取消息,编写指定的匹配规则,使被匹配的队列的消费者能够消费消息。
- 解决问题:实现更加灵活的,消息分发模式。
- 备注:主题模式下的路由键,多个标识用多个点将其分开(例如aaa.bbb.ccc),最长不超过255个字符。支持通配符,*匹配一个标识(奇葩设计,和#就不应该反过来吗),#匹配任意个标识。
通配符用法*.*.xxx,表示xxx结尾的路由键,*.xxx.*表示包含xxx的路由键。a.#可以匹配a,也可以匹配,a.b。
对应的,当一个队列绑定的键是#,那就类似于fanout模式,如果一个队列中没有任何通配符,那就类似于direct模式。
生产者代码
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
$channel = $connection->channel();
//声明队列类型为主题
$channel->exchange_declare('topic_test', 'topic', false, false, false);
$msg = new AMQPMessage('topic测试数据');
/*
以下路由键可以接受到消息
a.b
a.*.*
a.*.*.*
#.z
z
a.x.y.z
abc.z
*/
$arr = ['a.b.c', 'aa.bb.cc', 'a.b.c.d', 'a.b', 'a.*.*', 'a.*.*.*', '#.z', 'x', 'y', 'z', 'a', 'ab', 'ac', 'ad','a.x.y.z', 'abc.z'];
foreach($arr as $v) {
$channel->basic_publish($msg, 'topic_logs', $v);
}
$channel->close();
$connection->close();
消费者代码
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
$connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
$channel = $connection->channel();
//声明topic模式的交换机
$channel->exchange_declare('topic_test', 'topic', false, false, false);
//创建临时队列
$queue_name = $channel->queue_declare("", false, false, true, false)[0];
$binding_keys = ['a.b.c', 'aa.bb.cc', 'a.b.c.d', 'a.b', 'a.*.*', 'a.*.*.*', '#.z'];
//绑定多个路由键
foreach ($binding_keys as $binding_key) {
$channel->queue_bind($queue_name, 'topic_logs', $binding_key);
}
$callback = function ($msg) {
echo 'RoutingKey:', $msg->getRoutingKey(), ' --- Msg:', $msg->getBody(), "\n";
};
$channel->basic_consume($queue_name, '', false, true, false, false, $callback);
try {
$channel->consume();
} catch (\Throwable $exception) {
echo $exception->getMessage();
}
$channel->close();
$connection->close();
远程过程调用模式(RPC)
- 极简概括:(对于PHP,RPC用的不多,中间件RPC用的更少)使得客户端可以像调用本地方法一样调用远程方法,同时隐藏了底层网络通信细节。
调用端代码
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
class RpcClient {
private $connection;
private $channel;
private $queue_name;
private $response;
private $corr_id;
public function __construct() {
$this->connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
$this->channel = $this->connection->channel();
$this->queue_name = $this->channel->queue_declare("", false, false, true, false)[0];
$this->channel->basic_consume($this->queue_name, '', false, true, false, false, array($this, 'onResponse'));
}
public function onResponse($rep) {
if ($rep->get('correlation_id') == $this->corr_id) {
$this->response = $rep->body;
}
}
public function call($str) {
$this->response = null;
$this->corr_id = uniqid();
$this->channel->basic_publish(new AMQPMessage($str, [
'correlation_id' => $this->corr_id,
'reply_to' => $this->queue_name
]), '', 'rpc_queue');
while (! $this->response) {
$this->channel->wait();
}
return $this->response;
}
}
$fibonacci_rpc = new RpcClient();
$response = $fibonacci_rpc->call('客户端向服务端发送数据');
echo $response, "\n";
服务端代码
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
$channel = $connection->channel();
$channel->queue_declare('rpc_queue', false, false, false, false);
$callback = function ($req) {
echo $req->getBody(), "\n";
$msg = new AMQPMessage('服务端成功接收:'. $req->getBody(), ['correlation_id' => $req->get('correlation_id')]);
$req->getChannel()->basic_publish($msg, '', $req->get('reply_to'));
$req->ack();
};
$channel->basic_consume('rpc_queue', '', false, false, false, false, $callback);
$channel->consume();
$channel->close();
$connection->close();
死信队列(TTL过期)
- 极简概括:消息超时未处理,会被放置死信队列中。
- 备注:注释很清楚,这需要先执行这段代码(创建普通与死信,交换机、队列、和路由键),然后再关闭(模拟消费者挂掉,10秒后,此时超时的消息就会存入死信队列,至于死信的队列的数据,是重试,还是给开发者提示,看产品需求)。
- 现象:流程走完后,登录控制台,到Queues选项卡,查看队列列表的Ready列,原先的normal_queue队列由原先的10变成了0(执行一次生产者代码,里面有一个for循环),而dead_queue里面的Ready,由原先的0,变成了10(10个消息全部超时,到了死信队列)。
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Wire\AMQPTable;
$connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
$channel = $connection->channel();
//声明普通交换机名称
$normal_exchange = 'normal_exchange';
//声明死信交换机名称
$dead_exchange = 'dead_exchange';
//声明普通队列名称
$normal_queue = 'normal_queue';
//声明死信队列名称
$dead_queue = 'dead_queue';
//声明普通路由键
$normal_routing_key = 'normal_routing_key';
//声明死信路由键
$dead_routing_key = 'dead_routing_key';
//声明普通交换
$channel->exchange_declare($normal_exchange, 'direct', false, false);
//声明死信交换机
$channel->exchange_declare($dead_exchange, 'direct', false, false);
//配置普通队列异常时,转发给死信队列的荷载参数,就指望着这个普通队列有问题了,才会把消息数据转发到死信队列,转发到那里,肯定是要配置的。
$payload = new AMQPTable();
//设置消息生存时间为15秒
$payload->set('x-message-ttl', 10000);
//定位普通队列出异常了,要转发的交换机
$payload->set('x-dead-letter-exchange', $dead_exchange);
//定位了要转发的交换机还不够,还得知道那个队列,不然交换机不知道路由那个消息到达那个队列
$payload->set('x-dead-letter-routing-key', $dead_routing_key);
//声明普通队,就是等普通队列出问题了,才把数据丢给死信队列,所以普通(注意是普通队列)队列,要额外的配置。
$channel->queue_declare($normal_queue, false, false, false, false, false, $payload);
//声明死信队列,其实死信队列本身是一个普通队列。
$channel->queue_declare($dead_queue, false, false, false, false);
echo '正在等待接受消息...';
//绑定普通交换机与普通队列
$channel->queue_bind($normal_queue, $normal_exchange, $normal_routing_key);
//绑定死信交换机与死信队列
$channel->queue_bind($dead_queue, $dead_exchange, $dead_routing_key);
$channel->basic_consume($normal_queue, '', false, true, false, false, function($msg) {
echo $msg->getBody(), "\n";
});
//常驻进程
$channel->consume();
$channel->close();
$connection->close();
然后再执行生产者代码,模拟发消息。
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
$channel = $connection->channel();
$channel->exchange_declare('normal_exchange', 'direct', false, false, true);
for($i = 0; $i< 10; $i++) {
$msg = new AMQPMessage($i, [
//配置过期时间为10秒,让生产者控制过期时间
'expiration' => '10000'
]);
$channel->basic_publish($msg, 'normal_exchange', 'normal_routing_key');
}
$channel->close();
$connection->close();
极简死信队列消费示例
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
$connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
$channel = $connection->channel();
$callback = function ($msg) {
echo $msg->getBody();
};
$channel->basic_consume('dead_queue', '', false, true, false, false, $callback);
$channel->consume();
$channel->close();
$connection->close();
死信队列(队列达到最大长度放不下)
- 极简概括:生产的消息数量超过了消费者端设置了最大长度,剩余的消息会被放入死信队列。
- 注意:本次测试,需要把原先的队列删除,否则队列长度设置不生效。
- 现象:流程走完后,登录控制台,到Queues选项卡,查看队列列表的Ready列,原先的normal_queue队列由原先的0变成了8(执行一次生产者代码,里面有一个for循环),而dead_queue里面的Ready,由原先的0,变成了2(10 - 8)。
消费者端代码,这一步是为了初始化普通和死信交换机、队列、路由键,并且需要执行后Ctrl + C强制停止,保证生产者生产的消息,不被能正常的消费(不然怎么演示不正常现象时的死信队列?)。
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Wire\AMQPTable;
$connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
$channel = $connection->channel();
//声明普通交换机名称
$normal_exchange = 'normal_exchange';
//声明死信交换机名称
$dead_exchange = 'dead_exchange';
//声明普通队列名称
$normal_queue = 'normal_queue';
//声明死信队列名称
$dead_queue = 'dead_queue';
//声明普通路由键
$normal_routing_key = 'normal_routing_key';
//声明死信路由键
$dead_routing_key = 'dead_routing_key';
//声明普通交换
$channel->exchange_declare($normal_exchange, 'direct', false, false);
//声明死信交换机
$channel->exchange_declare($dead_exchange, 'direct', false, false);
//配置普通队列异常时,转发给死信队列的荷载参数,就指望着这个普通队列有问题了,才会把消息数据转发到死信队列,转发到那里,肯定是要配置的。
$payload = new AMQPTable();
//设置最多存储8条消息
$payload->set('x-max-length', 8);
//定位普通队列出异常了,要转发的交换机
$payload->set('x-dead-letter-exchange', $dead_exchange);
//定位了要转发的交换机还不够,还得知道那个队列,不然交换机不知道路由那个消息到达那个队列
$payload->set('x-dead-letter-routing-key', $dead_routing_key);
//声明普通队,就是等普通队列出问题了,才把数据丢给死信队列,所以普通(注意是普通队列)队列,要额外的配置。
$channel->queue_declare($normal_queue, false, false, false, false, false, $payload);
//声明死信队列,其实死信队列本身是一个普通队列。
$channel->queue_declare($dead_queue, false, false, false, false);
echo '正在等待接受消息...';
//绑定普通交换机与普通队列
$channel->queue_bind($normal_queue, $normal_exchange, $normal_routing_key);
//绑定死信交换机与死信队列
$channel->queue_bind($dead_queue, $dead_exchange, $dead_routing_key);
$channel->basic_consume($normal_queue, '', false, true, false, false, function($msg) {
echo $msg->getBody(), "\n";
});
//常驻进程
$channel->consume();
$channel->close();
$connection->close();
生产者代码
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
$channel = $connection->channel();
$channel->exchange_declare('normal_exchange', 'direct', false, false, true);
for($i = 0; $i< 10; $i++) {
$msg = new AMQPMessage($i, [
//配置过期时间为10秒,让生产者控制过期时间
'expiration' => '10000'
]);
$channel->basic_publish($msg, 'normal_exchange', 'normal_routing_key');
}
$channel->close();
$connection->close();
极简死信队列消费示例
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
$connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
$channel = $connection->channel();
$callback = function ($msg) {
echo $msg->getBody();
};
$channel->basic_consume('dead_queue', '', false, true, false, false, $callback);
$channel->consume();
$channel->close();
$connection->close();
死信队列(消息被拒绝,且requeue为false)
- 极简概括:消费者拒绝了消息,并且将requeue为false,的消息,会被放入死信队列。
- 注意:本次测试,需要把原先的队列删除,避免影响。
- 现象:流程走完后,登录控制台,到Queues选项卡,查看队列列表的Ready列,原先的normal_queue队列由原先的0变成了0(执行一次生产者代码,里面有一个for循环),而dead_queue里面的Ready,由原先的0,变成了3(10 - 7,消费者的代码中,有if判断)。
这次让消费者正常消费代码即可,不用Ctrl + C强制中断,正常接受生产者者的数据。
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Wire\AMQPTable;
$connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
$channel = $connection->channel();
//声明普通交换机名称
$normal_exchange = 'normal_exchange';
//声明死信交换机名称
$dead_exchange = 'dead_exchange';
//声明普通队列名称
$normal_queue = 'normal_queue';
//声明死信队列名称
$dead_queue = 'dead_queue';
//声明普通路由键
$normal_routing_key = 'normal_routing_key';
//声明死信路由键
$dead_routing_key = 'dead_routing_key';
//声明普通交换
$channel->exchange_declare($normal_exchange, 'direct', false, false);
//声明死信交换机
$channel->exchange_declare($dead_exchange, 'direct', false, false);
//配置普通队列异常时,转发给死信队列的荷载参数,就指望着这个普通队列有问题了,才会把消息数据转发到死信队列,转发到那里,肯定是要配置的。
$payload = new AMQPTable();
//定位普通队列出异常了,要转发的交换机
$payload->set('x-dead-letter-exchange', $dead_exchange);
//定位了要转发的交换机还不够,还得知道那个队列,不然交换机不知道路由那个消息到达那个队列
$payload->set('x-dead-letter-routing-key', $dead_routing_key);
//声明普通队,就是等普通队列出问题了,才把数据丢给死信队列,所以普通(注意是普通队列)队列,要额外的配置。
$channel->queue_declare($normal_queue, false, false, false, false, false, $payload);
//声明死信队列,其实死信队列本身是一个普通队列。
$channel->queue_declare($dead_queue, false, false, false, false);
echo '正在等待接受消息...';
//绑定普通交换机与普通队列
$channel->queue_bind($normal_queue, $normal_exchange, $normal_routing_key);
//绑定死信交换机与死信队列
$channel->queue_bind($dead_queue, $dead_exchange, $dead_routing_key);
$channel->basic_consume($normal_queue, '', false, false, false, false, function($msg) {
if($msg->getBody() > 6) {
//手动拒绝消息,不批量,且不让重入队列
$msg->delivery_info['channel']->basic_nack($msg->delivery_info['delivery_tag'], false, false);
} else {
//手动确认消息
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
}
});
//常驻进程
$channel->consume();
$channel->close();
$connection->close();
生产者代码
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
$channel = $connection->channel();
$channel->exchange_declare('normal_exchange', 'direct', false, false, true);
for($i = 0; $i< 10; $i++) {
$msg = new AMQPMessage($i, [
//配置过期时间为10秒,让生产者控制过期时间
'expiration' => '10000'
]);
$channel->basic_publish($msg, 'normal_exchange', 'normal_routing_key');
}
$channel->close();
$connection->close();
极简死信队列消费示例
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
$connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
$channel = $connection->channel();
$callback = function ($msg) {
echo $msg->getBody();
};
$channel->basic_consume('dead_queue', '', false, true, false, false, $callback);
$channel->consume();
$channel->close();
$connection->close();
延时队列(基于TTL过期式的死信队列,不推荐使用)
- 极简概括:生产出来的队列任务,不会立马消费,而是等到指定时间。
- 解决问题:自动确认订单,自动取消未支付订单,等。
- 原理分析:常规的生产和消息链路配置了死信队列的兜底机制,若普通队列因TTL过期,会自动把消息放入死信队列,利用这个特性,可以做出来延迟队列,延时队列的延迟机制,就是普通队列的TTL过期时间,延时任务的处理机制,就是消费死信队列的代码段。因此可以专门选一个死信队列,作为延时队列来用。
- 注意:
- 例如延迟10秒,并不能保证一定会在第10秒被处理,会有小误差(绝大部分业务场景能接受)。
- 延迟消息是排队处理的,第一个延迟10秒,第二个消息延迟5秒,默认情况下,第二个会延迟15秒,因为消息是排队(队列先进先出)处理的,所以这个情况需要优化,这是RabbitMQ死信队列实现延时队列的巨大缺陷。
扩展:利用Redis的sorted set也可以作为延时队列,key为唯一标识符,score为执行的时间的时间戳,val为要执行的序列化代码,用while(true)起一个常驻进程,用于消费当前时间戳下的任务,如果要添加任务,就网Zset中添加数据。常驻进程到时间了,就会消费。
初始化普通、死信交换机、队列、路由键。
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Wire\AMQPTable;
$connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
$channel = $connection->channel();
//声明普通交换机名称
$normal_exchange = 'normal_exchange';
//声明死信交换机名称
$dead_exchange = 'dead_exchange';
//声明普通队列名称
$normal_queue = 'normal_queue';
//声明死信队列名称
$dead_queue = 'dead_queue';
//声明普通路由键
$normal_routing_key = 'normal_routing_key';
//声明死信路由键
$dead_routing_key = 'dead_routing_key';
//声明普通交换
$channel->exchange_declare($normal_exchange, 'direct', false, false);
//声明死信交换机
$channel->exchange_declare($dead_exchange, 'direct', false, false);
//配置普通队列异常时,转发给死信队列的荷载参数,就指望着这个普通队列有问题了,才会把消息数据转发到死信队列,转发到那里,肯定是要配置的。
$payload = new AMQPTable();
//定位普通队列出异常了,要转发的交换机
$payload->set('x-dead-letter-exchange', $dead_exchange);
//定位了要转发的交换机还不够,还得知道那个队列,不然交换机不知道路由那个消息到达那个队列
$payload->set('x-dead-letter-routing-key', $dead_routing_key);
//声明普通队,就是等普通队列出问题了,才把数据丢给死信队列,所以普通(注意是普通队列)队列,要额外的配置。
$channel->queue_declare($normal_queue, false, false, false, false, false, $payload);
//声明死信队列,其实死信队列本身是一个普通队列。
$channel->queue_declare($dead_queue, false, false, false, false);
echo '正在等待接受消息...';
//绑定普通交换机与普通队列
$channel->queue_bind($normal_queue, $normal_exchange, $normal_routing_key);
//绑定死信交换机与死信队列
$channel->queue_bind($dead_queue, $dead_exchange, $dead_routing_key);
$channel->basic_consume($normal_queue, '', false, true, false, false, function($msg) {
});
生产者代码,产生延时任务
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
$channel = $connection->channel();
$channel->exchange_declare('normal_exchange', 'direct', false, false, true);
for($i = 0;$i < 10; $i++) {
$msg = new AMQPMessage(microtime(true), [
'expiration' => '3000'
]);
$channel->basic_publish($msg, 'normal_exchange', 'normal_routing_key');
}
$channel->close();
$connection->close();
延时任务处理代码
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
$connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
$channel = $connection->channel();
$callback = function ($msg) {
$consumer_time = microtime(true);
$product_time = $msg->getBody();
echo '时间处理误差:', bcsub($consumer_time, $product_time, 4), "\n";
};
$channel->basic_consume('dead_queue', '', false, true, false, false, $callback);
$channel->consume();
$channel->close();
$connection->close();
延时队列(基于RabbitMQ延迟交换机插件,推荐使用)
- 官方插件集合地址:https://www.rabbitmq.com/community-plugins
- rabbitmq-delayed-message-exchange版本地址:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases
- 版本匹配:
rabbitmqctl version
发现是3.10.0,rabbitmq-delayed-message-exchange release页也有说明:This release has no functional changes but lists RabbitMQ 3.10.x as supported in plugin metadata. - 实测延时误差在5毫秒以内,满足99.9%的应用场景。
- 安装插件
cd /usr/lib/rabbitmq/lib/rabbitmq_server-3.10.0/plugins
wget https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/3.10.0/rabbitmq_delayed_message_exchange-3.10.0.ez
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
发现如下字样,就说明安装成功。
Enabling plugins on node rabbit@lnmp:
rabbitmq_delayed_message_exchange
The following plugins have been configured:
rabbitmq_delayed_message_exchange
rabbitmq_management
rabbitmq_management_agent
rabbitmq_web_dispatch
Applying plugin configuration to rabbit@lnmp...
The following plugins have been enabled:
rabbitmq_delayed_message_exchange
started 1 plugins.
systemctl restart rabbitmq-server
生产者代码
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;
$connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
$channel = $connection->channel();
$delay_exchange = 'delay_exchange';
$delay_queue = 'delay_queue';
$delay_routing_key = 'delay_routing_key';
//声明交换机类型为direct
$payload = new AMQPTable();
$payload->set('x-delayed-type', 'direct');
//声明延时队列交换机,参数2的类型,是安装插件后才有的,固定值
$channel->exchange_declare($delay_exchange, 'x-delayed-message', false, false, true, false, false, $payload);
//声明一个自定义延迟队列
$channel->queue_declare($delay_queue, false, false, false, false);
//队列绑定交换机
$channel->queue_bind($delay_queue, $delay_exchange, $delay_routing_key);
//发送延迟消息
$msg = new AMQPMessage(microtime(true), [
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
//这里配置超时时间,固定格式。
'application_headers' => new AMQPTable(['x-delay' => 5000])
]);
$channel->basic_publish($msg, $delay_exchange, $delay_routing_key);
$channel->close();
$connection->close();
消费者代码
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
$connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
$channel = $connection->channel();
$callback = function ($msg) {
$consumer_time = microtime(true);
$product_time = $msg->getBody();
echo '时间处理误差:', bcsub($consumer_time, $product_time, 4), "\n";
};
$channel->basic_consume('delay_queue', '', false, true, false, false, $callback);
$channel->consume();
$channel->close();
$connection->close();
延时队列提前执行或延后执行的方案
某些业务场景,可能需要提前触发,或者延期处理,这就需要一些外的操作,才能完成。
队列优先级
- 不同的队列和可以设置不同的优先级。
- 在堆积的情况下才生效,否则生产一个消息处理一个,无阻塞,就不存在多个消息的优先处理的问题。
- 要设置优先级,那么队列和消息都需要设置优先级。
- 优先级越大,越先被处理。
- 支持1到255之间的优先级,强烈建议使用1到5之间的值,更高的优先级值需要更多的CPU和内存资源,因为RabbitMQ需要在内部为每个优先级维护一个子队列,从1到为给定队列配置的最大值。
首先启动生产者代码:
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;
$connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
$channel = $connection->channel();
$test_exchange = 'test_exchange';
$test_queue = 'test_queue';
$test_routing_key = 'test_routing_key';
$channel->exchange_declare($test_exchange, 'direct', false, false, true, false, false);
$payload = new AMQPTable();
$payload->set('x-max-priority', 20);
//声明一个自定义延迟队列
$channel->queue_declare($test_queue, false, false, false, false, false,$payload);
//队列绑定交换机
$channel->queue_bind($test_queue, $test_exchange, $test_routing_key);
for($i =0; $i < 10; $i++) {
$msg = new AMQPMessage($i, [
//为每个消息设置不同的优先级
'priority' => $i,
]);
$channel->basic_publish($msg, $test_exchange, $test_routing_key, true);
}
$channel->close();
$connection->close();
消费者代码:
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
$connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
$channel = $connection->channel();
$callback = function ($msg) {
echo $msg . "\n";
};
$channel->basic_consume('test_queue', '', false, true, false, false, $callback);
$channel->consume();
$channel->close();
$connection->close();
惰性队列
惰性队列用的极少,因为是存储在磁盘中的。在消费者挂掉的情况下,避免RabbitMQ积累了太多的消息,消耗内存去采取的策略。
高可用
性能与高可用权衡的架构思维
加了RabbitMQ组件可以看做是分布式系统,分布式系统有有一个CAP定理,CAP只能同时满足两个。
对于RabbitMQ消息丢失,或者重复消费的问题,若当前需求不能容忍,就需要额外的环节来弥补。如果当前需求(小概率)能容忍,去掉高可用环节,那么在性能上就会提升。
例如支付宝,支付时总是会延迟几秒钟,一是走支付宝安全风控系统,二是要极致要求稳定高可用的环节必然性能下降,三是等支付回调,然而其它接口内容却是瞬间加载。
所以没有最好的架构,只有最合适的架构,所以要结合业务场景,判断业务容忍度下限,与开发复杂度,在高可用与性能之间权衡,这是架构师必备思维。
如何避免RabbitMQ消息丢失?
- 百分99.9%投递策略:事先要处理的消息绑一个唯一标识(让RabbitMQ和MySQL自始至终都能知道是哪一个,避免因业务需求生产两份同样的消息区分不出来),先写入MySQL,并预定一个未消费的缺省状态,直到收到RabbitMQ的确认通知,改变状态。再加一个循环定时任务(定时的周期看业务需求容忍度与组件性能),超时未处理的消息,让定时任务手动触发。
但这种也不是没有弊端,如果RabbitMQ的确认通知由于网络抖动没有发出去,那么定时任务就会让其重复消费。 - 生产消息传输感知:RabbitMQ挂了,生产消息发送给MQ时,就会超时,报错,此时把消息放入MySQL做兜底(可以临时不处理,但是别丢),避免消息丢失。
- RabbitMQ自带的交换机、队列、消息持久化机制。
- RabbitMQ自带的发送确认、消费确认、事务队列机制。
如何防止RabbitMQ重复消费?
- 兜底策略:在数据集层上最好有个兜底策略,如唯一索引,更新数据前的状态机判断等,避免由于程序设计疏忽或者故障等原因导致的重复消费。
- 上游生产防重:消息重复消费,一方面是消息重复发送引起的,一般在接口处理的上游,会有一些防重策略(数据幂等),上游杜绝,也是一种策略。
- 消费端上游判断:首先把要处理的消息绑一个唯一标识(让RabbitMQ和MySQL自始至终都能知道是哪一个,避免因业务需求生产两份同样的消息区分不出来),消费者获取到消息时,再把唯一标识写入具有唯一索引约束的MySQL表,重复消费,MySQL唯一索引就要报错,利用插入失败的特性,阻止重复消费。
RabbitMQ不公平分发
- 不公平分发:推荐操作,避免一个消费者很忙,其它消费者不做任何工作的情况发生,防止压力倾斜。
可在每个消费者端调用basic_consume()方法之前配置,注意这种行为在消息堆积的情况下生效。
被设置不公平分发的消费者中,可从网页端控制台->Channels选项卡->表格->所在行数据的Prefetch中看到。
/*
参数1:预取大小,通常情况下为null,表示不限制消息大小。
参数2:预取数量,表示消费者每次最多接收的消息数量,值为权重比例,可以为任意正整数。
参数3:是否将预取限制应用到channel级别(true)或者消费者级别(false)。
*/
$channel->basic_qos(null, 1, false);
RabbitMQ可靠消息机制(发布确认)
-
极简概括:是用来保证消息不丢失的的重要功能,生产者将消息发送给MQ,MQ把数据保存在磁盘上之后,会返回生产者成功保存的反馈。需要生产者端的队列以及消息持久化的前提,就是为了防止队列或者小写将要持久化的时候RabbitMQ出故障的间隙情况发生。
队列持久化、消息持久化、发布确认3个因素加起来,才能保证消息不丢失。
发布确认模式有3种:- 单个确认:性能最低,生产一条消息确认一个,后面的消息排队。
- 批量确认:性能比单个确认好,但是出问题时,无法定位是那条消息。
- 异步确认:是性能与高可用性权衡的方案(很多服务端组件都有这样的),利用回调函数来达到消息可靠性传递的。
-
单个确认,发送100条数据,耗时0.337秒,大部分场景够用,高并发场景除外。
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
$channel = $connection->channel();
$channel->queue_declare('hello', false, true, false, false);
//开启发布确认模式
$channel->confirm_select();
$start = microtime(true);
for($i = 0; $i < 100; $i++) {
$msg = new AMQPMessage($i, ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]);
$channel->basic_publish($msg, '', 'hello');
//生产一条数据,发送一条确认消息,参数值为超时时间,单位:秒
$channel->wait_for_pending_acks(10.000);
}
echo microtime(true) - $start; // 0.337秒
$channel->close();
$connection->close();
- 批量确认,发送100条数据,耗时0.048秒,时间节省了很多
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
$channel = $connection->channel();
$channel->queue_declare('hello', false, true, false, false);
//开启发布确认模式
$channel->confirm_select();
$start = microtime(true);
for($i = 0; $i < 100; $i++) {
$msg = new AMQPMessage($i, ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]);
$channel->basic_publish($msg, '', 'hello');
}
//生产一条数据,发送一条确认消息,参数值为超时时间,单位:秒
$channel->wait_for_pending_acks(10.000);
echo microtime(true) - $start; // 0.048秒
$channel->close();
$connection->close();
- 异步确认:(推荐使用)生产者会先把消息发送到队列中,MQ会为其自动编号(有了这个编号,方便异步回调时定位失败的消息),并调用broker模块判断是否收到确认回调,由于确认的过程是异步的,所以对性能影响较小。至于回调的内容,怎么操作,重试还是做其它事,根据业务来判定。
异步的操作函数,由set_ack_handle()(消息确认时调用)和set_nack_handler()(消息拒绝确认时调用)函数完成。
本次测试耗时0.012秒。
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
$channel = $connection->channel();
$channel->set_ack_handler(
function (\PhpAmqpLib\Message\AMQPMessage $message){
// echo $message->getBody();
}
);
$channel->set_nack_handler(
function (\PhpAmqpLib\Message\AMQPMessage $message){
// echo $message->getBody();
}
);
$channel->queue_declare('hello', false, true, false, false);
//开启发布确认模式
$channel->confirm_select();
$start = microtime(true);
for($i = 0; $i < 100; $i++) {
$msg = new AMQPMessage($i, ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]);
$channel->basic_publish($msg, '', 'hello');
}
echo microtime(true) - $start; // 0.012秒
$channel->close();
$connection->close();
RabbitMQ可靠消息机制(事务消息)
- 极简概括:用的相对较少,用于确保消息被可靠的发送,但是会牺牲掉一些性能,有事务,往往意味着是多条消息的一次性处理(不然生产一条消费一条,回滚怎么撤回呢),例如发送10个消息,其中n(0<n<=10)个消息发送故障,或者ack异常,则10条全部回滚。
- 用法:tx_select()类比MySQL的begin,tx_commit类比MySQL的commit(),tx_rollback类比MySQL的commit(),可能出现异常情况,所以用try catch包裹。
- 注意:如果消费端设置了自动确认机制,事务会失效,必须为手动确认。
消费者代码,首先启动
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
$connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
$channel = $connection->channel();
$callback = function ($msg) {
echo $msg->getBody() . "\n";
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
};
//手动确认
$channel->basic_consume('test_queue', '', false, false, false, false, $callback);
$channel->consume();
$channel->close();
$connection->close();
先测试生产者,不用事务的情况,看看生产者异常情况下,消息是否会回滚。
命令行提示出异常了,消费者返回012三条消息,说明没有回滚。
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;
$connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
$channel = $connection->channel();
$test_exchange = 'test_exchange';
$test_queue = 'test_queue';
$test_routing_key = 'test_routing_key';
$channel->exchange_declare($test_exchange, 'direct', false, false, true, false, false);
$payload = new AMQPTable();
$payload->set('x-max-priority', 20);
//声明一个自定义延迟队列
$channel->queue_declare($test_queue, false, false, false, false, false,$payload);
//队列绑定交换机
$channel->queue_bind($test_queue, $test_exchange, $test_routing_key);
try {
for($i =0; $i < 10; $i++) {
if($i == 3) {
throw new Exception('测试异常');
}
$msg = new AMQPMessage($i);
$channel->basic_publish($msg, $test_exchange, $test_routing_key, true);
}
echo '成功执行';
} catch(\Exception $e) {
echo '出异常了';
}
$channel->close();
$connection->close();
再测试生产者,用事务的情况,看看生产者异常情况下,消息是否会回滚。
命令行提示出异常了,消费者返回0条消息,说明生产端异常,也会回滚。
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;
$connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
$channel = $connection->channel();
$test_exchange = 'test_exchange';
$test_queue = 'test_queue';
$test_routing_key = 'test_routing_key';
$channel->exchange_declare($test_exchange, 'direct', false, false, true, false, false);
$payload = new AMQPTable();
$payload->set('x-max-priority', 20);
//声明一个自定义延迟队列
$channel->queue_declare($test_queue, false, false, false, false, false,$payload);
//队列绑定交换机
$channel->queue_bind($test_queue, $test_exchange, $test_routing_key);
try {
$channel->tx_select();
for($i =0; $i < 10; $i++) {
if($i == 3) {
throw new Exception('测试异常');
}
$msg = new AMQPMessage($i);
$channel->basic_publish($msg, $test_exchange, $test_routing_key, true);
}
echo '成功执行';
//提交事务
$channel->tx_commit();
} catch(\Exception $e) {
echo '出异常了';
//回滚事务
$channel->tx_rollback();
}
$channel->close();
$connection->close();
RabbitMQ可靠消息机制(消息应答)
- 极简概括: 为了弥补事务消息的性能问题。如果某个消费者在消费数据时挂掉了,(此时肯定是已经获取到了这条数据才去消费的,那么工作队列中的数据就会被删除),就可能导致数据的丢失。为了解决因此问题,RabbitMQ添加了消息应答机制,消费端在消息处理完成后,告诉RabbitMQ这条数据已经被成功的处理,因此RabbitMQ才可以删除这条数据,应答机制又分为两种情况。
- 自动应答:不推荐使用,获取到消息确认应答,这不代表后续流程一定会处理成功。
- 手动应答:推荐使用,告诉MQ消息被成功处理或者失败。
测试手动确认或拒绝应答(无论是确认,或者拒绝,RabbitMQ都任务此消息已经消费过了)
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
$channel = $connection->channel();
$channel->queue_declare('hello', false, false, false, false);
//获取命令行参数
$msg = new AMQPMessage($argv[1]);
$channel->basic_publish($msg, '', 'hello');
$channel->close();
$connection->close();
这里只有消费者1,手动拒绝消息
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
$connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
$channel = $connection->channel();
$channel->basic_consume('hello', '', false, false, false, false,
function ($msg) {
echo "收到消息,内容为{$msg->getBody()}\n";
sleep(5);
echo "拒绝处理\n";
/*
basic_nack和basic_ack参数一致
参数1:string 消息唯一标识符
参数2:bool 是否确认接受或拒绝多个消息
参数3:bool 表示是否重新放入消息队列
*/
$msg->delivery_info['channel']->basic_nack($msg->delivery_info['delivery_tag']);
}
);
$channel->consume();
这里只有消费者1,手动确认消息,basic_nack(拒绝)方法改为basic_ack(同意)方法即可。
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
$connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
$channel = $connection->channel();
$channel->basic_consume('hello', '', false, false, false, false,
function ($msg) {
echo "收到消息,内容为{$msg->getBody()}\n";
sleep(5);
echo "成功处理消息\n";
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
}
);
$channel->consume();
RabbitMQ的故障转移机制
- 消息重新入队的故障转移机制:如果某个消费者由于某些原因失去连接,导致消息未发送ACK确认(默认超时时间为30分钟),那么RabbitMQ有让消息重新排队的机制,如果此时其它消费者可以处理,那就让其它消费者处理。注意:重新排队的机制只能保证消息不丢失,不能保证失去连接的那个消费者没处理这个消息,所以重新排队的故障转移机制,让其他消费者处理的过程,有重复消费的可能,所以要在代码逻辑中,添加一些兜底的机制(例如数据库唯一索引、分布式锁等)。
测试故障转移,生产者生产出一条消息,消费者1会立即捕获这条消息,但是处理时间为5秒钟,如果在这5秒钟内Ctrl + C强行终止,则RabbitMQ立即会让消费者2去消费这条数据,进而保证消息不丢失。
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
$channel = $connection->channel();
$channel->queue_declare('hello', false, false, false, false);
//获取命令行参数
$msg = new AMQPMessage($argv[1]);
$channel->basic_publish($msg, '', 'hello');
$channel->close();
$connection->close();
消费者1代码如下:
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
$connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
$channel = $connection->channel();
$channel->basic_consume('hello', '', false, false, false, false,
function ($msg) {
echo "收到消息,内容为{$msg->getBody()}\n";
sleep(5);
echo "成功处理消息\n";
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
}
);
$channel->consume();
消费者2代码如下
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
$connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
$channel = $connection->channel();
$channel->basic_consume('hello', '', false, false, false, false,
function ($msg) {
echo "收到消息,内容为{$msg->getBody()}\n";
sleep(10);
echo "成功处理消息\n";
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
}
);
$channel->consume();
RabbitMQ交换机、队列、消息持久化
以php-amqplib/php-amqplib为例,
-
交换机持久化:exchange_declare()方法参数4设置为true即可。
-
队列持久化:exchange_declare()方法参数3设置为true即可。
-
消息持久化:AMQPMessage()对象参数2添加['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]即可。
-
对于设置为持久化存储的情况:RabbitMQ会开启一个进程,采用追加写的方式,对数据进行实时持久化存储。
-
对于设置为非持久化存储的:RabbitMQ会开启一个进程,当内存不够时,采用追加写的方式,对数据进行实时持久化存储。
-
存储方式:持久化存储时,RabbitMQ会内部维护一张ETS的表,用于记录消息的(id、偏移量、有效数据、左边文件、右边文件的元数据)。
-
注意:如果一个队列已经存在且没有配置持久化,若再次配置持久化,会报错。如果一个队列持久化了,在网页端控制台->queues->队列堆在表格->所在队列的Features项,会显示D。
将消息标记为持久化并不能完全保证消息不会丢失。虽然它告诉RabbitMQ将消息保存到磁盘,但是当RabbitMQ已经设置为持久化,不能完全保证不会丢失消息,接受了一个消息但还没有保存它时,仍然有一个很短的时间空隙,这个间隙出问题了,还是有丢失的可能。
集成到Laravel
- 方案1:可以集成composer require php-amqplib/php-amqplib到框架,将主机端口用户名密码放入配置,php artisan make:command xxx,然后去运行它,优点是用法更贴近原生,缺点是少了Laravel Queue自带的优雅特性。
- 方案2:换Laravel RabbitMQ的包,这样更方便的使用Laravel优雅的Queue用法,不用关注中间件的底层实现。
亲测非延时队列和延时队列都可正常使用,延时队列不会造成像死信队列实现延时队列的(第一个延时5秒,第二个延迟10秒,结果第一个5秒执行,第二个15秒执行)那样消息堆积的情况。
composer require vladimir-yuldashev/laravel-queue-rabbitmq
vim laravel/config/queue.php,添加以下配置,并修改'driver' => 'rabbitmq'。
'rabbitmq' => [
'driver' => 'rabbitmq',
'hosts' => [
[
'host' => '192.168.0.180',
'port' => '5672',
'user' => 'admin',
'password' => '12345678',
'vhost' => '/',
]
],
'lazy' => false,
'options' => [
'queue' => [
'prioritize_delayed' => false,
'queue_max_priority' => 10,
],
],
],
集群与监控
大部分公司仍旧用的是框架自带的基于Redis实现的队列或延时队列,RabbitMQ都很少用,更别说用到集群。
集群方面的,小公司用不上,大公司有专门的运维。
后期更新……