Android 消息队列之MQTT的使用:物联网通讯,HTTP太重了,使用MQTT;断网重连、注册、订阅、发送数据和接受数据,实现双向通讯。

发布于:2024-12-09 ⋅ 阅读:(132) ⋅ 点赞:(0)

目录:

  1. 问题
  2. MQTT是什么以及为什么使用
  3. 如何使用:第一阶段、基础功能
  4. 如何使用:第二阶段、增加断网重连
  5. 如何使用:第三阶段、封装

在这里插入图片描述


一、问题

在开发的时候,我们一般都使用Http和后台进行通讯,比如我们是开发物联网的,设备会有很多数据需要频繁发给后台,使用Http来做这件事情,就感觉很重,比如会遇到如下这些问题:

  1. 开发成本:需要后台创建接口,前台去请求。
  2. 连接数过多:在HTTP协议中,每次请求都需要建立一个新的连接,这可能导致连接数过多,特别是在高并发场景下。对于自动售卖机来说,如果同时有大量的用户进行交互,可能会导致服务器资源紧张,影响性能。
  3. 开销较大:HTTP协议的消息头部相对复杂,包含了大量的元数据,这增加了网络传输的开销。
  4. 实时性较差:HTTP协议是基于请求-响应模式的,需要客户端主动发起请求才能获取数据。这导致在实时性要求较高的场景下,HTTP可能无法满足需求。也就是服务器不能主动发数据给客户端。

基于这样的背景,本来想使用Rabbit MQ,但是不能双向通讯,我们选择切换成MQTT。


二、MQTT是什么以及为什么使用

MQTT(Message Queuing Telemetry Transport)是一个轻量级的发布/订阅消息协议,它构建于TCP/IP协议之上,为小型设备提供了稳定的网络通讯。MQTT协议设计简单,易于实现,非常适合在物联网(IoT)和移动应用中使用。

你会发现传递的数据量是根据你的内容来决定。

能干吗:

1、实时通讯:MQTT支持异步通讯模式,客户端可以通过订阅主题来接收感兴趣的消息,而不需要主动请求。这使得MQTT非常适合实时通讯和事件驱动的应用场景。
2、低开销:MQTT协议的数据包开销非常小,消息头部仅需2字节,非常适合网络带宽受限或设备资源受限的环境。
3、高可靠性:MQTT支持三种不同的服务质量(QoS)级别,可以根据实际需求选择合适的级别来确保消息的可靠传输。同时,MQTT还具有自动重连机制,能够在网络断开时自动恢复连接。
4、减少连接数:与HTTP相比,MQTT协议只需要客户端与服务器(Broker)建立一次连接就可以进行多次消息发布和订阅,大大减少了网络连接次数和数据传输量。


三、如何使用:第一阶段、基础功能

  1. 如何连接:init方法
  2. 连接后如何订阅:subscribe方法
  3. 如何发送数据,如何接受数据:subscribe方法
/**
 * 测试环境的设备管理系统
 */
class ManageMqtt {

    private var TAG = "MQTT"
    private var client: MqttAndroidClient? = null //mqtt客户端
    private lateinit var options: MqttConnectOptions  //mqtt 的链接信息设置

    @Volatile
    var isMqConnected: Boolean = false


    //初始化,
    fun init(context: Context?) {
        try {
            log("1")

            if (client != null) {
                return
            }
            log("1")
            //MQTT的连接设置
            options = MqttConnectOptions()
            //设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,这里设置为true表示每次连接到服务器都以新的身份连接
            options.isCleanSession = true
            //重连尝试
            options.isAutomaticReconnect = true
            // 设置超时时间 单位为秒
            options.connectionTimeout = 10
            // 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制
            options.keepAliveInterval = 90

            client = MqttAndroidClient(context, "tcp://xxx:xxxx", "名称")//名称
            //设置连接的用户名
            options.userName = "xxx"
            //设置连接的密码
            options.password = "xxx".toCharArray()


            //设置回调
            client?.setCallback(object : MqttCallbackExtended {
                override fun connectComplete(reconnect: Boolean, serverURI: String) {
                    log("已连接mq")
                    isMqConnected = true
                    //连接成功,我们要进行订阅
                    subscribe("xxxx")
                }

                override fun connectionLost(cause: Throwable) {
                    log("已断开mq")
                    isMqConnected = false
                }

                override fun deliveryComplete(token: IMqttDeliveryToken) {
                    //publish后会执行到这里  发布
                    try {
                        log("发送成功:" + token.message.toString())
                    } catch (e: Exception) {
                        e.printStackTrace()
                    }
                }

                override fun messageArrived(topicName: String, message: MqttMessage) {
                    //subscribe后得到的消息会执行到这里面  订阅
                    //topicName 为主题
                    try {
                        //todo 收到消息,要进行一些处理的。
                        log("收到消息:$topicName     $message")
                    } catch (e: Exception) {
                        log("异常:$e")
                    }
                }
            })
            connect()
        } catch (e: Exception) {
            e.printStackTrace()
        }
    }

