MQTT¶
约 360 个字 48 行代码 3 张图片 预计阅读时间 2 分钟
简介 ¶
MQTT(Message Queuing Telemetry Transport)是一种轻量级的消息传输协议,广泛用于物联网(IoT)设备之间的通信。它基于发布 / 订阅模型,支持低带宽、高延迟或不稳定网络环境下的通信。
- 发布者(Publisher):将消息发送到特定主题(Topic
) 。 - 订阅者(Subscriber):订阅某个主题,并接收与该主题相关的消息。
 - 代理(Broker):负责管理所有的消息发布和订阅。它是 MQTT 网络的核心。
 
环境安装 ¶
Docker 环境 ¶
1.docker 安装:过程略 2.下载 EMQX 开源版
安装 EMQX
docker pull emqx/emqx:5.8.3
启动 EMQX
docker run -d --name emqx -p 1883:1883 -p 8083:8083 -p 8084:8084 -p 8883:8883 -p 18083:18083 emqx/emqx:5.8.3

EMQX 使用 ¶

账号相关 ¶
添加 Dashboard 用户
emqx ctl admins add <Username> <Password> <Description>	
重置指定用户的密码
emqx ctl admins passwd <Username> <Password>	
删除指定用户
emqx ctl admins del <Username>	
使用 ¶

Python 编程 ¶
Python 中常用的 MQTT 客户端库是 paho-mqtt,它是 Eclipse Paho 项目的一部分。
pip install paho-mqtt
常见指令 ¶
| 方法 | 描述 | 
|---|---|
mqtt.Client("client_id") | 
创建一个 MQTT 客户端实例 | 
client.connect(broker, port, keepalive) | 
连接到 MQTT 代理服务器 | 
client.subscribe(topic) | 
订阅一个主题 | 
client.publish(topic, payload) | 
发布消息到指定的主题 | 
client.on_connect | 
设置连接回调函数(连接成功后执行的函数) | 
client.on_message | 
设置消息接收回调函数(当接收到订阅主题的消息时执行的函数) | 
client.loop_start() | 
启动客户端并在后台运行(非阻塞) | 
client.loop_forever() | 
启动客户端并在当前线程运行(阻塞) | 
client.disconnect() | 
断开与代理的连接 | 
实例 ¶
publisher 程序
import paho.mqtt.client as mqtt
import time
# MQTT 配置
BROKER = "localhost"  # 替换为你的 MQTT Broker 地址
PORT = 1883
TOPIC = "topic"  # 主题
# 设置 MQTT 客户端
client = mqtt.Client()
client.username_pw_set("user id", "passwd")  # 设置用户名和密码
client.connect(BROKER, PORT, 60)
client.publish(TOPIC, message)
print(f"已发送消息: {message}")
time.sleep(1)  # 可选的延时,防止过快发送
client.disconnect()
subscriber 程序
import paho.mqtt.client as mqtt
import os
BROKER = "localhost"  # 替换为你的 MQTT Broker 地址
PORT = 1883
TOPIC = "topic"  # 主题,用于接收开关控制消息
import time
# 回调函数,当接收到 MQTT 消息时触发
def on_message(client, userdata, msg):
    payload = msg.payload.decode("utf-8")
    print(payload)
    # 执行操作
# 设置 MQTT 客户端
client = mqtt.Client()
client.on_message = on_message
client.username_pw_set("user id", "passwd")  # 设置用户名和密码
client.connect(BROKER, PORT, 60)
client.subscribe(TOPIC)
print("已经连接,等待接受信息")
client.loop_forever()