EMQ

1.EMQ介绍

EMQ官网文档

EMQ X (Erlang/Enterprise/Elastic MQTT Broker) 是基于 Erlang/OTP 平台开发的开源物联网 MQTT 消息服务器。

Erlang/OTP是出色的软实时 (Soft-Realtime)、低延时 (Low-Latency)、分布式 (Distributed)的语言平台。

MQTT 是轻量的 (Lightweight)、发布订阅模式 (PubSub) 的物联网消息协议。

EMQ X 设计目标是实现高可靠,并支持承载海量物联网终端的MQTT连接,支持在海量物联网设备间低延时消息路由:

  1. 稳定承载大规模的 MQTT 客户端连接,单服务器节点支持50万到100万连接。
  2. 分布式节点集群,快速低延时的消息路由,单集群支持1000万规模的路由。
  3. 消息服务器内扩展,支持定制多种认证方式、高效存储消息到后端数据库。
  4. 完整物联网协议支持,MQTT、MQTT-SN、CoAP、LwM2M、WebSocket 或私有协议支持。

EMQ X 开源版、企业版和专业版的对比

2.EMQ技术

2.1 MQTT协议

MQTT是一个轻量的发布订阅模式消息传输协议,专门针对低带宽和不稳定网络环境的物联网应用设计。

MQTT官网: http://mqtt.org

MQTT V3.1.1协议规范: http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html

2.1.1 特点

  1. 开放消息协议,简单易实现
  2. 发布订阅模式,一对多消息发布
  3. 基于TCP/IP网络连接
  4. 1字节固定报头,2字节心跳报文,报文结构紧凑
  5. 消息QoS支持,可靠传输保证

2.1.2 应用场景

MQTT协议广泛应用于物联网、移动互联网、智能硬件、车联网、电力能源等领域。

  1. 物联网M2M通信,物联网大数据采集
  2. Android消息推送,WEB消息推送
  3. 移动即时消息,例如Facebook Messenger
  4. 智能硬件、智能家具、智能电器
  5. 车联网通信,电动车站桩采集
  6. 智慧城市、远程医疗、远程教育
  7. 电力、石油与能源等行业市场

2.1.3 基于主题的消息路由

MQTT协议基于主题(Topic)进行消息路由,主题(Topic)类似URL路径,例如:

chat/room/1

sensor/10/temperature

sensor/+/temperature

$SYS/broker/metrics/packets/received

$SYS/broker/metrics/#

主题(Topic)通过’/’分割层级,支持’+’, ‘#’通配符:

'+': 表示通配一个层级,例如a/+,匹配a/x, a/y

'#': 表示通配多个层级,例如a/#,匹配a/x, a/b/c/d

订阅者与发布者之间通过主题路由消息进行通信,例如采用mosquitto命令行发布订阅消息:

mosquitto_sub -t a/b/+ -q 1

mosquitto_pub -t a/b/c -m hello -q 1

注解

订阅者可以订阅含通配符主题,但发布者不允许向含通配符主题发布消息。

2.1.4 MQTT V3.1.1协议报文

报文结构

固定报头(Fixed header)
可变报头(Variable header)
报文有效载荷(Payload)

固定报头

Bit 7 6 5 4 3 2 1 0
byte1 MQTT Packet type Flags            
byte2… Remaining Length              

报文类型

类型名称 类型值 报文说明
CONNECT 1 发起连接
CONNACK 2 连接回执
PUBLISH 3 发布消息
PUBACK 4 发布回执
PUBREC 5 QoS2消息回执
PUBREL 6 QoS2消息释放
PUBCOMP 7 QoS2消息完成
SUBSCRIBE 8 订阅主题
SUBACK 9 订阅回执
UNSUBSCRIBE 10 取消订阅
UNSUBACK 11 取消订阅回执
PINGREQ 12 PING请求
PINGRESP 13 PING响应
DISCONNECT 14 断开连接

PUBLISH发布消息

