diff --git a/docker/media/config.ini b/docker/media/config.ini index 4c1319a2d..7311621b9 100644 --- a/docker/media/config.ini +++ b/docker/media/config.ini @@ -27,7 +27,7 @@ enable_ffmpeg_log=0 flowThreshold=1024 listen_ip=:: maxStreamWaitMS=15000 -mediaServerId=polaris +mediaServerId=zlmediakit-local mergeWriteMS=0 resetWhenRePlay=1 streamNoneReaderDelayMS=20000 diff --git a/src/main/java/com/genersoft/iot/vmp/aiot/config/AiServiceConfig.java b/src/main/java/com/genersoft/iot/vmp/aiot/config/AiServiceConfig.java index 546a71632..0b3f5f9fd 100644 --- a/src/main/java/com/genersoft/iot/vmp/aiot/config/AiServiceConfig.java +++ b/src/main/java/com/genersoft/iot/vmp/aiot/config/AiServiceConfig.java @@ -14,4 +14,9 @@ public class AiServiceConfig { private int pushTimeout = 10000; private boolean enabled = false; + + // Local direct call (no Java HTTP). Use a local python script to push payload. + private boolean localCallEnabled = false; + private String localPython = "python"; + private String localScriptPath = "scripts/edge_local_sync.py"; } diff --git a/src/main/java/com/genersoft/iot/vmp/aiot/controller/AiConfigController.java b/src/main/java/com/genersoft/iot/vmp/aiot/controller/AiConfigController.java index dce8c31c7..a65103dc4 100644 --- a/src/main/java/com/genersoft/iot/vmp/aiot/controller/AiConfigController.java +++ b/src/main/java/com/genersoft/iot/vmp/aiot/controller/AiConfigController.java @@ -44,6 +44,12 @@ public class AiConfigController { return configService.pushConfig(cameraId); } + @Operation(summary = "一次性推送全部配置到本地Edge(HTTP直推,本地调试用)") + @PostMapping("/push-all") + public Map pushAllConfig() { + return configService.pushAllConfig(); + } + @Operation(summary = "导出通道完整配置JSON") @GetMapping("/export") public Map exportConfig(@RequestParam String cameraId) { diff --git a/src/main/java/com/genersoft/iot/vmp/aiot/dao/AiRoiAlgoBindMapper.java b/src/main/java/com/genersoft/iot/vmp/aiot/dao/AiRoiAlgoBindMapper.java index 29890de45..7d6e2a294 100644 --- a/src/main/java/com/genersoft/iot/vmp/aiot/dao/AiRoiAlgoBindMapper.java +++ b/src/main/java/com/genersoft/iot/vmp/aiot/dao/AiRoiAlgoBindMapper.java @@ -42,4 +42,7 @@ public interface AiRoiAlgoBindMapper { "INNER JOIN wvp_ai_roi r ON b.roi_id = r.roi_id " + "WHERE r.camera_id=#{cameraId} ORDER BY b.priority DESC, b.id") List queryByCameraId(@Param("cameraId") String cameraId); + + @Select("SELECT * FROM wvp_ai_roi_algo_bind ORDER BY priority DESC, id") + List queryAll(); } diff --git a/src/main/java/com/genersoft/iot/vmp/aiot/dao/AiRoiMapper.java b/src/main/java/com/genersoft/iot/vmp/aiot/dao/AiRoiMapper.java index 0061bb671..86b01a559 100644 --- a/src/main/java/com/genersoft/iot/vmp/aiot/dao/AiRoiMapper.java +++ b/src/main/java/com/genersoft/iot/vmp/aiot/dao/AiRoiMapper.java @@ -47,6 +47,9 @@ public interface AiRoiMapper { @Select("SELECT * FROM wvp_ai_roi WHERE camera_id=#{cameraId} ORDER BY priority DESC") List queryAllByCameraId(@Param("cameraId") String cameraId); + @Select("SELECT * FROM wvp_ai_roi ORDER BY priority DESC, id DESC") + List queryAll(); + @Select("SELECT DISTINCT camera_id FROM wvp_ai_roi WHERE device_id=#{deviceId}") List queryDistinctCameraIdsByDeviceId(@Param("deviceId") String deviceId); diff --git a/src/main/java/com/genersoft/iot/vmp/aiot/service/IAiConfigService.java b/src/main/java/com/genersoft/iot/vmp/aiot/service/IAiConfigService.java index 6cfc47782..4ecb8a138 100644 --- a/src/main/java/com/genersoft/iot/vmp/aiot/service/IAiConfigService.java +++ b/src/main/java/com/genersoft/iot/vmp/aiot/service/IAiConfigService.java @@ -11,4 +11,9 @@ public interface IAiConfigService { * @return 推送结果,含版本号 */ Map pushConfig(String cameraId); + + /** + * 本地调试:一次性同步全部配置到边缘端(HTTP) + */ + Map pushAllConfig(); } diff --git a/src/main/java/com/genersoft/iot/vmp/aiot/service/impl/AiAlgorithmServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/aiot/service/impl/AiAlgorithmServiceImpl.java index b1b08d0fe..9309e40a1 100644 --- a/src/main/java/com/genersoft/iot/vmp/aiot/service/impl/AiAlgorithmServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/aiot/service/impl/AiAlgorithmServiceImpl.java @@ -41,11 +41,11 @@ public class AiAlgorithmServiceImpl implements IAiAlgorithmService { // algoCode -> {algoName, targetClass, description, paramSchema} PRESET_ALGORITHMS.put("leave_post", new String[]{ "离岗检测", "person", "检测人员是否在岗,支持工作时间段配置", - "{\"confirm_on_duty_sec\":{\"type\":\"int\",\"default\":10,\"min\":1},\"confirm_leave_sec\":{\"type\":\"int\",\"default\":10,\"min\":1},\"cooldown_sec\":{\"type\":\"int\",\"default\":300,\"min\":0},\"working_hours\":{\"type\":\"list\",\"default\":[]}}" + "{\"confirm_on_duty_sec\":{\"type\":\"int\",\"default\":10,\"min\":1},\"confirm_leave_sec\":{\"type\":\"int\",\"default\":10,\"min\":1},\"cooldown_sec\":{\"type\":\"int\",\"default\":600,\"min\":0},\"working_hours\":{\"type\":\"list\",\"default\":[]}}" }); PRESET_ALGORITHMS.put("intrusion", new String[]{ "周界入侵检测", "person", "检测人员进入指定区域", - "{\"cooldown_seconds\":{\"type\":\"int\",\"default\":120,\"min\":0},\"confirm_seconds\":{\"type\":\"int\",\"default\":5,\"min\":1}}" + "{\"cooldown_seconds\":{\"type\":\"int\",\"default\":300,\"min\":0},\"confirm_seconds\":{\"type\":\"int\",\"default\":5,\"min\":1}}" }); // 人群聚集检测暂时注释,边缘端未启用 // PRESET_ALGORITHMS.put("crowd_detection", new String[]{ diff --git a/src/main/java/com/genersoft/iot/vmp/aiot/service/impl/AiConfigServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/aiot/service/impl/AiConfigServiceImpl.java index b630ccf5d..3c6461273 100644 --- a/src/main/java/com/genersoft/iot/vmp/aiot/service/impl/AiConfigServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/aiot/service/impl/AiConfigServiceImpl.java @@ -6,16 +6,29 @@ import com.genersoft.iot.vmp.aiot.bean.AiAlgorithm; import com.genersoft.iot.vmp.aiot.bean.AiConfigSnapshot; import com.genersoft.iot.vmp.aiot.bean.AiRoi; import com.genersoft.iot.vmp.aiot.bean.AiRoiAlgoBind; +import com.genersoft.iot.vmp.aiot.config.AiServiceConfig; import com.genersoft.iot.vmp.aiot.dao.AiAlgorithmMapper; import com.genersoft.iot.vmp.aiot.dao.AiRoiAlgoBindMapper; import com.genersoft.iot.vmp.aiot.dao.AiRoiMapper; import com.genersoft.iot.vmp.aiot.service.IAiConfigService; import com.genersoft.iot.vmp.aiot.service.IAiConfigSnapshotService; import com.genersoft.iot.vmp.aiot.service.IAiRedisConfigService; +import com.genersoft.iot.vmp.streamProxy.bean.StreamProxy; +import com.genersoft.iot.vmp.streamProxy.dao.StreamProxyMapper; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Lazy; import org.springframework.stereotype.Service; +import org.springframework.web.client.RestTemplate; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.concurrent.TimeUnit; +import java.io.BufferedReader; +import java.io.InputStreamReader; +import org.springframework.http.MediaType; +import org.springframework.http.HttpHeaders; +import org.springframework.http.HttpEntity; import java.util.*; @@ -35,6 +48,12 @@ public class AiConfigServiceImpl implements IAiConfigService { @Autowired private IAiRedisConfigService redisConfigService; + @Autowired + private AiServiceConfig aiServiceConfig; + + @Autowired + private StreamProxyMapper streamProxyMapper; + @Lazy @Autowired private IAiConfigSnapshotService snapshotService; @@ -119,14 +138,285 @@ public class AiConfigServiceImpl implements IAiConfigService { log.warn("[AiConfig] 摄像头 {} 未关联边缘设备,跳过聚合配置推送", cameraId); } - // 6. 返回推送结果 + // 6. 本地调试:同步到 Edge HTTP 接口(保留原 Redis 流程) + boolean httpSyncOk = pushConfigToLocalEdge(cameraId, config); + + // 7. 返回推送结果 Map result = new LinkedHashMap<>(); result.put("camera_id", cameraId); result.put("version", snapshot.getVersion()); result.put("push_status", "success"); result.put("message", "配置已推送到Redis并通知边缘端"); + result.put("http_sync", httpSyncOk); log.info("[AiConfig] 配置推送完成: cameraId={}, version={}", cameraId, snapshot.getVersion()); return result; } + + + @Override + public Map pushAllConfig() { + List rois = roiMapper.queryAll(); + List binds = bindMapper.queryAll(); + + Map payload = buildPayloadFromFlat(rois, binds); + payload.put("sync_mode", "full"); + boolean httpSyncOk = pushPayloadToLocalEdge(payload); + + Map result = new LinkedHashMap<>(); + result.put("rois", rois.size()); + result.put("binds", binds.size()); + result.put("http_sync", httpSyncOk); + result.put("message", "已推送全部ROI和算法绑定到Edge"); + log.info("[AiConfig] 全量推送完成 rois={}, binds={}, httpSync={}", rois.size(), binds.size(), httpSyncOk); + return result; + } + + private Map buildPayloadFromFlat(List rois, List binds) { + Map payload = new LinkedHashMap<>(); + List> roiList = new ArrayList<>(); + List> bindList = new ArrayList<>(); + Set cameraIds = new LinkedHashSet<>(); + + for (AiRoi roi : rois) { + cameraIds.add(roi.getCameraId()); + Map roiOut = new LinkedHashMap<>(); + roiOut.put("roi_id", roi.getRoiId()); + roiOut.put("camera_id", roi.getCameraId()); + roiOut.put("roi_type", roi.getRoiType()); + try { + roiOut.put("coordinates", objectMapper.readValue(roi.getCoordinates(), Object.class)); + } catch (Exception e) { + roiOut.put("coordinates", roi.getCoordinates()); + } + roiOut.put("enabled", roi.getEnabled() == 1); + roiOut.put("priority", roi.getPriority()); + roiList.add(roiOut); + } + + for (AiRoiAlgoBind bind : binds) { + Map bindOut = new LinkedHashMap<>(); + bindOut.put("bind_id", bind.getBindId()); + bindOut.put("roi_id", bind.getRoiId()); + bindOut.put("algo_code", bind.getAlgoCode()); + bindOut.put("enabled", bind.getEnabled() == 1); + bindOut.put("priority", bind.getPriority()); + try { + bindOut.put("params", objectMapper.readValue(bind.getParams(), Object.class)); + } catch (Exception e) { + bindOut.put("params", bind.getParams()); + } + AiAlgorithm algo = algorithmMapper.queryByCode(bind.getAlgoCode()); + if (algo != null) { + bindOut.put("algo_name", algo.getAlgoName()); + bindOut.put("target_class", algo.getTargetClass()); + } + bindList.add(bindOut); + } + + payload.put("camera_ids", new ArrayList<>(cameraIds)); + payload.put("rois", roiList); + payload.put("binds", bindList); + + // 构建 cameras 列表(含 rtsp_url),Edge 需要这些信息启动视频流 + // 只保留在 wvp_stream_proxy 中存在且有有效 srcUrl 的摄像头 + List> cameraList = new ArrayList<>(); + Set validCameraIds = new LinkedHashSet<>(); + for (String cameraId : cameraIds) { + String[] parts = cameraId.split("/", 2); + if (parts.length == 2) { + StreamProxy proxy = streamProxyMapper.selectOneByAppAndStream(parts[0], parts[1]); + if (proxy != null && proxy.getSrcUrl() != null && !proxy.getSrcUrl().isEmpty()) { + Map camOut = new LinkedHashMap<>(); + camOut.put("camera_id", cameraId); + camOut.put("enabled", true); + camOut.put("rtsp_url", proxy.getSrcUrl()); + camOut.put("camera_name", parts[0] + "/" + parts[1]); + cameraList.add(camOut); + validCameraIds.add(cameraId); + } else { + log.warn("[AiConfig] 跳过无效摄像头(无stream_proxy记录): {}", cameraId); + } + } else { + log.warn("[AiConfig] 跳过无效摄像头(camera_id格式错误): {}", cameraId); + } + } + payload.put("cameras", cameraList); + + // 过滤掉不存在的摄像头对应的 ROI 和 Bind + if (validCameraIds.size() < cameraIds.size()) { + Set staleCameraIds = new LinkedHashSet<>(cameraIds); + staleCameraIds.removeAll(validCameraIds); + log.info("[AiConfig] 过滤掉已失效的摄像头: {}", staleCameraIds); + + // 收集有效ROI的roi_id,用于过滤bindList + Set validRoiIds = new HashSet<>(); + roiList.removeIf(roi -> { + String camId = (String) roi.get("camera_id"); + boolean valid = validCameraIds.contains(camId); + if (valid) { + validRoiIds.add(roi.get("roi_id")); + } + return !valid; + }); + bindList.removeIf(bind -> !validRoiIds.contains(bind.get("roi_id"))); + + payload.put("camera_ids", new ArrayList<>(validCameraIds)); + payload.put("rois", roiList); + payload.put("binds", bindList); + } + + return payload; + } + + private boolean pushPayloadToLocalEdge(Map payload) { + String url = aiServiceConfig.getUrl() + "/debug/sync"; + String jsonBody = JSON.toJSONString(payload); + log.info("[AiConfig] 推送到Edge: url={}, body_len={}", url, jsonBody.length()); + if (jsonBody.length() <= 1024) { + log.info("[AiConfig] 推送body: {}", jsonBody); + } + + if (aiServiceConfig.isLocalCallEnabled()) { + boolean ok = pushPayloadByLocalScript(url, jsonBody); + log.info("[AiConfig] 本地脚本推送结果: ok={}", ok); + return ok; + } + + RestTemplate restTemplate = new RestTemplate(); + HttpHeaders headers = new HttpHeaders(); + headers.setContentType(MediaType.APPLICATION_JSON); + HttpEntity entity = new HttpEntity<>(jsonBody, headers); + try { + Map resp = restTemplate.postForObject(url, entity, Map.class); + boolean ok = resp != null && Boolean.TRUE.equals(resp.get("ok")); + log.info("[AiConfig] HTTP推送结果: ok={}, url={}, body_len={}", ok, url, jsonBody.length()); + return ok; + } catch (Exception e) { + log.warn("[AiConfig] HTTP推送失败: {}, 尝试本地脚本", e.getMessage()); + boolean ok2 = pushPayloadByLocalScript(url, jsonBody); + log.info("[AiConfig] 本地脚本推送结果: ok={}", ok2); + return ok2; + } + } + + private boolean pushPayloadByLocalScript(String url, String jsonBody) { + try { + Path tmp = Files.createTempFile("edge-sync-", ".json"); + Files.writeString(tmp, jsonBody, StandardCharsets.UTF_8); + + String python = aiServiceConfig.getLocalPython(); + String script = aiServiceConfig.getLocalScriptPath(); + ProcessBuilder pb = new ProcessBuilder(python, script, url, tmp.toString()); + pb.redirectErrorStream(true); + Process p = pb.start(); + + StringBuilder output = new StringBuilder(); + try (BufferedReader reader = new BufferedReader(new InputStreamReader(p.getInputStream(), StandardCharsets.UTF_8))) { + String line; + while ((line = reader.readLine()) != null) { + output.append(line).append("\n"); + } + } + + boolean finished = p.waitFor(aiServiceConfig.getPushTimeout(), TimeUnit.MILLISECONDS); + if (!finished) { + p.destroyForcibly(); + log.warn("[AiConfig] ??????: {} ms", aiServiceConfig.getPushTimeout()); + return false; + } + + int code = p.exitValue(); + log.info("[AiConfig] ???????: {}, ??: {}", code, output.toString().trim()); + return code == 0; + } catch (Exception e) { + log.warn("[AiConfig] ????????: {}", e.getMessage()); + return false; + } + } + +@SuppressWarnings("unchecked") + private boolean pushConfigToLocalEdge(String cameraId, Map config) { + if (!aiServiceConfig.isEnabled()) { + return false; + } + try { + Map payload = new LinkedHashMap<>(); + payload.put("camera_ids", java.util.Collections.singletonList(cameraId)); + + List> rois = new ArrayList<>(); + List> binds = new ArrayList<>(); + Object roisObj = config.get("rois"); + if (roisObj instanceof List) { + for (Object roiItem : (List) roisObj) { + if (!(roiItem instanceof Map)) { + continue; + } + Map roiMap = (Map) roiItem; + Map roiOut = new LinkedHashMap<>(); + roiOut.put("roi_id", roiMap.get("roi_id")); + roiOut.put("camera_id", cameraId); + roiOut.put("roi_type", roiMap.get("roi_type")); + roiOut.put("coordinates", roiMap.get("coordinates")); + roiOut.put("enabled", roiMap.getOrDefault("enabled", true)); + roiOut.put("priority", roiMap.getOrDefault("priority", 0)); + rois.add(roiOut); + + Object algosObj = roiMap.get("algorithms"); + if (algosObj instanceof List) { + for (Object algoItem : (List) algosObj) { + if (!(algoItem instanceof Map)) { + continue; + } + Map algoMap = (Map) algoItem; + Map bindOut = new LinkedHashMap<>(); + bindOut.put("bind_id", algoMap.get("bind_id")); + bindOut.put("roi_id", roiMap.get("roi_id")); + bindOut.put("algo_code", algoMap.get("algo_code")); + bindOut.put("enabled", algoMap.getOrDefault("enabled", true)); + bindOut.put("priority", algoMap.getOrDefault("priority", 0)); + bindOut.put("params", algoMap.get("params")); + bindOut.put("target_class", algoMap.get("target_class")); + binds.add(bindOut); + } + } + } + } + + payload.put("rois", rois); + payload.put("binds", binds); + + // 构建 cameras 列表(含 rtsp_url) + String[] parts = cameraId.split("/", 2); + if (parts.length == 2) { + StreamProxy proxy = streamProxyMapper.selectOneByAppAndStream(parts[0], parts[1]); + if (proxy != null) { + Map camOut = new LinkedHashMap<>(); + camOut.put("camera_id", cameraId); + camOut.put("rtsp_url", proxy.getSrcUrl()); + camOut.put("camera_name", cameraId); + camOut.put("enabled", true); + payload.put("cameras", java.util.Collections.singletonList(camOut)); + } + } + + log.info("[AiConfig] 构建Payload: cameraId={}, rois={}, binds={}", cameraId, rois.size(), binds.size()); + + String url = aiServiceConfig.getUrl() + "/debug/sync"; + String jsonBody = JSON.toJSONString(payload); + log.info("[AiConfig] 发送JSON: url={}, body_len={}", url, jsonBody.length()); + + RestTemplate restTemplate = new RestTemplate(); + HttpHeaders headers = new HttpHeaders(); + headers.setContentType(MediaType.APPLICATION_JSON); + HttpEntity entity = new HttpEntity<>(jsonBody, headers); + Map resp = restTemplate.postForObject(url, entity, Map.class); + boolean ok = resp != null && Boolean.TRUE.equals(resp.get("ok")); + log.info("[AiConfig] 本地 HTTP 同步结果: ok={}, url={}", ok, url); + return ok; + } catch (Exception e) { + log.warn("[AiConfig] 本地 HTTP 同步失败: {}", e.getMessage()); + return false; + } + } } diff --git a/src/main/resources/application-dev.yml b/src/main/resources/application-dev.yml index 90255171b..385477fd7 100644 --- a/src/main/resources/application-dev.yml +++ b/src/main/resources/application-dev.yml @@ -116,11 +116,11 @@ user-settings: ai: service: # FastAPI边缘端地址 - url: http://localhost:8090 + url: http://127.0.0.1:9001 # 推送超时ms push-timeout: 10000 # 暂未对接时设为false - enabled: false + enabled: true mqtt: # MQTT推送开关 enabled: false