feat(aiot): 新增方案B Service实现

- AiAlertServiceImpl: AI告警服务实现,支持MQTT告警入库
- AiAlgoTemplateServiceImpl: 算法模板CRUD和级联更新
- AiConfigSnapshotServiceImpl: 配置版本快照和回滚
- AiEdgeDeviceServiceImpl: 边缘设备心跳和离线检测
- AiRedisConfigServiceImpl: Redis配置同步,对齐ai_edge格式

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
2026-02-05 13:31:49 +08:00
parent 27eaffb9d7
commit 4c70eec7e8
5 changed files with 1064 additions and 0 deletions

View File

@@ -0,0 +1,71 @@
package com.genersoft.iot.vmp.aiot.service.impl;
import com.genersoft.iot.vmp.aiot.bean.AiAlert;
import com.genersoft.iot.vmp.aiot.dao.AiAlertMapper;
import com.genersoft.iot.vmp.aiot.service.IAiAlertService;
import com.github.pagehelper.PageHelper;
import com.github.pagehelper.PageInfo;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.*;
@Slf4j
@Service
public class AiAlertServiceImpl implements IAiAlertService {
@Autowired
private AiAlertMapper alertMapper;
private static final DateTimeFormatter FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
@Override
public void save(AiAlert alert) {
if (alert.getReceivedAt() == null) {
alert.setReceivedAt(LocalDateTime.now().format(FORMATTER));
}
// 防止重复插入
AiAlert existing = alertMapper.queryByAlertId(alert.getAlertId());
if (existing != null) {
log.debug("[AiAlert] 告警已存在, 跳过: alertId={}", alert.getAlertId());
return;
}
alertMapper.add(alert);
log.info("[AiAlert] 告警入库: alertId={}, type={}, camera={}", alert.getAlertId(), alert.getAlertType(), alert.getCameraId());
}
@Override
public AiAlert queryByAlertId(String alertId) {
return alertMapper.queryByAlertId(alertId);
}
@Override
public PageInfo<AiAlert> queryList(String cameraId, String alertType, String startTime, String endTime, int page, int count) {
PageHelper.startPage(page, count);
List<AiAlert> list = alertMapper.queryList(cameraId, alertType, startTime, endTime);
return new PageInfo<>(list);
}
@Override
public void delete(String alertId) {
alertMapper.deleteByAlertId(alertId);
}
@Override
public void deleteBatch(List<String> alertIds) {
if (alertIds != null && !alertIds.isEmpty()) {
alertMapper.deleteByAlertIds(alertIds);
}
}
@Override
public Map<String, Object> statistics(String startTime) {
Map<String, Object> result = new LinkedHashMap<>();
result.put("by_type", alertMapper.statisticsByType(startTime));
result.put("by_camera", alertMapper.statisticsByCamera(startTime));
return result;
}
}

View File

