0
  • 聊天消息
  • 系統(tǒng)消息
  • 評(píng)論與回復(fù)
登錄后你可以
  • 下載海量資料
  • 學(xué)習(xí)在線課程
  • 觀看技術(shù)視頻
  • 寫文章/發(fā)帖/加入社區(qū)
會(huì)員中心
創(chuàng)作中心

完善資料讓更多小伙伴認(rèn)識(shí)你,還能領(lǐng)取20積分哦,立即完善>

3天內(nèi)不再提示

如何在Python中使用MQTT

瑞科慧聯(lián)(RAK) ? 2022-12-22 10:41 ? 次閱讀

Python 是一種跨平臺(tái)的計(jì)算機(jī)程序設(shè)計(jì)語(yǔ)言,是ABC 語(yǔ)言的替代品,屬于面向?qū)ο蟮膭?dòng)態(tài)類型語(yǔ)言。它最初被設(shè)計(jì)用于編寫自動(dòng)化腳本,隨著版本的不斷更新和語(yǔ)言新功能的添加,越來(lái)越多被用于獨(dú)立的、大型項(xiàng)目的開發(fā)。

MQTT 是一個(gè)物聯(lián)網(wǎng)傳輸協(xié)議,用于輕量級(jí)的發(fā)布/訂閱式消息傳輸,旨在為低帶寬和不穩(wěn)定的網(wǎng)絡(luò)環(huán)境中的物聯(lián)網(wǎng)設(shè)備提供可靠的網(wǎng)絡(luò)服務(wù)。其輕量、簡(jiǎn)單、開放和易于實(shí)現(xiàn)等特點(diǎn),使得它適用范圍更加廣泛。

本文主要介紹如何在 Python 項(xiàng)目中使用paho-mqtt客戶端庫(kù) ,實(shí)現(xiàn)客戶端與MQTT服務(wù)器的連接、訂閱、取消訂閱、收發(fā)消息等功能。

一、項(xiàng)目準(zhǔn)備

本項(xiàng)目使用 Python 3.10進(jìn)行開發(fā)測(cè)試。

用戶可用以下命令來(lái)確認(rèn) Python的版本:

python3 --version

Python 3.10.9

測(cè)試設(shè)備:

瑞科慧聯(lián)(RAK)網(wǎng)關(guān)RAK7268 V2、帶溫濕度傳感器的數(shù)據(jù)采集器Sensor Hub

二、選擇 MQTT 客戶端庫(kù)

paho-mqtt是目前 Python 中使用較多的 MQTT 客戶端庫(kù)。它為 Python 2.7 或 3.x 版本以上的客戶端類提供了對(duì) MQTT v3.1 和 v3.1.1 的支持,還提供了一些幫助程序功能。這使得消息發(fā)布到 MQTT 服務(wù)器變得更簡(jiǎn)單。

三、Pip 安裝 Paho MQTT 客戶端

Pip 是 Python 包管理工具。該工具提供了對(duì) Python 包的查找、下載、安裝、卸載的功能。

pip3install paho.mqtt

四、Python MQTT 使用

1、連接 MQTT 服務(wù)器

本文將使用瑞科慧聯(lián)LoRaWAN?網(wǎng)關(guān)提供的內(nèi)置 MQTT服務(wù),該服務(wù)基于 Mosquitto的開源消息代理。服務(wù)器接入信息如下:

  • Broker:192.168.230.1
  • TCP Port:1883

2、導(dǎo)入 Paho MQTT客戶端

from paho.mqtt import client as mqtt

3、設(shè)置 MQTT Broker 連接參數(shù)

設(shè)置 MQTT Broker 連接地址,端口以及 topic,同時(shí)調(diào)用 Pythonrandom.randint函數(shù)隨機(jī)生成 MQTT 客戶端 id。

MQTT_SERVER_IP ="192.168.230.1"

MQTT_PORT =1883

4、編寫 MQTT 連接函數(shù)

編寫連接回調(diào)函數(shù) on_connect,該函數(shù)將在客戶端連接后會(huì)被調(diào)用。在該函數(shù)中可以依據(jù)rc來(lái)判斷客戶端是否連接成功。同時(shí)可創(chuàng)建一個(gè) MQTT 客戶端連接到broker.emqx.io

