一、MQtt简介
MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议)是IBM开发的一个物联网通讯协议,OASIS(结构化信息标准促进组织)已宣布MQTT协议作为其新兴的物联网消息传递协议的首选。在MQTT的官方网站上,定义MQTT是一种machine-to-machine (M2M)设备之间通信的物联网互联协议,是一种基于发布/订阅(publish/subscribe)模式的“轻量级”通讯协议。作为一种低开销、低带宽占用的即时通讯协议,使其在物联网、小型设备、移动应用等方面有较广泛的应用。
MQTT协议中有三种身份:发布者(Publish)、代理(Broker)(服务器)、订阅者(Subscribe)。其中,消息的发布者和订阅者都是客户端,消息代理是MQTT服务器,消息发布者可以同时是订阅者。
MQTT 的工作原理
MQTT 是基于发布-订阅模式的通信协议,由 MQTT 客户端通过主题(Topic)发布或订阅消息,通过 MQTT Broker 集中管理消息路由,并依据预设的服务质量等级(QoS)确保端到端消息传递可靠性。
MQTT 客户端
任何运行 MQTT 客户端库的应用或设备都是 MQTT 客户端。例如,使用 MQTT 的即时通讯应用是客户端,使用 MQTT 上报数据的各种传感器是客户端,各种 MQTT 测试工具也是客户端。
MQTT Broker
MQTT Broker 是负责处理客户端请求的关键组件,包括建立连接、断开连接、订阅和取消订阅等操作,同时还负责消息的转发。一个高效强大的 MQTT Broker 能够轻松应对海量连接和百万级消息吞吐量,从而帮助物联网服务提供商专注于业务发展,快速构建可靠的 MQTT 应用。
发布-订阅模式
发布-订阅模式与客户端-服务器模式的不同之处在于,它将发送消息的客户端(发布者)和接收消息的客户端(订阅者)进行了解耦。发布者和订阅者之间无需建立直接连接,而是通过 MQTT Broker 来负责消息的路由和分发。
下图展示了 MQTT 发布/订阅过程。温度传感器作为客户端连接到 MQTT Broker,并通过发布操作将温度数据发布到一个特定主题(例如 Temperature)。MQTT Broker 接收到该消息后会负责将其转发给订阅了相应主题(Temperature)的订阅者客户端。

