docs: IoT 模块 v2.0 升级设计方案(9 篇)+ 工程评审决议

基于 JetLinks Community + ThingsBoard 两大开源平台深度分析,
输出 IoT 模块全面升级改造设计方案,覆盖:
- 整体架构设计(分层、模块划分、迁移策略)
- 子系统与设备归属模型(租户→项目→子系统→设备)
- 物模型规范 v2(属性三元分类、派生物模型)
- 规则引擎方案(DAG 编排、SPI Provider、Aviator 脚本)
- 告警体系设计(两级存储、4 状态机、传播机制)
- 设备影子与 RPC(Shared 属性同步、持久化 RPC 状态机)
- 数据存储方案(策略插件化、写入缓冲、Micrometer 埋点)
- 协议与编解码扩展(Codec SPI、JAR 热加载、脚本编解码)

工程评审决议(8 项)已同步更新到各文档:
- 规则引擎独立 Maven 模块(gateway 不依赖)
- 链级 try-catch 故障隔离
- Aviator 完整沙箱(超时+循环限制+黑名单)
- 规则链全量缓存 + 变更驱逐
- 告警 BIGINT ID + UK 幂等(非 VARCHAR 主键)
- subsystem.code NOT NULL
- RPC 上线补发限速(5条/秒)
- 删除 EntityRelation 图关系设计

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
lzh
2026-04-10 12:50:41 +08:00
parent c71e8d31cf
commit 87dc7f7aba
9 changed files with 1871 additions and 0 deletions

View File

@@ -0,0 +1,96 @@
# IoT 模块 v2.0 升级设计总览
> 设计日期2026-04-10 | 参考项目JetLinks Community 2.11 + ThingsBoard 4.3
> 基于 viewsh-module-iot 现有 419 Java 文件源码分析 + 两大开源平台深度对比
---
## 一、升级目标
将 viewsh-module-iot 从"能用"升级为"好用、可扩展、可运维"的企业级 IoT 平台,核心目标:
1. **规则引擎通用化** — 从三条硬编码链路进化为可视化 DAG 编排 + 脚本节点
2. **子系统模型** — 引入空间/资产层级,设备归属子系统,支持任意业务拓扑
3. **设备影子** — 属性三元分类 + 持久化 RPC解决离线指令可靠投递
4. **物模型增强** — 设备级派生物模型,支持个性化覆盖产品定义
5. **告警体系重构** — 两级存储 + 完整状态机 + 关系传播
6. **基础设施补齐** — 可观测性、写入缓冲、协议热加载、存储策略插件化
---
## 二、设计原则
| 原则 | 说明 |
|------|------|
| **南向屏蔽,北向统一** | 协议层允许混乱,业务层必须干净(继承现有铁律) |
| **JetLinks 为主骨架** | 规则引擎、条件引擎、SPI 扩展采用 JetLinks 模式 |
| **ThingsBoard 补关键设计** | 属性三元分类、RPC 状态机、EntityRelation、告警状态机 |
| **保留现有消息总线** | Local/Redis/RocketMQ 三实现不动,消费端接入新引擎 |
| **渐进式迁移** | 新旧规则可并行运行,不要求一次性切换 |
---
## 三、文档索引
| 编号 | 文档 | 内容 |
|------|------|------|
| 01 | [[01-整体架构设计]] | 模块划分、分层架构、依赖关系、部署拓扑 |
| 02 | [[02-子系统与设备归属模型]] | 租户→项目→子系统→设备层级、设备归属管理 |
| 03 | [[03-物模型规范v2]] | 属性三元分类、派生物模型、数据类型增强 |
| 04 | [[04-规则引擎方案]] | DAG 编排、SPI Provider、脚本节点、抖动抑制 |
| 05 | [[05-告警体系设计]] | 两级存储、状态机、传播机制、通知集成 |
| 06 | [[06-设备影子与RPC]] | Shared 属性同步、持久化 RPC 状态机、离线指令队列 |
| 07 | [[07-数据存储方案]] | 存储策略插件化、写入缓冲、Key 压缩、TDengine 优化 |
| 08 | [[08-协议与编解码扩展]] | 协议包热加载、Codec SPI、透传编解码 |
---
## 四、融合策略速查表
| 设计点 | 取自 | 说明 |
| ------ | -------------------------- | ------------------------------------------ |
| 规则引擎框架 | JetLinks SceneRule DAG | 串行/并行/分支编排 |
| 条件引擎 | JetLinks ReactorQL 思路 | 编译期过滤器,支持指标对比 |
| 脚本节点 | TB TBEL 思路 | Aviator/QLExpress 实现Action Provider 注册 |
| 抖动抑制 | JetLinks ShakeLimit | 7 参数模型,参数化配置 |
| 消息总线 | 保留现有 | Local/Redis/RocketMQ 三实现 |
| 子系统模型 | 简化层级:租户→项目→子系统→设备 | 设备 FK 归属子系统,本次只做设备-子系统关系 |
| 设备影子 | TB 属性三元分类 + RPC | Client/Server/Shared + 持久化状态机 |
| 告警系统 | JetLinks 两级存储 + TB 状态机 | Record + History + Create→Update→Clear→ACK |
| 物模型继承 | JetLinks 派生物模型 | 设备级覆盖产品定义 |
| 存储策略 | JetLinks 策略模式 | 产品级配置存储后端 |
| 协议扩展 | JetLinks 协议热加载 | JAR + ClassLoader 隔离 |
| SPI 扩展 | JetLinks Provider + TB 注解 | 动态注册 + 元数据自描述 |
| 可观测性 | JetLinks + TB Micrometer | 连接数/吞吐量/规则耗时 |
| 写入缓冲 | JetLinks PersistenceBuffer | 内存+文件双层批量写入 |
---
## 五、不引入的设计
| 设计 | 来源 | 不引入原因 |
| ------------ | -------- | -------------------------------- |
| Actor 系统 | TB | 过重,当前规模不需要设备级 Actor 隔离 |
| 多租户分区路由 | TB | 单租户内部系统,纯开销 |
| Edge 同步 | TB | 无边缘计算需求 |
| 完整 87 节点 | TB | 20-25 个覆盖核心场景 |
| 全栈响应式 | JetLinks | 现有 Spring MVC 生态稳定,不做全量迁移 |
| ReactorQL 原版 | JetLinks | 依赖 Project Reactor改用表达式引擎替代核心思路 |
| EntityRelation 图关系 | TB | 过度设计,改用简洁的 subsystem_id FK 归属 |
---
## 六、工程评审决议2026-04-10
以下决议来自 `/plan-eng-review`,已更新到各设计文档中:
| # | 决议 | 影响文档 |
|---|------|---------|
| 1 | 分层图删除"实体关系",替换为"子系统服务" | 01-整体架构 |
| 2 | 规则引擎拆为独立 Maven 模块 `viewsh-module-iot-rule`gateway 不依赖 | 01-整体架构 |
| 3 | 规则链执行器加链级 try-catch 隔离,单链异常不影响其他链 | 04-规则引擎 |
| 4 | Aviator 脚本完整沙箱:超时 3s + MAX_LOOP_COUNT 1000 + 连续失败黑名单 | 04-规则引擎 |
| 5 | `iot_subsystem.code` 改为 NOT NULL确保唯一索引有效 | 02-子系统 |
| 6 | `iot_alarm_record` 保持 BIGINT 自增 ID + `record_key` UK 幂等 | 05-告警 |
| 7 | 规则链全量缓存(启动加载)+ 变更时驱逐重载,不用延迟加载 | 04-规则引擎 |
| 8 | RPC 上线补发加限速5 条/秒),防止设备过载 | 06-设备影子 |

View File