defmqtt_connect(MQTT_SERVER_IP,MQTT_PORT):

    """連接MQTT服務(wù)器"""

    client_id=time.strftime('%Y%m%d%H%M%S',time.localtime(time.time()))

    mqttClient=mqtt.Client(client_id)

    mqttClient.on_connect=on_connect # 返回連接狀態(tài)的回調(diào)函數(shù)

    mqttClient.on_message=on_message # 返回訂閱消息回調(diào)函數(shù)

    MQTT_HOST=MQTT_SERVER_IP # MQTT服務(wù)器地址

    # MQTT_PORT = MQTT_PORT  # MQTT端口

    mqttClient.username_pw_set("username","password")  # mqtt服務(wù)器賬號(hào)密碼

    mqttClient.connect(MQTT_HOST,MQTT_PORT,60)

    mqttClient.loop_start()  # 啟用線程連接

    returnmqttClient

5、發(fā)布消息

定義一個(gè) while 循環(huán)語(yǔ)句,在循環(huán)中設(shè)置每秒調(diào)用 MQTT 客戶端publish函數(shù)向/python/mqtt主題發(fā)送消息。

ddefon_publish():

    # 發(fā)布消息

    msg_count=0

    whileTrue:

        time.sleep(1)

        mqttClient=mqtt_connect(MQTT_SERVER_IP,MQTT_PORT)

        topic='application/1/device/0000000000000444/tx'# 發(fā)布的主題,訂閱時(shí)需要使用這個(gè)主題才能訂閱此消息

        msg='{"confirmed": true,"data": "SGVsbG8=","fPort": 10}'

        result=mqttClient.publish(topic,msg)

        status=result[0]

        ifstatus==0:

            print('第{}條消息發(fā)送成功'.format(msg_count))

        else:

            print('第{}條消息發(fā)送失敗'.format(msg_count))

        msg_count+=1

6、訂閱消息

編寫消息回調(diào)函數(shù) on_message,函數(shù)將在客戶端從 MQTT Broker 收到消息后被調(diào)用,并打印出訂閱的 topic 名稱以及接收到的消息內(nèi)容。

defon_subscribe():

    """訂閱主題:mqtt/demo"""

    mqttClient=mqtt_connect(MQTT_SERVER_IP,MQTT_PORT)

    whileTrue:

        mqttClient.subscribe("application/#",2)

        time.sleep(1)

7、完整代碼

消息訂閱代碼

#!/usr/bin/python

frompaho.mqttimportclientasmqtt

importtime

importjson

# from settings import *

importbase64



"""

網(wǎng)關(guān)通過mqtt發(fā)出數(shù)據(jù)

json - ok

probuf - no

"""

MQTT_SERVER_IP="192.168.230.1"

MQTT_PORT=1883

defon_connect(client,userdata,flags,rc):

    """一旦連接成功, 回調(diào)此方法"""

    rc_status= ["連接成功","協(xié)議版本錯(cuò)誤","無(wú)效的客戶端標(biāo)識(shí)","服務(wù)器無(wú)法使用","用戶名或密碼錯(cuò)誤","無(wú)授權(quán)"]

    print("connect:",rc_status[rc])

defon_message(client,userdata,msg):

    """一旦訂閱到消息, 回調(diào)此方法"""

    print("主題"+msg.topic +" 消息"+str(msg.payload.decode('gbk')))

    print("主題"+msg.topic +" 消息"+str(msg.payload.decode()))

    try:

        temp=json.loads(msg.payload.decode())

        # client.disconnect()

        deveui=temp['devEUI']

        print("devEUI: ",deveui)

        data=temp['data']

        print("解碼前的data為: ",data)

        data_decode=base64.b64decode(data).hex()

        print("解碼后的data為: ",data_decode)

        str1=data_decode[4:]

        ifstr1[0:4]=="0167":

            a=int(str1[4:8],16)*0.1 

            print("溫度:",a,"℃")

            ifstr1[8:12]=="0268":

               b=int(str1[12:16],16)

            print("濕度:",b,"%RH")

        elifstr1[0:4]=="0268":

            c=int(str1[4:8],16)

            print("濕度:",c,"%RH")                       

    exceptExceptionase:

        print(e)