@@ -0,0 +1,132 @@
package com.genersoft.iot.vmp.aiot.service.impl;
import com.alibaba.fastjson2.JSON;
import com.genersoft.iot.vmp.aiot.bean.AiAlgoTemplate;
import com.genersoft.iot.vmp.aiot.bean.AiAlgorithm;
import com.genersoft.iot.vmp.aiot.bean.AiRoiAlgoBind;
import com.genersoft.iot.vmp.aiot.dao.AiAlgoTemplateMapper;
import com.genersoft.iot.vmp.aiot.dao.AiAlgorithmMapper;
import com.genersoft.iot.vmp.aiot.dao.AiRoiAlgoBindMapper;
import com.genersoft.iot.vmp.aiot.service.IAiAlgoTemplateService;
import com.genersoft.iot.vmp.aiot.service.IAiRedisConfigService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.ObjectUtils;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.*;
@Slf4j
@Service
public class AiAlgoTemplateServiceImpl implements IAiAlgoTemplateService {
@Autowired
private AiAlgoTemplateMapper templateMapper;
@Autowired
private AiRoiAlgoBindMapper bindMapper;
@Autowired
private AiAlgorithmMapper algorithmMapper;
@Autowired
private IAiRedisConfigService redisConfigService;
private static final DateTimeFormatter FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
@Override
@Transactional
public void save(AiAlgoTemplate template) {
String now = LocalDateTime.now().format(FORMATTER);
template.setUpdateTime(now);
if (!ObjectUtils.isEmpty(template.getTemplateId())) {
AiAlgoTemplate existing = templateMapper.queryByTemplateId(template.getTemplateId());
if (existing != null) {
templateMapper.updateByTemplateId(template);
// 级联更新引用此模板的绑定
cascadeUpdate(template.getTemplateId());
log.info("[AiTemplate] 更新模板并级联: templateId={}", template.getTemplateId());
return;
}
}
if (ObjectUtils.isEmpty(template.getTemplateId())) {
template.setTemplateId(UUID.randomUUID().toString());
}
template.setCreateTime(now);
templateMapper.add(template);
log.info("[AiTemplate] 创建模板: templateId={}", template.getTemplateId());
}
@Override
@Transactional
public void delete(String templateId) {
// 检查是否有绑定引用
List<AiRoiAlgoBind> refs = bindMapper.queryByTemplateId(templateId);
if (refs != null && !refs.isEmpty()) {
throw new IllegalArgumentException("模板正在被" + refs.size() + "个绑定引用,无法删除。请先解除引用。");
}
templateMapper.deleteByTemplateId(templateId);
log.info("[AiTemplate] 删除模板: templateId={}", templateId);
}
@Override
public AiAlgoTemplate queryByTemplateId(String templateId) {
return templateMapper.queryByTemplateId(templateId);
}
@Override
public List<AiAlgoTemplate> queryList(String algoCode) {
return templateMapper.queryList(algoCode);
}
@Override
@Transactional
public void cascadeUpdate(String templateId) {
AiAlgoTemplate template = templateMapper.queryByTemplateId(templateId);
if (template == null) return;
List<AiRoiAlgoBind> binds = bindMapper.queryByTemplateId(templateId);
if (binds == null || binds.isEmpty()) return;
List<String> affectedBindIds = new ArrayList<>();
for (AiRoiAlgoBind bind : binds) {
String effectiveParams = mergeParams(template.getParams(), bind.getParamOverride());
AiAlgorithm algo = algorithmMapper.queryByCode(bind.getAlgoCode());
String algoName = algo != null ? algo.getAlgoName() : "";
String targetClass = algo != null ? algo.getTargetClass() : "person";
redisConfigService.writeBindToRedis(bind, algoName, targetClass, effectiveParams);
affectedBindIds.add(bind.getBindId());
}
redisConfigService.publishConfigUpdate("bind", affectedBindIds);
log.info("[AiTemplate] 级联更新完成: templateId={}, 影响绑定数={}", templateId, affectedBindIds.size());
}
/**
* 参数合并template.params 为基础param_override 覆盖
*/
@SuppressWarnings("unchecked")
public static String mergeParams(String templateParams, String paramOverride) {
if (ObjectUtils.isEmpty(templateParams)) {
return paramOverride != null ? paramOverride : "{}";
}
if (ObjectUtils.isEmpty(paramOverride)) {
return templateParams;
}
try {
Map<String, Object> base = JSON.parseObject(templateParams, LinkedHashMap.class);
Map<String, Object> override = JSON.parseObject(paramOverride, LinkedHashMap.class);
base.putAll(override);
return JSON.toJSONString(base);
} catch (Exception e) {
log.warn("[AiTemplate] 参数合并失败, 使用模板参数: templateParams={}, override={}", templateParams, paramOverride);
return templateParams;
}
}
}

View File