@@ -0,0 +1,170 @@
# 01-整体架构设计
> IoT 模块 v2.0 | 基于 JetLinks + ThingsBoard 融合方案
---
## 一、目标架构分层
```
┌─────────────────────────────────────────────────────┐
│ 前端 / API 层 │
│ Admin UIVue │ Open APIREST │ WebSocket │
├─────────────────────────────────────────────────────┤
│ 业务管理层Manager
│ 产品管理 │ 设备管理 │ 子系统管理 │ 规则管理 │ 告警管理 │
│ OTA管理 │ 统计分析 │ 通知管理 │ 用户权限 │ │
├─────────────────────────────────────────────────────┤
│ 规则引擎层Rule Engine
│ DAG 编排器 │ 条件评估器 │ 脚本引擎 │ 抖动抑制 │
│ 触发器 SPI │ 动作 SPI │ 数据富化 │ 告警处理器 │
├─────────────────────────────────────────────────────┤
│ 核心服务层Core Services
│ 设备影子 │ 属性服务 │ 时序服务 │ RPC 服务 │
│ 物模型服务 │ 子系统服务 │ 事件总线 │ 通知服务 │
├─────────────────────────────────────────────────────┤
│ 消息总线Message Bus
│ Local │ Redis Stream │ RocketMQ │
├─────────────────────────────────────────────────────┤
│ 设备接入层Gateway
│ MQTT Broker │ EMQX Bridge │ HTTP Server │ TCP Server │
│ Codec SPIAlink/JT808/Camera3D11/...
├─────────────────────────────────────────────────────┤
│ 存储层Storage
│ MySQL业务实体 │ TDengine时序数据 │ Redis缓存
└─────────────────────────────────────────────────────┘
```
---
## 二、模块划分v2.0
```
viewsh-module-iot/
├── viewsh-module-iot-api/ # [契约层] 不变
│ DTO、枚举、Feign 接口
├── viewsh-module-iot-core/ # [核心层] 增强
│ ├── messagebus/ # 消息总线(保留现有三实现)
│ ├── integration/ # 集成事件(保留)
│ ├── shadow/ # 【新增】设备影子核心接口
│ │ ├── AttributeScope.java # 属性三元分类枚举
│ │ ├── DeviceShadow.java # 影子服务接口
│ │ └── RpcCommand.java # RPC 指令模型
│ └── util/ # 工具类(保留+增强)
├── viewsh-module-iot-rule/ # 【新增·独立模块】规则引擎
│ ├── model/ # RuleModel、RuleNode、RuleLink
│ ├── spi/ # TriggerProvider、ActionProvider
│ ├── engine/ # RuleEngine 执行器(链级 try-catch 隔离)
│ ├── script/ # 脚本引擎Aviator + 完整沙箱)
│ └── shakelimit/ # 抖动抑制
├── viewsh-module-iot-server/ # [业务层] 重构
│ ├── controller/ # REST API
│ ├── service/
│ │ ├── product/ # 产品管理(保留)
│ │ ├── device/ # 设备管理(增强:影子集成)
│ │ ├── subsystem/ # 【新增】子系统管理
│ │ ├── thingmodel/ # 物模型(增强:派生物模型)
│ │ ├── shadow/ # 【新增】设备影子实现
│ │ ├── rule/ # 规则引擎(重构)
│ │ │ ├── scene/ # 场景联动DAG 编排)
│ │ │ ├── trigger/ # 触发器实现
│ │ │ ├── action/ # 动作实现
│ │ │ ├── condition/ # 条件评估器
│ │ │ ├── script/ # 脚本引擎实现
│ │ │ └── shakelimit/ # 抖动抑制
│ │ ├── alert/ # 告警(重构:两级+状态机)
│ │ ├── ota/ # OTA保留+完善)
│ │ ├── notify/ # 【新增】通知服务
│ │ └── statistics/ # 统计(增强)
│ ├── dal/ # 数据访问
│ │ ├── mysql/ # 关系库
│ │ ├── tdengine/ # 时序库(策略模式增强)
│ │ └── redis/ # 缓存
│ └── framework/ # 框架层
│ ├── observe/ # 【新增】可观测性Micrometer
│ └── buffer/ # 【新增】写入缓冲PersistenceBuffer
└── viewsh-module-iot-gateway/ # [接入层] 增强
├── protocol/ # 协议层(保留)
├── codec/ # 编解码增强SPI 热加载)
└── service/ # 网关服务
```
---
## 三、核心依赖关系v2.0
```
gateway ──────┐
├── core消息总线、影子接口、工具
server ───────┤ └── apiDTO、枚举、Feign
└── rule独立模块仅 server 依赖gateway 不依赖)
└── aviator脚本引擎
server 额外依赖:
├── viewsh-module-iot-rule规则引擎
├── viewsh-module-system-api用户、租户
├── viewsh-framework框架层
└── micrometer可观测性
```
---
## 四、新增实体一览
| 实体 | 表名 | 说明 |
|------|------|------|
| **IotProjectDO** | `iot_project` | 项目(架构预留,本次不开放 API |
| **IotSubsystemDO** | `iot_subsystem` | 子系统(设备归属单元) |
| **IotAlarmHistoryDO** | TDengine `alarm_history` | 告警时序归档 |
| **IotDeviceRpcDO** | `iot_device_rpc` | 持久化 RPC 指令 |
| **IotRuleChainDO** | `iot_rule_chain` | 规则链DAG 图定义) |
| **IotRuleNodeDO** | `iot_rule_node` | 规则节点 |
---
## 五、关键技术选型变更
| 领域 | v1.0(现有) | v2.0(目标) |
|------|-------------|-------------|
| 规则引擎条件 | SpEL 字符串表达式 | Aviator 表达式引擎(编译缓存+类型安全) |
| 脚本执行 | 无 | Aviator/QLExpress轻量+沙箱) |
| 规则编排 | 平铺动作列表 | DAGRuleModel参考 JetLinks |
| 告警存储 | 单表 MySQL | MySQLAlarmRecord 当前)+ TDengineAlarmHistory 时序) |
| 设备属性 | Redis Hash无分类 | Redis Hash + AttributeScope 三元分类 |
| RPC | 单次下发,丢失不重试 | 持久化 RPC + 状态机QUEUED→SENT→DELIVERED→SUCCESS |
| 设备组织 | groupIds JSON 数组 | 子系统归属subsystem_id FK+ 分组保留并存 |
| 监控指标 | 无 | Micrometer连接数/吞吐量/规则耗时/队列深度) |
---
## 六、迁移策略
### 阶段一基础设施1-2 天)
- 新增 `iot_project`(预留)、`iot_subsystem` 表,设备表增加 `subsystem_id`
- 新增 `iot_device_rpc`
- 引入 Micrometer 依赖,添加核心埋点
- 引入 Aviator 依赖
### 阶段二核心服务2-3 天)
- 实现子系统 CRUD + 设备归属绑定
- 实现设备影子服务(属性三元分类 + RPC 状态机)
- 物模型增强(派生物模型)
- 告警重构(两级存储 + 状态机)
### 阶段三规则引擎3-5 天)
- 实现 RuleModel DAG 编排框架
- 实现 TriggerProvider/ActionProvider SPI
- 迁移现有触发器(设备状态/属性/事件/定时)
- 迁移现有动作(设备控制/服务调用/告警/数据转发)
- 新增脚本节点、数据富化节点
- 新增抖动抑制
### 阶段四兼容迁移1-2 天)
- 现有 SceneRule/DataRule 配置数据迁移工具
- 旧规则 → 新 DAG 转换器
- 新旧规则并行运行验证

View File

