应用层协议则主要是运行在传统互联网 TCP/IP 协议之上的设备通讯协议,这类协议通过互联网,支撑设备到云端平台的数据交换及通信,常见的有 HTTP、MQTT、CoAP、LwM2M 以及 XMPP 等协议。

从协议在物联网系统中的应用角度来看,我们可以将协议划分为云端协议和网关协议。

  • 云端协议是建立在 TCP/IP 上的协议,传感器、控制设备等物联网数据通常都需要传输上云,通过云端连通用户并与企业系统进行集成。支持 TCP/IP 的物联网设备,可以通过 WIFI、蜂窝网络以及以太网,使用 HTTP、MQTT、CoAP、LwM2M 以及 XMPP 等应用层协议协议接入云端。
  • 网关协议是适用于短距通信无法直接上云的协议,比如蓝牙、ZigBee、LoRa 等。此类设备需要接入网关转换之后,通过 TCP/IP 协议进行上云。

MQTT 特性

  • 发布-订阅模型:MQTT 基于发布-订阅模型运作,非常适合物联网应用场景。发布者将消息发送至特定主题,而所有订阅该主题的客户端均能接收到该消息。

  • 轻量化设计:得益于其极小的数据包尺寸,MQTT 极为轻量化,适合在网络带宽十分宝贵的情况下使用。

  • 服务质量等级:MQTT 提供三种服务质量(QoS)等级,从“最多一次”,至少一次,到“精确一次”(确保消息准确无误地仅送达一次,严禁重复消息)。

  • 保留消息:MQTT 允许在特定主题上保留消息。对于任何订阅该主题的客户端,都会存储并提供该主题的最新消息。

  • 遗嘱机制:若 MQTT 客户端非正常断开连接,系统将向所有订阅者发送预设的“遗嘱”消息(在连接时在服务端中注册一个遗嘱消息,与普通消息类似,可以设置遗嘱消息的主题、有效载荷等等。当该客户端意外断开连接,服务端就会向其他订阅了相应主题的客户端发送此遗嘱消息)。

  • 会话管理能力:MQTT 具备内置的会话管理机制。这意味着即使连接中断,也能够在不丢失任何消息的情况下,重新建立会话。

特点和适用场景:

  • 特别适合于网络代价昂贵、带宽低、不可靠的环境。

  • 能在处理和内存资源有限的嵌入式设备中运行。

  • 使用发布/订阅消息模式,提供一对多的消息发布,从而解除应用程序耦合。

  • 使用TCP/IP提供网络连接,

  • 提供3种消息的QoS(Quality of Service):至多一次、最少一次、只有一次

  • 收发消息都是异步的,发送方不需要等待接收方应答

  • 提供Last Will 和 Testament特性通知有关各方客户端异常中断的机制。

MQTT 协议

mqtt 消息队里遥感勘测传输(MQ Telemetry Transport,简称MQTT)是一个基于TCP协议、发布/订阅模式的应用层传输协议。

MQTT是一种轻量级的消息协议,非常适合智能城市依赖的物联网应用。它提供低带宽消耗、高可靠性和安全的数据传输。

由EMQ开源的跨平台MQTT 5.0**客户端工具 **MQTTX

组成

在MQTT协议中,一个MQTT数据包由:固定头(Fixed header)、 可变头(Variable header)、 消息体(payload)三部分构成。

  • 固定头(Fixed header),所有数据包中都有固定头,包含数据包类型及数据包的分组标识。

  • 可变头(Variable header),部分数据包类型中有可变头。

  • 内容消息体(Payload),存在于部分数据包类,是客户端收到的具体消息内容。

微消息队列 MQTT 版相当于一个具备无限扩展能力的连接网关,不仅提供了自闭环的消息收发和存储能力,还提供了规则转入转出能力。可以通过配置规则搭配其他产品,例如传统服务端消息中间件(消息队列 RocketMQ)来实现云和端的数据双向互通。

微消息队列 MQTT 版系统采用分布式理念进行设计,无单点瓶颈,各组件之间均可以无限水平扩展,保证容量可以随着您的在线使用量进行调整,并且对用户完全透明。