@@ -0,0 +1,351 @@
package com.genersoft.iot.vmp.aiot.service.impl;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONArray;
import com.alibaba.fastjson2.JSONObject;
import com.genersoft.iot.vmp.aiot.bean.*;
import com.genersoft.iot.vmp.aiot.dao.AiConfigSnapshotMapper;
import com.genersoft.iot.vmp.aiot.dao.AiRoiAlgoBindMapper;
import com.genersoft.iot.vmp.aiot.dao.AiRoiMapper;
import com.genersoft.iot.vmp.aiot.service.IAiConfigService;
import com.genersoft.iot.vmp.aiot.service.IAiConfigSnapshotService;
import com.genersoft.iot.vmp.aiot.service.IAiRedisConfigService;
import com.github.pagehelper.PageHelper;
import com.github.pagehelper.PageInfo;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.*;
@Slf4j
@Service
public class AiConfigSnapshotServiceImpl implements IAiConfigSnapshotService {
@Autowired
private AiConfigSnapshotMapper snapshotMapper;
@Autowired
private AiRoiMapper roiMapper;
@Autowired
private AiRoiAlgoBindMapper bindMapper;
@Lazy
@Autowired
private IAiConfigService configService;
@Autowired
private IAiRedisConfigService redisConfigService;
private static final DateTimeFormatter FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
@Override
public AiConfigSnapshot createSnapshot(String scopeType, String scopeId, String cameraId,
String snapshot, String changeType, String changeDesc, String createdBy) {
int maxVersion = snapshotMapper.getMaxVersion(scopeType, scopeId);
AiConfigSnapshot s = new AiConfigSnapshot();
s.setVersion(maxVersion + 1);
s.setScopeType(scopeType);
s.setScopeId(scopeId);
s.setCameraId(cameraId);
s.setSnapshot(snapshot);
s.setChangeType(changeType);
s.setChangeDesc(changeDesc);
s.setCreatedBy(createdBy);
s.setCreatedAt(LocalDateTime.now().format(FORMATTER));
snapshotMapper.add(s);
log.info("[AiSnapshot] 创建快照: scope={}/{}, version={}, type={}", scopeType, scopeId, s.getVersion(), changeType);
return s;
}
@Override
public PageInfo<AiConfigSnapshot> queryVersions(String scopeType, String scopeId, String cameraId, int page, int count) {
PageHelper.startPage(page, count);
List<AiConfigSnapshot> list = snapshotMapper.queryList(scopeType, scopeId, cameraId);
return new PageInfo<>(list);
}
@Override
public AiConfigSnapshot queryById(Long id) {
return snapshotMapper.queryById(id);
}
@Override
@Transactional
public void rollbackCamera(String cameraId, Integer targetVersion, String operator) {
AiConfigSnapshot snapshot = snapshotMapper.queryByVersion("CAMERA", cameraId, targetVersion);
if (snapshot == null) {
throw new IllegalArgumentException("快照不存在: CAMERA/" + cameraId + "/v" + targetVersion);
}
JSONObject config = JSON.parseObject(snapshot.getSnapshot());
JSONArray rois = config.getJSONArray("rois");
// 删除当前所有ROI和绑定
List<AiRoi> currentRois = roiMapper.queryAllByCameraId(cameraId);
for (AiRoi roi : currentRois) {
bindMapper.deleteByRoiId(roi.getRoiId());
roiMapper.deleteByRoiId(roi.getRoiId());
}
// 从快照恢复
String now = LocalDateTime.now().format(FORMATTER);
if (rois != null) {
for (int i = 0; i < rois.size(); i++) {
JSONObject roiJson = rois.getJSONObject(i);
AiRoi roi = new AiRoi();
roi.setRoiId(roiJson.getString("roi_id"));
roi.setCameraId(cameraId);
roi.setRoiType(roiJson.getString("roi_type"));
roi.setName(roiJson.getString("name"));
roi.setCoordinates(roiJson.containsKey("coordinates") ?
JSON.toJSONString(roiJson.get("coordinates")) : null);
roi.setEnabled(roiJson.getBooleanValue("enabled") ? 1 : 0);
roi.setPriority(roiJson.getIntValue("priority"));
roi.setCreateTime(now);
roi.setUpdateTime(now);
roiMapper.add(roi);
JSONArray algos = roiJson.getJSONArray("algorithms");
if (algos != null) {
for (int j = 0; j < algos.size(); j++) {
JSONObject algoJson = algos.getJSONObject(j);
AiRoiAlgoBind bind = new AiRoiAlgoBind();
bind.setBindId(algoJson.containsKey("bind_id") ?
algoJson.getString("bind_id") : UUID.randomUUID().toString());
bind.setRoiId(roi.getRoiId());
bind.setAlgoCode(algoJson.getString("algo_code"));
bind.setParams(algoJson.containsKey("params") ?
JSON.toJSONString(algoJson.get("params")) : null);
bind.setEnabled(algoJson.getBooleanValue("enabled") ? 1 : 0);
bind.setPriority(algoJson.getIntValue("priority"));
bind.setTemplateId(algoJson.getString("template_id"));
bind.setParamOverride(algoJson.getString("param_override"));
bind.setCreateTime(now);
bind.setUpdateTime(now);
bindMapper.add(bind);
}
}
}
}
// 生成回滚快照
Map<String, Object> newConfig = configService.exportConfig(cameraId);
createSnapshot("CAMERA", cameraId, cameraId, JSON.toJSONString(newConfig),
"ROLLBACK", "回滚到版本" + targetVersion, operator);
// 推送到Redis
redisConfigService.syncCameraConfigToRedis(cameraId);
List<String> ids = new ArrayList<>();
ids.add(cameraId);
redisConfigService.publishConfigUpdate("full", ids);
log.info("[AiSnapshot] 摄像头配置回滚完成: cameraId={}, targetVersion={}", cameraId, targetVersion);
}
@Override
@Transactional
public void rollbackRoi(String roiId, Integer targetVersion, String operator) {
AiConfigSnapshot snapshot = snapshotMapper.queryByVersion("ROI", roiId, targetVersion);
if (snapshot == null) {
throw new IllegalArgumentException("快照不存在: ROI/" + roiId + "/v" + targetVersion);
}
JSONObject roiJson = JSON.parseObject(snapshot.getSnapshot());
String cameraId = snapshot.getCameraId();
// 删除当前ROI的所有绑定
bindMapper.deleteByRoiId(roiId);
// 更新ROI
String now = LocalDateTime.now().format(FORMATTER);
AiRoi roi = roiMapper.queryByRoiId(roiId);
if (roi == null) {
roi = new AiRoi();
roi.setRoiId(roiId);
roi.setCameraId(cameraId);
roi.setCreateTime(now);
}
roi.setRoiType(roiJson.getString("roi_type"));
roi.setName(roiJson.getString("name"));
roi.setCoordinates(roiJson.containsKey("coordinates") ?
JSON.toJSONString(roiJson.get("coordinates")) : null);
roi.setEnabled(roiJson.getBooleanValue("enabled") ? 1 : 0);
roi.setPriority(roiJson.getIntValue("priority"));
roi.setUpdateTime(now);
if (roi.getId() != null) {
roiMapper.update(roi);
} else {
roiMapper.add(roi);
}
// 恢复绑定
JSONArray algos = roiJson.getJSONArray("algorithms");
if (algos != null) {
for (int j = 0; j < algos.size(); j++) {
JSONObject algoJson = algos.getJSONObject(j);
AiRoiAlgoBind bind = new AiRoiAlgoBind();
bind.setBindId(algoJson.containsKey("bind_id") ?
algoJson.getString("bind_id") : UUID.randomUUID().toString());
bind.setRoiId(roiId);
bind.setAlgoCode(algoJson.getString("algo_code"));
bind.setParams(algoJson.containsKey("params") ?
JSON.toJSONString(algoJson.get("params")) : null);
bind.setEnabled(algoJson.getBooleanValue("enabled") ? 1 : 0);
bind.setPriority(algoJson.getIntValue("priority"));
bind.setTemplateId(algoJson.getString("template_id"));
bind.setParamOverride(algoJson.getString("param_override"));
bind.setCreateTime(now);
bind.setUpdateTime(now);
bindMapper.add(bind);
}
}
// 生成回滚快照
createSnapshot("ROI", roiId, cameraId, snapshot.getSnapshot(),
"ROLLBACK", "回滚到版本" + targetVersion, operator);
// 推送到Redis
redisConfigService.syncCameraConfigToRedis(cameraId);
List<String> ids = new ArrayList<>();
ids.add(roiId);
redisConfigService.publishConfigUpdate("roi", ids);
log.info("[AiSnapshot] ROI配置回滚完成: roiId={}, targetVersion={}", roiId, targetVersion);
}
@Override
@Transactional
public void rollbackBind(String bindId, Integer targetVersion, String operator) {
AiConfigSnapshot snapshot = snapshotMapper.queryByVersion("BIND", bindId, targetVersion);
if (snapshot == null) {
throw new IllegalArgumentException("快照不存在: BIND/" + bindId + "/v" + targetVersion);
}
JSONObject bindJson = JSON.parseObject(snapshot.getSnapshot());
String cameraId = snapshot.getCameraId();
String now = LocalDateTime.now().format(FORMATTER);
AiRoiAlgoBind bind = bindMapper.queryByBindId(bindId);
if (bind == null) {
bind = new AiRoiAlgoBind();
bind.setBindId(bindId);
bind.setRoiId(bindJson.getString("roi_id"));
bind.setAlgoCode(bindJson.getString("algo_code"));
bind.setCreateTime(now);
}
bind.setParams(bindJson.containsKey("params") ? JSON.toJSONString(bindJson.get("params")) : null);
bind.setEnabled(bindJson.getBooleanValue("enabled") ? 1 : 0);
bind.setPriority(bindJson.getIntValue("priority"));
bind.setTemplateId(bindJson.getString("template_id"));
bind.setParamOverride(bindJson.getString("param_override"));
bind.setUpdateTime(now);
if (bind.getId() != null) {
bindMapper.updateByBindId(bind);
} else {
bindMapper.add(bind);
}
// 生成回滚快照
createSnapshot("BIND", bindId, cameraId, snapshot.getSnapshot(),
"ROLLBACK", "回滚到版本" + targetVersion, operator);
// 推送到Redis
redisConfigService.syncCameraConfigToRedis(cameraId);
List<String> ids = new ArrayList<>();
ids.add(bindId);
redisConfigService.publishConfigUpdate("bind", ids);
log.info("[AiSnapshot] 绑定配置回滚完成: bindId={}, targetVersion={}", bindId, targetVersion);
}
@Override
public Map<String, Object> diff(String scopeType, String scopeId, Integer versionA, Integer versionB) {
AiConfigSnapshot snapshotA = snapshotMapper.queryByVersion(scopeType, scopeId, versionA);
AiConfigSnapshot snapshotB = snapshotMapper.queryByVersion(scopeType, scopeId, versionB);
if (snapshotA == null || snapshotB == null) {
throw new IllegalArgumentException("快照不存在");
}
JSONObject jsonA = JSON.parseObject(snapshotA.getSnapshot());
JSONObject jsonB = JSON.parseObject(snapshotB.getSnapshot());
Map<String, Object> result = new LinkedHashMap<>();
result.put("scope_type", scopeType);
result.put("scope_id", scopeId);
result.put("version_a", versionA);
result.put("version_b", versionB);
List<Map<String, Object>> changes = new ArrayList<>();
compareJson("", jsonA, jsonB, changes);
result.put("changes", changes);
return result;
}
@Override
public Map<String, Object> preview(String cameraId) {
// DB当前配置
Map<String, Object> dbConfig = configService.exportConfig(cameraId);
// Redis当前配置
Map<String, Object> redisConfig = redisConfigService.readConfigFromRedis(cameraId);
Map<String, Object> result = new LinkedHashMap<>();
result.put("camera_id", cameraId);
result.put("db_config", dbConfig);
result.put("redis_config", redisConfig);
// 简单的diff
List<Map<String, Object>> changes = new ArrayList<>();
compareJson("", JSON.parseObject(JSON.toJSONString(dbConfig)),
JSON.parseObject(JSON.toJSONString(redisConfig)), changes);
result.put("changes", changes);
return result;
}
/**
* 递归比较两个JSON对象收集差异
*/
private void compareJson(String path, JSONObject a, JSONObject b, List<Map<String, Object>> changes) {
Set<String> allKeys = new LinkedHashSet<>();
if (a != null) allKeys.addAll(a.keySet());
if (b != null) allKeys.addAll(b.keySet());
for (String key : allKeys) {
String currentPath = path.isEmpty() ? key : path + "." + key;
Object valA = a != null ? a.get(key) : null;
Object valB = b != null ? b.get(key) : null;
if (valA == null && valB != null) {
Map<String, Object> change = new LinkedHashMap<>();
change.put("path", currentPath);
change.put("old_value", null);
change.put("new_value", valB);
changes.add(change);
} else if (valA != null && valB == null) {
Map<String, Object> change = new LinkedHashMap<>();
change.put("path", currentPath);
change.put("old_value", valA);
change.put("new_value", null);
changes.add(change);
} else if (valA instanceof JSONObject && valB instanceof JSONObject) {
compareJson(currentPath, (JSONObject) valA, (JSONObject) valB, changes);
} else if (!Objects.equals(String.valueOf(valA), String.valueOf(valB))) {
Map<String, Object> change = new LinkedHashMap<>();
change.put("path", currentPath);
change.put("old_value", valA);
change.put("new_value", valB);
changes.add(change);
}
}
}
}

