php-mqtt/client 发布、订阅

发布于:2025-07-02 ⋅ 阅读:(38) ⋅ 点赞:(0)

一、环境要求

        php8.1 + composer


二、安装扩展

composer require php-mqtt/client

三、php-mqtt/client 与 Bluerhinos/phpMQTT 对比

特性 php-mqtt/client Bluerhinos/phpMQTT
开发语言 PHP 原生实现 PHP 原生实现
依赖扩展 需 PHP 7.4.21+,推荐 Swoole/Workerman 仅需 PHP socket 扩展
协议支持 MQTT 3.1/3.1.1 MQTT 3.1
异步支持 需配合 Swoole/Workerman 实现异步 同步阻塞模式
  • php-mqtt/client

    • 提供更现代的 API 设计,支持 Composer 一键安装。
    • 适合复杂场景(如物联网应用),支持 QoS 等级和遗嘱消息。
    • 文档详细,社区活跃度较高。
  • Bluerhinos/phpMQTT

    • 轻量级,代码简洁,适合快速集成。
    • 仅支持基础发布/订阅功能,缺少高级特性(如自动重连)。
    • 需手动管理连接和消息循环。

四、使用与demo

        导入 composer autoload 文件和 php-mqtt/client
require('vendor/autoload.php');

use \PhpMqtt\Client\MqttClient;

这里写一个简单的demo提供参考


<?php
require __DIR__ . '/vendor/autoload.php';
use PhpMqtt\Client\MqttClient;
use PhpMqtt\Client\ConnectionSettings;

class MqttService {
    private static $instance;
    private $mqtt;
    private $config = [
        'host' => 'broker.emqx.io',
        'port' => 1883,
        'client_id' => 'global_mqtt_client',
        'username' => null,
        'password' => null,
        'keepalive' => 60
    ];

    private function __construct() {
        $this->initConnection();
    }

    public static function getInstance(): self {
        if (!isset(self::$instance)) {
            self::$instance = new self();
        }
        return self::$instance;
    }

    private function initConnection(): void {
        $this->mqtt = new MqttClient(
            $this->config['host'],
            $this->config['port'],
            $this->config['client_id']
        );

        $settings = (new ConnectionSettings())
            ->setUsername($this->config['username'])
            ->setPassword($this->config['password'])
            ->setKeepAliveInterval($this->config['keepalive']);

        $this->mqtt->connect($settings, true);
    }

    public function publish(string $topic, string $message, int $qos = 0): void {
        $this->mqtt->publish($topic, $message, $qos);
    }

    public function subscribe(string $topic, callable $callback, int $qos = 0): void {
        $this->mqtt->subscribe($topic, $callback, $qos);
        $this->mqtt->loop(false);
    }

    public function __destruct() {
        if ($this->mqtt->isConnected()) {
            $this->mqtt->disconnect();
        }
    }
}

该方案通过单例模式确保全局唯一连接,支持自动重连和连接池管理


require 'MqttService.php';

// 全局调用示例
$mqtt = MqttService::getInstance();

// 发布消息
$mqtt->publish('global/topic', 'Global message');

// 订阅消息(需保持进程运行)
$mqtt->subscribe('global/topic', function($topic, $msg) {
    echo "Received: $msg\n";
});

打完收工。  


网站公告

今日签到

点亮在社区的每一天
去签到