    //进行链接
    private fun connect() {
        Thread(connect).start()
//        Schedulers.io().scheduleDirect(connect)
    }

    private val connect = Runnable {
        if (client != null && client!!.isConnected) {
            return@Runnable
        }
        try {
            log("连接Mq............")
            client?.connect(options, null, object : IMqttActionListener {
                override fun onSuccess(asyncActionToken: IMqttToken) {
                    log("Connection success")
                    //todo 是否连接成功?要重连的。
                }

                override fun onFailure(asyncActionToken: IMqttToken, exception: Throwable) {
                    log("Connection failure")
                    //todo 是否连接成功?要重连的。

                }
            })
        } catch (e: Exception) {
            e.printStackTrace()
        }
    }


    //订阅信息
    fun subscribe(topic: String, qos: Int = 1) {
        try {
            client?.subscribe(topic, qos, null, object : IMqttActionListener {
                override fun onSuccess(asyncActionToken: IMqttToken?) {
                    Log.d(TAG, "Subscribed to $topic")
                }

                override fun onFailure(asyncActionToken: IMqttToken?, exception: Throwable?) {
                    Log.d(TAG, "Failed to subscribe $topic")
                }
            })
        } catch (e: MqttException) {
            e.printStackTrace()
        }
    }

    //发送消息
    fun publish(topic: String, msg: String, qos: Int = 1, retained: Boolean = false) {
        try {
            val message = MqttMessage()
            message.payload = msg.toByteArray()
            message.qos = qos
            message.isRetained = retained
            client?.publish(topic, message, null, object : IMqttActionListener {
                override fun onSuccess(asyncActionToken: IMqttToken?) {
                    Log.d(TAG, "$msg published to $topic")
                }

                override fun onFailure(asyncActionToken: IMqttToken?, exception: Throwable?) {
                    Log.d(TAG, "Failed to publish $msg to $topic")
                }
            })
        } catch (e: MqttException) {
            e.printStackTrace()
        }
    }

    //释放资源
    fun closeMqtt() {
        try {
            if (client != null) {
                client!!.disconnect()
                client = null
            }
        } catch (e: java.lang.Exception) {
            e.printStackTrace()
        }
    }

    //打印log
    private fun log(msg: String) {
        Log.d(TAG, msg)
    }


}


四、如何使用:第二阶段、断网重连

  1. 即使短暂断网,后面自己也还是可以重连恢复。
  2. 如果第一次没有连接上,增加第一次的断网重连
/**
 * 测试环境的设备管理系统
 */
class ManageMqtt {

    private var context: Context? = null
    private var TAG = "MQTT"
    private var client: MqttAndroidClient? = null //mqtt客户端
    private lateinit var options: MqttConnectOptions  //mqtt 的链接信息设置

    @Volatile
    var isMqConnected: Boolean = false


    //初始化,
    fun init(context: Context?) {
        this.context = context
        try {
            log("1")

            if (client != null) {
                return
            }
            log("1")
            //MQTT的连接设置
            options = MqttConnectOptions()
            //设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,这里设置为true表示每次连接到服务器都以新的身份连接
            options.isCleanSession = true
            //重连尝试
            options.isAutomaticReconnect = true
            // 设置超时时间 单位为秒
            options.connectionTimeout = 10
            // 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制
            options.keepAliveInterval = 90

            client = MqttAndroidClient(context, "tcp://xxxx:xxxx", "")//名称
            //设置连接的用户名
            options.userName = "xxx"
            //设置连接的密码
            options.password = "xxx".toCharArray()


            //设置回调
            client?.setCallback(object : MqttCallbackExtended {
                override fun connectComplete(reconnect: Boolean, serverURI: String) {
                    log("已连接mq")
                    isMqConnected = true
                    //连接成功,我们要进行订阅
                    subscribe("xxxx")
                }

                override fun connectionLost(cause: Throwable) {
                    log("已断开mq")
                    isMqConnected = false
                }

                override fun deliveryComplete(token: IMqttDeliveryToken) {
                    //publish后会执行到这里  发布
                    try {
                        log("发送成功:" + token.message.toString())
                    } catch (e: Exception) {
                        e.printStackTrace()
                    }
                }

                override fun messageArrived(topicName: String, message: MqttMessage) {
                    //subscribe后得到的消息会执行到这里面  订阅
                    //topicName 为主题
                    try {
                        //todo 收到消息,要进行一些处理的。 Eventbus
                        log("收到消息:$topicName     $message")
                    } catch (e: Exception) {
                        log("异常:$e")
                    }
                }
            })
            connect()
        } catch (e: Exception) {
            e.printStackTrace()
        }
        val intentFilter = IntentFilter()
        intentFilter.addAction(ConnectivityManager.CONNECTIVITY_ACTION)
        intentFilter.addAction(WifiManager.NETWORK_STATE_CHANGED_ACTION)
        intentFilter.addAction(WifiManager.WIFI_STATE_CHANGED_ACTION)
        intentFilter.addAction(WifiManager.RSSI_CHANGED_ACTION)
        context?.registerReceiver(netWorkBroadCastReciver,intentFilter)
    }

