构建轻量级的边缘流处理软件 Kuiper
如果无法正常显示,请先停止浏览器的去广告插件。
1. www.emqx.io
构建轻量级的边缘流处理软件
eKuiper
黄济泳 · 2021
© 2021 EMQ Technologies Co., Ltd.
2.
3. 目录
边缘流式处理 eKuiper: 基于SQL的边缘流处理引擎 业务实践和案例
边缘流处理的背景,应用场景和面临的
挑战 详细解析 eKuiper 的边缘流式计算能力
以及架构实现 eKuiper的典型应用场景及其实现案例
© 2021 EMQ Technologies Co., Ltd.
4. 边缘流式处理
© 2021 EMQ Technologies Co., Ltd.
5. 流式处理
数据流入时立刻进行处理而非一次性批处理
实时
数据产生时立刻处理,低延迟
无界
数据源源不断流入系统,需要持续
计算
状态
可进行有状态计算,依赖基于时间
的上下文状态
© 2021 EMQ Technologies Co., Ltd.
6. 流式处理
使用场景
实时计算
以低延迟和增量的方式,计算实时
状态
规则引擎
灵活定义规则引擎,实现告警和消
息转发
数据管道
实现边缘与云端不同类型的数据格
式与异构协议之间的灵活转换,实
现IT & OT 融合
© 2021 EMQ Technologies Co., Ltd.
7. 边缘流式计算
为何云端流式计算框架不适用边缘
重量级 时延
当前流行的Apache Flink, Spark
Streaming 等都是为云端设计 需要通过网络传输计算输入和输出,无法适
用于低时延场景
数据安全 带宽
全量数据必须上传云端,有安全风险 边缘端数据量巨大,无效数据多。经过边缘
端处理后,可以极大减少传输量
© 2021 EMQ Technologies Co., Ltd.
8. 典型场景:车联网规则引擎
车型多
部分车型性能有限
车机
数据点位多,更新频繁
云端协同
复杂场景联动,动态更
新
EMQ X 集群
有状态计算,关注变化
不同车型点位不一
SOURCE
Sinks
SQL规则引擎
CAN BUS总线
MQTT 云边协同通道
云边协同管理
UDP Socket server
AI算法
训练框架
file
实时反控车
车身舒适性
C
A
N
典型规则
车辆开动车辆30秒内且窗户未关,关闭窗户
此处,开动车辆需要判断车辆启动状态变化
及其持续时间
动力总成
信息娱乐
底盘
© 2021 EMQ Technologies Co., Ltd.
9. eKuiper: 基于SQL的边缘流处理引擎
© 2021 EMQ Technologies Co., Ltd.
10. 边缘流式计算
3个主要挑战,某种程度下的三元悖论,需要取舍
轻量和高效
需要运行在CPU和内存受限的边缘端
轻量
敏捷与灵活
敏捷
可管理
边缘端量大且分散,需要用更加敏捷的方式实
现业务支持,避免复杂的编码和编译等工作,
实现热更新
易用,易部署和管理
支持非集中和弱网情况下的部署和自治
© 2021 EMQ Technologies Co., Ltd.
11. LF Edge eKuiper
边缘流式分析与计算
流式处理
规则引擎
消息推送
LF Edge 项目
由 EMQ 发起并托管在 LF Edge 基金会
轻代码
使用 SQL 实现数据处理,内置函数快速处理 IoT 数据
结构与非结构化数据处理
结构化物联网数据分析
图片、音频数据,以及 AI/ML 推理算法集成
开源社区边缘计算项目集成
EdgeX Foundry 参考规则引擎实现
KubeEdge, OpenYurt 边缘流式处理深度集成
© 2021 EMQ Technologies Co., Ltd.
12. 使用 eKuiper 3步骤
在边缘端或通过云边通道运行 CLI /REST API
• 创建流 create stream demo '() WITH (FORMAT="JSON", DATASOURCE="$hw/events/device/+/twin/update")'
• 创建规则:持续运行
的数据处理逻辑 {
"sql": "SELECT data->tag1->value AS temperature, data->tag2->value AS humidity FROM demo",
"actions": [
{
"log": {}
},
{
"mqtt": {
"server": "tcp://broker.emqx.io:1883",
"topic": "devices/result",
"qos": 1,
"clientId": "demo_001"
}
}
]
}
• 提交、运行规则
curl -X POST http://$kuiper_server:9081/rules -H 'Content-Type: application/json’
-d '$my_rule'
© 2021 EMQ Technologies Co., Ltd.
13. 用边缘流式处理实现车联网实时规则引擎
详解针对边缘场景的流式计算引擎的设计
1. 2. 3
. 4
.
轻量和高效 敏捷与灵活 实时计算 管理和扩展
适用车载芯片的超小安装包
和资源占用 动态,灵活的文本规则替代
OTA更新方式 内置IO 自定义扩展
丰富内置函数 云边协同管理
灵活格式化方案
© 2021 EMQ Technologies Co., Ltd.
14. 边缘节点
大概率需要考虑资源受限场景
算力千差万别
边缘设备算力较弱,资源受限
车机系统
车型众多,算力/资源有高有低
统一流计算引擎
需要支持最低算力/资源的设备
© 2021 EMQ Technologies Co., Ltd.
15. 轻量化
如何实现和取舍
语法解析和运行时针对编译端自研
安装包 < 10MB
核心部分几乎无第三方依赖,保证footprint较小
支持 SQL 语法适用于用户场景的子集,例如没有
subquery,可用直观的规则流水线代替实现
核心功能占用内存 < 12MB
功能可插拔
通过编译参数,控制编译的功能
提供不同的docker image, 应对不同的使用环境
车机共享总线source情况下,
多规则内存占用
内存复用
数据变换只添加,不复制
数据源共享,连接共享
© 2021 EMQ Technologies Co., Ltd.
16. 用边缘流式处理实现车联网实时规则引擎
详解针对边缘场景的流式计算引擎的设计
1
.
2.
3
. 4
.
轻量和高效 敏捷与灵活 实时计算 管理和扩展
适用车载芯片的超小安装包
和资源占用 动态,灵活的文本规则替代
OTA更新方式 流式计算窗口 自定义扩展
丰富内置IO/函数 云边协同管理
灵活格式化方案
© 2021 EMQ Technologies Co., Ltd.
17. 多变的边缘处理场景
想象OTA更新而被迫停车的窘境?
规则
元数据
连接
个性化定制,千人千面 不同车型,不同 DBC 格式变化
规则随时启停和更改 数据点位DBC变更频繁 目标地址变化
复杂场景联动
© 2021 EMQ Technologies Co., Ltd.
18. 规则动态管理和热更新
管理 接口 规则管理能力 管理API接口 多规则运行 REST API 规则持久化存储
动态增删改查 CLI 可选 sqlite 和 redis
动态启停 K8s tool (文本方式) 自动状态恢复
规则状态监控
存储
存储和迁移
可复制数据文件
© 2021 EMQ Technologies Co., Ltd.
19. 灵活的业务规则
SQL定义流处理
动态解析 规则独立,支持容错 通用
无需停机升级 相互隔离 低代码实现业务逻辑
SQL 解析为算子组成的DAG 运行失败不影响其他规则 代价:性能可能不如专用的程序
© 2021 EMQ Technologies Co., Ltd.
20. 用边缘流式处理实现车联网实时规则引擎
详解针对边缘场景的流式计算引擎的设计
1
. 2. 3. 轻量和高效 敏捷与灵活 实时计算 管理和扩展
适用车载芯片的超小安装包
和资源占用 动态,灵活的文本规则替代
OTA更新方式 内置IO 自定义扩展
丰富内置函数 云边协同管理
4
.
灵活格式化方案
© 2021 EMQ Technologies Co., Ltd.
21. 车机中的实时流计算
行车数据
数据采集与指令
数据处理
场景联动
CAN 总线
1、车况(状态、胎压、电量)
2、POI位置信息(轨迹、周边)
3、行车数据(速度、加速度、刹车、
1、指令下发 1、数据归类、清洗 1、数据应用
2、数据采集、筛选 2、数据聚合 2、规则联动
怠速等)
4、驾驶员行为
5、其他:空调、座椅加热、娱乐数据
© 2021 EMQ Technologies Co., Ltd.
22. eKuiper 中的查询类型
Join 和 Group By 需要运行在窗口中
…
连续查询 窗口式查询
数据流输入即触发查询 划分无界数据为有界的窗口
多为无状态查询 每个窗口可以将上下文状态物化为一个
或多个类似数据库的表,从而应用SQL
语句
SELECT * FROM demo
WHERE temperature > 2
0
SELECT avg(temperature) FROM demo G
ROUP BY TumblingWindow(mi, 1)
窗口N
id ts
… …
… …
…
…
tem hum
p
…
…
…
…
…
窗口N+1
…
…
id ts
… …
… …
…
…
…
tem hum
p
…
…
…
…
© 2021 EMQ Technologies Co., Ltd.
…
…
23. eKuiper支持的窗口
有状态计算
窗口用途 窗口类型 计算
定义触发 时间窗口 定义数据流分割 计数窗口 支持 Join,Group By 等需要多条数据
的SQL 语句
eKuiper中的窗口
写SQL时,每个窗口可类比为一个或多
个数据表。
© 2021 EMQ Technologies Co., Ltd.
24. IO 能力
SOURCE:外部系统输入eKuiper
SINK:eKuiper输出到外部系统
支持的SOURCE类型 支持的SINK类型
内置:Mqtt, edgeX, file, http 内置:Mqtt, edgeX, http, log
插件:zmq,random 插件:file, zmq, tdengine, redis , influx …
输入数据验证 输出格式化等配置
类似数据库建表,配置流格式 配置dataTemplate参数,利用go数据模板格
式化数据
非结构化数据,可不配置列格式
其他配置项 可配置空值是否输出,是否以数组
格式输出等
© 2021 EMQ Technologies Co., Ltd.
25. SQL 分析能力
内置支持
• 函数
• 数学: sin, cos, abs, log, mod etc; 共计 25个函数
• 字符串: concat, substring etc; 共计 19个函数
• 汇聚: avg, count, max, min, sum, collect & deduplicate; 共计 7 个函数
• Conversion/ Encoding & decoding / Hashing / JSON processing / 其它; 共计 18个函数
• 二进制处理:resize, thumbnail; 共计 2 个函数
• GeoHash函数,共计10个函数
• 过滤
•
WHERE / CASE WHEN
• Join (LEFT | RIGHT | FULL | CROSS JOIN )
• Streams: 动态流动数据
• Tables: 静态数据,经常用于关联更多别的数据。如提供用户 id,获取相关的名字
• Window
• Tumbling / Hopping / Sliding / Session / Count
• Group By & Order By
© 2021 EMQ Technologies Co., Ltd.
26. 扩展能力和运用
Go原生插件扩展
REST/gRPC调用扩展
(待发布) portable 插件,支持多语言
私有协议接入
实现各种私有协议的解析,以支持
进一步的处理
预计算
将高频状态通过扩展预先计算,简
化规则编写
指令下发
动态指定指令类型,下发参数等
© 2021 EMQ Technologies Co., Ltd.
27. 高效组合规则流水线
(即将发布)兼容 mqtt 主题格式的高速内存source/sink
© 2021 EMQ Technologies Co., Ltd.
28. 实时聚合分析
使用窗口进行有状态计算
• 计算最近 5s 平均值变化百分比
SELECT (ts - windowStart() < 5000) as firstHalf, AVG(val) as avg FROM demo
GROUP BY HoppingWindow(ss, 10, 5), firstHalf ORDER BY firstHalf
• SQL 输出样例
[{"firstHalf": false, "avg": 2},
{"firstMin":true, "avg":4.5}]
• 数据模板
"{\"percentage\":{{mulf (round (divf
(index . 0).avg (index . 1).avg) 2) 100}}}"
• 最终输出
{"percentage":"225"}
© 2021 EMQ Technologies Co., Ltd.
29. 用边缘流式处理实现车联网实时规则引擎
详解针对边缘场景的流式计算引擎的设计
4.
1
. 2. 3
. 轻量和高效 敏捷与灵活 实时计算 管理
适用车载芯片的超小安装包
和资源占用 动态,灵活的文本规则替代
OTA更新方式 内置IO 部署监控
丰富内置函数 云边协同
扩展
© 2021 EMQ Technologies Co., Ltd.
30. 多种部署方式
应对边缘端复杂多变的软硬件环境
安装包
利用 Go 语言的交叉编译能力,
支持各种软硬件环境
Docker K8S
Alpine, slim 等多种版本 Helm Chart
边缘容器编排整合
© 2021 EMQ Technologies Co., Ltd.
31. 云边协同
基于 Neuron + eKuiper 的边缘计算解决方案
© 2021 EMQ Technologies Co., Ltd.
32. 云边容器编排整合
云边容器编排协同 + 边缘分析
将原生容器化应用编排功能扩展到边缘节点的开源软件
云边容器编排
云边通道支持
© 2021 EMQ Technologies Co., Ltd.
33. 业务实践和案例
© 2021 EMQ Technologies Co., Ltd.
34. 智慧车机
车机显示应用
预计算处理
Pre-defined field
云端协同
总线数据-> 结构化数据
DBC
SOURCE
Speed:30 场景联动
SpeedV:5
MQTT 云边协同通道
Sinks
SQL规则引擎
CAN BUS总线
EMQ X 集群
云边协同管理
UDP Socket server
训练框架
file
反控车
车身舒适性
C
A
N
AI算法
模块化动态场景引擎
软实时、高效边缘计算
云边协同、动态持续更新能力
安全加密管理通道
动力总成
信息娱乐
底盘
© 2021 EMQ Technologies Co., Ltd.
35. 车联网场景引擎实现应用
架构设计
DBC配置文件
Socket
Server
TCP/UDP
EMQ X
边缘流计算
REST
API
云边协同
socket
CAN总线
eKuiper 规则引擎
车机/Tbox
① 车端通过CAN总线进行车端数据采集,同时通过socket的方式将数
据拉流到eKuiper中进行计算
② 基于车机的DBC配置文件解析socket报文,通过规则引擎实现阈值
事件触发能力
③ eKuiper目标调用CAN总线实现车机响应动作的控制指令下发
eKuiper Manager
云端
① 车机的场景引擎规则可在云端进行
配置管理
用户端
① 通过云端的API能
配置,并通过云边协同通道将配置 力,APP可以实现
下发到边缘端。 车主个性化定制参
数的能力。
© 2021 EMQ Technologies Co., Ltd.
36. EdgeX Foundry 整合
eKuiper 为 edgeX foundry 的指定规则引擎实现
应用场景
EdgeX Foundry是Linux基金会支持的边缘计算开
源平台。EdgeX Foundry的定位是通用工业IOT边
缘计算通用框架,部署于路由器和交换机等边缘设
备上,为各种传感器、设备或其他物联网器件提供
即插即用功能并管理它们,进一步收集和分析它们
的数据,或者导出至边缘计算应用或云计算中心做
进一步处理。
业务整合
• 接入EdgeX Foundry 消息总线,进行数据分析
和导出
• 作为EdgeX Foundry 规则引擎,支持场景联
动,按规则控制设备
© 2021 EMQ Technologies Co., Ltd.
37. KubeEdge + eKuiper
中国移动 – 国家工业互联网大数据中心
应用场景
企业端工业大数据来自企业生产和经营的多个环
节,例如设备数据,控制数据和企业经营康数据。
作为企业端工业大数据接入平台,该中心负责企业
大数据实时采集,汇聚和上传云端。同时提供实时
的数据分析和智能化处理。
解决方案
• KubeEdge将原生容器化应用编排功能扩展到
边缘节点
• eKuiper 提供了边缘分析如数据清洗的能力
• eKuiper 增强了边缘数据汇聚和上传云端的能力
© 2021 EMQ Technologies Co., Ltd.
38. 总结
eKuiper: 针对边缘端设计的流式计算框架
轻量化 灵活
安装包10MB级别,支持标准ARM和x86架
构的小型计算节点; 基于SQL的业务规则,支持多流 JOIN 运
算,60+ 包含数学、字符处理和编码等各类
内置函数支持;5类时间窗口支持;
性能:树莓派 3B+,支持 11k/秒 TPS,同
时支持数千条规则运行;
支持丰富的源和目标
易部署和管理 成长中
提供多种部署方式 LF Edge 孵化项目
与主流容器编排工具整合 持续更新中
作为年轻的开源项目,欢迎大家使用,建
议,贡献文档和代码!
© 2021 EMQ Technologies Co., Ltd.
39. © 2021 EMQ Technologies Co., Ltd.
40. THANKS
© 2021 EMQ Technologies Co., Ltd.