主题
MQTT 协议根据主题来转发消息。主题通过 / 来区分层级,类似于 URL 路径,例如:
chat/room/1
sensor/10/temperature
sensor/+/temperature
MQTT 主题支持以下两种通配符:+ 和 #。
+:表示单层通配符,例如a/+匹配a/x或a/y。#:表示多层通配符,例如a/#匹配a/x、a/b/c/d。
注意:通配符主题只能用于订阅,不能用于发布。
QoS
MQTT 提供了三种服务质量(QoS),在不同网络环境下保证消息的可靠性。
- QoS 0:消息最多传送一次。如果当前客户端不可用,它将丢失这条消息。
- QoS 1:消息至少传送一次。
- QoS 2:消息只传送一次。
MQTT 的工作流程
在了解了 MQTT 的基本组件之后,让我们来看看它的一般工作流程:
- 客户端使用 TCP/IP 协议与 Broker 建立连接,可以选择使用 TLS/SSL 加密来实现安全通信。客户端提供认证信息,并指定会话类型(Clean Session 或 Persistent Session)。
- 客户端既可以向特定主题发布消息,也可以订阅主题以接收消息。当客户端发布消息时,它会将消息发送给 MQTT Broker;而当客户端订阅消息时,它会接收与订阅主题相关的消息。
- MQTT Broker 接收发布的消息,并将这些消息转发给订阅了对应主题的客户端。它根据 QoS 等级确保消息可靠传递,并根据会话类型为断开连接的客户端存储消息。
开始使用 MQTT
本次教程实现java订阅者这个角色。
引入依赖
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<grpc.version>1.6.1</grpc.version>
<protobuf.version>3.21.12</protobuf.version>
</properties>
<dependencies>
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.5</version> <!-- 建议使用较新版本 -->
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-netty</artifactId>
<version>${grpc.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-protobuf</artifactId>
<version>${grpc.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-stub</artifactId>
<version>${grpc.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>${protobuf.version}</version>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java-util</artifactId>
<version>3.21.12</version> <!-- 版本号应与你的 protobuf-java 保持一致 -->
</dependency>
</dependencies>
<build>
<extensions>
<extension>
<groupId>kr.motd.maven</groupId>
<artifactId>os-maven-plugin</artifactId>
<version>1.5.0.Final</version>
</extension>
</extensions>
<plugins>
<plugin>
<groupId>org.xolstice.maven.plugins</groupId>
<artifactId>protobuf-maven-plugin</artifactId>
<version>0.5.0</version>
<configuration>
<protocArtifact>com.google.protobuf:protoc:${protobuf.version}:exe:${os.detected.classifier}</protocArtifact>
<pluginId>grpc-java</pluginId>
<pluginArtifact>io.grpc:protoc-gen-grpc-java:${grpc.version}:exe:${os.detected.classifier}</pluginArtifact>
</configuration>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>compile-custom</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
创建实体
开发者 MQTT 客户端在经过 proto buffer 反序列化步骤之后将得到完整的 JSON。
所以我们要通过 proto文件创建java实体类
.proto文件语法高亮显示
需要安装Protobuf Support插件
依次点击Intellij中的“File”-->"Settings"-->"Plugins",如下所示:




安装完后,重启Intellij IDEA,查看.proto文件,会发现已经支持语法高亮显示。
将.proto文件转成Java类
一般的做法,是执行protoc命令,依次将.proto文件转成Java类:
protoc.exe -I=d:/tmp --java_out=d:/tmp d:/tmp/monitor_data.proto
不过gRPC官方推荐了一种更优雅的使用姿势,可以通过maven轻松搞定。
使用maven的编译命令,即可在target中看到根据.proto文件生成的Java类。
需要引入上面的pom依赖。
新建一个包 proto,包里新建sensor.proto文件。
通过maven编译命令后,在target目录下就生成了java实体类,再把它移动到src目录下。

订阅主题
public class MqttClientExample {
// --- 请根据你的实际情况修改以下配置 ---
private static final String BROKER_URL = "ssl://ip:2883";
private static final String USERNAME = "xxxx";
private static final String PASSWORD = "xxxx";
private static final String CA_CERT_PATH = "C:\\Users\\0000\\Downloads\\caCert.pem"; // RisingHF提供的CA证书
private static final String CLIENT_ID = "JavaClient-" + System.currentTimeMillis(); // 确保客户端ID唯一
private static final String TOPIC_TO_SUBSCRIBE = "user/500/device/8cf95720001797f6/uplink";
private static final String TOPIC_TO_PUBLISH = "actuator/command";
// ------------------------------------
private static MqttClient mqttClient;
public static void main(String[] args) {
try {
// 1. 创建 MQTT 客户端实例
mqttClient = new MqttClient(BROKER_URL, CLIENT_ID, new MemoryPersistence());
// 2. 设置连接选项
MqttConnectOptions connOpts = new MqttConnectOptions();
connOpts.setUserName(USERNAME);
connOpts.setPassword(PASSWORD.toCharArray());
// 设置是否在断开连接后清除会话
connOpts.setCleanSession(true);
// 如果是 SSL/TLS 连接 (broker URL 以 ssl:// 开头)
if (BROKER_URL.startsWith("ssl")) {
// 加载 CA 证书
SSLContext sslContext = createSslContext(CA_CERT_PATH);
connOpts.setSocketFactory(sslContext.getSocketFactory());
System.out.println("SSL/TLS 已配置");
}
// 3. 设置回调函数
// 这个回调函数将处理接收到的消息、连接状态变化等
mqttClient.setCallback(new MqttCallback() {
@Override
public void connectionLost(Throwable cause) {
// 连接丢失时调用
System.out.println("连接丢失: " + cause.getMessage());
cause.printStackTrace();
}
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
// 收到消息时调用
System.out.println("\n收到消息:");
System.out.println(" 主题: " + topic);
byte[] payload = message.getPayload();
com.syjdly.Sensor.DeviceUplink sensorData = deserializeProtobuf(payload);
System.out.println(" 内容: " + JsonFormat.printer().print(sensorData));
System.out.println(" QoS: " + message.getQos());
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
// 消息发布完成且收到确认时调用
try {
System.out.println("\n消息发布成功: " + token.getMessage());
} catch (MqttException e) {
e.printStackTrace();
}
}
});
// 4. 连接到 Broker
System.out.println("正在连接到: " + BROKER_URL);
mqttClient.connect(connOpts);
System.out.println("连接成功!");
// 5. 订阅主题
System.out.println("正在订阅主题: " + TOPIC_TO_SUBSCRIBE);
// QoS 0: 最多一次
// QoS 1: 至少一次
// QoS 2: 恰好一次
mqttClient.subscribe(TOPIC_TO_SUBSCRIBE, 0);
System.out.println("订阅成功!");
// 保持主线程运行,以便接收消息
// 在实际应用中,你可能会有一个更复杂的事件循环
System.out.println("\n客户端正在运行,按 Enter 键退出...");
System.in.read();
// 7. 断开连接
mqttClient.disconnect();
System.out.println("已断开连接");
} catch (MqttException me) {
// 处理 MQTT 相关异常
System.out.println("MQTT 异常:");
System.out.println(" 原因代码: " + me.getReasonCode());
System.out.println(" 消息: " + me.getMessage());
System.out.println(" 本地消息: " + me.getLocalizedMessage());
System.out.println(" 原因: " + me.getCause());
me.printStackTrace();
} catch (Exception e) {
// 处理其他异常,如证书加载失败等
e.printStackTrace();
}
}
private static com.demo.Sensor.DeviceUplink deserializeProtobuf(byte[] payload) throws InvalidProtocolBufferException {
return Sensor.DeviceUplink.parseFrom(payload); // 调用生成的 parseFrom 方法
}
/**
* 创建并配置 SSLContext,用于加载 CA 证书
*/
private static SSLContext createSslContext(String caCertPath) throws Exception {
CertificateFactory cf = CertificateFactory.getInstance("X.509");
X509Certificate caCert = (X509Certificate) cf.generateCertificate(new FileInputStream(caCertPath));
KeyStore keyStore = KeyStore.getInstance(KeyStore.getDefaultType());
keyStore.load(null);
keyStore.setCertificateEntry("caCert", caCert);
TrustManagerFactory tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
tmf.init(keyStore);
SSLContext sslContext = SSLContext.getInstance("TLSv1.2");
sslContext.init(null, tmf.getTrustManagers(), null);
return sslContext;
}

这样就完成了对mqtt broker 的订阅。