    //进行链接
    private fun connect() {
        Thread(connect).start()
//        Schedulers.io().scheduleDirect(connect)
    }

    private val connect = Runnable {
        if (client != null && client!!.isConnected) {
            return@Runnable
        }
        try {
            log("连接Mq............")
            client?.connect(options, null, object : IMqttActionListener {
                override fun onSuccess(asyncActionToken: IMqttToken) {
                    log("Connection success")
                    //todo 是否连接成功?要重连的。
                }

                override fun onFailure(asyncActionToken: IMqttToken, exception: Throwable) {
                    log("Connection failure")
                    //todo 是否连接成功?要重连的。

                }
            })
        } catch (e: Exception) {
            e.printStackTrace()
        }
    }


    //订阅信息
    fun subscribe(topic: String, qos: Int = 1) {
        try {
            client?.subscribe(topic, qos, null, object : IMqttActionListener {
                override fun onSuccess(asyncActionToken: IMqttToken?) {
                    Log.d(TAG, "Subscribed to $topic")
                }

                override fun onFailure(asyncActionToken: IMqttToken?, exception: Throwable?) {
                    Log.d(TAG, "Failed to subscribe $topic")
                }
            })
        } catch (e: MqttException) {
            e.printStackTrace()
        }
    }

    //发送消息
    /**
     * @param topic 主题 给这个主题发送消息
     *  @param qos 0最多一次不管是否收到,1最少一次可能会收到多次,2保证收到,且仅一次
     *  @param retained 发布后是否保留,即重新链接时会存在
     *  @param msg 消息
     */
    fun publish(topic: String, msg: String, qos: Int = 0, retained: Boolean = false) {
        try {
            val message = MqttMessage()
            message.payload = msg.toByteArray()
            message.qos = qos
            message.isRetained = retained //发布后是否保留,即重新链接时会存在
            client?.publish(topic, message, null, object : IMqttActionListener {
                override fun onSuccess(asyncActionToken: IMqttToken?) {
                    Log.d(TAG, "$msg published to $topic")
                }

                override fun onFailure(asyncActionToken: IMqttToken?, exception: Throwable?) {
                    Log.d(TAG, "Failed to publish $msg to $topic")
                }
            })
        } catch (e: MqttException) {
            e.printStackTrace()
        }
    }

    //释放资源
    fun closeMqtt() {
        try {
            if (client != null) {
                client!!.disconnect()
                client = null
            }
        } catch (e: java.lang.Exception) {
            e.printStackTrace()
        }
        context?.unregisterReceiver(netWorkBroadCastReciver)
    }

    //打印log
    private fun log(msg: String) {
        Log.d(TAG, msg)
    }


    private var networkState = 100
    //断网重连查询
    fun isNetConnected(context: Context): Boolean {
        Log.d(TAG, "isNetConnected: ")
        val connectivity =
            context.getSystemService(Context.CONNECTIVITY_SERVICE) as ConnectivityManager
        if (connectivity != null) {
            val info = connectivity.activeNetworkInfo
            if (info != null) {
                if (info.type == networkState) {
                    return false
                }
                networkState = info.type
                if (info.type == (ConnectivityManager.TYPE_WIFI)) {
                    if (!isMqConnected) {

                        connect()
                    }
                    return true
                } else if (info.type == (ConnectivityManager.TYPE_MOBILE)) {
                    if (!isMqConnected) {
                        connect()
                    }
                    return true
                }
            }
        }

        return false
    }
    
    var netWorkBroadCastReciver = object :BroadcastReceiver() {
        override fun onReceive(context: Context, intent: Intent?) {
            isNetConnected(context)
            log( "NetWorkBroadCastReciver: ")
        }
    }


}

五、如何使用:第三阶段、封装

  1. 尝试封装,其实就是提供比如,注册,取消注册,订阅,发送数据,或者读取数据的方法。后面如何更换MQTT为其他协议,也很方便
/**
 * 对Mqtt操作的进一步封装
 */
@Singleton
class MqttHelper @Inject constructor() {

    @Inject
    lateinit var mqtt: ManageMqtt


    /**
     * 注册
     */
    fun register(context: Context?){
        mqtt.init(context)
    }

    /**
     * 发送数据
     */
    fun sendData(data :String){
        Heartbeat.deviceId?.let { mqtt.publish(it,Gson().toJson(data)) }
    }

    /**
     * 接收数据
     */
    fun data(kind:String,data:String){
        //待定,一般都是通过eventbus来解决。
    }
}

好了,这篇文章就介绍到这里~,我是前期后期,如果你也有相关的问题,也可以在评论区讨论哦,我们下一篇文章再见。


网站公告

今日签到

点亮在社区的每一天
去签到