通信过程

  • MQTT连接服务器

    客户端到服务器的网络连接建立后,客户端发送给服务器的第一个报文必须是CONNECT报文

    在一个网络连接上,客户端只能发送一次CONNECT报文,如果出现第二个CONNECT 报文,按照协议标准,服务器会将第二个CONNECT报文当作协议违规处理并断开客户端的连接。

    对于正常的连接请求,服务器必须产生应答报文,如果无法建立会话,服务器应该在应答报文中报告对应的错误代码。

    mqtt007
  • MQTT订阅主题

    客户端向服务器发送SUBSCRIBE报文用于创建一个或多个订阅。

    在服务器中,会记录这个客户关注的一个或者多个主题,当服务器收到这些主题的PUBLISH报文的时候,将分发应用消息到与之匹配的客户端中。

    SUBSCRIBE报文支持通配符,也为每个订阅指定了最大的QoS等级,服务器根据这些信息分发应用消息给客户端。

    SUBSCRIBE报文拥有固定报头、可变报头、有效载荷。

    当服务器收到客户端发送的一个SUBSCRIBE报文时,必须向客户端发送一个SUBACK报文响应,同时SUBACK报文必须和等待确认的SUBSCRIBE报文有相同的报文标识符。

    如果服务器收到一个SUBSCRIBE报文,报文的主题过滤器与一个现存订阅的主题过滤器相同,那么必须使用新的订阅彻底替换现存的订阅。新订阅的主题过滤器和之前订阅的相同,但是它的最大QoS值可以不同。与这个主题过滤器匹配的任何现存的保留消息必须被重发,但是发布流程不能中断。

    image-20250715182842243
  • MQTT发布消息

    PUBLISH报文是指从客户端向服务器或者服务器向客户端发送一个应用消息。

    其实从服务器分发的报文给订阅者,也是属于PUBLISH控制报文。

    MQTT按照服务质量 (QoS) 等级分发应用消息。分发应用消息给多个客户端(订阅者)时,每个客户端独立处理。从发布者发布消息到接受者,分发的消息服务质量可能是不同的,这取决于订阅者订阅主题时指定的服务质量等级。而对于发布者而言,发布消息时就指定了服务质量等级。

    • QoS=0(最多分发一次:不响应不重试)

      消息的分发依赖于底层网络的能力。发布者必须发送QoS等于0,DUP等于0的PUBLISH报文。

      服务器不会发送响应,发布者也不会重试,它在发出这个消息的时候就立马将消息丢弃,这个消息可能送达一次也可能没送达。

      服务器接受PUBLISH报文时要将消息分发给订阅该主题的订阅者。

    • QoS=1(至少分发一次:一次响应)

      确保消息至少送达一次,可能被多次处理。

      发送的PUBLISH报文的报头中必须包含一个报文标识符,且QoS等于1,DUP等于0。

      1. 新的应用消息都必须分配一个未使用的报文标识符,在发布消息的同时将消息存储起来,等待服务器的应答。

      2. 服务器回应 PUBACK报文,必须包含发布者的报文标识符。

      发布者收到来自服务器的PUBACK报文后,这个报文标识符就可重复使用

      在发送了PUBACK报文之后,接收者必须将任何包含相同报文标识符的PUBLISH报文当作一个新的消息,并忽略DUP标志的值。

      image-20250715184334437
    • QoS=2(仅分发一次:两次确认并存储)

      最高等级的服务质量,这个服务质量等级会有额外的开销。消息可变报头中有报文标识符。

      接收者使用两步确认过程来确认收到。发送者必须给消息分配一个未使用的报文标识符,且报文的QoS等于2,DUP等于0。

      1. 发布者发送一个未确认的报文消息,并将消息存储。

      2. 服务器响应PUBREC报文,并存储报文标识符和消息。

      3. 发布者丢弃消息,存储报文标识符,并发送一个PUBREL报文。必须包含与原始PUBLISH报文相同的报文标识符。

      4. 服务器发送对应的PUBCOMP报文(一旦发送了对应的PUBREL报文就不能重发这个PUBLISH报文)。

      5. 发布者丢弃存储的报文标识符。

      image-20250715185705230
  • 取消订阅:客户端发送UNSUBSCRIBE报文给服务器,用于取消订阅主题(主题存在且被订阅,需保证已存在的消息分发完成)。

    UNSUBSCRIBE报文固定报头的第3,2,1,0位是保留位且必须分别设置为0,0,1,0。否则服务器必须认为任何其它的值都是不合法的并关闭网络连。具体的描述可以看协议文档。

    即使没有删除任何主题订阅(客户端取消订阅的主题未被订阅),服务器也必须发送一个UNSUBACK响应。

  • 断开连接:DISCONNECT报文是客户端发给服务端的最后一个控制报文。表示客户端正常断开连接。

    DISCONNECT报文的固定报头保留位必须全为0。必须丢弃任何与当前连接关联的未发布的遗嘱消息。

