diff --git a/开发者文档/03-IoT领域/升级设计方案/00-升级设计总览.md b/开发者文档/03-IoT领域/升级设计方案/00-升级设计总览.md new file mode 100644 index 0000000..7b3b0e2 --- /dev/null +++ b/开发者文档/03-IoT领域/升级设计方案/00-升级设计总览.md @@ -0,0 +1,96 @@ +# IoT 模块 v2.0 升级设计总览 + +> 设计日期:2026-04-10 | 参考项目:JetLinks Community 2.11 + ThingsBoard 4.3 +> 基于 viewsh-module-iot 现有 419 Java 文件源码分析 + 两大开源平台深度对比 + +--- + +## 一、升级目标 + +将 viewsh-module-iot 从"能用"升级为"好用、可扩展、可运维"的企业级 IoT 平台,核心目标: + +1. **规则引擎通用化** — 从三条硬编码链路进化为可视化 DAG 编排 + 脚本节点 +2. **子系统模型** — 引入空间/资产层级,设备归属子系统,支持任意业务拓扑 +3. **设备影子** — 属性三元分类 + 持久化 RPC,解决离线指令可靠投递 +4. **物模型增强** — 设备级派生物模型,支持个性化覆盖产品定义 +5. **告警体系重构** — 两级存储 + 完整状态机 + 关系传播 +6. **基础设施补齐** — 可观测性、写入缓冲、协议热加载、存储策略插件化 + +--- + +## 二、设计原则 + +| 原则 | 说明 | +|------|------| +| **南向屏蔽,北向统一** | 协议层允许混乱,业务层必须干净(继承现有铁律) | +| **JetLinks 为主骨架** | 规则引擎、条件引擎、SPI 扩展采用 JetLinks 模式 | +| **ThingsBoard 补关键设计** | 属性三元分类、RPC 状态机、EntityRelation、告警状态机 | +| **保留现有消息总线** | Local/Redis/RocketMQ 三实现不动,消费端接入新引擎 | +| **渐进式迁移** | 新旧规则可并行运行,不要求一次性切换 | + +--- + +## 三、文档索引 + +| 编号 | 文档 | 内容 | +|------|------|------| +| 01 | [[01-整体架构设计]] | 模块划分、分层架构、依赖关系、部署拓扑 | +| 02 | [[02-子系统与设备归属模型]] | 租户→项目→子系统→设备层级、设备归属管理 | +| 03 | [[03-物模型规范v2]] | 属性三元分类、派生物模型、数据类型增强 | +| 04 | [[04-规则引擎方案]] | DAG 编排、SPI Provider、脚本节点、抖动抑制 | +| 05 | [[05-告警体系设计]] | 两级存储、状态机、传播机制、通知集成 | +| 06 | [[06-设备影子与RPC]] | Shared 属性同步、持久化 RPC 状态机、离线指令队列 | +| 07 | [[07-数据存储方案]] | 存储策略插件化、写入缓冲、Key 压缩、TDengine 优化 | +| 08 | [[08-协议与编解码扩展]] | 协议包热加载、Codec SPI、透传编解码 | + +--- + +## 四、融合策略速查表 + +| 设计点 | 取自 | 说明 | +| ------ | -------------------------- | ------------------------------------------ | +| 规则引擎框架 | JetLinks SceneRule DAG | 串行/并行/分支编排 | +| 条件引擎 | JetLinks ReactorQL 思路 | 编译期过滤器,支持指标对比 | +| 脚本节点 | TB TBEL 思路 | Aviator/QLExpress 实现,Action Provider 注册 | +| 抖动抑制 | JetLinks ShakeLimit | 7 参数模型,参数化配置 | +| 消息总线 | 保留现有 | Local/Redis/RocketMQ 三实现 | +| 子系统模型 | 简化层级:租户→项目→子系统→设备 | 设备 FK 归属子系统,本次只做设备-子系统关系 | +| 设备影子 | TB 属性三元分类 + RPC | Client/Server/Shared + 持久化状态机 | +| 告警系统 | JetLinks 两级存储 + TB 状态机 | Record + History + Create→Update→Clear→ACK | +| 物模型继承 | JetLinks 派生物模型 | 设备级覆盖产品定义 | +| 存储策略 | JetLinks 策略模式 | 产品级配置存储后端 | +| 协议扩展 | JetLinks 协议热加载 | JAR + ClassLoader 隔离 | +| SPI 扩展 | JetLinks Provider + TB 注解 | 动态注册 + 元数据自描述 | +| 可观测性 | JetLinks + TB Micrometer | 连接数/吞吐量/规则耗时 | +| 写入缓冲 | JetLinks PersistenceBuffer | 内存+文件双层批量写入 | + +--- + +## 五、不引入的设计 + +| 设计 | 来源 | 不引入原因 | +| ------------ | -------- | -------------------------------- | +| Actor 系统 | TB | 过重,当前规模不需要设备级 Actor 隔离 | +| 多租户分区路由 | TB | 单租户内部系统,纯开销 | +| Edge 同步 | TB | 无边缘计算需求 | +| 完整 87 节点 | TB | 20-25 个覆盖核心场景 | +| 全栈响应式 | JetLinks | 现有 Spring MVC 生态稳定,不做全量迁移 | +| ReactorQL 原版 | JetLinks | 依赖 Project Reactor,改用表达式引擎替代核心思路 | +| EntityRelation 图关系 | TB | 过度设计,改用简洁的 subsystem_id FK 归属 | + +--- + +## 六、工程评审决议(2026-04-10) + +以下决议来自 `/plan-eng-review`,已更新到各设计文档中: + +| # | 决议 | 影响文档 | +|---|------|---------| +| 1 | 分层图删除"实体关系",替换为"子系统服务" | 01-整体架构 | +| 2 | 规则引擎拆为独立 Maven 模块 `viewsh-module-iot-rule`,gateway 不依赖 | 01-整体架构 | +| 3 | 规则链执行器加链级 try-catch 隔离,单链异常不影响其他链 | 04-规则引擎 | +| 4 | Aviator 脚本完整沙箱:超时 3s + MAX_LOOP_COUNT 1000 + 连续失败黑名单 | 04-规则引擎 | +| 5 | `iot_subsystem.code` 改为 NOT NULL,确保唯一索引有效 | 02-子系统 | +| 6 | `iot_alarm_record` 保持 BIGINT 自增 ID + `record_key` UK 幂等 | 05-告警 | +| 7 | 规则链全量缓存(启动加载)+ 变更时驱逐重载,不用延迟加载 | 04-规则引擎 | +| 8 | RPC 上线补发加限速(5 条/秒),防止设备过载 | 06-设备影子 | diff --git a/开发者文档/03-IoT领域/升级设计方案/01-整体架构设计.md b/开发者文档/03-IoT领域/升级设计方案/01-整体架构设计.md new file mode 100644 index 0000000..247f7df --- /dev/null +++ b/开发者文档/03-IoT领域/升级设计方案/01-整体架构设计.md @@ -0,0 +1,170 @@ +# 01-整体架构设计 + +> IoT 模块 v2.0 | 基于 JetLinks + ThingsBoard 融合方案 + +--- + +## 一、目标架构分层 + +``` +┌─────────────────────────────────────────────────────┐ +│ 前端 / API 层 │ +│ Admin UI(Vue) │ Open API(REST) │ WebSocket │ +├─────────────────────────────────────────────────────┤ +│ 业务管理层(Manager) │ +│ 产品管理 │ 设备管理 │ 子系统管理 │ 规则管理 │ 告警管理 │ +│ OTA管理 │ 统计分析 │ 通知管理 │ 用户权限 │ │ +├─────────────────────────────────────────────────────┤ +│ 规则引擎层(Rule Engine) │ +│ DAG 编排器 │ 条件评估器 │ 脚本引擎 │ 抖动抑制 │ +│ 触发器 SPI │ 动作 SPI │ 数据富化 │ 告警处理器 │ +├─────────────────────────────────────────────────────┤ +│ 核心服务层(Core Services) │ +│ 设备影子 │ 属性服务 │ 时序服务 │ RPC 服务 │ +│ 物模型服务 │ 子系统服务 │ 事件总线 │ 通知服务 │ +├─────────────────────────────────────────────────────┤ +│ 消息总线(Message Bus) │ +│ Local │ Redis Stream │ RocketMQ │ +├─────────────────────────────────────────────────────┤ +│ 设备接入层(Gateway) │ +│ MQTT Broker │ EMQX Bridge │ HTTP Server │ TCP Server │ +│ Codec SPI(Alink/JT808/Camera3D11/...) │ +├─────────────────────────────────────────────────────┤ +│ 存储层(Storage) │ +│ MySQL(业务实体) │ TDengine(时序数据) │ Redis(缓存) │ +└─────────────────────────────────────────────────────┘ +``` + +--- + +## 二、模块划分(v2.0) + +``` +viewsh-module-iot/ +├── viewsh-module-iot-api/ # [契约层] 不变 +│ DTO、枚举、Feign 接口 +│ +├── viewsh-module-iot-core/ # [核心层] 增强 +│ ├── messagebus/ # 消息总线(保留现有三实现) +│ ├── integration/ # 集成事件(保留) +│ ├── shadow/ # 【新增】设备影子核心接口 +│ │ ├── AttributeScope.java # 属性三元分类枚举 +│ │ ├── DeviceShadow.java # 影子服务接口 +│ │ └── RpcCommand.java # RPC 指令模型 +│ └── util/ # 工具类(保留+增强) +│ +├── viewsh-module-iot-rule/ # 【新增·独立模块】规则引擎 +│ ├── model/ # RuleModel、RuleNode、RuleLink +│ ├── spi/ # TriggerProvider、ActionProvider +│ ├── engine/ # RuleEngine 执行器(链级 try-catch 隔离) +│ ├── script/ # 脚本引擎(Aviator + 完整沙箱) +│ └── shakelimit/ # 抖动抑制 +│ +├── viewsh-module-iot-server/ # [业务层] 重构 +│ ├── controller/ # REST API +│ ├── service/ +│ │ ├── product/ # 产品管理(保留) +│ │ ├── device/ # 设备管理(增强:影子集成) +│ │ ├── subsystem/ # 【新增】子系统管理 +│ │ ├── thingmodel/ # 物模型(增强:派生物模型) +│ │ ├── shadow/ # 【新增】设备影子实现 +│ │ ├── rule/ # 规则引擎(重构) +│ │ │ ├── scene/ # 场景联动(DAG 编排) +│ │ │ ├── trigger/ # 触发器实现 +│ │ │ ├── action/ # 动作实现 +│ │ │ ├── condition/ # 条件评估器 +│ │ │ ├── script/ # 脚本引擎实现 +│ │ │ └── shakelimit/ # 抖动抑制 +│ │ ├── alert/ # 告警(重构:两级+状态机) +│ │ ├── ota/ # OTA(保留+完善) +│ │ ├── notify/ # 【新增】通知服务 +│ │ └── statistics/ # 统计(增强) +│ ├── dal/ # 数据访问 +│ │ ├── mysql/ # 关系库 +│ │ ├── tdengine/ # 时序库(策略模式增强) +│ │ └── redis/ # 缓存 +│ └── framework/ # 框架层 +│ ├── observe/ # 【新增】可观测性(Micrometer) +│ └── buffer/ # 【新增】写入缓冲(PersistenceBuffer) +│ +└── viewsh-module-iot-gateway/ # [接入层] 增强 + ├── protocol/ # 协议层(保留) + ├── codec/ # 编解码(增强:SPI 热加载) + └── service/ # 网关服务 +``` + +--- + +## 三、核心依赖关系(v2.0) + +``` +gateway ──────┐ + ├── core(消息总线、影子接口、工具) +server ───────┤ └── api(DTO、枚举、Feign) + │ + └── rule(独立模块,仅 server 依赖,gateway 不依赖) + └── aviator(脚本引擎) + +server 额外依赖: + ├── viewsh-module-iot-rule(规则引擎) + ├── viewsh-module-system-api(用户、租户) + ├── viewsh-framework(框架层) + └── micrometer(可观测性) +``` + +--- + +## 四、新增实体一览 + +| 实体 | 表名 | 说明 | +|------|------|------| +| **IotProjectDO** | `iot_project` | 项目(架构预留,本次不开放 API) | +| **IotSubsystemDO** | `iot_subsystem` | 子系统(设备归属单元) | +| **IotAlarmHistoryDO** | TDengine `alarm_history` | 告警时序归档 | +| **IotDeviceRpcDO** | `iot_device_rpc` | 持久化 RPC 指令 | +| **IotRuleChainDO** | `iot_rule_chain` | 规则链(DAG 图定义) | +| **IotRuleNodeDO** | `iot_rule_node` | 规则节点 | + +--- + +## 五、关键技术选型变更 + +| 领域 | v1.0(现有) | v2.0(目标) | +|------|-------------|-------------| +| 规则引擎条件 | SpEL 字符串表达式 | Aviator 表达式引擎(编译缓存+类型安全) | +| 脚本执行 | 无 | Aviator/QLExpress(轻量+沙箱) | +| 规则编排 | 平铺动作列表 | DAG(RuleModel,参考 JetLinks) | +| 告警存储 | 单表 MySQL | MySQL(AlarmRecord 当前)+ TDengine(AlarmHistory 时序) | +| 设备属性 | Redis Hash(无分类) | Redis Hash + AttributeScope 三元分类 | +| RPC | 单次下发,丢失不重试 | 持久化 RPC + 状态机(QUEUED→SENT→DELIVERED→SUCCESS) | +| 设备组织 | groupIds JSON 数组 | 子系统归属(subsystem_id FK)+ 分组保留并存 | +| 监控指标 | 无 | Micrometer(连接数/吞吐量/规则耗时/队列深度) | + +--- + +## 六、迁移策略 + +### 阶段一:基础设施(1-2 天) +- 新增 `iot_project`(预留)、`iot_subsystem` 表,设备表增加 `subsystem_id` +- 新增 `iot_device_rpc` 表 +- 引入 Micrometer 依赖,添加核心埋点 +- 引入 Aviator 依赖 + +### 阶段二:核心服务(2-3 天) +- 实现子系统 CRUD + 设备归属绑定 +- 实现设备影子服务(属性三元分类 + RPC 状态机) +- 物模型增强(派生物模型) +- 告警重构(两级存储 + 状态机) + +### 阶段三:规则引擎(3-5 天) +- 实现 RuleModel DAG 编排框架 +- 实现 TriggerProvider/ActionProvider SPI +- 迁移现有触发器(设备状态/属性/事件/定时) +- 迁移现有动作(设备控制/服务调用/告警/数据转发) +- 新增脚本节点、数据富化节点 +- 新增抖动抑制 + +### 阶段四:兼容迁移(1-2 天) +- 现有 SceneRule/DataRule 配置数据迁移工具 +- 旧规则 → 新 DAG 转换器 +- 新旧规则并行运行验证 diff --git a/开发者文档/03-IoT领域/升级设计方案/02-子系统与设备归属模型.md b/开发者文档/03-IoT领域/升级设计方案/02-子系统与设备归属模型.md new file mode 100644 index 0000000..abed254 --- /dev/null +++ b/开发者文档/03-IoT领域/升级设计方案/02-子系统与设备归属模型.md @@ -0,0 +1,261 @@ +# 02-子系统与设备归属模型 + +--- + +## 一、组织层级架构 + +``` +租户(Tenant) ← 已有,TenantBaseDO + └── 项目(Project) ← 架构预留,本次不实现 + └── 子系统(Subsystem) ← 本次实现 + └── 设备(Device) ← 本次实现归属关系 +``` + +### 1.1 本次升级范围 + +- **实现**:子系统(Subsystem)实体 + 设备归属子系统 +- **预留**:项目(Project)层级,表结构和字段预留,但不开放 API +- **不做**:通用 EntityRelation 图关系(过度设计,当前不需要) + +### 1.2 设计原则 + +- 一个设备**必须且只能**归属一个子系统(`subsystem_id` NOT NULL FK) +- 一个子系统属于一个项目(`project_id` FK,本次可为空) +- 子系统和项目都在租户隔离范围内 +- 产品(Product)是设备的"型号模板",与子系统无关,维持现有设计不变 + +--- + +## 二、数据模型 + +### 2.1 项目表(架构预留) + +```sql +CREATE TABLE iot_project ( + id BIGINT PRIMARY KEY AUTO_INCREMENT, + name VARCHAR(128) NOT NULL COMMENT '项目名称', + description TEXT COMMENT '项目描述', + icon VARCHAR(256) COMMENT '项目图标', + status TINYINT NOT NULL DEFAULT 1 COMMENT '状态(0=禁用 1=启用)', + sort INT DEFAULT 0 COMMENT '排序', + -- 租户 + 审计 + tenant_id BIGINT NOT NULL, + creator VARCHAR(64), + create_time DATETIME DEFAULT CURRENT_TIMESTAMP, + updater VARCHAR(64), + update_time DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, + deleted BIT DEFAULT 0, + UNIQUE KEY uk_name_tenant (name, tenant_id, deleted) +) COMMENT '项目'; +``` + +### 2.2 子系统表(本次实现) + +```sql +CREATE TABLE iot_subsystem ( + id BIGINT PRIMARY KEY AUTO_INCREMENT, + name VARCHAR(128) NOT NULL COMMENT '子系统名称', + code VARCHAR(64) NOT NULL COMMENT '子系统编码(业务标识,如 security / energy / clean)', + description TEXT COMMENT '子系统描述', + icon VARCHAR(256) COMMENT '子系统图标', + status TINYINT NOT NULL DEFAULT 1 COMMENT '状态(0=禁用 1=启用)', + sort INT DEFAULT 0 COMMENT '排序', + project_id BIGINT COMMENT '所属项目 ID(预留,本次可为空)', + config JSON COMMENT '子系统配置(扩展 JSON)', + -- 租户 + 审计 + tenant_id BIGINT NOT NULL, + creator VARCHAR(64), + create_time DATETIME DEFAULT CURRENT_TIMESTAMP, + updater VARCHAR(64), + update_time DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, + deleted BIT DEFAULT 0, + UNIQUE KEY uk_name_tenant (name, tenant_id, deleted), + UNIQUE KEY uk_code_tenant (code, tenant_id, deleted) +) COMMENT '子系统'; +``` + +### 2.3 设备表变更 + +```sql +-- 设备新增子系统归属字段 +ALTER TABLE iot_device ADD COLUMN subsystem_id BIGINT COMMENT '所属子系统 ID'; +-- 索引 +ALTER TABLE iot_device ADD INDEX idx_subsystem_id (subsystem_id); +``` + +**字段说明**: +- `subsystem_id`:设备归属的子系统 ID,FK 关联 `iot_subsystem.id` +- 新设备创建时必须指定子系统(或后续批量分配) +- 存量设备的 `subsystem_id` 为 NULL,通过迁移工具或管理后台补录 + +--- + +## 三、实体关系总览 + +``` +┌─────────────────────────────────────────────┐ +│ 租户(Tenant) │ +│ │ +│ ┌─── 项目A ──────────────────────────────┐ │ +│ │ │ │ +│ │ ┌── 安防子系统 ─────┐ ┌── 能耗子系统 ──┐│ │ +│ │ │ 摄像头001 │ │ 电表001 ││ │ +│ │ │ 门禁001 │ │ 水表001 ││ │ +│ │ └─────────────────┘ └──────────────┘│ │ +│ └───────────────────────────────────────┘ │ +│ │ +│ ┌─── 项目B ──────────────────────────────┐ │ +│ │ ┌── 保洁子系统 ─────┐ │ │ +│ │ │ 客流计数器001 │ │ │ +│ │ │ 工牌001 │ │ │ +│ │ │ Beacon001 │ │ │ +│ │ └─────────────────┘ │ │ +│ └───────────────────────────────────────┘ │ +│ │ +│ 产品(Product)── 独立维度,与子系统无关 │ +│ ├── 客流计数器(PEOPLE_COUNTER) │ +│ ├── GPS工牌(JT808) │ +│ └── 3D11摄像头(CAMERA_3D11) │ +└─────────────────────────────────────────────┘ +``` + +**关键区分**: +- **产品** = 设备的"型号"(决定物模型、编解码器、协议) +- **子系统** = 设备的"归属"(决定业务逻辑、规则、告警通知对象) +- 同一产品的设备可以属于不同子系统 + +--- + +## 四、API 设计 + +### 4.1 子系统管理 + +| 路径 | 方法 | 权限 | 说明 | +|------|------|------|------| +| `/iot/subsystem/create` | POST | `iot:subsystem:create` | 创建子系统 | +| `/iot/subsystem/update` | PUT | `iot:subsystem:update` | 更新子系统 | +| `/iot/subsystem/delete` | DELETE | `iot:subsystem:delete` | 删除子系统(需无设备) | +| `/iot/subsystem/get` | GET | `iot:subsystem:query` | 获取子系统详情 | +| `/iot/subsystem/page` | GET | `iot:subsystem:query` | 分页查询 | +| `/iot/subsystem/simple-list` | GET | 无 | 下拉列表(仅启用,含 id/name/code) | + +**创建请求**: +```json +{ + "name": "保洁子系统", + "code": "clean", + "description": "保洁业务相关设备管理", + "icon": "clean-icon", + "status": 1, + "projectId": null +} +``` + +### 4.2 设备归属操作 + +| 路径 | 方法 | 权限 | 说明 | +|------|------|------|------| +| `/iot/device/bindSubsystem` | PUT | `iot:device:update` | 单设备绑定子系统 | +| `/iot/device/batchBindSubsystem` | PUT | `iot:device:update` | 批量绑定子系统 | + +**请求体**: +```json +// 单设备 +{ "deviceId": 1001, "subsystemId": 5 } + +// 批量 +{ "deviceIds": [1001, 1002, 1003], "subsystemId": 5 } +``` + +### 4.3 设备查询增强 + +现有设备分页查询(`/iot/device/page`)新增 `subsystemId` 过滤参数: + +``` +GET /iot/device/page?subsystemId=5&productId=10&status=1 +``` + +### 4.4 子系统设备统计 + +| 路径 | 方法 | 说明 | +|------|------|------| +| `/iot/subsystem/device-count` | GET | 各子系统设备数量统计 | +| `/iot/subsystem/{id}/devices` | GET | 查询子系统下所有设备(支持分页) | + +--- + +## 五、Service 层设计 + +### 5.1 IotSubsystemService + +```java +interface IotSubsystemService { + Long createSubsystem(IotSubsystemSaveReqVO req); + void updateSubsystem(IotSubsystemSaveReqVO req); + void deleteSubsystem(Long id); // 校验无设备引用 + IotSubsystemDO getSubsystem(Long id); + PageResult getSubsystemPage(IotSubsystemPageReqVO req); + List getSubsystemSimpleList(); + Map getDeviceCountMap(); // subsystemId → deviceCount +} +``` + +### 5.2 设备服务增强 + +```java +// IotDeviceService 新增方法 +void bindDeviceToSubsystem(Long deviceId, Long subsystemId); +void batchBindDevicesToSubsystem(Collection deviceIds, Long subsystemId); +``` + +--- + +## 六、与规则引擎的集成 + +### 6.1 规则链绑定子系统 + +规则链(RuleChain)新增 `subsystemId` 字段,规则只对该子系统下的设备生效: + +```sql +ALTER TABLE iot_rule_chain ADD COLUMN subsystem_id BIGINT COMMENT '关联子系统(空=全局规则)'; +``` + +规则触发时匹配逻辑: +``` +消息到达 → 获取设备 subsystemId + → 查找该 subsystemId 绑定的规则链 + → 同时查找 subsystemId=NULL 的全局规则链 + → 合并执行 +``` + +### 6.2 告警按子系统隔离 + +告警配置新增 `subsystemId`,告警记录冗余 `subsystemId`: +- 告警通知对象可按子系统配置不同接收人 +- 告警统计支持按子系统维度聚合 + +--- + +## 七、与现有系统兼容 + +### 7.1 设备分组(group_ids)保留 + +`group_ids` 与 `subsystem_id` 是不同维度: +- **子系统** = 业务归属(决定规则/告警/权限) +- **分组** = 管理标签(灵活分类,多对多) + +两者并存,互不冲突。 + +### 7.2 存量数据迁移 + +存量设备的 `subsystem_id` 初始为 NULL。提供管理后台批量分配功能: +1. 按产品批量选择设备 +2. 分配到指定子系统 +3. 未分配的设备在查询时显示"未归属"标签 + +### 7.3 保洁业务适配 + +现有保洁清洗规则(CleanRule)的设备通过 `AreaDeviceApi` Feign 调用 Ops 模块关联。升级后: +- 保洁相关设备统一归属"保洁子系统" +- 区域-设备关系仍由 Ops 模块管理(IoT 不越界) +- `subsystemId` 用于规则链匹配,不替代 Ops 的区域概念 diff --git a/开发者文档/03-IoT领域/升级设计方案/03-物模型规范v2.md b/开发者文档/03-IoT领域/升级设计方案/03-物模型规范v2.md new file mode 100644 index 0000000..360fb58 --- /dev/null +++ b/开发者文档/03-IoT领域/升级设计方案/03-物模型规范v2.md @@ -0,0 +1,140 @@ +# 03-物模型规范 v2 + +> 增强:属性三元分类 + 派生物模型 + 数据类型扩展 + +--- + +## 一、属性三元分类(借鉴 ThingsBoard) + +### 1.1 设计动机 + +现有系统所有属性不区分来源和可见性,设备上报的状态、平台计算的指标、下发给设备的配置混在一起。引入 **AttributeScope** 三元分类: + +| Scope | 写入方 | 读取方 | 典型场景 | +| ---------- | ------- | ------- | ------------------ | +| **CLIENT** | 设备上报 | 平台 | 固件版本、信号强度、电量、传感器数据 | +| **SERVER** | 平台/规则引擎 | 平台 | 计算指标、标签、运维备注 | +| **SHARED** | 平台 | 设备 + 平台 | 配置参数、阈值、开关、定时计划 | + +### 1.2 枚举定义 + +```java +enum AttributeScope { + CLIENT(1, "设备端属性"), // 设备上报,平台只读 + SERVER(2, "服务端属性"), // 平台私有,设备不可见 + SHARED(3, "共享属性"); // 平台写,设备可订阅 +} +``` + +### 1.3 存储变更 + +**Redis Hash Key 变更**: + +``` +// v1.0(现有) +iot:device_property:{deviceId} → {identifier: value} + +// v2.0(升级) +iot:device_property:{deviceId}:client → {identifier: value} +iot:device_property:{deviceId}:server → {identifier: value} +iot:device_property:{deviceId}:shared → {identifier: value} +``` + +**TDengine 不变**:时序数据仍按 `product_property_{productId}` 超级表存储,scope 作为普通列(TINYINT)标记。 + +### 1.4 物模型属性定义扩展 + +```java +ThingModelProperty { + // ... 现有字段保留 ... + String scope; // 【新增】"client" / "server" / "shared",默认 "client" +} +``` + +--- + +## 二、派生物模型(借鉴 JetLinks) + +### 2.1 设计动机 + +同一产品下的设备可能存在差异化配置。例如同型号客流计数器,A 设备多装了温湿度传感器。现有系统设备只能使用产品的物模型,无法个性化扩展。 + +### 2.2 实现方案 + +**设备表新增字段**: + +```sql +ALTER TABLE iot_device ADD COLUMN derive_metadata JSON COMMENT '设备派生物模型(覆盖/扩展产品物模型)'; +``` + +**合并策略**: + +```java +// 获取设备完整物模型 = 产品物模型 + 设备派生物模型 +List getEffectiveThingModel(Long deviceId) { + List productModels = getByProductId(device.getProductId()); // 产品基础 + ThingModelOverride derive = device.getDeriveMetadata(); // 设备扩展 + if (derive == null) return productModels; + return mergeThingModel(productModels, derive, MergeOption.DEVICE_OVERRIDE); +} +``` + +**合并规则**: +- 相同 identifier:设备定义覆盖产品定义(`DEVICE_OVERRIDE`) +- 新 identifier:追加到合并结果 +- 产品有但设备无:保留产品定义 + +### 2.3 派生物模型 JSON 结构 + +```json +{ + "properties": [ + { + "identifier": "humidity", + "name": "湿度", + "accessMode": "r", + "dataType": "float", + "dataSpecs": {"min": "0", "max": "100", "step": "0.1", "unit": "%"} + } + ], + "services": [], + "events": [] +} +``` + +--- + +## 三、数据类型增强 + +### 3.1 新增 TIMESTAMP 类型 + +| 物模型类型 | TDengine 类型 | 说明 | +|-----------|-------------|------| +| `timestamp` | TIMESTAMP | 毫秒级时间戳(新增) | + +### 3.2 struct/array 支持嵌套校验 + +现有系统 struct/array 存为 VARCHAR(1024),无法校验内部结构。v2.0 增强: +- struct 定义包含子字段的完整 DataSpecs +- 入库前按定义校验 JSON 结构 +- 查询时支持 JsonPath 提取嵌套字段 + +--- + +## 四、物模型缓存增强 + +### 4.1 双级缓存 + +``` +L1: Caffeine 本地缓存(productId → List,5 分钟 TTL) +L2: Redis(Spring Cache,现有方案保留) +``` + +### 4.2 派生物模型缓存 + +``` +Key: iot:device_thing_model:{deviceId} +Value: 合并后的完整物模型 JSON +TTL: 10 分钟 +驱逐: 设备 deriveMetadata 更新时清除 +``` diff --git a/开发者文档/03-IoT领域/升级设计方案/04-规则引擎方案.md b/开发者文档/03-IoT领域/升级设计方案/04-规则引擎方案.md new file mode 100644 index 0000000..f319dd9 --- /dev/null +++ b/开发者文档/03-IoT领域/升级设计方案/04-规则引擎方案.md @@ -0,0 +1,374 @@ +# 04-规则引擎方案 + +> 融合 JetLinks SceneRule DAG 框架 + ThingsBoard 脚本节点思路 + +--- + +## 一、设计目标 + +将现有三条硬编码链路(DataRule/SceneRule/CleanRule)统一为 **一套可配置的 DAG 规则引擎**,通过 SPI 扩展支持任意触发器、条件、动作组合,同时引入脚本节点实现运行时灵活数据转换。 + +--- + +## 二、核心架构 + +``` +设备消息(IotDeviceMessage) + ↓ 消息总线消费 +IotRuleEngineMessageHandler(统一入口,替代现有 4 个消费者) + ↓ 按 productId + deviceId + subsystemId 匹配规则链(从全量缓存查) + ↓ +┌─────────────────────────────────────────┐ +│ RuleChain(规则链 DAG) │ +│ │ +│ [Trigger] → [Condition] → [Branch] │ +│ ↓ ↓ │ +│ [ShakeLimit] [ShakeLimit] │ +│ ↓ ↓ │ +│ [Action1] [Action2] │ +│ ↓ ↓ │ +│ [Action3] [Script] │ +│ ↓ ↓ │ +│ [Notify] [Alarm] │ +└─────────────────────────────────────────┘ +``` + +--- + +## 三、数据模型 + +### 3.1 规则链(RuleChain) + +```sql +CREATE TABLE iot_rule_chain ( + id BIGINT PRIMARY KEY AUTO_INCREMENT, + name VARCHAR(128) NOT NULL, + description TEXT, + status TINYINT DEFAULT 1, -- 0=禁用, 1=启用 + type VARCHAR(32) NOT NULL, -- SCENE(场景联动) / DATA(数据转发) / CUSTOM + first_node_id BIGINT, -- 入口节点 + debug_mode BOOLEAN DEFAULT FALSE, -- 调试模式(记录每步输入输出) + tenant_id BIGINT, + -- 审计字段 ... +); +``` + +### 3.2 规则节点(RuleNode) + +```sql +CREATE TABLE iot_rule_node ( + id BIGINT PRIMARY KEY AUTO_INCREMENT, + rule_chain_id BIGINT NOT NULL, -- FK + name VARCHAR(128), + type VARCHAR(64) NOT NULL, -- Provider 类型标识 + configuration JSON NOT NULL, -- 节点配置(多态 JSON) + position_x INT, -- 画布坐标(可视化编排用) + position_y INT, +); +``` + +### 3.3 规则连线(RuleLink) + +```sql +CREATE TABLE iot_rule_link ( + id BIGINT PRIMARY KEY AUTO_INCREMENT, + rule_chain_id BIGINT NOT NULL, + source_node_id BIGINT NOT NULL, -- 源节点 + target_node_id BIGINT NOT NULL, -- 目标节点 + relation_type VARCHAR(32) NOT NULL, -- Success / Failure / True / False / 自定义 + condition JSON, -- 可选:连线条件(表达式) + sort_order INT DEFAULT 0, -- 排序(同源多出边时) +); +``` + +--- + +## 四、SPI Provider 体系 + +### 4.1 核心接口 + +```java +// 触发器 Provider +interface TriggerProvider { + String getType(); // "device_property" / "timer" / "manual" + boolean matches(IotDeviceMessage msg, JsonNode config); + void register(RuleChain chain, JsonNode config); // 定时器注册等 + void unregister(RuleChain chain); +} + +// 动作 Provider +interface ActionProvider { + String getType(); // "device_control" / "script" / "notify" / "alarm" / "http_push" + ActionResult execute(RuleContext ctx, JsonNode config); +} + +// 条件评估器 +interface ConditionEvaluator { + String getType(); // "expression" / "script" / "time_range" + boolean evaluate(RuleContext ctx, JsonNode config); +} +``` + +### 4.2 内置 Provider 清单 + +**触发器(Trigger):** + +| 类型标识 | 实现类 | 来源 | 说明 | +|---------|--------|------|------| +| `device_state` | DeviceStateTrigger | 迁移自现有 | 设备上下线 | +| `device_property` | DevicePropertyTrigger | 迁移自现有 | 属性上报 | +| `device_event` | DeviceEventTrigger | 迁移自现有 | 事件上报 | +| `device_service` | DeviceServiceTrigger | 迁移自现有 | 服务调用回复 | +| `timer` | TimerTrigger | 迁移自现有 | CRON 定时 | +| `manual` | ManualTrigger | **新增** | 手动/API 触发 | + +**动作(Action):** + +| 类型标识 | 实现类 | 来源 | 说明 | +|---------|--------|------|------| +| `device_property_set` | DevicePropertySetAction | 迁移自现有 | 设备属性设置 | +| `device_service_invoke` | DeviceServiceInvokeAction | 迁移自现有 | 设备服务调用 | +| `alarm_trigger` | AlarmTriggerAction | 重构 | 触发告警(新状态机) | +| `alarm_clear` | AlarmClearAction | **新增** | 清除告警 | +| `notify` | NotifyAction | **新增** | 发送通知(SMS/邮件/站内信/Webhook) | +| `http_push` | HttpPushAction | 迁移自 DataSink | HTTP 数据推送 | +| `mq_push` | MqPushAction | 迁移自 DataSink | MQ 数据推送(RocketMQ/Kafka/RabbitMQ) | +| `redis_push` | RedisPushAction | 迁移自 DataSink | Redis 数据推送 | +| `tcp_push` | TcpPushAction | 迁移自 DataSink | TCP 数据推送 | +| `script` | ScriptAction | **新增** | 脚本执行(数据转换/富化) | +| `delay` | DelayAction | **新增** | 延迟执行 | +| `enrich` | EnrichAction | **新增** | 数据富化(读取属性/时序) | +| `log` | LogAction | **新增** | 日志记录 | + +**条件评估器(Condition):** + +| 类型标识 | 实现类 | 说明 | +|---------|--------|------| +| `expression` | ExpressionCondition | Aviator 表达式(替代 SpEL) | +| `script` | ScriptCondition | 脚本条件(复杂逻辑) | +| `time_range` | TimeRangeCondition | 时间范围(当日时间/日期区间) | +| `device_state` | DeviceStateCondition | 设备在线/离线状态 | + +--- + +## 五、脚本引擎设计 + +### 5.1 选型:Aviator + +| 维度 | Aviator | QLExpress | TBEL | +| ---- | -------------- | --------- | --------- | +| 性能 | 编译为字节码,接近 Java | 解释执行 | ANTLR4 编译 | +| 安全 | 内置沙箱(白名单函数) | 需手动限制 | 语言层面安全 | +| 生态 | 国内广泛使用,中文文档丰富 | 阿里开源 | TB 自有 | +| 学习成本 | 类 Java 表达式 | 类 Java | 私有语法 | +| 依赖 | 单 JAR,无额外依赖 | 单 JAR | 需 TB 框架 | + +**推荐 Aviator**:编译缓存 + 类型安全 + 沙箱 + 国内生态好。 + +### 5.2 脚本节点配置 + +```json +{ + "type": "script", + "config": { + "engine": "aviator", + "script": "let result = msg.temperature * 1.8 + 32; return seq.map('fahrenheit', result, 'celsius', msg.temperature);", + "timeout": 3000, + "outputTo": "msg" + } +} +``` + +### 5.3 沙箱安全配置 + +```java +AviatorEvaluatorInstance engine = AviatorEvaluator.newInstance(); +// 禁用系统函数 +engine.setOption(Options.DISABLE_ASSIGNMENT, true); +// 限制最大循环次数 +engine.setOption(Options.MAX_LOOP_COUNT, 1000); +// 编译缓存 +engine.useLRUExpressionCache(256); +``` + +--- + +## 六、抖动抑制(ShakeLimit) + +### 6.1 参数模型(借鉴 JetLinks) + +```java +class ShakeLimitConfig { + boolean enabled; // 是否启用 + int time; // 时间窗口(秒) + int threshold; // 触发阈值(窗口内达到 N 次才触发) + boolean alarmFirst; // true=取第一条,false=取最后一条 + boolean continuous; // 连续模式:不满足条件时重置计数器 + boolean rolling; // 滚动窗口 vs 固定窗口 +} +``` + +### 6.2 实现方式 + +不依赖 Reactor(现有系统不是响应式),使用 **Guava RateLimiter + ScheduledExecutorService** 实现: + +``` +消息到达 → ShakeLimitFilter + ├── 按 deviceId 分组,维护 ConcurrentHashMap + ├── WindowState: count, firstMsg, lastMsg, windowStartTime + ├── 固定窗口:windowStartTime + time 秒内累计 + ├── 滚动窗口:每条消息重置 windowStartTime + ├── count >= threshold → 输出 firstMsg 或 lastMsg + └── continuous=true 且不满足条件 → 重置 count=0 +``` + +### 6.3 配置示例 + +```json +{ + "shakeLimit": { + "enabled": true, + "time": 10, + "threshold": 3, + "alarmFirst": true, + "continuous": true, + "rolling": false + } +} +``` + +含义:10 秒窗口内连续 3 次触发条件才真正执行动作,取第一条消息,中间不满足条件则重置计数。 + +--- + +## 七、数据富化节点(Enrich) + +借鉴 ThingsBoard Enrichment 节点,在规则执行过程中动态读取上下文数据: + +| 富化类型 | 配置 | 说明 | +|---------|------|------| +| `device_property` | deviceId + identifiers | 读取设备最新属性(从 Redis) | +| `device_history` | deviceId + identifier + timeRange | 读取属性历史(从 TDengine) | +| `related_device` | relationType + identifiers | 读取关联设备属性 | +| `asset_info` | assetId 或 relationType | 读取所属资产信息 | + +富化结果注入 `RuleContext.metadata`,后续节点可引用。 + +--- + +## 八、分支执行逻辑 + +### 8.1 条件分组(保留 JetLinks 模式) + +```json +{ + "branches": [ + { + "name": "高温告警", + "conditionGroups": [ + [{"identifier": "temperature", "operator": ">", "value": "40"}] + ], + "shakeLimit": {"enabled": true, "time": 30, "threshold": 3}, + "actions": [{"type": "alarm_trigger", "config": {...}}], + "executeAnyway": false + }, + { + "name": "温度通知", + "conditionGroups": [ + [{"identifier": "temperature", "operator": ">", "value": "30"}] + ], + "actions": [{"type": "notify", "config": {...}}], + "executeAnyway": true + } + ] +} +``` + +### 8.2 执行语义 + +- `executeAnyway=false`(默认):前一分支命中后跳过当前分支(if/else-if 语义) +- `executeAnyway=true`:无论前面分支是否命中都执行(重叠触发) + +--- + +## 九、与现有系统的迁移映射 + +| 现有概念 | v2.0 对应 | 迁移方式 | +|---------|----------|---------| +| SceneRule | RuleChain(type=SCENE) | 自动转换:triggers→Trigger 节点,conditions→Branch 条件,actions→Action 节点 | +| DataRule + DataSink | RuleChain(type=DATA) | 自动转换:sourceConfigs→Trigger 配置,sinkIds→各类 Push Action | +| CleanRule(4 个 Processor) | RuleChain(type=SCENE)+ Script Action | 手动迁移:硬编码逻辑抽取为配置+脚本 | +| IotDeviceMessageSubscriber | 保留,但从中分离规则触发 | 设备消息处理保留,规则触发委托给 RuleEngine | + +--- + +## 十、调试模式 + +规则链 `debug_mode=true` 时: +- 每个节点执行前后记录 `RuleContext` 快照 +- 写入 TDengine `rule_debug_log` 表 +- 前端可回放执行路径,查看每步的输入/输出/耗时 +- 生产环境建议关闭(性能开销) + +--- + +## 十一、工程评审决议(2026-04-10) + +以下决议来自 /plan-eng-review 工程评审,需严格执行: + +### 11.1 独立 Maven 模块 + +规则引擎从 `iot-core` 拆出为独立模块 `viewsh-module-iot-rule`,仅 `iot-server` 依赖,`iot-gateway` 不依赖。避免网关 JAR 引入 Aviator 等不需要的依赖。 + +### 11.2 链级故障隔离 + +统一消费者(`IotRuleEngineMessageHandler`)处理一条消息时,每条规则链的执行**必须独立 try-catch**: + +```java +for (RuleChain chain : matchedChains) { + try { + ruleEngine.execute(chain, context); + } catch (Exception e) { + log.error("规则链 {} 执行失败", chain.getId(), e); + metrics.recordRuleFailure(chain.getId()); + // 不中断,继续执行下一条规则链 + } +} +``` + +### 11.3 脚本完整沙箱(安全红线) + +Aviator 脚本节点**必须**启用以下安全配置: + +```java +AviatorEvaluatorInstance engine = AviatorEvaluator.newInstance(); +engine.setOption(Options.DISABLE_ASSIGNMENT, true); // 禁止赋值到外部变量 +engine.setOption(Options.MAX_LOOP_COUNT, 1000); // 最大循环 1000 次 +engine.useLRUExpressionCache(256); // 编译缓存 + +// 执行超时:3 秒 +Future future = executor.submit(() -> expression.execute(env)); +try { + future.get(3, TimeUnit.SECONDS); +} catch (TimeoutException e) { + future.cancel(true); + scriptBlacklist.recordFailure(scriptId); +} +``` + +**黑名单机制**:脚本连续失败超过 3 次,加入黑名单 60 秒,期间直接跳过不执行。 + +### 11.4 规则链全量缓存 + +```java +// 启动时加载所有启用规则链到内存(含节点+连线) +Map ruleChainCache = new ConcurrentHashMap<>(); + +// 按 subsystemId + productId + deviceId 建立索引 +Map> ruleChainIndex; // key → ruleChainIds + +// CRUD 时驱逐并重新加载 +@CacheEvict + reload +``` + +不使用延迟加载(@Cacheable),避免首次消息处理的延迟和 TTL 窗口期数据不一致。 diff --git a/开发者文档/03-IoT领域/升级设计方案/05-告警体系设计.md b/开发者文档/03-IoT领域/升级设计方案/05-告警体系设计.md new file mode 100644 index 0000000..b9d3c18 --- /dev/null +++ b/开发者文档/03-IoT领域/升级设计方案/05-告警体系设计.md @@ -0,0 +1,192 @@ +# 05-告警体系设计 + +> 融合 JetLinks 两级存储 + ThingsBoard 完整状态机 + +--- + +## 一、告警状态机(5 状态) + +``` + ┌──── 持续触发 ────┐ + ↓ │ + ──→ ACTIVE ──→ ACTIVE(更新 endTs/details) + │ + ├──→ ACKNOWLEDGED(用户确认) + │ │ + │ └──→ CLEARED(规则自动/手动清除) + │ + └──→ CLEARED(规则自动/手动清除) + │ + └──→ RESOLVED(最终关闭) +``` + +| 状态 | 含义 | 触发方式 | +|------|------|---------| +| ACTIVE | 告警激活中 | 规则引擎 AlarmTriggerAction | +| ACKNOWLEDGED | 已确认 | 用户手动确认 | +| CLEARED | 已清除 | 规则引擎 AlarmClearAction 或手动 | +| RESOLVED | 已解决 | 用户最终关闭 | + +### 1.1 与现有系统对比 + +| v1.0 | v2.0 | +|------|------| +| `processStatus: Boolean` | 4 状态枚举 | +| 无确认步骤 | ACKNOWLEDGED 支持 | +| 手动恢复 | 规则自动 CLEARED | +| 无最终关闭 | RESOLVED 归档 | + +--- + +## 二、两级存储模型(借鉴 JetLinks) + +### 2.1 AlarmRecord(MySQL,当前状态) + +每个 `(设备 + 告警配置)` 组合最多一条记录,通过幂等 upsert 更新。 + +```sql +CREATE TABLE iot_alarm_record ( + id BIGINT PRIMARY KEY AUTO_INCREMENT, + record_key VARCHAR(64) NOT NULL, -- MD5(deviceId + "-" + alarmConfigId),幂等键 + alarm_config_id BIGINT NOT NULL, + alarm_name VARCHAR(128), -- 冗余 + severity TINYINT NOT NULL, -- 1=CRITICAL 2=MAJOR 3=MINOR 4=WARNING 5=INFO + state VARCHAR(16) NOT NULL, -- ACTIVE / ACKNOWLEDGED / CLEARED / RESOLVED + device_id BIGINT NOT NULL, + product_id BIGINT, + scene_rule_id BIGINT, + start_ts DATETIME NOT NULL, -- 首次触发时间 + end_ts DATETIME, -- 最近触发时间 + clear_ts DATETIME, -- 清除时间 + ack_ts DATETIME, -- 确认时间 + resolve_ts DATETIME, -- 解决时间 + details JSON, -- 告警详情(可累积) + trigger_count INT DEFAULT 1, -- 持续触发次数 + propagated_to JSON, -- 传播到的资产列表 + process_remark TEXT, -- 处理备注 + tenant_id BIGINT, + -- 审计字段 ... + UNIQUE KEY uk_record_key (record_key) +); +``` + +**幂等策略**(工程评审决议):保持 BIGINT 自增 ID(与全局一致),通过 `record_key = MD5(deviceId + "-" + alarmConfigId)` 唯一索引实现幂等。写入时使用 `INSERT ... ON DUPLICATE KEY UPDATE`。 + +### 2.2 AlarmHistory(TDengine,时序归档) + +每次告警状态变化都追加一条记录,用于趋势分析和审计。 + +```sql +CREATE STABLE alarm_history ( + ts TIMESTAMP, + alarm_record_id BIGINT, -- 关联 AlarmRecord.id + alarm_config_id BIGINT, + severity TINYINT, + state NCHAR(16), -- 变化后的状态 + trigger_data NCHAR(2048), -- 触发的设备消息 JSON + details NCHAR(2048), -- 告警详情快照 + operator NCHAR(64), -- 操作人(系统/用户) + remark NCHAR(256) -- 备注 +) TAGS ( + device_id BIGINT +); +``` + +--- + +## 三、告警严重度(5 级) + +```java +enum AlarmSeverity { + CRITICAL(1, "紧急"), // 需立即处理 + MAJOR(2, "重要"), // 尽快处理 + MINOR(3, "次要"), // 计划处理 + WARNING(4, "警告"), // 需关注 + INFO(5, "信息"); // 仅记录 +} +``` + +支持动态严重度:规则引擎中可根据条件动态设置(如温度 >50 为 CRITICAL,>40 为 MAJOR)。 + +--- + +## 四、告警传播 + +### 4.1 传播机制 + +告警创建时,沿 EntityRelation(Contains/Manages)向上遍历,在 `propagated_to` 字段记录传播路径: + +```json +{ + "propagated_to": [ + {"type": "ASSET", "id": 100, "name": "卫生间"}, + {"type": "ASSET", "id": 50, "name": "1楼"}, + {"type": "ASSET", "id": 10, "name": "楼宇A"} + ] +} +``` + +### 4.2 资产级告警聚合 + +Dashboard 查询某资产下的告警时,通过 `propagated_to` 字段的 JSON 包含查询实现。 + +--- + +## 五、通知集成(完善现有 TODO) + +### 5.1 通知通道 + +| 通道 | 实现 | 配置 | +|------|------|------| +| 短信 | SmsSendApi(现有 Feign) | 模板 ID + 变量映射 | +| 邮件 | MailSendApi(现有 Feign) | 模板 + 收件人列表 | +| 站内信 | NotifyMessageSendApi(现有 Feign) | 模板 + 用户列表 | +| Webhook | **新增** HttpPushAction | URL + Headers + Body 模板 | +| 企业微信/钉钉 | **新增** 扩展 NotifyProvider | 通过 SPI 扩展 | + +### 5.2 通知模板变量 + +| 变量 | 说明 | +|------|------| +| `${alarm.name}` | 告警名称 | +| `${alarm.severity}` | 严重度 | +| `${alarm.triggerCount}` | 触发次数 | +| `${device.name}` | 设备名称 | +| `${device.nickname}` | 设备备注名 | +| `${asset.name}` | 所属资产名称 | +| `${trigger.value}` | 触发值 | +| `${trigger.threshold}` | 阈值 | + +--- + +## 六、竞态保护(借鉴 JetLinks) + +### 6.1 告警缓存 + +```java +class AlarmRecordCache { + byte state; // 当前状态(0=无告警, 1=ACTIVE, 2=CLEARED) + long alarmTime; // 最近触发时间 + long clearTime; // 最近清除时间 +} +``` + +Redis 缓存 Key:`iot:alarm:cache:{recordId}` + +### 6.2 有效性判断 + +```java +// 触发告警:必须在清除时间之后(防止旧消息重新触发已清除的告警) +boolean isEffectiveTrigger(long timestamp) { + return timestamp > cache.clearTime || !cache.isActive(); +} + +// 清除告警:必须在最近触发时间之后 +boolean isEffectiveClear(long timestamp) { + return timestamp > cache.alarmTime && cache.isActive(); +} +``` + +### 6.3 分布式锁 + +对同一 `recordId` 的并发操作使用 Redis 分布式锁(`SETNX iot:alarm:lock:{recordId}`),超时 5 秒自动释放。 diff --git a/开发者文档/03-IoT领域/升级设计方案/06-设备影子与RPC.md b/开发者文档/03-IoT领域/升级设计方案/06-设备影子与RPC.md new file mode 100644 index 0000000..6b8a3c3 --- /dev/null +++ b/开发者文档/03-IoT领域/升级设计方案/06-设备影子与RPC.md @@ -0,0 +1,216 @@ +# 06-设备影子与 RPC + +> 参考:ThingsBoard 属性三元分类 + 持久化 RPC 状态机 + +--- + +## 一、设计目标 + +解决两个核心问题: +1. **属性同步**:设备配置下发后,平台如何知道设备是否已应用? +2. **离线指令**:设备不在线时下发的指令如何不丢失? + +--- + +## 二、Shared 属性同步机制 + +### 2.1 流程 + +``` +平台设置 Shared 属性(如 reportInterval=30) + ↓ 写入 Redis + MySQL + ↓ 检查设备在线? + ├── 在线 → 通过消息总线下发 thing.config.push + │ 设备收到后应用配置,上报 CLIENT 属性确认 + └── 离线 → 标记为 pending + 设备上线时,推送全部 pending 的 Shared 属性 +``` + +### 2.2 Shared 属性表 + +```sql +-- 复用现有 Redis Hash,增加 scope 维度 +-- Key: iot:device_property:{deviceId}:shared +-- Hash Field: identifier +-- Hash Value: {value, updateTime, synced} synced 标记设备是否已同步 +``` + +### 2.3 上线自动同步 + +`IotDeviceMessageSubscriber.forceDeviceOnline()` 中增加逻辑: + +```java +if (设备首次上线或重新上线) { + List pendingAttrs = getUnsyncedSharedAttributes(deviceId); + if (!pendingAttrs.isEmpty()) { + sendConfigPush(deviceId, pendingAttrs); + } +} +``` + +--- + +## 三、持久化 RPC 状态机 + +### 3.1 RPC 状态枚举 + +```java +enum RpcStatus { + QUEUED(0), // 已入队(设备离线时暂存) + SENT(10), // 已发送到网关 + DELIVERED(20), // 网关确认已推送到设备 + SUCCESS(30), // 设备回复成功 + FAILURE(40), // 设备回复失败 + TIMEOUT(50), // 超时未响应 + EXPIRED(60), // 过期未发送(设备长期离线) + CANCELED(70); // 手动取消 +} +``` + +### 3.2 状态流转 + +``` +创建 RPC → QUEUED + ├── 设备在线 → SENT → 设备回复 → SUCCESS / FAILURE + │ → 超时 → TIMEOUT + ├── 设备离线 → 保持 QUEUED + │ 设备上线 → 检查未过期的 QUEUED RPC → SENT → ... + └── 过期时间到 → EXPIRED +``` + +### 3.3 RPC 持久化表 + +```sql +CREATE TABLE iot_device_rpc ( + id BIGINT PRIMARY KEY AUTO_INCREMENT, + device_id BIGINT NOT NULL, + method VARCHAR(64) NOT NULL, -- 服务标识符 + params JSON, -- 调用参数 + status TINYINT NOT NULL DEFAULT 0, + response JSON, -- 设备响应 + request_id VARCHAR(64), -- 关联消息 requestId + persisted BOOLEAN DEFAULT TRUE, -- 是否持久化(false=瞬态,设备离线直接丢弃) + expiration_time DATETIME, -- 过期时间 + timeout_ms INT DEFAULT 30000, -- 单次超时(毫秒) + retry_count INT DEFAULT 0, -- 已重试次数 + max_retries INT DEFAULT 3, -- 最大重试次数 + create_time DATETIME, + update_time DATETIME, + tenant_id BIGINT, +); +``` + +### 3.4 RPC 执行流程 + +```java +// 1. 创建 RPC +IotDeviceRpcDO rpc = createRpc(deviceId, method, params, persisted, timeout, expiration); + +// 2. 检查设备在线 +if (isDeviceOnline(deviceId)) { + // 立即发送 + sendRpcToDevice(rpc); + updateStatus(rpc.id, SENT); + // 注册超时检查 + scheduleTimeout(rpc.id, timeout); +} else if (rpc.isPersisted()) { + // 持久化等待 + // 设备上线时由 IotDeviceMessageSubscriber 触发 pendingRpcCheck +} else { + // 瞬态 RPC,设备离线直接失败 + updateStatus(rpc.id, FAILURE); +} + +// 3. 设备回复 +void onRpcResponse(String requestId, Object response) { + IotDeviceRpcDO rpc = findByRequestId(requestId); + rpc.setResponse(response); + updateStatus(rpc.id, SUCCESS); + cancelTimeout(rpc.id); +} + +// 4. 超时处理 +void onRpcTimeout(Long rpcId) { + IotDeviceRpcDO rpc = getById(rpcId); + if (rpc.getRetryCount() < rpc.getMaxRetries()) { + rpc.setRetryCount(rpc.getRetryCount() + 1); + sendRpcToDevice(rpc); // 重试 + } else { + updateStatus(rpcId, TIMEOUT); + } +} +``` + +### 3.5 设备上线时的 Pending RPC 处理 + +```java +// IotDeviceMessageSubscriber 中,设备上线后 +void onDeviceOnline(Long deviceId) { + List pendingRpcs = rpcService.getByDeviceIdAndStatus(deviceId, QUEUED); + + // 【工程评审决议】限速:防止长期离线设备上线后一次性涌入大量指令导致设备过载 + // 按创建时间排序,每秒最多发送 5 条 + pendingRpcs.sort(Comparator.comparing(IotDeviceRpcDO::getCreateTime)); + RateLimiter limiter = RateLimiter.create(5.0); // 5 条/秒 + + for (IotDeviceRpcDO rpc : pendingRpcs) { + if (rpc.getExpirationTime() != null && rpc.getExpirationTime().isBefore(now())) { + updateStatus(rpc.getId(), EXPIRED); // 已过期 + } else { + limiter.acquire(); // 限速等待 + sendRpcToDevice(rpc); + updateStatus(rpc.getId(), SENT); + } + } +} +``` + +--- + +## 四、RPC 过期清理 Job + +```java +@XxlJob("rpcExpirationCheckJob") +void checkExpiredRpcs() { + List expiredRpcs = rpcService.getExpiredQueuedRpcs(now()); + for (IotDeviceRpcDO rpc : expiredRpcs) { + updateStatus(rpc.getId(), EXPIRED); + } +} +``` + +建议每 5 分钟执行一次。 + +--- + +## 五、API 接口 + +| 路径 | 方法 | 说明 | +|------|------|------| +| `/iot/device/rpc/send` | POST | 发送 RPC(同步等待或异步) | +| `/iot/device/rpc/get` | GET | 查询 RPC 状态 | +| `/iot/device/rpc/page` | GET | 分页查询设备 RPC 历史 | +| `/iot/device/rpc/cancel` | PUT | 取消未发送的 RPC | +| `/iot/device/shadow/shared` | GET | 获取设备 Shared 属性 | +| `/iot/device/shadow/shared` | PUT | 设置 Shared 属性(触发同步) | + +--- + +## 六、与 Feign API 集成 + +现有 `IotDeviceControlApi.invokeService()` 增强为支持 RPC 模式: + +```java +IotDeviceServiceInvokeReqDTO { + // ... 现有字段 ... + Boolean persisted; // 【新增】是否持久化(默认 true) + LocalDateTime expiration; // 【新增】过期时间 + Integer maxRetries; // 【新增】最大重试次数(默认 3) +} + +IotDeviceServiceInvokeRespDTO { + // ... 现有字段 ... + Long rpcId; // 【新增】RPC 记录 ID(异步时返回) + String rpcStatus; // 【新增】RPC 当前状态 +} +``` diff --git a/开发者文档/03-IoT领域/升级设计方案/07-数据存储方案.md b/开发者文档/03-IoT领域/升级设计方案/07-数据存储方案.md new file mode 100644 index 0000000..5713d66 --- /dev/null +++ b/开发者文档/03-IoT领域/升级设计方案/07-数据存储方案.md @@ -0,0 +1,243 @@ +# 07-数据存储方案 + +> 存储策略插件化 + 写入缓冲 + TDengine 优化 + +--- + +## 一、存储策略插件化(借鉴 JetLinks) + +### 1.1 设计动机 + +现有系统所有产品设备数据统一存 TDengine,无法针对不同场景选择最优存储。引入策略模式,支持产品级配置。 + +### 1.2 策略接口 + +```java +interface ThingsDataStorageStrategy { + String getId(); // "tdengine-column" / "tdengine-row" + int getOrder(); // 优先级 + + void saveProperties(Long deviceId, Map properties, LocalDateTime reportTime); + List queryHistory(Long deviceId, String identifier, TimeRange range, int limit); + Map getLatest(Long deviceId); + void defineTable(Long productId, List thingModels); + void alterTable(Long productId, List oldModels, List newModels); +} +``` + +### 1.3 内置策略 + +| 策略 ID | 说明 | 适用场景 | +| ----------------- | --------------- | -------------- | +| `tdengine-column` | 列模式(现有方案),每属性一列 | 属性固定、查询频繁 | +| `tdengine-row` | 行模式,每属性一行 | 属性动态变化、物模型频繁变更 | + +### 1.4 列模式 vs 行模式 + +**列模式(现有,保留为默认)**: +```sql +CREATE STABLE product_property_{productId} ( + ts TIMESTAMP, report_time TIMESTAMP, + temperature INT, humidity FLOAT, status TINYINT -- 每属性一列 +) TAGS (device_id BIGINT); +``` +- 优点:查询快,单行包含所有属性 +- 缺点:物模型变更需 ALTER TABLE,TDengine 不支持缩短字段 + +**行模式(新增)**: +```sql +CREATE STABLE product_property_row_{productId} ( + ts TIMESTAMP, report_time TIMESTAMP, + identifier NCHAR(64), -- 属性标识符 + value_int INT, -- 整型值 + value_float FLOAT, -- 浮点值 + value_double DOUBLE, -- 双精度值 + value_bool TINYINT, -- 布尔值 + value_str NCHAR(1024), -- 字符串值 + scope TINYINT -- 属性域(1=CLIENT, 2=SERVER, 3=SHARED) +) TAGS (device_id BIGINT); +``` +- 优点:物模型变更无需 DDL,属性可任意扩展 +- 缺点:查询需按 identifier 过滤,同时查多属性需 PIVOT + +### 1.5 产品级配置 + +```sql +ALTER TABLE iot_product ADD COLUMN store_policy VARCHAR(32) DEFAULT 'tdengine-column'; +``` + +`ThingsDataStorageRouter` 按 productId 路由到对应策略: +```java +ThingsDataStorageStrategy getStrategy(Long productId) { + String policy = productService.getProduct(productId).getStorePolicy(); + return strategyMap.get(policy); // 策略注册表 +} +``` + +--- + +## 二、写入缓冲(借鉴 JetLinks PersistenceBuffer) + +### 2.1 设计动机 + +设备高频上报时(如 1Hz × 1000 台),每条消息直接写 TDengine 会产生大量小批次 INSERT。引入写入缓冲,批量合并后写入。 + +### 2.2 PersistenceBuffer 设计 + +```java +class PersistenceBuffer { + // 配置 + int bufferSize = 200; // 数量触发阈值 + Duration bufferTimeout = Duration.ofSeconds(3); // 时间触发阈值 + int parallelism = 4; // 并行写出线程数 + + // 内部结构 + BlockingQueue queue; // 内存队列 + ScheduledExecutorService timer; // 定时刷新 + + // 写入方法 + void write(T item) { + queue.add(item); + if (queue.size() >= bufferSize) { + flush(); + } + } + + // 批量刷出 + void flush() { + List batch = new ArrayList<>(); + queue.drainTo(batch, bufferSize); + if (!batch.isEmpty()) { + batchWriter.accept(batch); // 回调:批量写入 TDengine + } + } +} +``` + +### 2.3 应用点 + +| 缓冲实例 | 写入目标 | bufferSize | timeout | +|---------|---------|-----------|---------| +| 属性写入缓冲 | TDengine `product_property_*` | 200 | 3s | +| 消息日志缓冲 | TDengine `device_message` | 500 | 5s | +| 告警历史缓冲 | TDengine `alarm_history` | 100 | 3s | +| 规则调试日志缓冲 | TDengine `rule_debug_log` | 100 | 5s | + +--- + +## 三、TDengine 优化 + +### 3.1 消息超级表优化 + +现有 `device_message` 表的 `params` 和 `data` 字段为 `NCHAR(2048)`,大消息会截断。 + +优化方案: +```sql +-- 增大到 4096,覆盖绝大多数场景 +ALTER STABLE device_message MODIFY COLUMN params NCHAR(4096); +ALTER STABLE device_message MODIFY COLUMN data NCHAR(4096); +``` + +### 3.2 新增超级表 + +```sql +-- 告警历史(两级告警的时序部分) +CREATE STABLE IF NOT EXISTS alarm_history ( + ts TIMESTAMP, + alarm_record_id NCHAR(64), + alarm_config_id BIGINT, + severity TINYINT, + state NCHAR(16), + trigger_data NCHAR(4096), + details NCHAR(2048), + operator NCHAR(64), + remark NCHAR(256) +) TAGS (device_id BIGINT); + +-- 规则调试日志 +CREATE STABLE IF NOT EXISTS rule_debug_log ( + ts TIMESTAMP, + rule_chain_id BIGINT, + node_id BIGINT, + node_type NCHAR(64), + input_data NCHAR(4096), + output_data NCHAR(4096), + duration_ms INT, + success BOOL, + error_msg NCHAR(512) +) TAGS (device_id BIGINT); +``` + +### 3.3 数据保留策略 + +| 超级表 | 建议保留期 | 配置方式 | +|--------|----------|---------| +| `device_message` | 90 天 | TDengine `KEEP 90d` | +| `product_property_*` | 365 天 | TDengine `KEEP 365d` | +| `alarm_history` | 365 天 | TDengine `KEEP 365d` | +| `rule_debug_log` | 7 天 | TDengine `KEEP 7d` | + +--- + +## 四、Redis 缓存优化 + +### 4.1 修复生产隐患 + +| 问题 | 现状 | 修复 | +|------|------|------| +| `keys()` 全量扫描 | `SignalLossRuleProcessor` 用 `keys("iot:clean:signal:loss:*")` | 改用 `SCAN` 游标遍历 | +| 部分缓存无 TTL | `iot:device_property:*`、`iot:device_server_id` 常驻 | 设备删除时主动清理 | +| 告警缓存竞态 | 无并发保护 | 分布式锁 + RecordCache | + +### 4.2 新增 Redis Key + +| Key | 类型 | 用途 | TTL | +|-----|------|------|-----| +| `iot:alarm:cache:{recordId}` | Hash | 告警状态缓存 | 7 天 | +| `iot:alarm:lock:{recordId}` | String | 告警操作分布式锁 | 5s | +| `iot:device_property:{did}:shared` | Hash | Shared 属性 | 常驻 | +| `iot:device_property:{did}:server` | Hash | Server 属性 | 常驻 | +| `iot:rpc:pending:{deviceId}` | List | 待发送 RPC 队列(辅助索引) | 7 天 | + +--- + +## 五、可观测性埋点(Micrometer) + +### 5.1 核心指标 + +| 指标名 | 类型 | 标签 | 说明 | +|--------|------|------|------| +| `iot.device.online` | Gauge | protocol | 在线设备数 | +| `iot.message.received` | Counter | method, protocol | 上行消息数 | +| `iot.message.sent` | Counter | method | 下行消息数 | +| `iot.rule.execution` | Timer | ruleChainId, nodeType | 规则节点执行耗时 | +| `iot.rule.trigger` | Counter | ruleChainId, result | 规则触发次数(命中/未命中) | +| `iot.alarm.active` | Gauge | severity | 活跃告警数 | +| `iot.property.write` | Counter | productId | 属性写入次数 | +| `iot.buffer.size` | Gauge | bufferName | 缓冲区当前大小 | +| `iot.rpc.status` | Counter | status | RPC 状态分布 | + +### 5.2 集成方式 + +```java +@Component +class IotMetrics { + private final MeterRegistry registry; + + // 设备上线 + void recordDeviceOnline(String protocol) { + Metrics.gauge("iot.device.online", Tags.of("protocol", protocol), onlineCount); + } + + // 规则执行耗时 + void recordRuleExecution(Long chainId, String nodeType, Duration duration) { + Timer.builder("iot.rule.execution") + .tag("ruleChainId", String.valueOf(chainId)) + .tag("nodeType", nodeType) + .register(registry) + .record(duration); + } +} +``` + +Prometheus 端点:`/actuator/prometheus`,Grafana Dashboard 可直接对接。 diff --git a/开发者文档/03-IoT领域/升级设计方案/08-协议与编解码扩展.md b/开发者文档/03-IoT领域/升级设计方案/08-协议与编解码扩展.md new file mode 100644 index 0000000..8fb214c --- /dev/null +++ b/开发者文档/03-IoT领域/升级设计方案/08-协议与编解码扩展.md @@ -0,0 +1,179 @@ +# 08-协议与编解码扩展 + +> 参考:JetLinks 协议包热加载 + Codec SPI 增强 + +--- + +## 一、设计目标 + +将现有的"每种设备写 Java Codec 编译部署"模式,升级为 **Codec SPI 注册 + 协议包热加载**,新设备接入从"写代码"变为"填配置 + 可选脚本"。 + +--- + +## 二、Codec SPI 增强 + +### 2.1 现有接口保留 + +```java +interface IotDeviceMessageCodec { + byte[] encode(IotDeviceMessage message); + IotDeviceMessage decode(byte[] bytes); + String type(); +} +``` + +### 2.2 增强:Codec 自注册 + 元数据 + +```java +@Retention(RetentionPolicy.RUNTIME) +@Target(ElementType.TYPE) +@interface CodecMeta { + String type(); // 类型标识(如 "PEOPLE_COUNTER") + String name(); // 显示名称(如 "客流计数器") + String protocol(); // 适用协议("mqtt" / "http" / "tcp" / "all") + String description() default ""; +} + +// 示例 +@CodecMeta(type = "PEOPLE_COUNTER", name = "客流计数器", protocol = "http") +public class IotPeopleCounterCodec implements IotDeviceMessageCodec { ... } +``` + +### 2.3 Codec 注册表 + +```java +@Component +class CodecRegistry { + Map codecs = new ConcurrentHashMap<>(); + + @PostConstruct + void init() { + // 1. 扫描 @CodecMeta 注解的 Spring Bean + // 2. 扫描外部 JAR 中的实现 + } + + void register(IotDeviceMessageCodec codec) { ... } + void unregister(String type) { ... } + IotDeviceMessageCodec get(String type) { ... } + List listAll() { ... } // 前端下拉列表 +} +``` + +--- + +## 三、协议包热加载(借鉴 JetLinks) + +### 3.1 架构 + +``` +协议包 JAR 文件 + ↓ 上传到文件管理器(或配置本地路径/HTTP URL) +CodecJarLoader(独立 ClassLoader 加载) + ↓ 扫描 META-INF/services/IotDeviceMessageCodec 或 @CodecMeta + ↓ 实例化并注册到 CodecRegistry + ↓ +网关运行时使用新 Codec,无需重启 +``` + +### 3.2 实现 + +```java +class CodecJarLoader { + + IotDeviceMessageCodec loadFromJar(String jarPath) { + // 独立 ClassLoader,隔离依赖 + URLClassLoader classLoader = new URLClassLoader( + new URL[]{new File(jarPath).toURI().toURL()}, + getClass().getClassLoader() // 父加载器 + ); + + // SPI 发现 + ServiceLoader loader = + ServiceLoader.load(IotDeviceMessageCodec.class, classLoader); + + for (IotDeviceMessageCodec codec : loader) { + codecRegistry.register(codec); + log.info("Loaded codec: {} from {}", codec.type(), jarPath); + } + } + + void unloadJar(String jarPath) { + // 注销 codec + 关闭 ClassLoader + } +} +``` + +### 3.3 协议包 JAR 规范 + +``` +my-custom-codec.jar +├── META-INF/ +│ └── services/ +│ └── com.viewsh.module.iot.gateway.codec.IotDeviceMessageCodec +│ → com.example.MyCustomCodec(全限定类名) +└── com/example/ + └── MyCustomCodec.class +``` + +### 3.4 热加载管理 API + +| 路径 | 方法 | 说明 | +|------|------|------| +| `/iot/codec/list` | GET | 列出所有已注册 Codec | +| `/iot/codec/upload` | POST | 上传协议包 JAR 并热加载 | +| `/iot/codec/unload` | DELETE | 卸载指定协议包 | +| `/iot/codec/reload` | PUT | 重新加载指定协议包 | + +--- + +## 四、脚本化透传编解码(借鉴 JetLinks TransparentCodec) + +### 4.1 设计动机 + +对于简单的 JSON 格式差异(如字段名映射),不需要写 JAR,用脚本配置即可。 + +### 4.2 脚本 Codec 配置 + +```json +{ + "type": "SCRIPT_CODEC", + "config": { + "decodeScript": "let props = {}; props.temperature = raw.temp; props.humidity = raw.humi; return {method: 'thing.property.post', params: props};", + "encodeScript": "return JSON.stringify({cmd: msg.method, data: msg.params});" + } +} +``` + +### 4.3 实现 + +```java +@CodecMeta(type = "SCRIPT_CODEC", name = "脚本编解码", protocol = "all") +class ScriptCodec implements IotDeviceMessageCodec { + private AviatorEvaluatorInstance engine; + private Expression decodeExpr; + private Expression encodeExpr; + + void init(JsonNode config) { + decodeExpr = engine.compile(config.get("decodeScript").asText()); + encodeExpr = engine.compile(config.get("encodeScript").asText()); + } + + IotDeviceMessage decode(byte[] bytes) { + Map raw = JsonUtils.parseMap(bytes); + Map result = (Map) decodeExpr.execute(Map.of("raw", raw)); + return IotDeviceMessage.requestOf(result.get("method"), result.get("params")); + } +} +``` + +--- + +## 五、迁移兼容 + +### 5.1 现有 7 个 Codec 不变 + +所有现有 Codec(Alink/Camera3D11/HenghuaD5/PeopleCounter/JT808/TcpBinary/TcpJson)继续保留为内置实现,添加 `@CodecMeta` 注解即可自动注册。 + +### 5.2 产品配置不变 + +`iot_product.codec_type` 字段继续使用字符串标识,与 `CodecRegistry` 中的 `type` 对应。新增 Codec 后,前端下拉列表自动包含新选项。