View File

@@ -0,0 +1,85 @@
package com.genersoft.iot.vmp.aiot.service.impl;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONObject;
import com.genersoft.iot.vmp.aiot.bean.AiEdgeDevice;
import com.genersoft.iot.vmp.aiot.dao.AiEdgeDeviceMapper;
import com.genersoft.iot.vmp.aiot.service.IAiEdgeDeviceService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.List;
@Slf4j
@Service
public class AiEdgeDeviceServiceImpl implements IAiEdgeDeviceService {
@Autowired
private AiEdgeDeviceMapper deviceMapper;
private static final DateTimeFormatter FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
@Override
public void saveOrUpdateHeartbeat(String deviceId, String payload) {
String now = LocalDateTime.now().format(FORMATTER);
try {
JSONObject json = JSON.parseObject(payload);
JSONObject status = json.getJSONObject("status");
AiEdgeDevice device = deviceMapper.queryByDeviceId(deviceId);
if (device == null) {
device = new AiEdgeDevice();
device.setDeviceId(deviceId);
device.setStatus("online");
device.setLastHeartbeat(now);
device.setUptimeSeconds(status != null ? status.getLong("uptime_seconds") : null);
device.setFramesProcessed(status != null ? status.getLong("frames_processed") : null);
device.setAlertsGenerated(status != null ? status.getLong("alerts_generated") : null);
device.setStreamStats(status != null && status.containsKey("stream_stats") ?
status.getJSONObject("stream_stats").toJSONString() : null);
device.setUpdatedAt(now);
deviceMapper.add(device);
log.info("[AiEdgeDevice] 新设备上线: deviceId={}", deviceId);
} else {
device.setStatus("online");
device.setLastHeartbeat(now);
device.setUptimeSeconds(status != null ? status.getLong("uptime_seconds") : null);
device.setFramesProcessed(status != null ? status.getLong("frames_processed") : null);
device.setAlertsGenerated(status != null ? status.getLong("alerts_generated") : null);
device.setStreamStats(status != null && status.containsKey("stream_stats") ?
status.getJSONObject("stream_stats").toJSONString() : null);
device.setUpdatedAt(now);
deviceMapper.updateByDeviceId(device);
log.debug("[AiEdgeDevice] 心跳更新: deviceId={}", deviceId);
}
} catch (Exception e) {
log.error("[AiEdgeDevice] 处理心跳失败: deviceId={}", deviceId, e);
}
}
@Override
public AiEdgeDevice queryByDeviceId(String deviceId) {
return deviceMapper.queryByDeviceId(deviceId);
}
@Override
public List<AiEdgeDevice> queryAll() {
return deviceMapper.queryAll();
}
@Override
@Scheduled(fixedRate = 90000) // 每90秒检查一次
public void checkOffline() {
String now = LocalDateTime.now().format(FORMATTER);
String threshold = LocalDateTime.now().minusSeconds(90).format(FORMATTER);
int count = deviceMapper.markOffline(threshold, now);
if (count > 0) {
log.warn("[AiEdgeDevice] 标记{}台设备为离线", count);
}
}
}