Demo

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
public class MqttDemo {
// MQTT 服务器地址
private static final String MQTT_BROKER = "tcp://iot.modbus.cn:1883";

// MQTT 客户端 ID
private static final String MQTT_CLIENT_ID = "4QR8TZ9ThuL4G";

// MQTT 用户名
private static final String MQTT_USERNAME = "ceshi";

// MQTT 密码
private static final String MQTT_PASSWORD = "Abc123456";

// 订阅的主题
private static final String MQTT_TOPIC_SUBSCRIBE = "/server/coo/4QR8TZ9ThuL4G";

// 发布的主题
private static final String MQTT_TOPIC_PUBLISH = "/dev/coo/4QR8TZ9ThuL4G";

// 心跳间隔,单位为秒
private static final int MQTT_KEEP_ALIVE_INTERVAL = 60;

public static void main(String[] args) throws MqttException {
// 创建 MQTT 客户端
MqttClient mqttClient = new MqttClient(MQTT_BROKER, MQTT_CLIENT_ID, new MemoryPersistence());

// 设置 MQTT 连接选项
MqttConnectOptions options = new MqttConnectOptions();
options.setUserName(MQTT_USERNAME);
options.setPassword(MQTT_PASSWORD.toCharArray());
options.setCleanSession(true);
options.setKeepAliveInterval(MQTT_KEEP_ALIVE_INTERVAL);

// 设置回调
mqttClient.setCallback(new MqttCallback() {
@Override
public void connectionLost(Throwable cause) {
System.out.println("连接断开");
}

@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
System.out.println("收到消息。主题: " + topic);
System.out.println("消息: " + new String(message.getPayload()));

// 将消息转换为字符串
String msg = new String(message.getPayload());

// 使用 JSON 工具解析字符串
JSONArray jsonArray = new JSONArray(msg);

for (int i = 0; i < jsonArray.length(); i++) {
JSONObject jsonObject = jsonArray.getJSONObject(i);
int device_id = jsonObject.getInt("sensor_device_id");
int port_id = jsonObject.getInt("port_id");
int sdata = jsonObject.getInt("sdata");

System.out.println("设备ID: " + device_id + ", 端口ID: " + port_id + ", 数据: " + sdata);
}
}

@Override
public void deliveryComplete(IMqttDeliveryToken token) {
System.out.println("消息发布完成");
}
});

// 连接到 MQTT 服务器
mqttClient.connect(options);

// 订阅主题
mqttClient.subscribe(MQTT_TOPIC_SUBSCRIBE);

// 创建消息
MqttMessage message = new MqttMessage();
message.setPayload("[{\"sensor_device_id\": 1, \"port_id\": 1, \"sdata\": 98.633}]".getBytes());

// 发布消息
mqttClient.publish(MQTT_TOPIC_PUBLISH, message);
}
}

RabbitMQ 支持

RabbitMQ 是基于 AMQP 0.9.1 协议实现的广泛使用的开源消息队列产品,RabbitMQ 以插件的形式支持了 MQTT 协议,可以在 RabbitMQ 集群上方便的支持 MQTT 协议,实现对物联网等业务场景的支持

通过在集群节点执行以下命令开启MQTT插件:

1
2
3
4
5
6
7
8
9
10
11
rabbitmq-plugins enable rabbitmq_mqtt

<!--mqtt依赖包-->
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
</dependency>
img

AMQP 端口(默认端口:5672): 这是 RabbitMQ 使用的端口,用于 AMQP 协议。确保你的安全组允许来自需要访问 RabbitMQ 的客户端的流量通过这个端口。

管理界面端口(默认端口:15672): 如果你需要访问 RabbitMQ 的 Web 管理界面,你需要开放这个端口。

MQTT 端口(默认端口:1883): 这是 MQTT 服务使用的端口。确保这个端口也被安全组允许通过,以便 MQTT 客户端可以连接到 RabbitMQ 的 MQTT 服务。

验证 RabbitMQ 和 MQTT 下行消息的互通性,可以 RabbitMQ 发送消息,MQTT 订阅到消息。

RabbitMQ 的 MQTT 插件,可以支持正常的 MQTT 消息收发,也可以支持 MQTT 上行消息到应用,也可以支持应用发送 MQTT 消息下行消息到订阅端,并且有完善的监控。

注意事项:

  • MQTT 协议的 topic 使用"/"分割 topic, AMQP 协议的 Topic(Routingkey)使用 "."分割 topic,所以在协议转换的时候会自动转换,应用使用的时候要注意这个差别。

  • 不推荐 MQTT 使用匿名连接或“no login credentials”,因为 AMQP 协议会自动转换为默认用户 guest 或 mqtt.default_user,不方便做权限管控。

  • 关于订阅持久性,注意 MQTT 和 AMQP 队列持久性的映射。

    • Transient clients that use transient (non-persistent) messages
    • Stateful clients that use durable subscriptions (non-clean sessions, QoS1)。
  • 优先使用镜像队列,不要使用 Quorum Queues 特性,因为 Quorum 要求至少三节点,并且新特性稳定性待验证,不推荐用。

RocketMQ 支持

MQTT协议定义的是一个Pub/Sub的通信模型,这个与RocketMQ是类似的,不过其在订阅方式上比较灵活,可以支持多级Topic订阅(如 “/t/t1/t2”),甚至可以支持通配符订阅(如 “/t/t1/+”)。

架构模型:

image

队列存储模型:

image

由于RocketMQ-MQTT项目依赖RocketMQ底层的多队列分发,RocketMQ从4.9.3版本开始支持这一特性,因此您需要确认RocketMQ的版本升级到4.9.3或更高版本,并且确保以下配置项已开启:

