diff --git a/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/integration/consumer/CleanOrderArriveEventHandler.java b/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/integration/consumer/CleanOrderArriveEventHandler.java new file mode 100644 index 0000000..123e6cc --- /dev/null +++ b/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/integration/consumer/CleanOrderArriveEventHandler.java @@ -0,0 +1,193 @@ +package com.viewsh.module.ops.environment.integration.consumer; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.viewsh.module.ops.core.lifecycle.OrderLifecycleManager; +import com.viewsh.module.ops.core.lifecycle.model.OrderTransitionRequest; +import com.viewsh.module.ops.dal.dataobject.workorder.OpsOrderDO; +import com.viewsh.module.ops.dal.mysql.workorder.OpsOrderMapper; +import com.viewsh.module.ops.enums.OperatorTypeEnum; +import com.viewsh.module.ops.enums.WorkOrderStatusEnum; +import com.viewsh.module.ops.environment.integration.dto.CleanOrderArriveEventDTO; +import jakarta.annotation.Resource; +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.spring.annotation.ConsumeMode; +import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; +import org.apache.rocketmq.spring.core.RocketMQListener; +import org.springframework.data.redis.core.StringRedisTemplate; +import org.springframework.stereotype.Component; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +/** + * 保洁工单到岗事件消费者 + *

+ * 订阅 IoT 模块发布的保洁工单到岗事件 + *

+ * RocketMQ 配置: + * - Topic: ops.order.arrive + * - ConsumerGroup: ops-clean-order-arrive-group + * + * @author AI + */ +@Slf4j +@Component +@RocketMQMessageListener( + topic = "ops.order.arrive", + consumerGroup = "ops-clean-order-arrive-group", + consumeMode = ConsumeMode.CONCURRENTLY, + selectorExpression = "*" +) +public class CleanOrderArriveEventHandler implements RocketMQListener { + + /** + * 幂等性控制 Key 模式 + */ + private static final String DEDUP_KEY_PATTERN = "ops:clean:dedup:arrive:%s"; + + /** + * 幂等性控制 TTL(秒) + */ + private static final int DEDUP_TTL_SECONDS = 300; + + /** + * 设备当前工单缓存 Key 模式 + */ + private static final String ORDER_CACHE_KEY_PATTERN = "ops:clean:device:order:%s"; + + /** + * 工单缓存 TTL(秒) + */ + private static final int ORDER_CACHE_TTL_SECONDS = 3600; + + @Resource + private ObjectMapper objectMapper; + + @Resource + private StringRedisTemplate stringRedisTemplate; + + @Resource + private OpsOrderMapper opsOrderMapper; + + @Resource + private OrderLifecycleManager orderLifecycleManager; + + @Override + public void onMessage(String message) { + try { + // 1. JSON 反序列化 + CleanOrderArriveEventDTO event = objectMapper.readValue(message, CleanOrderArriveEventDTO.class); + + // 2. 幂等性检查 + String dedupKey = String.format(DEDUP_KEY_PATTERN, event.getEventId()); + Boolean firstTime = stringRedisTemplate.opsForValue() + .setIfAbsent(dedupKey, "1", DEDUP_TTL_SECONDS, TimeUnit.SECONDS); + + if (!firstTime) { + log.debug("[CleanOrderArriveEventHandler] 重复消息,跳过处理: eventId={}", event.getEventId()); + return; + } + + // 3. 业务处理 + handleOrderArrive(event); + + } catch (Exception e) { + log.error("[CleanOrderArriveEventHandler] 消息处理失败: message={}", message, e); + throw new RuntimeException("保洁工单到岗事件处理失败", e); + } + } + + /** + * 处理工单到岗 + */ + private void handleOrderArrive(CleanOrderArriveEventDTO event) { + log.info("[CleanOrderArriveEventHandler] 收到到岗事件: eventId={}, orderId={}, deviceId={}, areaId={}", + event.getEventId(), event.getOrderId(), event.getDeviceId(), event.getAreaId()); + + // 1. 查询工单 + OpsOrderDO order = opsOrderMapper.selectById(event.getOrderId()); + if (order == null) { + log.warn("[CleanOrderArriveEventHandler] 工单不存在: orderId={}", event.getOrderId()); + return; + } + + // 2. 检查工单状态 + WorkOrderStatusEnum currentStatus = WorkOrderStatusEnum.fromStatus(order.getStatus()); + if (!currentStatus.canStartWorking()) { + log.warn("[CleanOrderArriveEventHandler] 工单状态不允许到岗: orderId={}, status={}", + event.getOrderId(), order.getStatus()); + return; + } + + // 3. 更新工单的设备信息(扩展字段) + order.setAssigneeDeviceId(event.getDeviceId()); + order.setAssigneeDeviceKey(event.getDeviceKey()); + opsOrderMapper.updateById(order); + + // 4. 构建状态转换请求 + Map payload = new HashMap<>(); + payload.put("deviceId", event.getDeviceId()); + payload.put("deviceKey", event.getDeviceKey()); + payload.put("areaId", event.getAreaId()); + payload.put("triggerSource", event.getTriggerSource()); + if (event.getTriggerData() != null) { + payload.putAll(event.getTriggerData()); + } + + OrderTransitionRequest request = OrderTransitionRequest.builder() + .orderId(event.getOrderId()) + .targetStatus(WorkOrderStatusEnum.ARRIVED) + .operatorType(OperatorTypeEnum.SYSTEM) + .reason("蓝牙信标自动到岗确认") + .payload(payload) + .build(); + + // 5. 通过生命周期管理器执行状态转换(DISPATCHED/CONFIRMED -> ARRIVED) + orderLifecycleManager.transition(request); + + // 6. 更新 Redis 缓存(设备当前工单) + cacheDeviceCurrentOrder(event); + + log.info("[CleanOrderArriveEventHandler] 工单到岗成功: eventId={}, orderId={}, deviceId={}", + event.getEventId(), event.getOrderId(), event.getDeviceId()); + } + + /** + * 缓存设备当前工单 + *

+ * 供 IoT 模块的规则处理器查询 + */ + private void cacheDeviceCurrentOrder(CleanOrderArriveEventDTO event) { + try { + String cacheKey = String.format(ORDER_CACHE_KEY_PATTERN, event.getDeviceId()); + + // 构建缓存数据 + StringBuilder cacheData = new StringBuilder(); + cacheData.append("{"); + cacheData.append("\"orderId\":").append(event.getOrderId()).append(","); + cacheData.append("\"status\":\"").append(WorkOrderStatusEnum.ARRIVED.getStatus()).append("\","); + cacheData.append("\"areaId\":").append(event.getAreaId()); + + // 如果有信标 MAC,也缓存 + if (event.getTriggerData() != null && event.getTriggerData().containsKey("beaconMac")) { + cacheData.append(",\"beaconMac\":\"").append(event.getTriggerData().get("beaconMac")).append("\""); + } + + cacheData.append("}"); + + // 写入 Redis + stringRedisTemplate.opsForValue().set( + cacheKey, + cacheData.toString(), + ORDER_CACHE_TTL_SECONDS, + TimeUnit.SECONDS + ); + + log.debug("[CleanOrderArriveEventHandler] 设备工单缓存已更新: deviceId={}, orderId={}", + event.getDeviceId(), event.getOrderId()); + } catch (Exception e) { + log.error("[CleanOrderArriveEventHandler] 设备工单缓存更新失败: deviceId={}", event.getDeviceId(), e); + } + } +} diff --git a/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/integration/consumer/CleanOrderAuditEventHandler.java b/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/integration/consumer/CleanOrderAuditEventHandler.java new file mode 100644 index 0000000..8a19eb3 --- /dev/null +++ b/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/integration/consumer/CleanOrderAuditEventHandler.java @@ -0,0 +1,217 @@ +package com.viewsh.module.ops.environment.integration.consumer; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.viewsh.module.ops.environment.integration.dto.CleanOrderAuditEventDTO; +import com.viewsh.module.ops.infrastructure.log.context.BusinessLogContext; +import com.viewsh.module.ops.infrastructure.log.enumeration.LogScope; +import com.viewsh.module.ops.infrastructure.log.enumeration.LogType; +import com.viewsh.module.ops.infrastructure.log.publisher.BusinessLogPublisher; +import com.viewsh.module.iot.api.device.IotDeviceControlApi; +import com.viewsh.module.iot.api.device.dto.IotDeviceServiceInvokeReqDTO; +import jakarta.annotation.Resource; +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.spring.annotation.ConsumeMode; +import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; +import org.apache.rocketmq.spring.core.RocketMQListener; +import org.springframework.data.redis.core.StringRedisTemplate; +import org.springframework.stereotype.Component; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +/** + * 保洁工单审计事件消费者 + *

+ * 订阅 IoT 模块��布的保洁工单审计事件 + * 用于记录非状态变更的业务审计日志(如警告发送、抑制操作等) + *

+ * RocketMQ 配置: + * - Topic: ops.order.audit + * - ConsumerGroup: ops-clean-order-audit-group + * + * @author AI + */ +@Slf4j +@Component +@RocketMQMessageListener( + topic = "ops.order.audit", + consumerGroup = "ops-clean-order-audit-group", + consumeMode = ConsumeMode.CONCURRENTLY, + selectorExpression = "*" +) +public class CleanOrderAuditEventHandler implements RocketMQListener { + + /** + * 幂等性控制 Key 模式 + */ + private static final String DEDUP_KEY_PATTERN = "ops:clean:dedup:audit:%s"; + + /** + * 幂等性控制 TTL(秒) + */ + private static final int DEDUP_TTL_SECONDS = 300; + + @Resource + private ObjectMapper objectMapper; + + @Resource + private StringRedisTemplate stringRedisTemplate; + + @Resource + private BusinessLogPublisher businessLogPublisher; + + @Resource + private IotDeviceControlApi iotDeviceControlApi; + + @Override + public void onMessage(String message) { + try { + // 1. JSON 反序列化 + CleanOrderAuditEventDTO event = objectMapper.readValue(message, CleanOrderAuditEventDTO.class); + + // 2. 幂等性检查 + String dedupKey = String.format(DEDUP_KEY_PATTERN, event.getEventId()); + Boolean firstTime = stringRedisTemplate.opsForValue() + .setIfAbsent(dedupKey, "1", DEDUP_TTL_SECONDS, TimeUnit.SECONDS); + + if (!firstTime) { + log.debug("[CleanOrderAuditEventHandler] 重复消息,跳过处理: eventId={}", event.getEventId()); + return; + } + + // 3. 业务处理 + handleAuditEvent(event); + + } catch (Exception e) { + log.error("[CleanOrderAuditEventHandler] 消息处理失败: message={}", message, e); + // 审计日志失败不抛出异常,避免影响主流程 + } + } + + /** + * 处理审计事件 + */ + private void handleAuditEvent(CleanOrderAuditEventDTO event) { + log.debug("[CleanOrderAuditEventHandler] 收到审计事件: eventId={}, auditType={}, message={}", + event.getEventId(), event.getAuditType(), event.getMessage()); + + // 1. 确定日志级别和类型 + LogType logType = determineLogType(event.getAuditType()); + boolean isSuccess = determineSuccess(event.getAuditType()); + + // 2. 构建业务日志上下文 + BusinessLogContext logContext = BusinessLogContext.builder() + .type(logType) + .scope(LogScope.ORDER) + .description(event.getMessage()) + .targetId(event.getOrderId()) + .targetType("order") + .operatorType("SYSTEM") + .success(isSuccess) + .build(); + + // 3. 添加扩展信息 + if (event.getDeviceId() != null) { + logContext.putExtra("deviceId", event.getDeviceId()); + } + if (event.getDeviceKey() != null) { + logContext.putExtra("deviceKey", event.getDeviceKey()); + } + if (event.getAreaId() != null) { + logContext.putExtra("areaId", event.getAreaId()); + } + if (event.getAuditType() != null) { + logContext.putExtra("auditType", event.getAuditType()); + } + if (event.getData() != null) { + event.getData().forEach(logContext::putExtra); + } + + // 4. 发布业务日志 + if (isSuccess) { + businessLogPublisher.publishSuccess(logContext); + } else { + businessLogPublisher.publishFailure(logContext, event.getMessage()); + } + + log.debug("[CleanOrderAuditEventHandler] 审计日志已记录: eventId={}, auditType={}", + event.getEventId(), event.getAuditType()); + + // 5. 如果是 TTS 请求,调用 IoT 模块下发语音 + if ("TTS_REQUEST".equals(event.getAuditType()) && event.getDeviceId() != null) { + handleTtsRequest(event); + } + } + + /** + * 处理 TTS 请求 + *

+ * 调用 IoT 模块的设备控制接口,下发语音播报到工牌设备 + * + * @param event 审计事件 + */ + private void handleTtsRequest(CleanOrderAuditEventDTO event) { + try { + // 1. 从审计数据中提取 TTS 文本 + String ttsText = null; + if (event.getData() != null && event.getData().containsKey("tts")) { + ttsText = (String) event.getData().get("tts"); + } + + if (ttsText == null || ttsText.isEmpty()) { + log.warn("[CleanOrderAuditEventHandler] TTS 文本为空,跳过下发: eventId={}", event.getEventId()); + return; + } + + // 2. 构建服务调用请求 + Map params = new HashMap<>(); + params.put("text", ttsText); + params.put("volume", 80); // 默认音量 80% + + IotDeviceServiceInvokeReqDTO reqDTO = IotDeviceServiceInvokeReqDTO.builder() + .deviceId(event.getDeviceId()) + .identifier("playVoice") // 语音播报服务标识符 + .params(params) + .timeoutSeconds(30) + .build(); + + // 3. 调用 IoT 模块 RPC 接口 + var result = iotDeviceControlApi.invokeService(reqDTO); + + if (result.getData() != null && result.getData().getSuccess()) { + log.info("[CleanOrderAuditEventHandler] TTS 下发成功: eventId={}, deviceId={}, text={}", + event.getEventId(), event.getDeviceId(), ttsText); + } else { + log.warn("[CleanOrderAuditEventHandler] TTS 下发失败: eventId={}, deviceId={}, error={}", + event.getEventId(), event.getDeviceId(), + result.getData() != null ? result.getData().getErrorMsg() : "Unknown error"); + } + + } catch (Exception e) { + log.error("[CleanOrderAuditEventHandler] TTS 下发异常: eventId={}, deviceId={}", + event.getEventId(), event.getDeviceId(), e); + // TTS 失败不影响主流程,仅记录日志 + } + } + + /** + * 确定日志类型 + */ + private LogType determineLogType(String auditType) { + if (auditType.startsWith("BEACON_") || auditType.contains("BEACON")) { + return LogType.DEVICE; + } else if (auditType.equals("TTS_REQUEST")) { + return LogType.NOTIFICATION; + } else { + return LogType.SYSTEM; + } + } + + /** + * 确定是否成功 + */ + private boolean determineSuccess(String auditType) { + return !auditType.endsWith("_WARNING") && !auditType.endsWith("_SUPPRESSED") && !auditType.endsWith("_REJECTED"); + } +} diff --git a/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/integration/consumer/CleanOrderCompleteEventHandler.java b/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/integration/consumer/CleanOrderCompleteEventHandler.java new file mode 100644 index 0000000..386df6c --- /dev/null +++ b/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/integration/consumer/CleanOrderCompleteEventHandler.java @@ -0,0 +1,181 @@ +package com.viewsh.module.ops.environment.integration.consumer; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.viewsh.module.ops.core.lifecycle.OrderLifecycleManager; +import com.viewsh.module.ops.core.lifecycle.model.OrderTransitionRequest; +import com.viewsh.module.ops.dal.dataobject.workorder.OpsOrderDO; +import com.viewsh.module.ops.dal.mysql.workorder.OpsOrderMapper; +import com.viewsh.module.ops.enums.OperatorTypeEnum; +import com.viewsh.module.ops.enums.WorkOrderStatusEnum; +import com.viewsh.module.ops.environment.integration.dto.CleanOrderCompleteEventDTO; +import jakarta.annotation.Resource; +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.spring.annotation.ConsumeMode; +import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; +import org.apache.rocketmq.spring.core.RocketMQListener; +import org.springframework.data.redis.core.StringRedisTemplate; +import org.springframework.stereotype.Component; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +/** + * 保洁工单完成事件消费者 + *

+ * 订阅 IoT 模块发布的保洁工单完成事件 + *

+ * RocketMQ 配置: + * - Topic: ops.order.complete + * - ConsumerGroup: ops-clean-order-complete-group + * + * @author AI + */ +@Slf4j +@Component +@RocketMQMessageListener( + topic = "ops.order.complete", + consumerGroup = "ops-clean-order-complete-group", + consumeMode = ConsumeMode.CONCURRENTLY, + selectorExpression = "*" +) +public class CleanOrderCompleteEventHandler implements RocketMQListener { + + /** + * 幂等性控制 Key 模式 + */ + private static final String DEDUP_KEY_PATTERN = "ops:clean:dedup:complete:%s"; + + /** + * 幂等性控制 TTL(秒) + */ + private static final int DEDUP_TTL_SECONDS = 300; + + /** + * 设备当前工单缓存 Key 模式 + */ + private static final String ORDER_CACHE_KEY_PATTERN = "ops:clean:device:order:%s"; + + @Resource + private ObjectMapper objectMapper; + + @Resource + private StringRedisTemplate stringRedisTemplate; + + @Resource + private OpsOrderMapper opsOrderMapper; + + @Resource + private OrderLifecycleManager orderLifecycleManager; + + @Override + public void onMessage(String message) { + try { + // 1. JSON 反序列化 + CleanOrderCompleteEventDTO event = objectMapper.readValue(message, CleanOrderCompleteEventDTO.class); + + // 2. 幂等性检查 + String dedupKey = String.format(DEDUP_KEY_PATTERN, event.getEventId()); + Boolean firstTime = stringRedisTemplate.opsForValue() + .setIfAbsent(dedupKey, "1", DEDUP_TTL_SECONDS, TimeUnit.SECONDS); + + if (!firstTime) { + log.debug("[CleanOrderCompleteEventHandler] 重复消息,跳过处理: eventId={}", event.getEventId()); + return; + } + + // 3. 业务处理 + handleOrderComplete(event); + + } catch (Exception e) { + log.error("[CleanOrderCompleteEventHandler] 消息处理失败: message={}", message, e); + throw new RuntimeException("保洁工单完成事件处理失败", e); + } + } + + /** + * 处理工单完成 + */ + private void handleOrderComplete(CleanOrderCompleteEventDTO event) { + log.info("[CleanOrderCompleteEventHandler] 收到完成事件: eventId={}, orderId={}, deviceId={}, areaId={}", + event.getEventId(), event.getOrderId(), event.getDeviceId(), event.getAreaId()); + + // 1. 查询工单 + OpsOrderDO order = opsOrderMapper.selectById(event.getOrderId()); + if (order == null) { + log.warn("[CleanOrderCompleteEventHandler] 工单不存在: orderId={}", event.getOrderId()); + return; + } + + // 2. 检查工单状态 + WorkOrderStatusEnum currentStatus = WorkOrderStatusEnum.fromStatus(order.getStatus()); + if (!currentStatus.canComplete()) { + log.warn("[CleanOrderCompleteEventHandler] 工单状态不允许完成: orderId={}, status={}", + event.getOrderId(), order.getStatus()); + return; + } + + // 3. 计算作业时长 + String remark = buildCompletionRemark(event); + + // 4. 构建状态转换请求 + Map payload = new HashMap<>(); + payload.put("deviceId", event.getDeviceId()); + payload.put("deviceKey", event.getDeviceKey()); + payload.put("areaId", event.getAreaId()); + payload.put("triggerSource", event.getTriggerSource()); + if (event.getTriggerData() != null) { + payload.putAll(event.getTriggerData()); + } + + OrderTransitionRequest request = OrderTransitionRequest.builder() + .orderId(event.getOrderId()) + .targetStatus(WorkOrderStatusEnum.COMPLETED) + .operatorType(OperatorTypeEnum.SYSTEM) + .reason(remark) + .payload(payload) + .build(); + + // 5. 通过生命周期管理器执行状态转换(ARRIVED -> COMPLETED) + orderLifecycleManager.completeOrder(event.getOrderId(), null, remark); + + // 6. 清除 Redis 缓存(设备当前工单) + clearDeviceCurrentOrder(event.getDeviceId()); + + log.info("[CleanOrderCompleteEventHandler] 工单完成成功: eventId={}, orderId={}, duration={}ms", + event.getEventId(), event.getOrderId(), + event.getTriggerData() != null ? event.getTriggerData().get("durationMs") : "N/A"); + } + + /** + * 构建完成备注 + */ + private String buildCompletionRemark(CleanOrderCompleteEventDTO event) { + StringBuilder remark = new StringBuilder(); + remark.append("信号丢失超时自动完成"); + + if (event.getTriggerData() != null) { + Object durationMs = event.getTriggerData().get("durationMs"); + if (durationMs != null) { + long durationMinutes = ((Number) durationMs).longValue() / 60000; + remark.append(",作业时长: ").append(durationMinutes).append("分钟"); + } + } + + return remark.toString(); + } + + /** + * 清除设备当前工单缓存 + */ + private void clearDeviceCurrentOrder(Long deviceId) { + try { + String cacheKey = String.format(ORDER_CACHE_KEY_PATTERN, deviceId); + stringRedisTemplate.delete(cacheKey); + + log.debug("[CleanOrderCompleteEventHandler] 设备工单缓存已清除: deviceId={}", deviceId); + } catch (Exception e) { + log.error("[CleanOrderCompleteEventHandler] 设备工单缓存清除失败: deviceId={}", deviceId, e); + } + } +} diff --git a/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/integration/consumer/CleanOrderCreateEventHandler.java b/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/integration/consumer/CleanOrderCreateEventHandler.java new file mode 100644 index 0000000..3fa6c4a --- /dev/null +++ b/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/integration/consumer/CleanOrderCreateEventHandler.java @@ -0,0 +1,148 @@ +package com.viewsh.module.ops.environment.integration.consumer; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.viewsh.module.ops.dal.dataobject.dto.OpsOrderCreateReqDTO; +import com.viewsh.module.ops.enums.PriorityEnum; +import com.viewsh.module.ops.enums.SourceTypeEnum; +import com.viewsh.module.ops.environment.integration.dto.CleanOrderCreateEventDTO; +import com.viewsh.module.ops.service.order.OpsOrderService; +import jakarta.annotation.Resource; +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.spring.annotation.ConsumeMode; +import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; +import org.apache.rocketmq.spring.core.RocketMQListener; +import org.springframework.data.redis.core.StringRedisTemplate; +import org.springframework.stereotype.Component; + +import java.util.concurrent.TimeUnit; + +/** + * 保洁工单创建事件消费者 + *

+ * 订阅 IoT 模块发布的保洁工单创建事件 + *

+ * RocketMQ 配置: + * - Topic: ops.order.create + * - ConsumerGroup: ops-clean-order-create-group + * + * @author AI + */ +@Slf4j +@Component +@RocketMQMessageListener( + topic = "ops.order.create", + consumerGroup = "ops-clean-order-create-group", + consumeMode = ConsumeMode.CONCURRENTLY, + selectorExpression = "*" +) +public class CleanOrderCreateEventHandler implements RocketMQListener { + + /** + * 幂等性控制 Key 模式 + */ + private static final String DEDUP_KEY_PATTERN = "ops:clean:dedup:create:%s"; + + /** + * 幂等性控制 TTL(秒) + */ + private static final int DEDUP_TTL_SECONDS = 300; + + @Resource + private ObjectMapper objectMapper; + + @Resource + private StringRedisTemplate stringRedisTemplate; + + @Resource + private OpsOrderService opsOrderService; + + @Override + public void onMessage(String message) { + try { + // 1. JSON 反序列化 + CleanOrderCreateEventDTO event = objectMapper.readValue(message, CleanOrderCreateEventDTO.class); + + // 2. 幂等性检查 + String dedupKey = String.format(DEDUP_KEY_PATTERN, event.getEventId()); + Boolean firstTime = stringRedisTemplate.opsForValue() + .setIfAbsent(dedupKey, "1", DEDUP_TTL_SECONDS, TimeUnit.SECONDS); + + if (!firstTime) { + log.debug("[CleanOrderCreateEventHandler] 重复消息,跳过处理: eventId={}", event.getEventId()); + return; + } + + // 3. 业务处理 + handleOrderCreate(event); + + } catch (Exception e) { + log.error("[CleanOrderCreateEventHandler] 消息处理失败: message={}", message, e); + throw new RuntimeException("保洁工单创建事件处理失败", e); + } + } + + /** + * 处理工单创建 + */ + private void handleOrderCreate(CleanOrderCreateEventDTO event) { + log.info("[CleanOrderCreateEventHandler] 收到工单创建事件: eventId={}, areaId={}, triggerSource={}", + event.getEventId(), event.getAreaId(), event.getTriggerSource()); + + // 1. 构建创建请求 + OpsOrderCreateReqDTO createReq = new OpsOrderCreateReqDTO(); + createReq.setOrderType(event.getOrderType()); + createReq.setSourceType(SourceTypeEnum.TRAFFIC.getType()); // 系统触发 + createReq.setTitle(generateOrderTitle(event)); + createReq.setDescription(generateOrderDescription(event)); + createReq.setPriority(PriorityEnum.fromPriority(event.getPriority() != null ? + Integer.parseInt(event.getPriority()) : 2).getPriority()); + createReq.setAreaId(event.getAreaId()); + + // 2. 创建工单 + Long orderId = opsOrderService.createOrder(createReq); + + // 3. 更新工单的触发信息(集成字段) + opsOrderService.updateIntegrationFields( + orderId, + event.getTriggerSource(), + event.getTriggerDeviceId(), + event.getTriggerDeviceKey() + ); + + log.info("[CleanOrderCreateEventHandler] 工单创建成功: eventId={}, orderId={}, areaId={}", + event.getEventId(), orderId, event.getAreaId()); + } + + /** + * 生成工单标题 + */ + private String generateOrderTitle(CleanOrderCreateEventDTO event) { + if ("IOT_TRAFFIC".equals(event.getTriggerSource())) { + return "客流阈值触发保洁"; + } else if ("IOT_BEACON".equals(event.getTriggerSource())) { + return "信标检测触发保洁"; + } else { + return "IoT设备触发保洁"; + } + } + + /** + * 生成工单描述 + */ + private String generateOrderDescription(CleanOrderCreateEventDTO event) { + StringBuilder desc = new StringBuilder(); + desc.append("触发来源: ").append(event.getTriggerSource()).append("\n"); + desc.append("触发设备: ").append(event.getTriggerDeviceKey()).append("\n"); + + if (event.getTriggerData() != null) { + if (event.getTriggerData().containsKey("actualCount")) { + desc.append("当前客流: ").append(event.getTriggerData().get("actualCount")).append("\n"); + } + if (event.getTriggerData().containsKey("threshold")) { + desc.append("触发阈值: ").append(event.getTriggerData().get("threshold")).append("\n"); + } + } + + return desc.toString(); + } +} diff --git a/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/integration/dto/CleanOrderArriveEventDTO.java b/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/integration/dto/CleanOrderArriveEventDTO.java new file mode 100644 index 0000000..2dd3e21 --- /dev/null +++ b/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/integration/dto/CleanOrderArriveEventDTO.java @@ -0,0 +1,64 @@ +package com.viewsh.module.ops.environment.integration.dto; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.util.Map; + +/** + * 保洁工单到岗事件 DTO + *

+ * 由 IoT 模块发布,Ops 模块消费 + * + * @author AI + */ +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class CleanOrderArriveEventDTO { + + /** + * 事件ID(UUID,用于幂等性控制) + */ + private String eventId; + + /** + * 工单类型(CLEAN=保洁) + */ + private String orderType; + + /** + * 工单ID + */ + private Long orderId; + + /** + * 设备ID(保洁员工牌设备ID) + */ + private Long deviceId; + + /** + * 设备Key + */ + private String deviceKey; + + /** + * 区域ID + */ + private Long areaId; + + /** + * 触发来源(IOT_BEACON=蓝牙信标检测) + */ + private String triggerSource; + + /** + * 触发数据(JSON 格式的附加信息) + *

+ * 示例:{"beaconMac":"F0:C8:60:1D:10:BB","rssi":-65,"windowSnapshot":[-70,-68,-65,-64,-66]} + */ + private Map triggerData; +} diff --git a/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/integration/dto/CleanOrderAuditEventDTO.java b/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/integration/dto/CleanOrderAuditEventDTO.java new file mode 100644 index 0000000..0b498b0 --- /dev/null +++ b/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/integration/dto/CleanOrderAuditEventDTO.java @@ -0,0 +1,70 @@ +package com.viewsh.module.ops.environment.integration.dto; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.util.Map; + +/** + * 保洁工单审计事件 DTO + *

+ * 由 IoT 模块发布,Ops 模块消费 + * 用于记录非状态变更的业务审计日志(如警告发送、抑制操作等) + * + * @author AI + */ +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class CleanOrderAuditEventDTO { + + /** + * 事件ID(UUID,用于幂等性控制) + */ + private String eventId; + + /** + * 工单ID(可选,部分审计事件可能没有工单ID) + */ + private Long orderId; + + /** + * 审计类型 + *

+ * - BEACON_ARRIVE_CONFIRMED: 蓝牙信标到岗确认 + * - BEACON_LEAVE_WARNING_SENT: 离开区域警告已发送 + * - COMPLETE_SUPPRESSED_INVALID: 作业时长不足,抑制自动完成 + * - BEACON_COMPLETE_REQUESTED: 信号丢失超时自动完成请求 + * - TTS_REQUEST: TTS 语音播报请求 + * - ARRIVE_REJECTED: 到岗请求被拒绝(状态不符) + */ + private String auditType; + + /** + * 设备ID + */ + private Long deviceId; + + /** + * 设备Key + */ + private String deviceKey; + + /** + * 区域ID + */ + private Long areaId; + + /** + * 消息内容 + */ + private String message; + + /** + * 审计数据(JSON 格式的附加信息) + */ + private Map data; +} diff --git a/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/integration/dto/CleanOrderCompleteEventDTO.java b/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/integration/dto/CleanOrderCompleteEventDTO.java new file mode 100644 index 0000000..4634e76 --- /dev/null +++ b/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/integration/dto/CleanOrderCompleteEventDTO.java @@ -0,0 +1,64 @@ +package com.viewsh.module.ops.environment.integration.dto; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.util.Map; + +/** + * 保洁工单完成事件 DTO + *

+ * 由 IoT 模块发布,Ops 模块消费 + * + * @author AI + */ +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class CleanOrderCompleteEventDTO { + + /** + * 事件ID(UUID,用于幂等性控制) + */ + private String eventId; + + /** + * 工单类型(CLEAN=保洁) + */ + private String orderType; + + /** + * 工单ID + */ + private Long orderId; + + /** + * 设备ID(保洁员工牌设备ID) + */ + private Long deviceId; + + /** + * 设备Key + */ + private String deviceKey; + + /** + * 区域ID + */ + private Long areaId; + + /** + * 触发来源(IOT_SIGNAL_LOSS=信号丢失超时) + */ + private String triggerSource; + + /** + * 触发数据(JSON 格式的附加信息) + *

+ * 示例:{"durationMs":1800000,"lastLossTime":1704067200000,"completionReason":"SIGNAL_LOSS_TIMEOUT"} + */ + private Map triggerData; +} diff --git a/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/integration/dto/CleanOrderCreateEventDTO.java b/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/integration/dto/CleanOrderCreateEventDTO.java new file mode 100644 index 0000000..9ed480c --- /dev/null +++ b/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/integration/dto/CleanOrderCreateEventDTO.java @@ -0,0 +1,65 @@ +package com.viewsh.module.ops.environment.integration.dto; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.util.Map; + +/** + * 保洁工单创建事件 DTO + *

+ * 由 IoT 模块发布,Ops 模块消费 + * + * @author AI + */ +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class CleanOrderCreateEventDTO { + + /** + * 事件ID(UUID,用于幂等性控制) + */ + private String eventId; + + /** + * 工单类型(CLEAN=保洁) + */ + private String orderType; + + /** + * 区域ID + */ + private Long areaId; + + /** + * 触发来源(IOT_TRAFFIC=客流阈值/IOT_BEACON=蓝牙信标/IOT_SIGNAL_LOSS=信号丢失超时) + */ + private String triggerSource; + + /** + * 触发设备ID + */ + private Long triggerDeviceId; + + /** + * 触发设备Key + */ + private String triggerDeviceKey; + + /** + * 优先级(0=P0紧急 1=P1重要 2=P2普通) + */ + private String priority; + + /** + * 触发数据(JSON 格式的附加信息) + *

+ * 客流阈值触发示例:{"actualCount":150,"baseValue":1000,"threshold":100,"exceededCount":50} + * 信标检测触发示例:{"rssi":-65,"beaconMac":"F0:C8:60:1D:10:BB"} + */ + private Map triggerData; +} diff --git a/viewsh-module-ops/viewsh-module-ops-biz/pom.xml b/viewsh-module-ops/viewsh-module-ops-biz/pom.xml index 43af83e..ba0d6fd 100644 --- a/viewsh-module-ops/viewsh-module-ops-biz/pom.xml +++ b/viewsh-module-ops/viewsh-module-ops-biz/pom.xml @@ -60,6 +60,12 @@ viewsh-spring-boot-starter-mq + + + org.apache.rocketmq + rocketmq-spring-boot-starter + true + diff --git a/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/dal/dataobject/log/OpsOrderCleanLogDO.java b/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/dal/dataobject/log/OpsOrderCleanLogDO.java new file mode 100644 index 0000000..e809d99 --- /dev/null +++ b/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/dal/dataobject/log/OpsOrderCleanLogDO.java @@ -0,0 +1,82 @@ +package com.viewsh.module.ops.dal.dataobject.log; + +import com.baomidou.mybatisplus.annotation.TableField; +import com.viewsh.framework.mybatis.core.dataobject.BaseDO; +import com.baomidou.mybatisplus.annotation.KeySequence; +import com.baomidou.mybatisplus.annotation.TableId; +import com.baomidou.mybatisplus.annotation.TableName; +import com.baomidou.mybatisplus.extension.handlers.JacksonTypeHandler; +import lombok.*; + +import java.time.LocalDateTime; +import java.util.Map; + +/** + * 保洁业务日志 DO + * + * @author lzh + */ +@TableName(value = "ops_order_clean_log", autoResultMap = true) +@KeySequence("ops_order_clean_log_seq") +@Data +@EqualsAndHashCode(callSuper = true) +@ToString(callSuper = true) +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class OpsOrderCleanLogDO extends BaseDO { + + /** + * 日志ID + */ + @TableId + private Long id; + /** + * 事件发生时间 + */ + private LocalDateTime eventTime; + /** + * 日志级别(INFO=信息/WARN=警告/ERROR=错误) + */ + private String eventLevel; + /** + * 领域(RULE=规则引擎/DISPATCH=调度/BADGE=工牌/BEACON=信标/SYSTEM=系统) + * + * 枚举 {@link com.viewsh.module.ops.enums.EventDomainEnum} + */ + private String eventDomain; + /** + * 事件类型 + */ + private String eventType; + /** + * 关联工单ID + * + * 关联 {@link com.viewsh.module.ops.dal.dataobject.workorder.OpsOrderDO#getId()} + */ + private Long opsOrderId; + /** + * 区域ID + * + * 关联 {@link com.viewsh.module.ops.dal.dataobject.area.OpsBusAreaDO#getId()} + */ + private Long areaId; + /** + * 保洁员ID + */ + private Long cleanerId; + /** + * 设备ID(工牌/信标) + */ + private Long deviceId; + /** + * 可读日志内容 + */ + private String eventMessage; + /** + * 结构化上下文 + */ + @TableField(typeHandler = JacksonTypeHandler.class) + private Map eventPayload; + +} diff --git a/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/dal/dataobject/workorder/OpsOrderDO.java b/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/dal/dataobject/workorder/OpsOrderDO.java index dd38169..3a78e02 100644 --- a/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/dal/dataobject/workorder/OpsOrderDO.java +++ b/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/dal/dataobject/workorder/OpsOrderDO.java @@ -118,6 +118,38 @@ public class OpsOrderDO extends BaseDO { * Flowable流程实例ID(预留) */ private String flowInstanceId; + /** + * 触发来源(IOT_TRAFFIC=客流阈值/IOT_BEACON=蓝牙信标/IOT_SIGNAL_LOSS=信号丢失超时) + *

+ * 记录工单是由IoT设备的哪个检测规则触发的 + */ + private String triggerSource; + /** + * 触发规则ID(关联 ops_area_device_relation.id) + *

+ * 记录具体是哪个设备关联配置触发的工单 + */ + private Long triggerRuleId; + /** + * 触发设备ID(关联 iot_device.id) + *

+ * 记录触发工单的IoT设备(如客流计数器、蓝牙信标) + */ + private Long triggerDeviceId; + /** + * 触发设备Key(冗余,便于查询) + */ + private String triggerDeviceKey; + /** + * 受理人工牌设备ID(关联 iot_device.id) + *

+ * 记录分配处理此工单的保洁员的工牌设备ID,用于自动到岗/完成检测 + */ + private Long assigneeDeviceId; + /** + * 受理人工牌设备Key(冗余,便于查询) + */ + private String assigneeDeviceKey; // ==================== 便捷方法 ==================== diff --git a/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/dal/mysql/log/OpsOrderCleanLogMapper.java b/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/dal/mysql/log/OpsOrderCleanLogMapper.java new file mode 100644 index 0000000..ee19935 --- /dev/null +++ b/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/dal/mysql/log/OpsOrderCleanLogMapper.java @@ -0,0 +1,46 @@ +package com.viewsh.module.ops.dal.mysql.log; + +import com.viewsh.framework.mybatis.core.mapper.BaseMapperX; +import com.viewsh.framework.mybatis.core.query.LambdaQueryWrapperX; +import com.viewsh.module.ops.dal.dataobject.log.OpsOrderCleanLogDO; +import org.apache.ibatis.annotations.Mapper; + +import java.util.List; + +/** + * 保洁业务日志 Mapper + * + * @author lzh + */ +@Mapper +public interface OpsOrderCleanLogMapper extends BaseMapperX { + + /** + * 根据工单ID查询日志 + */ + default List selectListByOpsOrderId(Long opsOrderId) { + return selectList(new LambdaQueryWrapperX() + .eq(OpsOrderCleanLogDO::getOpsOrderId, opsOrderId) + .orderByDesc(OpsOrderCleanLogDO::getEventTime)); + } + + /** + * 根据保洁员查询日志 + */ + default List selectListByCleanerId(Long cleanerId) { + return selectList(new LambdaQueryWrapperX() + .eq(OpsOrderCleanLogDO::getCleanerId, cleanerId) + .orderByDesc(OpsOrderCleanLogDO::getEventTime)); + } + + /** + * 根据事件领域和类型查询日志 + */ + default List selectListByDomainAndType(String eventDomain, String eventType) { + return selectList(new LambdaQueryWrapperX() + .eq(OpsOrderCleanLogDO::getEventDomain, eventDomain) + .eq(OpsOrderCleanLogDO::getEventType, eventType) + .orderByDesc(OpsOrderCleanLogDO::getEventTime)); + } + +} diff --git a/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/order/OpsOrderService.java b/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/order/OpsOrderService.java index ebe0866..2e30f78 100644 --- a/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/order/OpsOrderService.java +++ b/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/order/OpsOrderService.java @@ -106,4 +106,16 @@ public interface OpsOrderService { */ void cancelOrder(Long orderId, String reason, OperatorTypeEnum operatorType, Long operatorId); + /** + * 更新工单集成字段(IoT 设备触发信息) + *

+ * 用于更新工单的触发来源、触发设备、受理人工牌等集成相关字段 + * + * @param orderId 工单ID + * @param triggerSource 触发来源(IOT_TRAFFIC/IOT_BEACON/IOT_SIGNAL_LOSS) + * @param triggerDeviceId 触发设备ID + * @param triggerDeviceKey 触发设备Key + */ + void updateIntegrationFields(Long orderId, String triggerSource, Long triggerDeviceId, String triggerDeviceKey); + } diff --git a/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/order/OpsOrderServiceImpl.java b/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/order/OpsOrderServiceImpl.java index d9db6bc..f4d6c42 100644 --- a/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/order/OpsOrderServiceImpl.java +++ b/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/order/OpsOrderServiceImpl.java @@ -281,6 +281,29 @@ public class OpsOrderServiceImpl implements OpsOrderService { log.info("取消工单成功: orderId={}, reason={}", orderId, reason); } + @Override + @Transactional(rollbackFor = Exception.class) + public void updateIntegrationFields(Long orderId, String triggerSource, Long triggerDeviceId, String triggerDeviceKey) { + // 1. 查询工单 + OpsOrderDO order = opsOrderMapper.selectById(orderId); + if (order == null) { + throw new RuntimeException("工单不存在: " + orderId); + } + + // 2. 更新集成字段 + OpsOrderDO updateObj = new OpsOrderDO(); + updateObj.setId(orderId); + updateObj.setTriggerSource(triggerSource); + updateObj.setTriggerDeviceId(triggerDeviceId); + updateObj.setTriggerDeviceKey(triggerDeviceKey); + + // 3. 执行更新 + opsOrderMapper.updateById(updateObj); + + log.info("更新工单集成字段成功: orderId={}, triggerSource={}, triggerDeviceId={}", + orderId, triggerSource, triggerDeviceId); + } + /** * 生成工单编号 * 格式:WO + yyyyMMddHHmmss + 3位随机数