defmqtt_connect(MQTT_SERVER_IP,MQTT_PORT):

    """連接MQTT服務(wù)器"""

    client_id=time.strftime('%Y%m%d%H%M%S',time.localtime(time.time()))

    mqttClient=mqtt.Client(client_id)

    mqttClient.on_connect=on_connect # 返回連接狀態(tài)的回調(diào)函數(shù)

    mqttClient.on_message=on_message # 返回訂閱消息回調(diào)函數(shù)

    MQTT_HOST=MQTT_SERVER_IP # MQTT服務(wù)器地址

    # MQTT_PORT = MQTT_PORT  # MQTT端口

    mqttClient.username_pw_set("username","password")  # mqtt服務(wù)器賬號(hào)密碼

    mqttClient.connect(MQTT_HOST,MQTT_PORT,60)

    mqttClient.loop_start()  # 啟用線程連接

    returnmqttClient

defon_subscribe():

    """訂閱主題:mqtt/demo"""

    mqttClient=mqtt_connect(MQTT_SERVER_IP,MQTT_PORT)

    whileTrue:

        mqttClient.subscribe("application/#",2)

        # allure.attach("gateway/" + GATEWAY_EUI + "/event/up", name="topic")

        # mqttClient.subscribe("gateway/ac1f09fffe08f099/event/up", 2)

        time.sleep(1)

if__name__=='__main__':

    on_subscribe()

消息發(fā)布代碼

#!/usr/bin/python

frompaho.mqttimportclientasmqtt

importtime

importjson

# from settings import *

importbase64



"""

網(wǎng)關(guān)通過mqtt發(fā)出數(shù)據(jù)

json - ok

probuf - no

"""

MQTT_SERVER_IP="192.168.230.1"

MQTT_PORT=1883

defon_connect(client,userdata,flags,rc):

    """一旦連接成功, 回調(diào)此方法"""

    rc_status= ["連接成功","協(xié)議版本錯(cuò)誤","無(wú)效的客戶端標(biāo)識(shí)","服務(wù)器無(wú)法使用","用戶名或密碼錯(cuò)誤","無(wú)授權(quán)"]

    print("connect:",rc_status[rc])

defmqtt_connect(MQTT_SERVER_IP,MQTT_PORT):

    """連接MQTT服務(wù)器"""

    client_id=time.strftime('%Y%m%d%H%M%S',time.localtime(time.time()))

    mqttClient=mqtt.Client(client_id)

    mqttClient.on_connect=on_connect # 返回連接狀態(tài)的回調(diào)函數(shù)

    MQTT_HOST=MQTT_SERVER_IP # MQTT服務(wù)器地址

    # MQTT_PORT = MQTT_PORT  # MQTT端口

    mqttClient.username_pw_set("username","password")  # mqtt服務(wù)器賬號(hào)密碼

    mqttClient.connect(MQTT_HOST,MQTT_PORT,60)

    mqttClient.loop_start()  # 啟用線程連接

    returnmqttClient

defon_publish():

    # 發(fā)布消息

    msg_count=0

    whileTrue:

        time.sleep(1)

        mqttClient=mqtt_connect(MQTT_SERVER_IP,MQTT_PORT)

        topic='application/x/device/x/tx'# 發(fā)布的主題,訂閱時(shí)需要使用這個(gè)主題才能訂閱此消息

        msg='{"confirmed": true,"data": "SGVsbG8=","fPort": 10}'#需要發(fā)布的消息內(nèi)容

        result=mqttClient.publish(topic,msg)

        status=result[0]

        ifstatus==0:

            print('第{}條消息發(fā)送成功'.format(msg_count))

        else:

            print('第{}條消息發(fā)送失敗'.format(msg_count))

        msg_count+=1

if__name__=='__main__':

    on_publish()

測(cè)試

消息發(fā)布

運(yùn)行 MQTT消息發(fā)布代碼,將看到客戶端連接成功,并且成功將消息發(fā)布。

pYYBAGOjwVmAR1KUAAApM_Y0F48108.png

消息訂閱