1
2
enableLmq = true 
enableMultiDispatch = true

生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
public class RocketMQProducer {
private static DefaultMQProducer producer;
private static String firstTopic = System.getenv("firstTopic");
private static String recvClientId = "recv01";

public static void main(String[] args) throws Exception {
//Instantiate with a producer group name.
producer = new DefaultMQProducer("PID_TEST");
// Specify name server addresses.
producer.setNamesrvAddr(System.getenv("namesrv"));
//Launch the instance.
producer.start();

for (int i = 0; i < 1000; i++) {
try {
sendMessage(i);
Thread.sleep(1000);
sendWithWildcardMessage(i);
Thread.sleep(1000);
} catch (Exception e) {
e.printStackTrace();
}
}
producer.shutdown();
}

private static void setLmq(Message msg, Set<String> queues) {
msg.putUserProperty(MessageConst.PROPERTY_INNER_MULTI_DISPATCH,
StringUtils.join(
queues.stream().map(s -> StringUtils.replace(s, "/", "%")).map(s -> MixAll.LMQ_PREFIX + s).collect(Collectors.toSet()),
MixAll.MULTI_DISPATCH_QUEUE_SPLITTER));
}

private static void sendMessage(int i) throws MQBrokerException, RemotingException, InterruptedException, MQClientException {
Message msg = new Message(firstTopic,
"MQ2MQTT",
("MQ_" + System.currentTimeMillis() + "_" + i).getBytes(StandardCharsets.UTF_8));
String secondTopic = "/r1";
setLmq(msg, new HashSet<>(Arrays.asList(TopicUtils.wrapLmq(firstTopic, secondTopic))));
SendResult sendResult = producer.send(msg);
System.out.println("sendMessage: " + new String(msg.getBody()));
}

private static void sendWithWildcardMessage(int i) throws MQBrokerException, RemotingException, InterruptedException, MQClientException {
Message msg = new Message(firstTopic,
"MQ2MQTT",
("MQwc_" + System.currentTimeMillis() + "_" + i).getBytes(StandardCharsets.UTF_8));
String secondTopic = "/r/wc";
Set<String> lmqSet = new HashSet<>();
lmqSet.add(TopicUtils.wrapLmq(firstTopic, secondTopic));
lmqSet.addAll(mapWildCardLmq(firstTopic, secondTopic));
setLmq(msg, lmqSet);
SendResult sendResult = producer.send(msg);
System.out.println(() + "sendWcMessage: " + new String(msg.getBody()));
}

private static Set<String> mapWildCardLmq(String firstTopic, String secondTopic) {
// todo by yourself
return new HashSet<>(Arrays.asList(TopicUtils.wrapLmq(firstTopic, "/r/+")));
}

}

消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class RocketMQConsumer {

public static void main(String[] args) throws MQClientException {
// Instantiate with specified consumer group name.
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("GID_test01");

// Specify name server addresses.
consumer.setNamesrvAddr(System.getenv("namesrv"));

// Subscribe one more more topics to consume.
String firstTopic = System.getenv("firstTopic");
consumer.subscribe(firstTopic, Constants.MQTT_TAG);
// Register callback to execute on arrival of messages fetched from brokers.
consumer.registerMessageListener(new MessageListenerConcurrently() {

@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
MessageExt messageExt = msgs.get(0);
System.out.println("Receive: " + new String(messageExt.getBody()));
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});

consumer.start();

System.out.printf("Consumer Started.%n");
}
}

EMQX

EMQX 提供了高效可靠海量物联网设备连接,能够高性能实时移动与处理消息和事件流数据,以弹性伸缩、安全可靠的方式连接数以亿计的物联网设备

  • 海量连接:单节点支持 500 万 MQTT 设备连接,集群可水平扩展至支持 1 亿并发的 MQTT 连接。

  • 高可靠:弹性伸缩,无单点故障。内置 RocksDB 可靠地持久化 MQTT 消息,确保无数据损失。

  • 数据安全:端到端数据加密(支持国密),细粒度访问控制,保障数据安全,满足企业合规需求。

  • 多协议:支持 MQTT、HTTP、QUIC、WebSocket、LwM2M/CoAP 或专有协议连接任何设备。

  • 全面支持 MQTT 5.0 标准:100% 符合 MQTT 5.0 和 3.x 标准,具有更好的可扩展性、安全性和可靠性。

  • 高性能:单节点支持每秒实时接收、处理与分发数百万条的 MQTT 消息。毫秒级消息交付时延。

  • 易运维:图形化配置、操作与管理,实时监测运行状态。支持 MQTT 跟踪进行端到端问题分析。

  • 云原生:通过 Kubernetes Operator 和 Terraform,可以轻松地在企业内部和公共云中进行部署。