PUBLISH报文承载客户端与服务器间双向的发布消息。 PUBACK报文用于接收端确认QoS1报文,PUBREC/PUBREL/PUBCOMP报文用于QoS2消息流程。

PINGREQ/PINGRESP心跳

客户端在无报文发送时,按保活周期(KeepAlive)定时向服务端发送PINGREQ心跳报文,服务端响应PINGRESP报文。PINGREQ/PINGRESP报文均2个字节。

MQTT消息QoS

MQTT发布消息QoS保证不是端到端的,是客户端与服务器之间的。订阅者收到MQTT消息的QoS级别,最终取决于发布消息的QoS和主题订阅的QoS。

发布消息的QoS 主题订阅的QoS 接收消息的QoS
0 0 0
0 1 0
0 2 0
1 0 0
1 1 1
1 2 1
2 0 0
2 1 1
2 2 2

Qos0消息发布订阅

Qos0消息发布订阅

Qos1消息发布订阅

Qos1消息发布订阅

Qos2消息发布订阅

Qos2消息发布订阅

3.EMQ使用

3.1 安装

安装流程

3.2 启动

# 启动
## 直接启动
$ emqx start
EMQ X v4.0.0 is started successfully!
## systemctl 启动
$ sudo systemctl start emqx
EMQ X v4.0.0 is started successfully!
## service 启动
$ sudo service emqx start
EMQ X v4.0.0 is started successfully!
# 查看启动状态
$ emqx_ctl status

3.3 操作

# 后台启动 EMQ X Broker;
$ emqx start
# 关闭 EMQ X Broker;
$ emqx stop
# 重启 EMQ X Broker;
$ emqx restart
# 使用控制台启动 EMQ X Broker;
% emqx console
# 使用控制台启动 EMQ X Broker,与 emqx console 不同,emqx foreground 不支持输入 Erlang 命令;
emqx foreground
# Ping EMQ X Broker。
emqx ping

其他指令

3.4 目录结构

描述 使用 ZIP 压缩包安装 使用二进制包安装
可执行文件目录 ./bin /usr/lib/emqx/bin
数据文件 ./data /var/lib/emqx/data
Erlang 虚拟机文件 ./erts-* /usr/lib/emqx/erts-*
配置文件目录 ./etc /etc/emqx/etc
依赖项目录 ./lib /usr/lib/emqx/lib
日志文件 ./log /var/log/emqx
启动相关的脚本、schema 文件 ./releases /usr/lib/emqx/releases

3.4.1 bin 目录

emqx、emqx.cmd

EMQ X 的可执行文件,具体使用可以查看 基本命令

emqx_ctl、emqx_ctl.cmd

EMQ X 管理命令的可执行文件,具体使用可以查看 管理命令 CLI

3.4.2 etc 目录

EMQ X 通过 etc 目录下配置文件进行设置,主要配置文件包括:

配置文件 说明
emqx.conf EMQ X 配置文件
acl.conf EMQ X 默认 ACL 规则配置文件
plugins/*.conf EMQ X 各类插件配置文件
certs EMQ X SSL 证书文件
emqx.lic License 文件仅限 EMQ X Enterprise

EMQ X 具体的配置内容可以查看 配置项

3.4.3 data 目录

EMQ X 将运行数据存储在 data 目录下,主要的文件包括:

configs/app.\*.config

EMQ X 读取 etc/emqx.confetc/plugins/*.conf 中的配置后,转换为 Erlang 原生配置文件格式,并在运行时读取其中的配置。

loaded_plugins

loaded_plugins 文件记录了 EMQ X 默认启动的插件列表,可以修改此文件以增删默认启动的插件。loaded_plugins 中启动项格式为 {, }.字段为布尔类型,EMQ X 会在启动时根据 的值判断是否需要启动该插件。关于插件的更多内容,请查看 插件

$ cat loaded_plugins
{emqx_management,true}.
{emqx_recon,true}.
{emqx_retainer,true}.
{emqx_dashboard,true}.
{emqx_rule_engine,true}.
{emqx_bridge_mqtt,false}.

mnesia

Mnesia 数据库是 Erlang 内置的一个分布式 DBMS,可以直接存储 Erlang 的各种数据结构。

EMQ X 使用 Mnesia 数据库存储自身运行数据,例如告警记录、规则引擎已创建的资源和规则、Dashbaord 用户信息等数据,这些数据都将被存储在 mnesia 目录下,因此一旦删除该目录,将导致 EMQ X 丢失所有业务数据。

可以通过 emqx_ctl mnesia 命令查询 EMQ X 中 Mnesia 数据库的系统信息,具体请查看 管理命令 CLI

3.4.4 log 目录

emqx.log.\*

EMQ X 运行时产生的日志文件,具体请查看 日志与追踪

crash.dump

EMQ X 的崩溃转储文件,可以通过 etc/emqx.conf 修改配置,具体内容可以查看 配置项

erlang.log.\*

emqx start 方式后台启动 EMQ X 时,控制台日志的副本文件。

3.5 配置说明

3.5.1 配置文件

配置文件 说明
etc/emqx.conf EMQ X 配置文件
etc/acl.conf EMQ X 默认 ACL 规则配置文件
etc/plugins/*.conf EMQ X 扩展插件配置文件
  • 监听器
监听器 说明
TCP Listener A listener for MQTT which uses TCP
SSL Listener A secure listener for MQTT which uses TLS
Websocket Listener A listener for MQTT over WebSockets
Secure Websocket Listener A secure listener for MQTT over secure WebSockets (TLS)
端口 说明
1883 MQTT/TCP 协议端口
11883 MQTT/TCP 协议内部端口,仅用于本机客户端连接
8883 MQTT/SSL 协议端口
8083 MQTT/WS 协议端口
8084 MQTT/WSS 协议端口

Listener 配置项的命名规则为 listener...xxx<Protocol>即 Listener 使用的协议,目前支持 tcp, ssl, ws, wss

一个 Zone 定义了一组配置项 (比如最大连接数等),Listener 可以通过配置项 listener...zone 指定使用某个 Zone,以使用该 Zone 下的所有配置。多个 Listener 可以共享同一个 Zone。Zone 的命名规则为 zone..xxxZone Name 可以随意命名,但同样建议是全小写的英文单词,xxx 是具体的配置项,你可以在 配置项 中查看 Zone 支持的所有配置项。

4.其他

4.1 paho-mqtt安装

4.1.1 Python

paho-mqtt-python存放在github

测试代码:

import paho.mqtt.client as mqtt
import json
import random

# 更换为自己的IP或者域名
Ip = "XXX.XXX.XXX.XXX"

def success_connect(*args):
    print('连接成功')

def on_message(client, other, message: mqtt.MQTTMessage):
    print(message.topic)
    print(message.payload)
    print(json.loads(message.payload))
    data = json.loads(message.payload)
    request_id = data.get('request_id', 'none')
    data = {
            'request_id': request_id,
            'errcode': 2002,
            'errmsg': 'params incorrect',
            'data': {
            'ok': 'success'
            }
        }
    client.publish('/pub/product/1998/1222222222/set', json.dumps(data, ensure_ascii=False))
    print('发送成功')

def on_subscribe(*args):
    print('订阅成功')

client = mqtt.Client(client_id='test_' + str(random.randint(1, 1000000000000)))
client.on_connect = success_connect
client.on_message = on_message
client.on_subscribe = on_subscribe
# client.username_pw_set('admin', 'admin')
client.connect(Ip, 1883, 60)
client.subscribe('/sub/product/1998/1222222222/set')
# while 1:
# time.sleep(1)
# client.publish('/sub/product/1/1222222222/set', "sdsdsd")
client.loop_forever()


欢迎关注我的微信公众号

互联网矿工

funpeefun

Search

    Post Directory