通過瑞科慧聯(lián)帶溫濕度傳感器的 Sensor hub進(jìn)行數(shù)據(jù)傳輸,訂閱并解析數(shù)據(jù)結(jié)果如下:

poYBAGOjwVmAdS2hAABgCqVnG0E194.png

五、總結(jié)

至此,我們完成了使用paho-mqtt客戶端連接到LoRaWAN?網(wǎng)關(guān)內(nèi)置 MQTT服務(wù)器,并實(shí)現(xiàn)了測(cè)試客戶端與 MQTT 服務(wù)器的連接、消息發(fā)布和訂閱并解析。

與 C ++ 或 Java 之類的高級(jí)語(yǔ)言不同,Python 比較適合設(shè)備側(cè)的業(yè)務(wù)邏輯實(shí)現(xiàn)。使用 Python 可以減少代碼上的邏輯復(fù)雜度,降低與設(shè)備的交互成本。未來(lái),我們相信在物聯(lián)網(wǎng)領(lǐng)域 Python 將會(huì)有更廣泛的應(yīng)用!

聲明:本文內(nèi)容及配圖由入駐作者撰寫或者入駐合作網(wǎng)站授權(quán)轉(zhuǎn)載。文章觀點(diǎn)僅代表作者本人,不代表電子發(fā)燒友網(wǎng)立場(chǎng)。文章及其配圖僅供工程師學(xué)習(xí)之用,如有內(nèi)容侵權(quán)或者其他違規(guī)問題,請(qǐng)聯(lián)系本站處理。 舉報(bào)投訴
  • 物聯(lián)網(wǎng)
    +關(guān)注

    關(guān)注

    2894

    文章

    43301

    瀏覽量

    366348
  • python
    +關(guān)注

    關(guān)注

    53

    文章

    4753

    瀏覽量

    84068
  • MQTT
    +關(guān)注

    關(guān)注

    5

    文章

    629

    瀏覽量

    22285