原理

  • MQTT 客户端

    任何运行 MQTT 客户端库的应用或设备都是 MQTT 客户端。例如,使用 MQTT 的即时通讯应用是客户端,使用 MQTT 上报数据的各种传感器是客户端,各种 MQTT 测试工具也是客户端。

  • MQTT Broker

    MQTT Broker 是负责处理客户端请求的关键组件,包括建立连接、断开连接、订阅和取消订阅等操作,同时还负责消息的转发。一个高效强大的 MQTT Broker 能够轻松应对海量连接和百万级消息吞吐量,从而帮助物联网服务提供商专注于业务发展,快速构建可靠的 MQTT 应用。

  • 发布-订阅模式

    发布-订阅模式与客户端-服务器模式的不同之处在于,它将发送消息的客户端(发布者)和接收消息的客户端(订阅者)进行了解耦。发布者和订阅者之间无需建立直接连接,而是通过 MQTT Broker 来负责消息的路由和分发。

    下图展示了 MQTT 发布/订阅过程。温度传感器作为客户端连接到 MQTT Broker,并通过发布操作将温度数据发布到一个特定主题(例如 Temperature)。MQTT Broker 接收到该消息后会负责将其转发给订阅了相应主题(Temperature)的订阅者客户端。

    MQTT 发布-订阅模式
  • 主题

    MQTT 协议根据主题来转发消息。主题通过 / 来区分层级,类似于 URL 路径,例如:

    1
    2
    3
    4
    5
    chat/room/1

    sensor/10/temperature

    sensor/+/temperature

    MQTT 主题支持以下两种通配符:+#

    • +:表示单层通配符,例如 a/+ 匹配 a/xa/y

    • #:表示多层通配符,例如 a/# 匹配 a/xa/b/c/d

    注意:通配符主题只能用于订阅,不能用于发布。

    QoS

    MQTT 提供了三种服务质量(QoS),在不同网络环境下保证消息的可靠性。

    • QoS 0:消息最多传送一次。如果当前客户端不可用,它将丢失这条消息。

    • QoS 1:消息至少传送一次。

    • QoS 2:消息只传送一次。

发布/订阅

适用于复杂和高性能的消息应用程序。这些特性包括支持通配符主题、基于主题的消息过滤、消息持久化和消息质量等级(QoS)设置。

发布功能允许连接到 EMQX 的设备向特定主题发送消息。消息可以包含任何类型的数据,例如传感器读数、状态更新或命令。当设备发布消息到一个主题时,EMQX 接收该消息并将其转发给所有订阅了该主题的设备。

EMQX 中的订阅功能允许设备从特定主题接收消息。设备可以订阅一个或多个主题,并接收在这些主题上发布的所有消息。这使得设备能够实时监控特定事件或数据流,而无需不断轮询更新。

延迟发布

延迟发布是 EMQX 支持的 MQTT 扩展功能。当客户端使用特殊主题前缀 $delayed/{DelayInteval} 发布消息时,将触发延迟发布功能,可以实现按照用户配置的时间间隔延迟发布消息。

延迟发布主题的具体格式如下:

1
$delayed/{DelayInterval}/{TopicName}
  • $delayed:使用 $delay 作为主题前缀的消息都将被视为需要延迟发布的消息。延迟间隔由下一主题层级中的内容决定。

  • {DelayInterval}:指定该 MQTT 消息延迟发布的时间间隔,单位是秒,允许的最大间隔是 4294967 秒。如果 {DelayInterval} 无法被解析为一个整型数字,EMQX 将丢弃该消息,客户端不会收到任何信息。

  • {TopicName}:MQTT 消息的主题名称。

例如:

  • $delayed/15/x/y:15 秒后将 MQTT 消息发布到主题 x/y

  • $delayed/60/a/b:1 分钟后将 MQTT 消息发布到 a/b

  • $delayed/3600/$SYS/topic:1 小时后将 MQTT 消息发布到 $SYS/topic

排它订阅

排它订阅是 EMQX 支持的 MQTT 扩展功能。排它订阅允许对主题进行互斥订阅,一个主题同一时刻仅被允许存在一个订阅者,在当前订阅者未取消订阅前,其他订阅者都将无法订阅对应主题。

要进行排它订阅,您需要为主题名称添加前缀,如以下表格中的示例:

示例 前缀 真实主题名
$exclusive/t/1 $exclusive/ t/1

当某个客户端 A 订阅 $exclusive/t/1 后,其他客户端再订阅 $exclusive/t/1 时都会失败,直到 A 取消了对 $exclusive/t/1 的订阅为止。

注意: 排它订阅必须使用 $exclusive/ 前缀,在上面的示例中,其他客户端依然可以通过 t/1 成功进行订阅。

持久化

可以将会话和消息持久化存储到磁盘,并提供高可用副本以保证数据的冗余和一致性。通过会话持久化功能,可以实现有效的故障转移和恢复机制,确保服务的连续性和可用性,从而提高系统的可靠性。

集成 RabbitMQ

