首先确保安装好了Rabbitmq服务器。
1.新建一个空白php项目,安装php客户端库:
composer require php-amqplib/php-amqplib
2.生产者
然后添加生产者代码 (producer.php)
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
// 创建到RabbitMQ服务器的连接[1,2](@ref)
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
// 声明一个队列,如果不存在则创建[1,2](@ref)
$queueName = 'hello';
$channel->queue_declare($queueName, false, false, false, false);
// 创建消息内容
$data = "Hello RabbitMQ! 时间: " . date('Y-m-d H:i:s');
$msg = new AMQPMessage($data);
// 发送消息到队列[1,2](@ref)
$channel->basic_publish($msg, '', $queueName);
echo " [x] 发送消息: '$data'\n";
// 关闭连接[1,2](@ref)
$channel->close();
$connection->close();
运行生产者:php producer.php
3.消费者
添加consumer.php。
消费者监听并处理队列中的消息:
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
// 创建到RabbitMQ服务器的连接[1,2](@ref)
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
// 声明同一个队列[1,2](@ref)
$queueName = 'hello';
$channel->queue_declare($queueName, false, false, false, false);
echo " [*] 等待消息中. 按 CTRL+C 退出\n";
// 定义处理消息的回调函数[1,2](@ref)
$callback = function ($msg) {
echo " [x] 收到消息: ", $msg->body, "\n";
};
// 开始消费队列中的消息[1,2](@ref)
$channel->basic_consume($queueName, '', false, true, false, false, $callback);
// 持续监听消息[1,2](@ref)
while ($channel->is_consuming()) {
$channel->wait();
}
// 关闭连接(通常不会执行到这里)[1,2](@ref)
$channel->close();
$connection->close();
运行消费者:php consumer.php
4. 运行说明
- 确保 RabbitMQ 服务已启动
- 先运行消费者:在终端执行
php consumer.php
,它会持续运行并等待消息 - 再运行生产者:另开终端执行
php producer.php
,发送消息 - 观察消费者终端,会立即显示收到的消息
也可以在rabbitmq管理页面发送消息:
如果没有消费者,就是不运行consumer.php, 发布消息之后,可以获取到消息 Get Message(s),
否则就是 Queue is empty。
5. 关键点说明
- •队列声明:生产者和消费者都要声明相同的队列
- •消息确认:本例使用自动确认模式(
true
参数),消息被接收后自动从队列删除 - •持久化:如需消息持久化,需设置队列和消息的
durable
属性 - •连接参数:根据实际修改主机、端口、用户名和密码
这是一个最基础的 RabbitMQ 使用示例。在实际项目中,你可能需要添加错误处理、消息持久化、手动确认等更多功能。