收藏 人收藏

    評(píng)論

    相關(guān)推薦

    何在MSP430?MCU中使用智能模擬組合

    電子發(fā)燒友網(wǎng)站提供《如何在MSP430?MCU中使用智能模擬組合.pdf》資料免費(fèi)下載
    發(fā)表于 09-14 10:19 ?0次下載
    如<b class='flag-5'>何在</b>MSP430?MCU<b class='flag-5'>中使</b>用智能模擬組合

    何在反向降壓-升壓拓?fù)?b class='flag-5'>中使用TPS6290x

    電子發(fā)燒友網(wǎng)站提供《如何在反向降壓-升壓拓?fù)?b class='flag-5'>中使用TPS6290x.pdf》資料免費(fèi)下載
    發(fā)表于 09-13 10:07 ?0次下載
    如<b class='flag-5'>何在</b>反向降壓-升壓拓?fù)?b class='flag-5'>中使</b>用TPS6290x

    何在汽車CAN應(yīng)用中使用負(fù)邊緣觸發(fā)觸發(fā)器節(jié)省電力

    電子發(fā)燒友網(wǎng)站提供《如何在汽車CAN應(yīng)用中使用負(fù)邊緣觸發(fā)觸發(fā)器節(jié)省電力.pdf》資料免費(fèi)下載
    發(fā)表于 09-13 10:06 ?0次下載
    如<b class='flag-5'>何在</b>汽車CAN應(yīng)用<b class='flag-5'>中使</b>用負(fù)邊緣觸發(fā)觸發(fā)器節(jié)省電力

    何在RTOS中使用spi_interface.c?

    何在 RTOS 中使用 spi_interface.c?
    發(fā)表于 07-10 06:29

    求助,請(qǐng)問如何在RTOS SDK 1.5的PlatformIO IDE ESP8266實(shí)現(xiàn)MQTT?

    ESP8266設(shè)備連接到 mqtt 代理。但 PlatformIO IDE 內(nèi)置的 RTOS SDK 1.5 版本不支持 mqtt。此 SDK 沒有 mqtt 示例。所以你能不能讓我知道我如
    發(fā)表于 07-08 06:22

    請(qǐng)問cmakelists中的變量如何在程序中使用?

    大家好, 我有個(gè)問題請(qǐng)教,cmakelists.txt中的變量如何在程序中使用?比如以下cmakelists.txt文件中的PROJECT_VER變量,我如何在c程序中使用?試了很多辦
    發(fā)表于 06-11 07:34

    工業(yè)計(jì)算機(jī)是什么?如何在不同行業(yè)中使用?

    工業(yè)電腦是專為在工業(yè)環(huán)境中使用而設(shè)計(jì)的計(jì)算機(jī)。它們可用于各個(gè)行業(yè),包括制造、運(yùn) 輸和能源。它們通常比普通計(jì)算機(jī)更強(qiáng)大,并且能夠在大多數(shù)計(jì)算機(jī)無(wú)法運(yùn)行的環(huán)境中運(yùn)行。在本文中,我們將更深入地了解什么是工業(yè)計(jì)算機(jī)以及它們?nèi)?b class='flag-5'>何在不同行業(yè)中使
    的頭像 發(fā)表于 04-01 15:45 ?614次閱讀
    工業(yè)計(jì)算機(jī)是什么?如<b class='flag-5'>何在</b>不同行業(yè)<b class='flag-5'>中使</b>用?

    Raspberry Pi樹莓派使用Python實(shí)現(xiàn)MQTT通信設(shè)計(jì)

    這次的例子,主要講述如何基于PYTHONMQTT 客戶端的使用方法
    的頭像 發(fā)表于 03-14 11:45 ?631次閱讀
    Raspberry Pi樹莓派使用<b class='flag-5'>Python</b>實(shí)現(xiàn)<b class='flag-5'>MQTT</b>通信設(shè)計(jì)

    何在測(cè)試中使用ChatGPT

    Dimitar Panayotov 在 2023 年 QA Challenge Accepted 大會(huì) 上分享了他如何在測(cè)試中使用 ChatGPT。
    的頭像 發(fā)表于 02-20 13:57 ?642次閱讀

    如何使用linux下gdb來(lái)調(diào)試python程序

    中,我們將介紹如何在Linux中使用GDB來(lái)調(diào)試Python程序。 一、安裝GDB和Python調(diào)試符號(hào) 在使用GDB調(diào)試Python程序之
    的頭像 發(fā)表于 01-31 10:41 ?1943次閱讀

    何在DAVE IDE中使用XMC7200?

    能否在 DAVE IDE 中為 XMC 7200 EVK KIT 構(gòu)建應(yīng)用程序。我嘗試打開一個(gè)項(xiàng)目但它最多只能顯示 XMC48000。如何在 DAVE IDE 中使用 XMC7200 請(qǐng)幫忙。
    發(fā)表于 01-26 06:32

    何在Linux中使用htop命令

    本文介紹如何在 Linux 中使用 htop 命令。
    的頭像 發(fā)表于 12-04 14:45 ?1389次閱讀
    如<b class='flag-5'>何在</b>Linux<b class='flag-5'>中使</b>用htop命令

    何在Python中使用Scapy進(jìn)行抓包操作

    文章將介紹如何使用 Python 來(lái)進(jìn)行簡(jiǎn)單的抓包操作。 2. Python 中的抓包庫(kù) 在 Python 中,有很多優(yōu)秀的抓包庫(kù),例如 Scapy、dpkt、pcapy 等等。在本文中,我們將以
    的頭像 發(fā)表于 11-01 14:47 ?3527次閱讀

    何在Windows下使用 Supervisor 重新拉起崩潰的Python程序

    我們用Python定時(shí)跑一些自動(dòng)化程序的時(shí)候會(huì)出現(xiàn)程序崩潰的情況。此時(shí)如果你本人不在電腦面前,或者沒有留意到程序的崩潰,沒有及時(shí)重新拉起程序,會(huì)造成或大或小的損失。 本文將教你如何在 Windows
    的頭像 發(fā)表于 10-21 11:23 ?2329次閱讀
    如<b class='flag-5'>何在</b>Windows下使用 Supervisor 重新拉起崩潰的<b class='flag-5'>Python</b>程序

    請(qǐng)問如何在單片機(jī)里實(shí)現(xiàn)MQTT協(xié)議?

    何在單片機(jī)里實(shí)現(xiàn)MQTT協(xié)議?
    發(fā)表于 10-19 07:30