通过内置的规则引擎组件,该集成简化了 EMQX 与 RabbitMQ 之间的数据摄取过程,无需复杂编码。

  • 创建 RabbitMQ Sink 规则,通过配置的 Sink 将处理后的结果传输到 RabbitMQ 的队列 (MQTT 主题 t/#

    • 规则 SQL:SELECT payload as data , now_timestamp() as timestamp FROM “t/#”

    • 配置信息

      • 交换机: 输入之前创建的 test_exchange, 消息将被发送到该交换机。

      • 路由键: 输入之前创建的 test_routing_key,用于将消息路由到 RabbitMQ 交换中的正确队列。

      • 消息传递模式下拉框中选择 non_persistentpersistent

        • non_persistent (默认选项):消息不会持久化到磁盘,如果 RabbitMQ 重新启动或崩溃,消息可能会丢失。

        • persistent:消息被持久化到磁盘,以确保在 RabbitMQ 重新启动或崩溃时的数据持久性。

          TIP

          如果需要在 RabbitMQ 重新启动时防止消息丢失,您可能还需要将 queue 和 exchange 设置为 durable。有关更多信息,请参阅 RabbitMQ 的文档

      • 有效载荷模版: 默认值为空字符串,意味着将被转发至 RabbitMQ 服务器的消息载荷是未经任何修改的 JSON 格式文本。

      • 等待发布确认: 默认开启以保证消息被成功发送至 RabbitMQ。

  • 创建 RabbitMQ Source 规则,将 RabbitMQ 队列中的数据转发至EMQX本地

    • 规则 SQL:SELECT * FROM “$bridges/rabbitmq:my-rabbitmq-source”

    • 配置信息

      • 队列:RabbitMQ 中创建好的队列名称。
      • No Ack:根据情况选择,指定从 RabbitMQ 消费消息时是否使用 no_ack 模式。使用 no_ack 模式表示消费者接收消息后不需要发送确认,消息会立刻从队列中移除。
      • 等待发布确认:使用消息发布者确认时,是否要等待 RabbitMQ 确认消息。
    • 创建消息重发布动作

      • 主题:发布到 MQTT 的主题,此处填写 t/1

      • QoS:选择 012、或 ${qos},也可以输入占位符从其他字段中设置 QoS,此处选择 ${qos} 表示跟随原始消息的 QoS。

      • Retain:选择 truefalse

        确认是否以保留消息方式发布消息,也可以输入占位符从其他字段中设置保留消息标志位,此处我们选择 false

      • 消息模板:用于生成转发消息 Payload 的模板,默认留空表示转发规则输出结果。此处可以输入 ${payload} 表示仅转发 Payload。

      • MQTT 5.0 消息属性:默认禁用

  • 创建一个连接器,将 RabbitMQ Sink 或 Source 连接到 RabbitMQ 服务器(Dashboard -> 创建连接器 -> 选择rmq,输入连接信息)。

下载 EMQX 开源版

1
2
3
4
5
6
7
8
9
10
11
# https://docker.aityp.com/image/docker.io/emqx/emqx:5.8.1

docker pull swr.cn-north-4.myhuaweicloud.com/ddn-k8s/docker.io/emqx/emqx:5.8.1
# 或者
docker tag swr.cn-north-4.myhuaweicloud.com/ddn-k8s/docker.io/emqx/emqx:5.8.1 docker.io/emqx/emqx:5.8.1
# 修改镜像名称
docker tag 0526464d2e3a emqx/emqx
# 启动
docker run -d --name emqx -p 1883:1883 -p 8083:8083 -p 8084:8084 -p 8883:8883 -p 18083:18083 emqx/emqx
# 访问emqx(默认账号admin / public)
http://localhost:18083/

java客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
public class MyMqttClient {

public static MqttClient mqttClient = null;
private static MemoryPersistence memoryPersistence = null;
private static MqttConnectOptions mqttConnectOptions = null;
private static String ClientName = "test-mqtt"; //待填 将在服务端出现的名字
private static String IP = "127.0.0.1"; //待填 服务器IP

public static void start(String clientId) {
//初始化连接设置对象
mqttConnectOptions = new MqttConnectOptions();
//设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,
//这里设置为true表示每次连接到服务器都以新的身份连接
mqttConnectOptions.setCleanSession(true);
//设置连接超时时间,单位是秒
mqttConnectOptions.setConnectionTimeout(10);
//设置持久化方式
memoryPersistence = new MemoryPersistence();
if (null != clientId) {
try {
mqttClient = new MqttClient("tcp://" + IP + ":1883", clientId, memoryPersistence);
} catch (MqttException e) {
e.printStackTrace();
}
}
//设置连接和回调
if (null != mqttClient) {
if (!mqttClient.isConnected()) {
//创建回调函数对象
MQTTReceiveCallback MQTTReceiveCallback = new MQTTReceiveCallback();
//客户端添加回调函数
mqttClient.setCallback(MQTTReceiveCallback);
//创建连接
try {
System.out.println("创建连接");
mqttClient.connect(mqttConnectOptions);
} catch (MqttException e) {
e.printStackTrace();
}

}
} else {
System.out.println("mqttClient为空");
}
System.out.println("连接状态" + mqttClient.isConnected());
}

// 关闭连接
public void closeConnect() {
//关闭存储方式
if (null != memoryPersistence) {
try {
memoryPersistence.close();
} catch (MqttPersistenceException e) {
e.printStackTrace();
}
} else {
System.out.println("memoryPersistence is null");
}

//关闭连接
if (null != mqttClient) {
if (mqttClient.isConnected()) {
try {
mqttClient.disconnect();
mqttClient.close();
} catch (MqttException e) {
e.printStackTrace();
}
} else {
System.out.println("mqttClient is not connect");
}
} else {
System.out.println("mqttClient is null");
}
}

// 发布消息
public static void publishMessage(String pubTopic, String message, int qos) {
if (null != mqttClient && mqttClient.isConnected()) {
System.out.println("发布消息,id:" + mqttClient.getClientId());
MqttMessage mqttMessage = new MqttMessage();
mqttMessage.setQos(qos);
mqttMessage.setPayload(message.getBytes());

MqttTopic topic = mqttClient.getTopic(pubTopic);

if (null != topic) {
try {
MqttDeliveryToken publish = topic.publish(mqttMessage);
if (!publish.isComplete()) {
System.out.println("消息发布成功");
}
} catch (MqttException e) {
e.printStackTrace();
}
}
} else {
reConnect();

publishMessage(pubTopic, message, qos);
}
}

// 重新连接
public static void reConnect() {
if (null != mqttClient) {
if (!mqttClient.isConnected()) {
if (null != mqttConnectOptions) {
try {
mqttClient.connect(mqttConnectOptions);
} catch (MqttException e) {
e.printStackTrace();
}
} else {
System.out.println("mqttConnectOptions is null");
}
} else {
System.out.println("mqttClient is null or connect");
}
} else {
start(ClientName);
}
}

// 订阅主题
public static void subTopic(String topic) {
if (null != mqttClient && mqttClient.isConnected()) {
try {
mqttClient.subscribe(topic, 1);
} catch (MqttException e) {
e.printStackTrace();
}
} else {
System.out.println("mqttClient is error");
}
}

// 清空主题
public void cleanTopic(String topic) {
if (null != mqttClient && !mqttClient.isConnected()) {
try {
mqttClient.unsubscribe(topic);
} catch (MqttException e) {
e.printStackTrace();
}
} else {
System.out.println("mqttClient is error");
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// 发布消息的回调类
public class MQTTReceiveCallback implements MqttCallback {
// 连接丢失后,一般在这里面进行重连
public void connectionLost(Throwable cause) {
System.out.println("连接断开,可以做重连");
}

// 接收到已经发布的 QoS 1 或 QoS 2 消息的传递令牌时调用。由 MqttClient.connect 激活此回调。
public void deliveryComplete(IMqttDeliveryToken token) {
System.out.println("deliveryComplete---------" + token.isComplete());
}
// subscribe后得到的消息会执行到这里面
public void messageArrived(String topic, MqttMessage message) throws Exception {
System.out.println("接收消息主题 : " + topic);
System.out.println("接收消息Qos : " + message.getQos());
System.out.println("接收消息内容 : " + new String(message.getPayload()));
}
}

HiveMQ

HiveMQ通过提供强大的安全协议、可扩展架构和事件驱动通信等功能,增强了MQTT的能力,使其成为智能城市中关键基础设施和服务的理想选择。

与 MQ 的关联和区别

在物联网 IoT 场景中,成千上万(甚至数百万)规模的设备传感器可使用微消息队列 MQTT 版上传数据,需做数据分析的服务端(即部署在服务器上的应用)则可以通过消息队列 RocketMQ 版完成数据的分析与处理

产品名 适用场景
微消息队列 MQTT 版 面向移动端场景,移动端场景一般都具备海量设备,单设备数据较少的特点。
微消息队列 MQTT 适用于拥有大量在线客户端(过万,甚至上百万),但每个客户端消息较少的场景。
消息队列 RocketMQ 版 面向服务端的消息引擎,主要用于服务组件之间的解耦、异步通知、削峰填谷等,服务器规模较小,但需要大量的消息处理,吞吐量要求高。
消息队列 RocketMQ 版适用于服务端进行大批量的数据处理和分析的场景。

功能对比:

功能特性 微消息队列 MQTT 版 消息队列 RocketMQ 版
客户端连接数 客户端规模庞大,百万甚至千万级。 一般服务器规模较小,极少数万级。
单客户端消息量 单个客户端需要处理的消息少,一般定时收发消息。 单个客户端处理消息量大,注重吞吐量。
部署场景 移动设备、App 软件、H5 页面等。 服务端应用。
消费模式 支持广播模式。 支持集群消费和广播消费。
顺序支持 只支持上行顺序,不支持下行顺序(后续开放)。 支持上行和下行顺序。
多语言/系统支持(TCP 协议) 支持 Java、C、C++、.NET、Android、iOS、Python、JS、Go 等多种语言和系统。 支持 Java、C++、.NET。
访问凭证 支持 RAM 主子账号授权和 MQTT Token 的临时访问模式,详情请参见鉴权概述。 支持 RAM 主子账号授权和 跨云账号授权(STS 临时授权访问)。

选型推荐:

场景 部署端 微消息队列 MQTT 版 消息队列 RocketMQ 版
设备端上报状态数据 移动终端 ×
接收并处理分析设备的上报数据 移动终端 ×
对多个设备下发控制指令 服务器 ×
直播、弹幕、聊天 App 收发消息 应用 ×
服务端接收并分析聊天消息 服务器 ×

MQTT协议广泛应用场景也是非常非常多,下边列举一些:

  • 物联网M2M通信,物联网大数据采集

  • Android消息推送,WEB消息推送

  • 移动即时消息,例如Facebook Messenger

  • 智能硬件、智能家具、智能电器

  • 车联网通信,电动车站桩采集

  • 智慧城市、远程医疗、远程教育

  • 电力、石油与能源等行业市场

如何关联:将 MQTT 数据传输到 RabbitMQ

RabbitMQ 数据集成是 EMQX 中的开箱即用功能,结合了 EMQX 的设备接入、消息传输能力与 RabbitMQ 强大的消息队列处理能力。

EMQX-RabbitMQ 集成

EMQX 支持与 RabbitMQ 的数据集成,能够让您将 MQTT 消息和事件转发至 RabbitMQ,还能够实现从 RabbitMQ Server 中消费数据,并发布到 EMQX 特定主题中,实现 RabbitMQ 到 MQTT 的消息下发。

MQTT 数据摄取到 RabbitMQ 的工作流程如下:

  1. 消息发布和接收:工业物联网设备通过 MQTT 协议与 EMQX 建立成功连接,并向 EMQX 发布实时 MQTT 数据。EMQX 收到这些消息后,将启动其规则引擎中的匹配过程。

  2. 消息数据处理:消息到达后,它将通过规则引擎进行处理,然后由 EMQX 中定义的规则处理。根据预定义的标准,规则将决定哪些消息需要路由到 RabbitMQ。如果任何规则指定了载荷转换,则将应用这些转换,例如转换数据格式、过滤特定信息或用额外的上下文丰富载荷。

  3. 消息传入到 RabbitMQ:规则处理完消息后,它将触发一个动作,将消息转发到 RabbitMQ。处理过的消息将无缝写入 RabbitMQ。

  4. 数据持久化和利用:RabbitMQ 将消息存储在队列中,并将它们传递给适当的消费者。消息可以被其他应用程序或服务消费以进行进一步处理,如数据分析、可视化和存储。

对比 CoAP 协议

CoAP 的关键特性包括

  • 基于 UDP:不同于运行在 TCP 之上的 MQTT,CoAP 设计为使用 UDP,更适合于网络和资源受限的环境。

  • 类 HTTP 语义:CoAP 采用了类似 HTTP 的语义,使用 GET、POST、PUT 和 DELETE 等方法进行交云。这使得熟悉 HTTP 的开发者可以轻松地采用 CoAP。

  • 确认消息:CoAP 提供了确保消息送达接收方的确认消息机制。如果没有收到确认回复,消息将会被重新发送。

  • 资源观察:CoAP 允许客户端“观察”资源,这样当资源状态发生变化时,客户端可以自动接收更新。

  • 分块传输:CoAP 支持将大型负载分割成小块进行传输,这对数据包大小敏感的受限网络来说非常有用。

特性 MQTT CoAP
传输层 TCP(开销大) UDP(开销小)
头部大小 2 字节(灵活头部) 4 字节(固定头部)
资源开销 非常低
消息模型 发布/订阅 请求/响应 RESTful
消息可靠性 非常高 较低
功能多样性 丰富 较少
扩展性 高度的可扩展性 适合资源受限的环境,高并发场景下可能会遇到挑战
安全性 SSL/TLS DTLS(数据报传输层安全)
应用场景 油气管道监控、工业自动化、车联网、远程医疗 低功耗传感器、农业、环境检测、智能计量

在决定选用 MQTT 还是 CoAP 时,应考虑以下因素:

  • 网络环境:如果您的网络不稳定或带宽受限,MQTT 凭借其轻量级设计和处理高延迟的能力,可能是更佳的选择。

  • 设备能力:如果您的设备处理能力或存储空间有限,CoAP 的低开销特性可能使其成为更适合的选项。

  • 消息传递保证:如果您需要确保消息只传递一次,MQTT 提供的高级服务质量(QoS)可能更加有利。

  • 安全需求:MQTT 和 CoAP 都提供了安全特性,但它们的实现方式各不相同。根据您的具体安全需求选择合适的协议。