@@ -0,0 +1,261 @@
# 02-子系统与设备归属模型
---
## 一、组织层级架构
```
租户Tenant ← 已有TenantBaseDO
└── 项目Project ← 架构预留,本次不实现
└── 子系统Subsystem ← 本次实现
└── 设备Device ← 本次实现归属关系
```
### 1.1 本次升级范围
- **实现**子系统Subsystem实体 + 设备归属子系统
- **预留**项目Project层级表结构和字段预留但不开放 API
- **不做**:通用 EntityRelation 图关系(过度设计,当前不需要)
### 1.2 设计原则
- 一个设备**必须且只能**归属一个子系统(`subsystem_id` NOT NULL FK
- 一个子系统属于一个项目(`project_id` FK本次可为空
- 子系统和项目都在租户隔离范围内
- 产品Product是设备的"型号模板",与子系统无关,维持现有设计不变
---
## 二、数据模型
### 2.1 项目表(架构预留)
```sql
CREATE TABLE iot_project (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
name VARCHAR(128) NOT NULL COMMENT '项目名称',
description TEXT COMMENT '项目描述',
icon VARCHAR(256) COMMENT '项目图标',
status TINYINT NOT NULL DEFAULT 1 COMMENT '状态0=禁用 1=启用)',
sort INT DEFAULT 0 COMMENT '排序',
-- 租户 + 审计
tenant_id BIGINT NOT NULL,
creator VARCHAR(64),
create_time DATETIME DEFAULT CURRENT_TIMESTAMP,
updater VARCHAR(64),
update_time DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
deleted BIT DEFAULT 0,
UNIQUE KEY uk_name_tenant (name, tenant_id, deleted)
) COMMENT '项目';
```
### 2.2 子系统表(本次实现)
```sql
CREATE TABLE iot_subsystem (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
name VARCHAR(128) NOT NULL COMMENT '子系统名称',
code VARCHAR(64) NOT NULL COMMENT '子系统编码(业务标识,如 security / energy / clean',
description TEXT COMMENT '子系统描述',
icon VARCHAR(256) COMMENT '子系统图标',
status TINYINT NOT NULL DEFAULT 1 COMMENT '状态0=禁用 1=启用)',
sort INT DEFAULT 0 COMMENT '排序',
project_id BIGINT COMMENT '所属项目 ID预留本次可为空',
config JSON COMMENT '子系统配置(扩展 JSON',
-- 租户 + 审计
tenant_id BIGINT NOT NULL,
creator VARCHAR(64),
create_time DATETIME DEFAULT CURRENT_TIMESTAMP,
updater VARCHAR(64),
update_time DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
deleted BIT DEFAULT 0,
UNIQUE KEY uk_name_tenant (name, tenant_id, deleted),
UNIQUE KEY uk_code_tenant (code, tenant_id, deleted)
) COMMENT '子系统';
```
### 2.3 设备表变更
```sql
-- 设备新增子系统归属字段
ALTER TABLE iot_device ADD COLUMN subsystem_id BIGINT COMMENT '所属子系统 ID';
-- 索引
ALTER TABLE iot_device ADD INDEX idx_subsystem_id (subsystem_id);
```
**字段说明**
- `subsystem_id`:设备归属的子系统 IDFK 关联 `iot_subsystem.id`
- 新设备创建时必须指定子系统(或后续批量分配)
- 存量设备的 `subsystem_id` 为 NULL通过迁移工具或管理后台补录
---
## 三、实体关系总览
```
┌─────────────────────────────────────────────┐
│ 租户Tenant
│ │
│ ┌─── 项目A ──────────────────────────────┐ │
│ │ │ │
│ │ ┌── 安防子系统 ─────┐ ┌── 能耗子系统 ──┐│ │
│ │ │ 摄像头001 │ │ 电表001 ││ │
│ │ │ 门禁001 │ │ 水表001 ││ │
│ │ └─────────────────┘ └──────────────┘│ │
│ └───────────────────────────────────────┘ │
│ │
│ ┌─── 项目B ──────────────────────────────┐ │
│ │ ┌── 保洁子系统 ─────┐ │ │
│ │ │ 客流计数器001 │ │ │
│ │ │ 工牌001 │ │ │
│ │ │ Beacon001 │ │ │
│ │ └─────────────────┘ │ │
│ └───────────────────────────────────────┘ │
│ │
│ 产品Product── 独立维度,与子系统无关 │
│ ├── 客流计数器PEOPLE_COUNTER
│ ├── GPS工牌JT808
│ └── 3D11摄像头CAMERA_3D11
└─────────────────────────────────────────────┘
```
**关键区分**
- **产品** = 设备的"型号"(决定物模型、编解码器、协议)
- **子系统** = 设备的"归属"(决定业务逻辑、规则、告警通知对象)
- 同一产品的设备可以属于不同子系统
---
## 四、API 设计
### 4.1 子系统管理
| 路径 | 方法 | 权限 | 说明 |
|------|------|------|------|
| `/iot/subsystem/create` | POST | `iot:subsystem:create` | 创建子系统 |
| `/iot/subsystem/update` | PUT | `iot:subsystem:update` | 更新子系统 |
| `/iot/subsystem/delete` | DELETE | `iot:subsystem:delete` | 删除子系统(需无设备) |
| `/iot/subsystem/get` | GET | `iot:subsystem:query` | 获取子系统详情 |
| `/iot/subsystem/page` | GET | `iot:subsystem:query` | 分页查询 |
| `/iot/subsystem/simple-list` | GET | 无 | 下拉列表(仅启用,含 id/name/code |
**创建请求**
```json
{
"name": "保洁子系统",
"code": "clean",
"description": "保洁业务相关设备管理",
"icon": "clean-icon",
"status": 1,
"projectId": null
}
```
### 4.2 设备归属操作
| 路径 | 方法 | 权限 | 说明 |
|------|------|------|------|
| `/iot/device/bindSubsystem` | PUT | `iot:device:update` | 单设备绑定子系统 |
| `/iot/device/batchBindSubsystem` | PUT | `iot:device:update` | 批量绑定子系统 |
**请求体**
```json
// 单设备
{ "deviceId": 1001, "subsystemId": 5 }
// 批量
{ "deviceIds": [1001, 1002, 1003], "subsystemId": 5 }
```
### 4.3 设备查询增强
现有设备分页查询(`/iot/device/page`)新增 `subsystemId` 过滤参数:
```
GET /iot/device/page?subsystemId=5&productId=10&status=1
```
### 4.4 子系统设备统计
| 路径 | 方法 | 说明 |
|------|------|------|
| `/iot/subsystem/device-count` | GET | 各子系统设备数量统计 |
| `/iot/subsystem/{id}/devices` | GET | 查询子系统下所有设备(支持分页) |
---
## 五、Service 层设计
### 5.1 IotSubsystemService
```java
interface IotSubsystemService {
Long createSubsystem(IotSubsystemSaveReqVO req);
void updateSubsystem(IotSubsystemSaveReqVO req);
void deleteSubsystem(Long id); // 校验无设备引用
IotSubsystemDO getSubsystem(Long id);
PageResult<IotSubsystemDO> getSubsystemPage(IotSubsystemPageReqVO req);
List<IotSubsystemDO> getSubsystemSimpleList();
Map<Long, Integer> getDeviceCountMap(); // subsystemId → deviceCount
}
```
### 5.2 设备服务增强
```java
// IotDeviceService 新增方法
void bindDeviceToSubsystem(Long deviceId, Long subsystemId);
void batchBindDevicesToSubsystem(Collection<Long> deviceIds, Long subsystemId);
```
---
## 六、与规则引擎的集成
### 6.1 规则链绑定子系统
规则链RuleChain新增 `subsystemId` 字段,规则只对该子系统下的设备生效:
```sql
ALTER TABLE iot_rule_chain ADD COLUMN subsystem_id BIGINT COMMENT '关联子系统(空=全局规则)';
```
规则触发时匹配逻辑:
```
消息到达 → 获取设备 subsystemId
→ 查找该 subsystemId 绑定的规则链
→ 同时查找 subsystemId=NULL 的全局规则链
→ 合并执行
```
### 6.2 告警按子系统隔离
告警配置新增 `subsystemId`,告警记录冗余 `subsystemId`
- 告警通知对象可按子系统配置不同接收人
- 告警统计支持按子系统维度聚合
---
## 七、与现有系统兼容
### 7.1 设备分组group_ids保留
`group_ids``subsystem_id` 是不同维度:
- **子系统** = 业务归属(决定规则/告警/权限)
- **分组** = 管理标签(灵活分类,多对多)
两者并存,互不冲突。
### 7.2 存量数据迁移
存量设备的 `subsystem_id` 初始为 NULL。提供管理后台批量分配功能
1. 按产品批量选择设备
2. 分配到指定子系统
3. 未分配的设备在查询时显示"未归属"标签
### 7.3 保洁业务适配
现有保洁清洗规则CleanRule的设备通过 `AreaDeviceApi` Feign 调用 Ops 模块关联。升级后:
- 保洁相关设备统一归属"保洁子系统"
- 区域-设备关系仍由 Ops 模块管理IoT 不越界)
- `subsystemId` 用于规则链匹配,不替代 Ops 的区域概念

View File

@@ -0,0 +1,140 @@
# 03-物模型规范 v2
> 增强:属性三元分类 + 派生物模型 + 数据类型扩展
---
## 一、属性三元分类(借鉴 ThingsBoard
### 1.1 设计动机
现有系统所有属性不区分来源和可见性,设备上报的状态、平台计算的指标、下发给设备的配置混在一起。引入 **AttributeScope** 三元分类:
| Scope | 写入方 | 读取方 | 典型场景 |
| ---------- | ------- | ------- | ------------------ |
| **CLIENT** | 设备上报 | 平台 | 固件版本、信号强度、电量、传感器数据 |
| **SERVER** | 平台/规则引擎 | 平台 | 计算指标、标签、运维备注 |
| **SHARED** | 平台 | 设备 + 平台 | 配置参数、阈值、开关、定时计划 |
### 1.2 枚举定义
```java
enum AttributeScope {
CLIENT(1, "设备端属性"), // 设备上报,平台只读
SERVER(2, "服务端属性"), // 平台私有,设备不可见
SHARED(3, "共享属性"); // 平台写,设备可订阅
}
```
### 1.3 存储变更
**Redis Hash Key 变更**
```
// v1.0(现有)
iot:device_property:{deviceId} → {identifier: value}
// v2.0(升级)
iot:device_property:{deviceId}:client → {identifier: value}
iot:device_property:{deviceId}:server → {identifier: value}
iot:device_property:{deviceId}:shared → {identifier: value}
```
**TDengine 不变**:时序数据仍按 `product_property_{productId}` 超级表存储scope 作为普通列TINYINT标记。
### 1.4 物模型属性定义扩展
```java
ThingModelProperty {
// ... 现有字段保留 ...
String scope; // 【新增】"client" / "server" / "shared",默认 "client"
}
```
---
## 二、派生物模型(借鉴 JetLinks
### 2.1 设计动机
同一产品下的设备可能存在差异化配置。例如同型号客流计数器A 设备多装了温湿度传感器。现有系统设备只能使用产品的物模型,无法个性化扩展。
### 2.2 实现方案
**设备表新增字段**
```sql
ALTER TABLE iot_device ADD COLUMN derive_metadata JSON COMMENT '设备派生物模型(覆盖/扩展产品物模型)';
```
**合并策略**
```java
// 获取设备完整物模型 = 产品物模型 + 设备派生物模型
List<ThingModelDO> getEffectiveThingModel(Long deviceId) {
List<ThingModelDO> productModels = getByProductId(device.getProductId()); // 产品基础
ThingModelOverride derive = device.getDeriveMetadata(); // 设备扩展
if (derive == null) return productModels;
return mergeThingModel(productModels, derive, MergeOption.DEVICE_OVERRIDE);
}
```
**合并规则**
- 相同 identifier设备定义覆盖产品定义`DEVICE_OVERRIDE`
- 新 identifier追加到合并结果
- 产品有但设备无:保留产品定义
### 2.3 派生物模型 JSON 结构
```json
{
"properties": [
{
"identifier": "humidity",
"name": "湿度",
"accessMode": "r",
"dataType": "float",
"dataSpecs": {"min": "0", "max": "100", "step": "0.1", "unit": "%"}
}
],
"services": [],
"events": []
}
```
---
## 三、数据类型增强
### 3.1 新增 TIMESTAMP 类型
| 物模型类型 | TDengine 类型 | 说明 |
|-----------|-------------|------|
| `timestamp` | TIMESTAMP | 毫秒级时间戳(新增) |
### 3.2 struct/array 支持嵌套校验
现有系统 struct/array 存为 VARCHAR(1024)无法校验内部结构。v2.0 增强:
- struct 定义包含子字段的完整 DataSpecs
- 入库前按定义校验 JSON 结构
- 查询时支持 JsonPath 提取嵌套字段
---
## 四、物模型缓存增强
### 4.1 双级缓存
```
L1: Caffeine 本地缓存productId → List<ThingModelDO>5 分钟 TTL
L2: RedisSpring Cache现有方案保留
```
### 4.2 派生物模型缓存
```
Key: iot:device_thing_model:{deviceId}
Value: 合并后的完整物模型 JSON
TTL: 10 分钟
驱逐: 设备 deriveMetadata 更新时清除
```

View File

@@ -0,0 +1,374 @@
# 04-规则引擎方案
> 融合 JetLinks SceneRule DAG 框架 + ThingsBoard 脚本节点思路
---
## 一、设计目标
将现有三条硬编码链路DataRule/SceneRule/CleanRule统一为 **一套可配置的 DAG 规则引擎**,通过 SPI 扩展支持任意触发器、条件、动作组合,同时引入脚本节点实现运行时灵活数据转换。
---
## 二、核心架构
```
设备消息IotDeviceMessage
↓ 消息总线消费
IotRuleEngineMessageHandler统一入口替代现有 4 个消费者)
↓ 按 productId + deviceId + subsystemId 匹配规则链(从全量缓存查)
┌─────────────────────────────────────────┐
│ RuleChain规则链 DAG
│ │
│ [Trigger] → [Condition] → [Branch] │
│ ↓ ↓ │
│ [ShakeLimit] [ShakeLimit] │
│ ↓ ↓ │
│ [Action1] [Action2] │
│ ↓ ↓ │
│ [Action3] [Script] │
│ ↓ ↓ │
│ [Notify] [Alarm] │
└─────────────────────────────────────────┘
```
---
## 三、数据模型
### 3.1 规则链RuleChain
```sql
CREATE TABLE iot_rule_chain (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
name VARCHAR(128) NOT NULL,
description TEXT,
status TINYINT DEFAULT 1, -- 0=禁用, 1=启用
type VARCHAR(32) NOT NULL, -- SCENE(场景联动) / DATA(数据转发) / CUSTOM
first_node_id BIGINT, -- 入口节点
debug_mode BOOLEAN DEFAULT FALSE, -- 调试模式(记录每步输入输出)
tenant_id BIGINT,
-- 审计字段 ...
);
```
### 3.2 规则节点RuleNode
```sql
CREATE TABLE iot_rule_node (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
rule_chain_id BIGINT NOT NULL, -- FK
name VARCHAR(128),
type VARCHAR(64) NOT NULL, -- Provider 类型标识
configuration JSON NOT NULL, -- 节点配置(多态 JSON
position_x INT, -- 画布坐标(可视化编排用)
position_y INT,
);
```
### 3.3 规则连线RuleLink
```sql
CREATE TABLE iot_rule_link (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
rule_chain_id BIGINT NOT NULL,
source_node_id BIGINT NOT NULL, -- 源节点
target_node_id BIGINT NOT NULL, -- 目标节点
relation_type VARCHAR(32) NOT NULL, -- Success / Failure / True / False / 自定义
condition JSON, -- 可选:连线条件(表达式)
sort_order INT DEFAULT 0, -- 排序(同源多出边时)
);
```
---
## 四、SPI Provider 体系
### 4.1 核心接口
```java
// 触发器 Provider
interface TriggerProvider {
String getType(); // "device_property" / "timer" / "manual"
boolean matches(IotDeviceMessage msg, JsonNode config);
void register(RuleChain chain, JsonNode config); // 定时器注册等
void unregister(RuleChain chain);
}
// 动作 Provider
interface ActionProvider {
String getType(); // "device_control" / "script" / "notify" / "alarm" / "http_push"
ActionResult execute(RuleContext ctx, JsonNode config);
}
// 条件评估器
interface ConditionEvaluator {
String getType(); // "expression" / "script" / "time_range"
boolean evaluate(RuleContext ctx, JsonNode config);
}
```
### 4.2 内置 Provider 清单
**触发器Trigger**
| 类型标识 | 实现类 | 来源 | 说明 |
|---------|--------|------|------|
| `device_state` | DeviceStateTrigger | 迁移自现有 | 设备上下线 |
| `device_property` | DevicePropertyTrigger | 迁移自现有 | 属性上报 |
| `device_event` | DeviceEventTrigger | 迁移自现有 | 事件上报 |
| `device_service` | DeviceServiceTrigger | 迁移自现有 | 服务调用回复 |
| `timer` | TimerTrigger | 迁移自现有 | CRON 定时 |
| `manual` | ManualTrigger | **新增** | 手动/API 触发 |
**动作Action**
| 类型标识 | 实现类 | 来源 | 说明 |
|---------|--------|------|------|
| `device_property_set` | DevicePropertySetAction | 迁移自现有 | 设备属性设置 |
| `device_service_invoke` | DeviceServiceInvokeAction | 迁移自现有 | 设备服务调用 |
| `alarm_trigger` | AlarmTriggerAction | 重构 | 触发告警(新状态机) |
| `alarm_clear` | AlarmClearAction | **新增** | 清除告警 |
| `notify` | NotifyAction | **新增** | 发送通知SMS/邮件/站内信/Webhook |
| `http_push` | HttpPushAction | 迁移自 DataSink | HTTP 数据推送 |
| `mq_push` | MqPushAction | 迁移自 DataSink | MQ 数据推送RocketMQ/Kafka/RabbitMQ |
| `redis_push` | RedisPushAction | 迁移自 DataSink | Redis 数据推送 |
| `tcp_push` | TcpPushAction | 迁移自 DataSink | TCP 数据推送 |
| `script` | ScriptAction | **新增** | 脚本执行(数据转换/富化) |
| `delay` | DelayAction | **新增** | 延迟执行 |
| `enrich` | EnrichAction | **新增** | 数据富化(读取属性/时序) |
| `log` | LogAction | **新增** | 日志记录 |
**条件评估器Condition**
| 类型标识 | 实现类 | 说明 |
|---------|--------|------|
| `expression` | ExpressionCondition | Aviator 表达式(替代 SpEL |
| `script` | ScriptCondition | 脚本条件(复杂逻辑) |
| `time_range` | TimeRangeCondition | 时间范围(当日时间/日期区间) |
| `device_state` | DeviceStateCondition | 设备在线/离线状态 |
---
## 五、脚本引擎设计
### 5.1 选型Aviator
| 维度 | Aviator | QLExpress | TBEL |
| ---- | -------------- | --------- | --------- |
| 性能 | 编译为字节码,接近 Java | 解释执行 | ANTLR4 编译 |
| 安全 | 内置沙箱(白名单函数) | 需手动限制 | 语言层面安全 |
| 生态 | 国内广泛使用,中文文档丰富 | 阿里开源 | TB 自有 |
| 学习成本 | 类 Java 表达式 | 类 Java | 私有语法 |
| 依赖 | 单 JAR无额外依赖 | 单 JAR | 需 TB 框架 |
**推荐 Aviator**:编译缓存 + 类型安全 + 沙箱 + 国内生态好。
### 5.2 脚本节点配置
```json
{
"type": "script",
"config": {
"engine": "aviator",
"script": "let result = msg.temperature * 1.8 + 32; return seq.map('fahrenheit', result, 'celsius', msg.temperature);",
"timeout": 3000,
"outputTo": "msg"
}
}
```
### 5.3 沙箱安全配置
```java
AviatorEvaluatorInstance engine = AviatorEvaluator.newInstance();
// 禁用系统函数
engine.setOption(Options.DISABLE_ASSIGNMENT, true);
// 限制最大循环次数
engine.setOption(Options.MAX_LOOP_COUNT, 1000);
// 编译缓存
engine.useLRUExpressionCache(256);
```
---
## 六、抖动抑制ShakeLimit
### 6.1 参数模型(借鉴 JetLinks
```java
class ShakeLimitConfig {
boolean enabled; // 是否启用
int time; // 时间窗口(秒)
int threshold; // 触发阈值(窗口内达到 N 次才触发)
boolean alarmFirst; // true=取第一条false=取最后一条
boolean continuous; // 连续模式:不满足条件时重置计数器
boolean rolling; // 滚动窗口 vs 固定窗口
}
```
### 6.2 实现方式
不依赖 Reactor现有系统不是响应式使用 **Guava RateLimiter + ScheduledExecutorService** 实现:
```
消息到达 → ShakeLimitFilter
├── 按 deviceId 分组,维护 ConcurrentHashMap<deviceId, WindowState>
├── WindowState: count, firstMsg, lastMsg, windowStartTime
├── 固定窗口windowStartTime + time 秒内累计
├── 滚动窗口:每条消息重置 windowStartTime
├── count >= threshold → 输出 firstMsg 或 lastMsg
└── continuous=true 且不满足条件 → 重置 count=0
```
### 6.3 配置示例
```json
{
"shakeLimit": {
"enabled": true,
"time": 10,
"threshold": 3,
"alarmFirst": true,
"continuous": true,
"rolling": false
}
}
```
含义10 秒窗口内连续 3 次触发条件才真正执行动作,取第一条消息,中间不满足条件则重置计数。
---
## 七、数据富化节点Enrich
借鉴 ThingsBoard Enrichment 节点,在规则执行过程中动态读取上下文数据:
| 富化类型 | 配置 | 说明 |
|---------|------|------|
| `device_property` | deviceId + identifiers | 读取设备最新属性(从 Redis |
| `device_history` | deviceId + identifier + timeRange | 读取属性历史(从 TDengine |
| `related_device` | relationType + identifiers | 读取关联设备属性 |
| `asset_info` | assetId 或 relationType | 读取所属资产信息 |
富化结果注入 `RuleContext.metadata`,后续节点可引用。
---
## 八、分支执行逻辑
### 8.1 条件分组(保留 JetLinks 模式)
```json
{
"branches": [
{
"name": "高温告警",
"conditionGroups": [
[{"identifier": "temperature", "operator": ">", "value": "40"}]
],
"shakeLimit": {"enabled": true, "time": 30, "threshold": 3},
"actions": [{"type": "alarm_trigger", "config": {...}}],
"executeAnyway": false
},
{
"name": "温度通知",
"conditionGroups": [
[{"identifier": "temperature", "operator": ">", "value": "30"}]
],
"actions": [{"type": "notify", "config": {...}}],
"executeAnyway": true
}
]
}
```
### 8.2 执行语义
- `executeAnyway=false`默认前一分支命中后跳过当前分支if/else-if 语义)
- `executeAnyway=true`:无论前面分支是否命中都执行(重叠触发)
---
## 九、与现有系统的迁移映射
| 现有概念 | v2.0 对应 | 迁移方式 |
|---------|----------|---------|
| SceneRule | RuleChaintype=SCENE | 自动转换triggers→Trigger 节点conditions→Branch 条件actions→Action 节点 |
| DataRule + DataSink | RuleChaintype=DATA | 自动转换sourceConfigs→Trigger 配置sinkIds→各类 Push Action |
| CleanRule4 个 Processor | RuleChaintype=SCENE+ Script Action | 手动迁移:硬编码逻辑抽取为配置+脚本 |
| IotDeviceMessageSubscriber | 保留,但从中分离规则触发 | 设备消息处理保留,规则触发委托给 RuleEngine |
---
## 十、调试模式
规则链 `debug_mode=true` 时:
- 每个节点执行前后记录 `RuleContext` 快照
- 写入 TDengine `rule_debug_log`
- 前端可回放执行路径,查看每步的输入/输出/耗时
- 生产环境建议关闭(性能开销)
---
## 十一、工程评审决议2026-04-10
以下决议来自 /plan-eng-review 工程评审,需严格执行:
### 11.1 独立 Maven 模块
规则引擎从 `iot-core` 拆出为独立模块 `viewsh-module-iot-rule`,仅 `iot-server` 依赖,`iot-gateway` 不依赖。避免网关 JAR 引入 Aviator 等不需要的依赖。
### 11.2 链级故障隔离
统一消费者(`IotRuleEngineMessageHandler`)处理一条消息时,每条规则链的执行**必须独立 try-catch**
```java
for (RuleChain chain : matchedChains) {
try {
ruleEngine.execute(chain, context);
} catch (Exception e) {
log.error("规则链 {} 执行失败", chain.getId(), e);
metrics.recordRuleFailure(chain.getId());
// 不中断,继续执行下一条规则链
}
}
```
### 11.3 脚本完整沙箱(安全红线)
Aviator 脚本节点**必须**启用以下安全配置:
```java
AviatorEvaluatorInstance engine = AviatorEvaluator.newInstance();
engine.setOption(Options.DISABLE_ASSIGNMENT, true); // 禁止赋值到外部变量
engine.setOption(Options.MAX_LOOP_COUNT, 1000); // 最大循环 1000 次
engine.useLRUExpressionCache(256); // 编译缓存
// 执行超时3 秒
Future<?> future = executor.submit(() -> expression.execute(env));
try {
future.get(3, TimeUnit.SECONDS);
} catch (TimeoutException e) {
future.cancel(true);
scriptBlacklist.recordFailure(scriptId);
}
```
**黑名单机制**:脚本连续失败超过 3 次,加入黑名单 60 秒,期间直接跳过不执行。
### 11.4 规则链全量缓存
```java
// 启动时加载所有启用规则链到内存(含节点+连线)
Map<Long, CompiledRuleChain> ruleChainCache = new ConcurrentHashMap<>();
// 按 subsystemId + productId + deviceId 建立索引
Map<String, List<Long>> ruleChainIndex; // key → ruleChainIds
// CRUD 时驱逐并重新加载
@CacheEvict + reload
```
不使用延迟加载(@Cacheable),避免首次消息处理的延迟和 TTL 窗口期数据不一致。

View File

@@ -0,0 +1,192 @@
# 05-告警体系设计
> 融合 JetLinks 两级存储 + ThingsBoard 完整状态机
---
## 一、告警状态机5 状态)
```
┌──── 持续触发 ────┐
↓ │
──→ ACTIVE ──→ ACTIVE更新 endTs/details
├──→ ACKNOWLEDGED用户确认
│ │
│ └──→ CLEARED规则自动/手动清除)
└──→ CLEARED规则自动/手动清除)
└──→ RESOLVED最终关闭
```
| 状态 | 含义 | 触发方式 |
|------|------|---------|
| ACTIVE | 告警激活中 | 规则引擎 AlarmTriggerAction |
| ACKNOWLEDGED | 已确认 | 用户手动确认 |
| CLEARED | 已清除 | 规则引擎 AlarmClearAction 或手动 |
| RESOLVED | 已解决 | 用户最终关闭 |
### 1.1 与现有系统对比
| v1.0 | v2.0 |
|------|------|
| `processStatus: Boolean` | 4 状态枚举 |
| 无确认步骤 | ACKNOWLEDGED 支持 |
| 手动恢复 | 规则自动 CLEARED |
| 无最终关闭 | RESOLVED 归档 |
---
## 二、两级存储模型(借鉴 JetLinks
### 2.1 AlarmRecordMySQL当前状态
每个 `(设备 + 告警配置)` 组合最多一条记录,通过幂等 upsert 更新。
```sql
CREATE TABLE iot_alarm_record (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
record_key VARCHAR(64) NOT NULL, -- MD5(deviceId + "-" + alarmConfigId),幂等键
alarm_config_id BIGINT NOT NULL,
alarm_name VARCHAR(128), -- 冗余
severity TINYINT NOT NULL, -- 1=CRITICAL 2=MAJOR 3=MINOR 4=WARNING 5=INFO
state VARCHAR(16) NOT NULL, -- ACTIVE / ACKNOWLEDGED / CLEARED / RESOLVED
device_id BIGINT NOT NULL,
product_id BIGINT,
scene_rule_id BIGINT,
start_ts DATETIME NOT NULL, -- 首次触发时间
end_ts DATETIME, -- 最近触发时间
clear_ts DATETIME, -- 清除时间
ack_ts DATETIME, -- 确认时间
resolve_ts DATETIME, -- 解决时间
details JSON, -- 告警详情(可累积)
trigger_count INT DEFAULT 1, -- 持续触发次数
propagated_to JSON, -- 传播到的资产列表
process_remark TEXT, -- 处理备注
tenant_id BIGINT,
-- 审计字段 ...
UNIQUE KEY uk_record_key (record_key)
);
```
**幂等策略**(工程评审决议):保持 BIGINT 自增 ID与全局一致通过 `record_key = MD5(deviceId + "-" + alarmConfigId)` 唯一索引实现幂等。写入时使用 `INSERT ... ON DUPLICATE KEY UPDATE`
### 2.2 AlarmHistoryTDengine时序归档
每次告警状态变化都追加一条记录,用于趋势分析和审计。
```sql
CREATE STABLE alarm_history (
ts TIMESTAMP,
alarm_record_id BIGINT, -- 关联 AlarmRecord.id
alarm_config_id BIGINT,
severity TINYINT,
state NCHAR(16), -- 变化后的状态
trigger_data NCHAR(2048), -- 触发的设备消息 JSON
details NCHAR(2048), -- 告警详情快照
operator NCHAR(64), -- 操作人(系统/用户)
remark NCHAR(256) -- 备注
) TAGS (
device_id BIGINT
);
```
---
## 三、告警严重度5 级)
```java
enum AlarmSeverity {
CRITICAL(1, "紧急"), // 需立即处理
MAJOR(2, "重要"), // 尽快处理
MINOR(3, "次要"), // 计划处理
WARNING(4, "警告"), // 需关注
INFO(5, "信息"); // 仅记录
}
```
支持动态严重度:规则引擎中可根据条件动态设置(如温度 >50 为 CRITICAL>40 为 MAJOR
---
## 四、告警传播
### 4.1 传播机制
告警创建时,沿 EntityRelationContains/Manages向上遍历`propagated_to` 字段记录传播路径:
```json
{
"propagated_to": [
{"type": "ASSET", "id": 100, "name": "卫生间"},
{"type": "ASSET", "id": 50, "name": "1楼"},
{"type": "ASSET", "id": 10, "name": "楼宇A"}
]
}
```
### 4.2 资产级告警聚合
Dashboard 查询某资产下的告警时,通过 `propagated_to` 字段的 JSON 包含查询实现。
---
## 五、通知集成(完善现有 TODO
### 5.1 通知通道
| 通道 | 实现 | 配置 |
|------|------|------|
| 短信 | SmsSendApi现有 Feign | 模板 ID + 变量映射 |
| 邮件 | MailSendApi现有 Feign | 模板 + 收件人列表 |
| 站内信 | NotifyMessageSendApi现有 Feign | 模板 + 用户列表 |
| Webhook | **新增** HttpPushAction | URL + Headers + Body 模板 |
| 企业微信/钉钉 | **新增** 扩展 NotifyProvider | 通过 SPI 扩展 |
### 5.2 通知模板变量
| 变量 | 说明 |
|------|------|
| `${alarm.name}` | 告警名称 |
| `${alarm.severity}` | 严重度 |
| `${alarm.triggerCount}` | 触发次数 |
| `${device.name}` | 设备名称 |
| `${device.nickname}` | 设备备注名 |
| `${asset.name}` | 所属资产名称 |
| `${trigger.value}` | 触发值 |
| `${trigger.threshold}` | 阈值 |
---
## 六、竞态保护(借鉴 JetLinks
### 6.1 告警缓存
```java
class AlarmRecordCache {
byte state; // 当前状态0=无告警, 1=ACTIVE, 2=CLEARED
long alarmTime; // 最近触发时间
long clearTime; // 最近清除时间
}
```
Redis 缓存 Key`iot:alarm:cache:{recordId}`
### 6.2 有效性判断
```java
// 触发告警:必须在清除时间之后(防止旧消息重新触发已清除的告警)
boolean isEffectiveTrigger(long timestamp) {
return timestamp > cache.clearTime || !cache.isActive();
}
// 清除告警:必须在最近触发时间之后
boolean isEffectiveClear(long timestamp) {
return timestamp > cache.alarmTime && cache.isActive();
}
```
### 6.3 分布式锁
对同一 `recordId` 的并发操作使用 Redis 分布式锁(`SETNX iot:alarm:lock:{recordId}`),超时 5 秒自动释放。

View File

@@ -0,0 +1,216 @@
# 06-设备影子与 RPC
> 参考ThingsBoard 属性三元分类 + 持久化 RPC 状态机
---
## 一、设计目标
解决两个核心问题:
1. **属性同步**:设备配置下发后,平台如何知道设备是否已应用?
2. **离线指令**:设备不在线时下发的指令如何不丢失?
---
## 二、Shared 属性同步机制
### 2.1 流程
```
平台设置 Shared 属性(如 reportInterval=30
↓ 写入 Redis + MySQL
↓ 检查设备在线?
├── 在线 → 通过消息总线下发 thing.config.push
│ 设备收到后应用配置,上报 CLIENT 属性确认
└── 离线 → 标记为 pending
设备上线时,推送全部 pending 的 Shared 属性
```
### 2.2 Shared 属性表
```sql
-- 复用现有 Redis Hash增加 scope 维度
-- Key: iot:device_property:{deviceId}:shared
-- Hash Field: identifier
-- Hash Value: {value, updateTime, synced} synced 标记设备是否已同步
```
### 2.3 上线自动同步
`IotDeviceMessageSubscriber.forceDeviceOnline()` 中增加逻辑:
```java
if (设备首次上线或重新上线) {
List<SharedAttribute> pendingAttrs = getUnsyncedSharedAttributes(deviceId);
if (!pendingAttrs.isEmpty()) {
sendConfigPush(deviceId, pendingAttrs);
}
}
```
---
## 三、持久化 RPC 状态机
### 3.1 RPC 状态枚举
```java
enum RpcStatus {
QUEUED(0), // 已入队(设备离线时暂存)
SENT(10), // 已发送到网关
DELIVERED(20), // 网关确认已推送到设备
SUCCESS(30), // 设备回复成功
FAILURE(40), // 设备回复失败
TIMEOUT(50), // 超时未响应
EXPIRED(60), // 过期未发送(设备长期离线)
CANCELED(70); // 手动取消
}
```
### 3.2 状态流转
```
创建 RPC → QUEUED
├── 设备在线 → SENT → 设备回复 → SUCCESS / FAILURE
│ → 超时 → TIMEOUT
├── 设备离线 → 保持 QUEUED
│ 设备上线 → 检查未过期的 QUEUED RPC → SENT → ...
└── 过期时间到 → EXPIRED
```
### 3.3 RPC 持久化表
```sql
CREATE TABLE iot_device_rpc (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
device_id BIGINT NOT NULL,
method VARCHAR(64) NOT NULL, -- 服务标识符
params JSON, -- 调用参数
status TINYINT NOT NULL DEFAULT 0,
response JSON, -- 设备响应
request_id VARCHAR(64), -- 关联消息 requestId
persisted BOOLEAN DEFAULT TRUE, -- 是否持久化false=瞬态,设备离线直接丢弃)
expiration_time DATETIME, -- 过期时间
timeout_ms INT DEFAULT 30000, -- 单次超时(毫秒)
retry_count INT DEFAULT 0, -- 已重试次数
max_retries INT DEFAULT 3, -- 最大重试次数
create_time DATETIME,
update_time DATETIME,
tenant_id BIGINT,
);
```
### 3.4 RPC 执行流程
```java
// 1. 创建 RPC
IotDeviceRpcDO rpc = createRpc(deviceId, method, params, persisted, timeout, expiration);
// 2. 检查设备在线
if (isDeviceOnline(deviceId)) {
// 立即发送
sendRpcToDevice(rpc);
updateStatus(rpc.id, SENT);
// 注册超时检查
scheduleTimeout(rpc.id, timeout);
} else if (rpc.isPersisted()) {
// 持久化等待
// 设备上线时由 IotDeviceMessageSubscriber 触发 pendingRpcCheck
} else {
// 瞬态 RPC设备离线直接失败
updateStatus(rpc.id, FAILURE);
}
// 3. 设备回复
void onRpcResponse(String requestId, Object response) {
IotDeviceRpcDO rpc = findByRequestId(requestId);
rpc.setResponse(response);
updateStatus(rpc.id, SUCCESS);
cancelTimeout(rpc.id);
}
// 4. 超时处理
void onRpcTimeout(Long rpcId) {
IotDeviceRpcDO rpc = getById(rpcId);
if (rpc.getRetryCount() < rpc.getMaxRetries()) {
rpc.setRetryCount(rpc.getRetryCount() + 1);
sendRpcToDevice(rpc); // 重试
} else {
updateStatus(rpcId, TIMEOUT);
}
}
```
### 3.5 设备上线时的 Pending RPC 处理
```java
// IotDeviceMessageSubscriber 中,设备上线后
void onDeviceOnline(Long deviceId) {
List<IotDeviceRpcDO> pendingRpcs = rpcService.getByDeviceIdAndStatus(deviceId, QUEUED);
// 【工程评审决议】限速:防止长期离线设备上线后一次性涌入大量指令导致设备过载
// 按创建时间排序,每秒最多发送 5 条
pendingRpcs.sort(Comparator.comparing(IotDeviceRpcDO::getCreateTime));
RateLimiter limiter = RateLimiter.create(5.0); // 5 条/秒
for (IotDeviceRpcDO rpc : pendingRpcs) {
if (rpc.getExpirationTime() != null && rpc.getExpirationTime().isBefore(now())) {
updateStatus(rpc.getId(), EXPIRED); // 已过期
} else {
limiter.acquire(); // 限速等待
sendRpcToDevice(rpc);
updateStatus(rpc.getId(), SENT);
}
}
}
```
---
## 四、RPC 过期清理 Job
```java
@XxlJob("rpcExpirationCheckJob")
void checkExpiredRpcs() {
List<IotDeviceRpcDO> expiredRpcs = rpcService.getExpiredQueuedRpcs(now());
for (IotDeviceRpcDO rpc : expiredRpcs) {
updateStatus(rpc.getId(), EXPIRED);
}
}
```
建议每 5 分钟执行一次。
---
## 五、API 接口
| 路径 | 方法 | 说明 |
|------|------|------|
| `/iot/device/rpc/send` | POST | 发送 RPC同步等待或异步 |
| `/iot/device/rpc/get` | GET | 查询 RPC 状态 |
| `/iot/device/rpc/page` | GET | 分页查询设备 RPC 历史 |
| `/iot/device/rpc/cancel` | PUT | 取消未发送的 RPC |
| `/iot/device/shadow/shared` | GET | 获取设备 Shared 属性 |
| `/iot/device/shadow/shared` | PUT | 设置 Shared 属性(触发同步) |
---
## 六、与 Feign API 集成
现有 `IotDeviceControlApi.invokeService()` 增强为支持 RPC 模式:
```java
IotDeviceServiceInvokeReqDTO {
// ... 现有字段 ...
Boolean persisted; // 【新增】是否持久化(默认 true
LocalDateTime expiration; // 【新增】过期时间
Integer maxRetries; // 【新增】最大重试次数(默认 3
}
IotDeviceServiceInvokeRespDTO {
// ... 现有字段 ...
Long rpcId; // 【新增】RPC 记录 ID异步时返回
String rpcStatus; // 【新增】RPC 当前状态
}
```

View File

@@ -0,0 +1,243 @@
# 07-数据存储方案
> 存储策略插件化 + 写入缓冲 + TDengine 优化
---
## 一、存储策略插件化(借鉴 JetLinks
### 1.1 设计动机
现有系统所有产品设备数据统一存 TDengine无法针对不同场景选择最优存储。引入策略模式支持产品级配置。
### 1.2 策略接口
```java
interface ThingsDataStorageStrategy {
String getId(); // "tdengine-column" / "tdengine-row"
int getOrder(); // 优先级
void saveProperties(Long deviceId, Map<String, Object> properties, LocalDateTime reportTime);
List<PropertyRecord> queryHistory(Long deviceId, String identifier, TimeRange range, int limit);
Map<String, Object> getLatest(Long deviceId);
void defineTable(Long productId, List<ThingModelDO> thingModels);
void alterTable(Long productId, List<ThingModelDO> oldModels, List<ThingModelDO> newModels);
}
```
### 1.3 内置策略
| 策略 ID | 说明 | 适用场景 |
| ----------------- | --------------- | -------------- |
| `tdengine-column` | 列模式(现有方案),每属性一列 | 属性固定、查询频繁 |
| `tdengine-row` | 行模式,每属性一行 | 属性动态变化、物模型频繁变更 |
### 1.4 列模式 vs 行模式
**列模式(现有,保留为默认)**
```sql
CREATE STABLE product_property_{productId} (
ts TIMESTAMP, report_time TIMESTAMP,
temperature INT, humidity FLOAT, status TINYINT -- 每属性一列
) TAGS (device_id BIGINT);
```
- 优点:查询快,单行包含所有属性
- 缺点:物模型变更需 ALTER TABLETDengine 不支持缩短字段
**行模式(新增)**
```sql
CREATE STABLE product_property_row_{productId} (
ts TIMESTAMP, report_time TIMESTAMP,
identifier NCHAR(64), -- 属性标识符
value_int INT, -- 整型值
value_float FLOAT, -- 浮点值
value_double DOUBLE, -- 双精度值
value_bool TINYINT, -- 布尔值
value_str NCHAR(1024), -- 字符串值
scope TINYINT -- 属性域1=CLIENT, 2=SERVER, 3=SHARED
) TAGS (device_id BIGINT);
```
- 优点:物模型变更无需 DDL属性可任意扩展
- 缺点:查询需按 identifier 过滤,同时查多属性需 PIVOT
### 1.5 产品级配置
```sql
ALTER TABLE iot_product ADD COLUMN store_policy VARCHAR(32) DEFAULT 'tdengine-column';
```
`ThingsDataStorageRouter` 按 productId 路由到对应策略:
```java
ThingsDataStorageStrategy getStrategy(Long productId) {
String policy = productService.getProduct(productId).getStorePolicy();
return strategyMap.get(policy); // 策略注册表
}
```
---
## 二、写入缓冲(借鉴 JetLinks PersistenceBuffer
### 2.1 设计动机
设备高频上报时(如 1Hz × 1000 台),每条消息直接写 TDengine 会产生大量小批次 INSERT。引入写入缓冲批量合并后写入。
### 2.2 PersistenceBuffer 设计
```java
class PersistenceBuffer<T> {
// 配置
int bufferSize = 200; // 数量触发阈值
Duration bufferTimeout = Duration.ofSeconds(3); // 时间触发阈值
int parallelism = 4; // 并行写出线程数
// 内部结构
BlockingQueue<T> queue; // 内存队列
ScheduledExecutorService timer; // 定时刷新
// 写入方法
void write(T item) {
queue.add(item);
if (queue.size() >= bufferSize) {
flush();
}
}
// 批量刷出
void flush() {
List<T> batch = new ArrayList<>();
queue.drainTo(batch, bufferSize);
if (!batch.isEmpty()) {
batchWriter.accept(batch); // 回调:批量写入 TDengine
}
}
}
```
### 2.3 应用点
| 缓冲实例 | 写入目标 | bufferSize | timeout |
|---------|---------|-----------|---------|
| 属性写入缓冲 | TDengine `product_property_*` | 200 | 3s |
| 消息日志缓冲 | TDengine `device_message` | 500 | 5s |
| 告警历史缓冲 | TDengine `alarm_history` | 100 | 3s |
| 规则调试日志缓冲 | TDengine `rule_debug_log` | 100 | 5s |
---
## 三、TDengine 优化
### 3.1 消息超级表优化
现有 `device_message` 表的 `params``data` 字段为 `NCHAR(2048)`,大消息会截断。
优化方案:
```sql
-- 增大到 4096覆盖绝大多数场景
ALTER STABLE device_message MODIFY COLUMN params NCHAR(4096);
ALTER STABLE device_message MODIFY COLUMN data NCHAR(4096);
```
### 3.2 新增超级表
```sql
-- 告警历史(两级告警的时序部分)
CREATE STABLE IF NOT EXISTS alarm_history (
ts TIMESTAMP,
alarm_record_id NCHAR(64),
alarm_config_id BIGINT,
severity TINYINT,
state NCHAR(16),
trigger_data NCHAR(4096),
details NCHAR(2048),
operator NCHAR(64),
remark NCHAR(256)
) TAGS (device_id BIGINT);
-- 规则调试日志
CREATE STABLE IF NOT EXISTS rule_debug_log (
ts TIMESTAMP,
rule_chain_id BIGINT,
node_id BIGINT,
node_type NCHAR(64),
input_data NCHAR(4096),
output_data NCHAR(4096),
duration_ms INT,
success BOOL,
error_msg NCHAR(512)
) TAGS (device_id BIGINT);
```
### 3.3 数据保留策略
| 超级表 | 建议保留期 | 配置方式 |
|--------|----------|---------|
| `device_message` | 90 天 | TDengine `KEEP 90d` |
| `product_property_*` | 365 天 | TDengine `KEEP 365d` |
| `alarm_history` | 365 天 | TDengine `KEEP 365d` |
| `rule_debug_log` | 7 天 | TDengine `KEEP 7d` |
---
## 四、Redis 缓存优化
### 4.1 修复生产隐患
| 问题 | 现状 | 修复 |
|------|------|------|
| `keys()` 全量扫描 | `SignalLossRuleProcessor``keys("iot:clean:signal:loss:*")` | 改用 `SCAN` 游标遍历 |
| 部分缓存无 TTL | `iot:device_property:*``iot:device_server_id` 常驻 | 设备删除时主动清理 |
| 告警缓存竞态 | 无并发保护 | 分布式锁 + RecordCache |
### 4.2 新增 Redis Key
| Key | 类型 | 用途 | TTL |
|-----|------|------|-----|
| `iot:alarm:cache:{recordId}` | Hash | 告警状态缓存 | 7 天 |
| `iot:alarm:lock:{recordId}` | String | 告警操作分布式锁 | 5s |
| `iot:device_property:{did}:shared` | Hash | Shared 属性 | 常驻 |
| `iot:device_property:{did}:server` | Hash | Server 属性 | 常驻 |
| `iot:rpc:pending:{deviceId}` | List | 待发送 RPC 队列(辅助索引) | 7 天 |
---
## 五、可观测性埋点Micrometer
### 5.1 核心指标
| 指标名 | 类型 | 标签 | 说明 |
|--------|------|------|------|
| `iot.device.online` | Gauge | protocol | 在线设备数 |
| `iot.message.received` | Counter | method, protocol | 上行消息数 |
| `iot.message.sent` | Counter | method | 下行消息数 |
| `iot.rule.execution` | Timer | ruleChainId, nodeType | 规则节点执行耗时 |
| `iot.rule.trigger` | Counter | ruleChainId, result | 规则触发次数(命中/未命中) |
| `iot.alarm.active` | Gauge | severity | 活跃告警数 |
| `iot.property.write` | Counter | productId | 属性写入次数 |
| `iot.buffer.size` | Gauge | bufferName | 缓冲区当前大小 |
| `iot.rpc.status` | Counter | status | RPC 状态分布 |
### 5.2 集成方式
```java
@Component
class IotMetrics {
private final MeterRegistry registry;
// 设备上线
void recordDeviceOnline(String protocol) {
Metrics.gauge("iot.device.online", Tags.of("protocol", protocol), onlineCount);
}
// 规则执行耗时
void recordRuleExecution(Long chainId, String nodeType, Duration duration) {
Timer.builder("iot.rule.execution")
.tag("ruleChainId", String.valueOf(chainId))
.tag("nodeType", nodeType)
.register(registry)
.record(duration);
}
}
```
Prometheus 端点:`/actuator/prometheus`Grafana Dashboard 可直接对接。

View File

@@ -0,0 +1,179 @@
# 08-协议与编解码扩展
> 参考JetLinks 协议包热加载 + Codec SPI 增强
---
## 一、设计目标
将现有的"每种设备写 Java Codec 编译部署"模式,升级为 **Codec SPI 注册 + 协议包热加载**,新设备接入从"写代码"变为"填配置 + 可选脚本"。
---
## 二、Codec SPI 增强
### 2.1 现有接口保留
```java
interface IotDeviceMessageCodec {
byte[] encode(IotDeviceMessage message);
IotDeviceMessage decode(byte[] bytes);
String type();
}
```
### 2.2 增强Codec 自注册 + 元数据
```java
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
@interface CodecMeta {
String type(); // 类型标识(如 "PEOPLE_COUNTER"
String name(); // 显示名称(如 "客流计数器"
String protocol(); // 适用协议("mqtt" / "http" / "tcp" / "all"
String description() default "";
}
// 示例
@CodecMeta(type = "PEOPLE_COUNTER", name = "客流计数器", protocol = "http")
public class IotPeopleCounterCodec implements IotDeviceMessageCodec { ... }
```
### 2.3 Codec 注册表
```java
@Component
class CodecRegistry {
Map<String, IotDeviceMessageCodec> codecs = new ConcurrentHashMap<>();
@PostConstruct
void init() {
// 1. 扫描 @CodecMeta 注解的 Spring Bean
// 2. 扫描外部 JAR 中的实现
}
void register(IotDeviceMessageCodec codec) { ... }
void unregister(String type) { ... }
IotDeviceMessageCodec get(String type) { ... }
List<CodecInfo> listAll() { ... } // 前端下拉列表
}
```
---
## 三、协议包热加载(借鉴 JetLinks
### 3.1 架构
```
协议包 JAR 文件
↓ 上传到文件管理器(或配置本地路径/HTTP URL
CodecJarLoader独立 ClassLoader 加载)
↓ 扫描 META-INF/services/IotDeviceMessageCodec 或 @CodecMeta
↓ 实例化并注册到 CodecRegistry
网关运行时使用新 Codec无需重启
```
### 3.2 实现
```java
class CodecJarLoader {
IotDeviceMessageCodec loadFromJar(String jarPath) {
// 独立 ClassLoader隔离依赖
URLClassLoader classLoader = new URLClassLoader(
new URL[]{new File(jarPath).toURI().toURL()},
getClass().getClassLoader() // 父加载器
);
// SPI 发现
ServiceLoader<IotDeviceMessageCodec> loader =
ServiceLoader.load(IotDeviceMessageCodec.class, classLoader);
for (IotDeviceMessageCodec codec : loader) {
codecRegistry.register(codec);
log.info("Loaded codec: {} from {}", codec.type(), jarPath);
}
}
void unloadJar(String jarPath) {
// 注销 codec + 关闭 ClassLoader
}
}
```
### 3.3 协议包 JAR 规范
```
my-custom-codec.jar
├── META-INF/
│ └── services/
│ └── com.viewsh.module.iot.gateway.codec.IotDeviceMessageCodec
│ → com.example.MyCustomCodec全限定类名
└── com/example/
└── MyCustomCodec.class
```
### 3.4 热加载管理 API
| 路径 | 方法 | 说明 |
|------|------|------|
| `/iot/codec/list` | GET | 列出所有已注册 Codec |
| `/iot/codec/upload` | POST | 上传协议包 JAR 并热加载 |
| `/iot/codec/unload` | DELETE | 卸载指定协议包 |
| `/iot/codec/reload` | PUT | 重新加载指定协议包 |
---
## 四、脚本化透传编解码(借鉴 JetLinks TransparentCodec
### 4.1 设计动机
对于简单的 JSON 格式差异(如字段名映射),不需要写 JAR用脚本配置即可。
### 4.2 脚本 Codec 配置
```json
{
"type": "SCRIPT_CODEC",
"config": {
"decodeScript": "let props = {}; props.temperature = raw.temp; props.humidity = raw.humi; return {method: 'thing.property.post', params: props};",
"encodeScript": "return JSON.stringify({cmd: msg.method, data: msg.params});"
}
}
```
### 4.3 实现
```java
@CodecMeta(type = "SCRIPT_CODEC", name = "脚本编解码", protocol = "all")
class ScriptCodec implements IotDeviceMessageCodec {
private AviatorEvaluatorInstance engine;
private Expression decodeExpr;
private Expression encodeExpr;
void init(JsonNode config) {
decodeExpr = engine.compile(config.get("decodeScript").asText());
encodeExpr = engine.compile(config.get("encodeScript").asText());
}
IotDeviceMessage decode(byte[] bytes) {
Map<String, Object> raw = JsonUtils.parseMap(bytes);
Map<String, Object> result = (Map) decodeExpr.execute(Map.of("raw", raw));
return IotDeviceMessage.requestOf(result.get("method"), result.get("params"));
}
}
```
---
## 五、迁移兼容
### 5.1 现有 7 个 Codec 不变
所有现有 CodecAlink/Camera3D11/HenghuaD5/PeopleCounter/JT808/TcpBinary/TcpJson继续保留为内置实现添加 `@CodecMeta` 注解即可自动注册。
### 5.2 产品配置不变
`iot_product.codec_type` 字段继续使用字符串标识,与 `CodecRegistry` 中的 `type` 对应。新增 Codec 后,前端下拉列表自动包含新选项。