View File

@@ -0,0 +1,425 @@
package com.genersoft.iot.vmp.aiot.service.impl;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONArray;
import com.genersoft.iot.vmp.aiot.bean.AiAlgorithm;
import com.genersoft.iot.vmp.aiot.bean.AiAlgoTemplate;
import com.genersoft.iot.vmp.aiot.bean.AiRoi;
import com.genersoft.iot.vmp.aiot.bean.AiRoiAlgoBind;
import com.genersoft.iot.vmp.aiot.dao.AiAlgorithmMapper;
import com.genersoft.iot.vmp.aiot.dao.AiAlgoTemplateMapper;
import com.genersoft.iot.vmp.aiot.dao.AiRoiAlgoBindMapper;
import com.genersoft.iot.vmp.aiot.dao.AiRoiMapper;
import com.genersoft.iot.vmp.aiot.service.IAiRedisConfigService;
import com.genersoft.iot.vmp.gb28181.bean.CommonGBChannel;
import com.genersoft.iot.vmp.gb28181.dao.CommonGBChannelMapper;
import com.genersoft.iot.vmp.media.bean.MediaServer;
import com.genersoft.iot.vmp.media.service.IMediaServerService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.*;
import java.util.concurrent.TimeUnit;
@Slf4j
@Service
public class AiRedisConfigServiceImpl implements IAiRedisConfigService {
@Autowired
private StringRedisTemplate stringRedisTemplate;
@Autowired
private AiRoiMapper roiMapper;
@Autowired
private AiRoiAlgoBindMapper bindMapper;
@Autowired
private AiAlgorithmMapper algorithmMapper;
@Autowired
private AiAlgoTemplateMapper templateMapper;
@Autowired
private CommonGBChannelMapper channelMapper;
@Autowired
private IMediaServerService mediaServerService;
private static final long KEY_EXPIRE_SECONDS = 3600;
@Override
public void writeRoiToRedis(AiRoi roi) {
String key = "config:roi:" + roi.getRoiId();
Map<String, String> hash = new LinkedHashMap<>();
hash.put("roi_id", roi.getRoiId());
hash.put("camera_id", roi.getCameraId());
hash.put("roi_type", roi.getRoiType() != null ? roi.getRoiType() : "polygon");
// coordinates: 转为Python list格式字符串ai_edge用eval解析
hash.put("coordinates", toPythonListStr(roi.getCoordinates()));
hash.put("enabled", roi.getEnabled() != null && roi.getEnabled() == 1 ? "True" : "False");
hash.put("priority", String.valueOf(roi.getPriority() != null ? roi.getPriority() : 0));
stringRedisTemplate.opsForHash().putAll(key, hash);
stringRedisTemplate.expire(key, KEY_EXPIRE_SECONDS, TimeUnit.SECONDS);
log.debug("[AiRedis] 写入ROI配置: key={}", key);
}
@Override
public void writeBindToRedis(AiRoiAlgoBind bind, String algoName, String targetClass, String effectiveParams) {
String key = "config:bind:" + bind.getBindId();
Map<String, String> hash = new LinkedHashMap<>();
hash.put("bind_id", bind.getBindId());
hash.put("roi_id", bind.getRoiId());
hash.put("algo_code", bind.getAlgoCode());
// params: 转为Python dict格式字符串ai_edge用eval解析
hash.put("params", toPythonDictStr(effectiveParams));
hash.put("priority", String.valueOf(bind.getPriority() != null ? bind.getPriority() : 0));
hash.put("enabled", bind.getEnabled() != null && bind.getEnabled() == 1 ? "True" : "False");
hash.put("algo_name", algoName != null ? algoName : "");
hash.put("target_class", targetClass != null ? targetClass : "person");
stringRedisTemplate.opsForHash().putAll(key, hash);
stringRedisTemplate.expire(key, KEY_EXPIRE_SECONDS, TimeUnit.SECONDS);
log.debug("[AiRedis] 写入绑定配置: key={}", key);
}
@Override
public void writeCameraToRedis(String cameraId, String rtspUrl, String cameraName, String location) {
String key = "config:camera:" + cameraId;
Map<String, String> hash = new LinkedHashMap<>();
hash.put("camera_id", cameraId);
hash.put("rtsp_url", rtspUrl != null ? rtspUrl : "");
hash.put("camera_name", cameraName != null ? cameraName : "");
hash.put("enabled", "True");
hash.put("location", location != null ? location : "");
stringRedisTemplate.opsForHash().putAll(key, hash);
stringRedisTemplate.expire(key, KEY_EXPIRE_SECONDS, TimeUnit.SECONDS);
log.debug("[AiRedis] 写入摄像头配置: key={}", key);
}
@Override
public void deleteRoiFromRedis(String roiId) {
String key = "config:roi:" + roiId;
stringRedisTemplate.delete(key);
log.debug("[AiRedis] 删除ROI配置: key={}", key);
}
@Override
public void deleteBindFromRedis(String bindId) {
String key = "config:bind:" + bindId;
stringRedisTemplate.delete(key);
log.debug("[AiRedis] 删除绑定配置: key={}", key);
}
@Override
public void publishConfigUpdate(String type, List<String> ids) {
Map<String, Object> message = new LinkedHashMap<>();
message.put("type", type);
message.put("ids", ids);
// affected_items: ai_edge 的 _handle_config_update 只看这个字段
// "all" 会同时清除 cameras 和 rois 缓存
List<String> affectedItems = new ArrayList<>();
switch (type) {
case "full":
affectedItems.add("all");
break;
case "roi":
affectedItems.add("roi");
break;
case "bind":
affectedItems.add("roi"); // bind变更也需要刷新roi关联的算法
break;
case "camera":
affectedItems.add("camera");
break;
default:
affectedItems.add("all");
}
message.put("affected_items", affectedItems);
message.put("version", "1.0.0");
message.put("timestamp", LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSSSSS")));
String json = JSON.toJSONString(message);
stringRedisTemplate.convertAndSend("config_update", json);
log.info("[AiRedis] 发布config_update通知: type={}, affected_items={}, ids={}", type, affectedItems, ids);
}
@Override
public Map<String, Object> readConfigFromRedis(String cameraId) {
Map<String, Object> result = new LinkedHashMap<>();
result.put("camera_id", cameraId);
// 扫描所有ROI keys筛选属于该摄像头的
Set<String> roiKeys = stringRedisTemplate.keys("config:roi:*");
List<Map<String, Object>> rois = new ArrayList<>();
List<String> roiIds = new ArrayList<>();
if (roiKeys != null) {
for (String key : roiKeys) {
Map<Object, Object> entries = stringRedisTemplate.opsForHash().entries(key);
if (cameraId.equals(entries.get("camera_id"))) {
Map<String, Object> roiMap = new LinkedHashMap<>();
for (Map.Entry<Object, Object> e : entries.entrySet()) {
roiMap.put(e.getKey().toString(), e.getValue());
}
rois.add(roiMap);
roiIds.add(entries.get("roi_id").toString());
}
}
}
// 扫描所有bind keys筛选属于该摄像头ROI的
Set<String> bindKeys = stringRedisTemplate.keys("config:bind:*");
List<Map<String, Object>> binds = new ArrayList<>();
if (bindKeys != null) {
for (String key : bindKeys) {
Map<Object, Object> entries = stringRedisTemplate.opsForHash().entries(key);
String bindRoiId = entries.get("roi_id") != null ? entries.get("roi_id").toString() : "";
if (roiIds.contains(bindRoiId)) {
Map<String, Object> bindMap = new LinkedHashMap<>();
for (Map.Entry<Object, Object> e : entries.entrySet()) {
bindMap.put(e.getKey().toString(), e.getValue());
}
binds.add(bindMap);
}
}
}
result.put("rois", rois);
result.put("binds", binds);
return result;
}
@Override
public void syncCameraConfigToRedis(String cameraId) {
List<AiRoi> rois = roiMapper.queryAllByCameraId(cameraId);
List<String> roiIds = new ArrayList<>();
List<String> bindIds = new ArrayList<>();
// 1. 清理该摄像头关联的旧Redis keys防止已删除的ROI/bind残留
cleanStaleCameraKeys(cameraId);
// 2. 写入摄像头配置含通道名称和RTSP地址
writeCameraConfigFromChannel(cameraId, rois);
// 3. 写入ROI和绑定配置
for (AiRoi roi : rois) {
writeRoiToRedis(roi);
roiIds.add(roi.getRoiId());
List<AiRoiAlgoBind> binds = bindMapper.queryByRoiId(roi.getRoiId());
for (AiRoiAlgoBind bind : binds) {
AiAlgorithm algo = algorithmMapper.queryByCode(bind.getAlgoCode());
String algoName = algo != null ? algo.getAlgoName() : "";
String targetClass = algo != null ? algo.getTargetClass() : "person";
String effectiveParams = resolveEffectiveParams(bind);
writeBindToRedis(bind, algoName, targetClass, effectiveParams);
bindIds.add(bind.getBindId());
}
}
log.info("[AiRedis] 全量同步摄像头配置完成: cameraId={}, ROIs={}, Binds={}", cameraId, roiIds.size(), bindIds.size());
}
/**
* 清理该摄像头在Redis中的旧ROI和bind keys避免已删除的配置残留
*/
private void cleanStaleCameraKeys(String cameraId) {
// 扫描所有 config:roi:* 找到属于该摄像头的,全部删除(后面会重新写入当前有效的)
Set<String> roiKeys = stringRedisTemplate.keys("config:roi:*");
List<String> staleRoiIds = new ArrayList<>();
if (roiKeys != null) {
for (String key : roiKeys) {
Object cam = stringRedisTemplate.opsForHash().get(key, "camera_id");
if (cameraId.equals(cam)) {
Object roiId = stringRedisTemplate.opsForHash().get(key, "roi_id");
if (roiId != null) {
staleRoiIds.add(roiId.toString());
}
stringRedisTemplate.delete(key);
}
}
}
// 删除属于这些ROI的bind keys
if (!staleRoiIds.isEmpty()) {
Set<String> bindKeys = stringRedisTemplate.keys("config:bind:*");
if (bindKeys != null) {
for (String key : bindKeys) {
Object roiId = stringRedisTemplate.opsForHash().get(key, "roi_id");
if (roiId != null && staleRoiIds.contains(roiId.toString())) {
stringRedisTemplate.delete(key);
}
}
}
}
log.debug("[AiRedis] 清理旧keys完成: cameraId={}, 旧ROIs={}", cameraId, staleRoiIds.size());
}
/**
* 从通道信息写入摄像头Redis配置
*/
private void writeCameraConfigFromChannel(String cameraId, List<AiRoi> rois) {
String cameraName = "";
String rtspUrl = "";
// 从ROI关联的通道获取名称
if (!rois.isEmpty()) {
AiRoi firstRoi = rois.get(0);
if (firstRoi.getChannelDbId() != null) {
try {
CommonGBChannel channel = channelMapper.queryById(firstRoi.getChannelDbId());
if (channel != null) {
cameraName = channel.getGbName() != null ? channel.getGbName() : "";
}
} catch (Exception e) {
log.warn("[AiRedis] 查询通道信息失败: channelDbId={}", firstRoi.getChannelDbId());
}
}
}
// 构建RTSP代理地址通过ZLM媒体服务器
// cameraId格式为 {app}/{stream}ZLM的RTSP路径直接使用该格式
try {
MediaServer mediaServer = mediaServerService.getDefaultMediaServer();
if (mediaServer != null && mediaServer.getRtspPort() != 0) {
rtspUrl = String.format("rtsp://%s:%s/%s",
mediaServer.getStreamIp() != null ? mediaServer.getStreamIp() : mediaServer.getIp(),
mediaServer.getRtspPort(), cameraId);
} else if (mediaServer != null) {
rtspUrl = String.format("http://%s:%s/%s.live.flv",
mediaServer.getStreamIp() != null ? mediaServer.getStreamIp() : mediaServer.getIp(),
mediaServer.getHttpPort(), cameraId);
}
} catch (Exception e) {
log.warn("[AiRedis] 获取媒体服务器信息失败RTSP地址为空: {}", e.getMessage());
}
writeCameraToRedis(cameraId, rtspUrl, cameraName, "");
}
/**
* 解析绑定的有效参数:如果有模板引用,则合并模板参数和覆盖参数
*/
private String resolveEffectiveParams(AiRoiAlgoBind bind) {
if (bind.getTemplateId() != null && !bind.getTemplateId().isEmpty()) {
AiAlgoTemplate template = templateMapper.queryByTemplateId(bind.getTemplateId());
if (template != null) {
return AiAlgoTemplateServiceImpl.mergeParams(template.getParams(), bind.getParamOverride());
}
}
return bind.getParams() != null ? bind.getParams() : "{}";
}
/**
* 将坐标JSON转为Python list格式
* 输入: [{"x":0.28,"y":0.56},{"x":0.01,"y":0.80}] (前端格式)
* 输出: [[0.28, 0.56], [0.01, 0.80]] (Python可eval的格式)
* 输入: [[0.1,0.2],[0.3,0.4]] (已是数组格式)
* 输出: [[0.1, 0.2], [0.3, 0.4]] (直接返回)
*/
private String toPythonListStr(String jsonCoordinates) {
if (jsonCoordinates == null || jsonCoordinates.isEmpty()) {
return "[]";
}
try {
JSONArray arr = JSON.parseArray(jsonCoordinates);
if (arr == null || arr.isEmpty()) {
return "[]";
}
// 检测是否为 [{x:..., y:...}] 格式
Object first = arr.get(0);
if (first instanceof com.alibaba.fastjson2.JSONObject) {
StringBuilder sb = new StringBuilder("[");
for (int i = 0; i < arr.size(); i++) {
if (i > 0) sb.append(", ");
com.alibaba.fastjson2.JSONObject point = arr.getJSONObject(i);
sb.append("[").append(point.get("x")).append(", ").append(point.get("y")).append("]");
}
sb.append("]");
return sb.toString();
}
} catch (Exception e) {
log.debug("[AiRedis] 坐标格式转换跳过,原样返回: {}", jsonCoordinates);
}
// 已是 [[x,y],...] 格式,直接返回
return jsonCoordinates;
}
/**
* 将JSON格式的参数字符串转为Python dict格式
* 输入: {"confirm_leave_sec":10,"cooldown_sec":300}
* 输出: {'confirm_leave_sec': 10, 'cooldown_sec': 300}
*/
private String toPythonDictStr(String jsonParams) {
if (jsonParams == null || jsonParams.isEmpty()) {
return "{}";
}
try {
// 解析JSON然后重新格式化为Python风格
Map<String, Object> map = JSON.parseObject(jsonParams, LinkedHashMap.class);
StringBuilder sb = new StringBuilder("{");
boolean first = true;
for (Map.Entry<String, Object> entry : map.entrySet()) {
if (!first) sb.append(", ");
first = false;
sb.append("'").append(entry.getKey()).append("': ");
Object val = entry.getValue();
if (val instanceof String) {
sb.append("'").append(val).append("'");
} else if (val instanceof Boolean) {
sb.append((Boolean) val ? "True" : "False");
} else if (val instanceof List) {
sb.append(toPythonValue(val));
} else {
sb.append(val);
}
}
sb.append("}");
return sb.toString();
} catch (Exception e) {
log.warn("[AiRedis] JSON转Python dict失败, 原样返回: {}", jsonParams);
return jsonParams;
}
}
@SuppressWarnings("unchecked")
private String toPythonValue(Object val) {
if (val == null) return "None";
if (val instanceof String) return "'" + val + "'";
if (val instanceof Boolean) return (Boolean) val ? "True" : "False";
if (val instanceof List) {
List<Object> list = (List<Object>) val;
StringBuilder sb = new StringBuilder("[");
boolean first = true;
for (Object item : list) {
if (!first) sb.append(", ");
first = false;
sb.append(toPythonValue(item));
}
sb.append("]");
return sb.toString();
}
if (val instanceof Map) {
Map<String, Object> map = (Map<String, Object>) val;
StringBuilder sb = new StringBuilder("{");
boolean first = true;
for (Map.Entry<String, Object> entry : map.entrySet()) {
if (!first) sb.append(", ");
first = false;
sb.append("'").append(entry.getKey()).append("': ").append(toPythonValue(entry.getValue()));
}
sb.append("}");
return sb.toString();
}
return val.toString();
}
}