From 77594dae3d30e8093af34f882a69cc34bc638bd7 Mon Sep 17 00:00:00 2001 From: lin <648540858@qq.com> Date: Sat, 27 Sep 2025 21:03:20 +0800 Subject: [PATCH] =?UTF-8?q?=E6=94=AF=E6=8C=81=E4=BB=8E=E7=AC=AC=E4=B8=89?= =?UTF-8?q?=E6=96=B9=E5=90=8C=E6=AD=A5=E5=88=86=E7=BB=84=E4=BF=A1=E6=81=AF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../iot/vmp/common/VideoManagerConstants.java | 5 + .../genersoft/iot/vmp/conf/UserSetting.java | 10 + .../vmp/conf/redis/RedisMsgListenConfig.java | 7 + .../genersoft/iot/vmp/gb28181/bean/Group.java | 7 + .../vmp/gb28181/bean/RedisGroupMessage.java | 35 ++- .../iot/vmp/gb28181/dao/GroupMapper.java | 3 + .../vmp/gb28181/service/IGroupService.java | 4 +- .../service/impl/GroupServiceImpl.java | 10 +- ...rviceImplForSy.java => SyServiceImpl.java} | 15 +- .../redisMsg/RedisGroupChangeListener.java | 272 ++++++++++++++++++ .../redisMsg/RedisGroupMsgListener.java | 213 ++++++++++++++ 数据库/2.7.4/初始化-mysql-2.7.4.sql | 1 + .../2.7.4/初始化-postgresql-kingbase-2.7.4.sql | 1 + 数据库/2.7.4/更新-mysql-2.7.4.sql | 12 +- .../2.7.4/更新-postgresql-kingbase-2.7.4.sql | 1 + 15 files changed, 576 insertions(+), 20 deletions(-) rename src/main/java/com/genersoft/iot/vmp/service/impl/{MapServiceImplForSy.java => SyServiceImpl.java} (83%) create mode 100755 src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGroupChangeListener.java create mode 100755 src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGroupMsgListener.java diff --git a/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java b/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java index 6ca7d9611..3c304567a 100644 --- a/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java +++ b/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java @@ -85,6 +85,11 @@ public class VideoManagerConstants { */ public static final String VM_MSG_GROUP_LIST_RESPONSE = "VM_MSG_GROUP_LIST_RESPONSE"; + /** + * 同步三方组织结构回复 + */ + public static final String VM_MSG_GROUP_LIST_CHANGE = "VM_MSG_GROUP_LIST_CHANGE"; + /** * redis 消息通知设备推流到平台 */ diff --git a/src/main/java/com/genersoft/iot/vmp/conf/UserSetting.java b/src/main/java/com/genersoft/iot/vmp/conf/UserSetting.java index 26302e782..77beba74a 100644 --- a/src/main/java/com/genersoft/iot/vmp/conf/UserSetting.java +++ b/src/main/java/com/genersoft/iot/vmp/conf/UserSetting.java @@ -209,4 +209,14 @@ public class UserSetting { */ private boolean disableDateHeader = false; + /** + * 同步业务分组时自动生成分组国标编号的模板,不配置则默认参考当前的sip域信息生成 + */ + private String groupSyncDeviceTemplate; + + /** + * 与第三方进行分组同步时使用别名而不是分组ID, 如果没有设置此项为true,那么分组编号就是必须传递的。如果是设置为true则,自动为别名的分组生成新的编号 + */ + private boolean useAliasForGroupSync = false; + } diff --git a/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisMsgListenConfig.java b/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisMsgListenConfig.java index bc8b5f6a1..bbb94b345 100644 --- a/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisMsgListenConfig.java +++ b/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisMsgListenConfig.java @@ -34,6 +34,11 @@ public class RedisMsgListenConfig { @Autowired private RedisPushStreamListMsgListener pushStreamListMsgListener; + @Autowired + private RedisGroupMsgListener groupMsgListener; + + @Autowired + private RedisGroupChangeListener groupChangeListener; @Autowired private RedisCloseStreamMsgListener redisCloseStreamMsgListener; @@ -64,6 +69,8 @@ public class RedisMsgListenConfig { container.addMessageListener(redisCloseStreamMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_STREAM_PUSH_CLOSE)); container.addMessageListener(redisRpcConfig, new PatternTopic(RedisRpcConfig.REDIS_REQUEST_CHANNEL_KEY)); container.addMessageListener(redisPushStreamCloseResponseListener, new PatternTopic(VideoManagerConstants.VM_MSG_STREAM_PUSH_RESPONSE)); + container.addMessageListener(groupMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_GROUP_LIST_RESPONSE)); + container.addMessageListener(groupChangeListener, new PatternTopic(VideoManagerConstants.VM_MSG_GROUP_LIST_CHANGE)); return container; } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/Group.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/Group.java index e50c1ad50..0d6729d31 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/Group.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/Group.java @@ -58,12 +58,19 @@ public class Group implements Comparable{ */ @Schema(description = "更新时间") private String updateTime; + /** * 行政区划 */ @Schema(description = "行政区划") private String civilCode; + /** + * 别名 + */ + @Schema(description = "别名, 此别名为唯一值,可以对接第三方是存储对方的ID") + private String alias; + public static Group getInstance(DeviceChannel channel) { GbCode gbCode = GbCode.decode(channel.getDeviceId()); if (gbCode == null || (!gbCode.getTypeCode().equals("215") && !gbCode.getTypeCode().equals("216"))) { diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/RedisGroupMessage.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/RedisGroupMessage.java index e4ec0b701..2691dc1b2 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/RedisGroupMessage.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/RedisGroupMessage.java @@ -1,6 +1,5 @@ package com.genersoft.iot.vmp.gb28181.bean; -import com.alibaba.fastjson2.JSON; import lombok.Data; @Data @@ -36,13 +35,33 @@ public class RedisGroupMessage { */ private String parentGAlias; + /** + * 分组所属业务分组国标ID + */ + private String topGroupGbId; - public static void main(String[] args) { - RedisGroupMessage redisGroupMessage = new RedisGroupMessage(); - redisGroupMessage.setGroupAlias("100000001"); - redisGroupMessage.setGroupName("消防大队"); - System.out.println(JSON.toJSONString(redisGroupMessage)); + /** + * 分组所属业务分组别名 + */ + private String topGroupGAlias; + + /** + * 分组变化消息中的消息类型,取值为 add update delete + */ + private String messageType; + + + @Override + public String toString() { + return "RedisGroupMessage{" + + "groupGbId='" + groupGbId + '\'' + + ", groupAlias='" + groupAlias + '\'' + + ", groupName='" + groupName + '\'' + + ", groupCivilCode='" + groupCivilCode + '\'' + + ", parentGroupGbId='" + parentGroupGbId + '\'' + + ", parentGAlias='" + parentGAlias + '\'' + + ", topGroupGbId='" + topGroupGbId + '\'' + + ", topGroupGAlias='" + topGroupGAlias + '\'' + + '}'; } - - } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/dao/GroupMapper.java b/src/main/java/com/genersoft/iot/vmp/gb28181/dao/GroupMapper.java index 5eaf9002f..7d6bb48fc 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/dao/GroupMapper.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/dao/GroupMapper.java @@ -286,4 +286,7 @@ public interface GroupMapper { @Delete("DELETE FROM wvp_platform_group WHERE group_id = #{groupId}") void deletePlatformGroup(@Param("groupId") int groupId); + + @Select("SELECT * from wvp_common_group WHERE alias = #{alias} ") + Group queryGroupByAlias(@Param("alias") String alias); } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/IGroupService.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/IGroupService.java index 79a405434..ad3463c99 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/IGroupService.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/IGroupService.java @@ -17,8 +17,6 @@ public interface IGroupService { List queryForTree(String query, Integer parent, Boolean hasChannel); - void syncFromChannel(); - boolean delete(int id); boolean batchAdd(List groupList); @@ -26,4 +24,6 @@ public interface IGroupService { List getPath(String deviceId, String businessGroup); PageInfo queryList(Integer page, Integer count, String query); + + Group queryGroupByAlias(String groupAlias); } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/GroupServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/GroupServiceImpl.java index 72b6f1f4f..5c9f13f4b 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/GroupServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/GroupServiceImpl.java @@ -186,11 +186,6 @@ public class GroupServiceImpl implements IGroupService { return groupTrees; } - @Override - public void syncFromChannel() { - - } - @Override @Transactional public boolean delete(int id) { @@ -307,4 +302,9 @@ public class GroupServiceImpl implements IGroupService { List all = groupManager.query(query, null, null); return new PageInfo<>(all); } + + @Override + public Group queryGroupByAlias(String groupAlias) { + return groupManager.queryGroupByAlias(groupAlias); + } } diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/MapServiceImplForSy.java b/src/main/java/com/genersoft/iot/vmp/service/impl/SyServiceImpl.java similarity index 83% rename from src/main/java/com/genersoft/iot/vmp/service/impl/MapServiceImplForSy.java rename to src/main/java/com/genersoft/iot/vmp/service/impl/SyServiceImpl.java index 68c8b6f66..aa64cfa6b 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/MapServiceImplForSy.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/SyServiceImpl.java @@ -2,9 +2,12 @@ package com.genersoft.iot.vmp.service.impl; import com.alibaba.fastjson2.JSONArray; import com.alibaba.fastjson2.JSONObject; +import com.genersoft.iot.vmp.common.VideoManagerConstants; import com.genersoft.iot.vmp.service.IMapService; import com.genersoft.iot.vmp.vmanager.bean.MapConfig; +import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.CommandLineRunner; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Service; @@ -12,14 +15,22 @@ import java.util.ArrayList; import java.util.List; /** - * 第三方地图适配 + * 第三方平台适配 */ +@Slf4j @Service -public class MapServiceImplForSy implements IMapService { +public class SyServiceImpl implements IMapService, CommandLineRunner { @Autowired private RedisTemplate redisTemplate; + // 启动后请求组织结构同步 + @Override + public void run(String... args) throws Exception { + String key = VideoManagerConstants.VM_MSG_GROUP_LIST_REQUEST; + log.info("[redis发送通知] 发送 同步组织结构请求 {}", key); + redisTemplate.convertAndSend(key, ""); + } @Override public List getConfig() { diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGroupChangeListener.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGroupChangeListener.java new file mode 100755 index 000000000..f82c80c66 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGroupChangeListener.java @@ -0,0 +1,272 @@ +package com.genersoft.iot.vmp.service.redisMsg; + +import com.alibaba.fastjson2.JSON; +import com.genersoft.iot.vmp.common.VideoManagerConstants; +import com.genersoft.iot.vmp.conf.SipConfig; +import com.genersoft.iot.vmp.conf.UserSetting; +import com.genersoft.iot.vmp.gb28181.bean.Group; +import com.genersoft.iot.vmp.gb28181.bean.RedisGroupMessage; +import com.genersoft.iot.vmp.gb28181.service.IGroupService; +import com.genersoft.iot.vmp.streamPush.service.IStreamPushService; +import com.genersoft.iot.vmp.utils.DateUtil; +import jakarta.annotation.Resource; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.RandomStringUtils; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.data.redis.connection.Message; +import org.springframework.data.redis.connection.MessageListener; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Component; +import org.springframework.util.ObjectUtils; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ConcurrentLinkedQueue; + +/** + * @Auther: JiangFeng + * @Date: 2022/8/16 11:32 + * @Description: 接收redis发送的推流设备列表更新通知 + * 监听: SUBSCRIBE VM_MSG_PUSH_STREAM_LIST_CHANGE + * 发布 PUBLISH VM_MSG_PUSH_STREAM_LIST_CHANGE '[{"app":1000,"stream":10000000,"gbId":"12345678901234567890","name":"A6","status":false},{"app":1000,"stream":10000021,"gbId":"24212345671381000021","name":"终端9273","status":false},{"app":1000,"stream":10000022,"gbId":"24212345671381000022","name":"终端9434","status":true},{"app":1000,"stream":10000025,"gbId":"24212345671381000025","name":"华为M10","status":false},{"app":1000,"stream":10000051,"gbId":"11111111111381111122","name":"终端9720","status":false}]' + */ +@Slf4j +@Component +public class RedisGroupChangeListener implements MessageListener { + + @Resource + private IGroupService groupService; + + @Resource + private IStreamPushService streamPushService; + + @Autowired + private UserSetting userSetting; + + @Autowired + private SipConfig sipConfig; + + private final ConcurrentLinkedQueue taskQueue = new ConcurrentLinkedQueue<>(); + + @Override + public void onMessage(Message message, byte[] bytes) { + log.info("[REDIS: 分组信息更新] key: {}, : {}", VideoManagerConstants.VM_MSG_GROUP_LIST_CHANGE, new String(message.getBody())); + taskQueue.offer(message); + } + + @Scheduled(fixedDelay = 100) + public void executeTaskQueue() { + if (taskQueue.isEmpty()) { + return; + } + List messageDataList = new ArrayList<>(); + int size = taskQueue.size(); + for (int i = 0; i < size; i++) { + Message msg = taskQueue.poll(); + if (msg != null) { + messageDataList.add(msg); + } + } + if (messageDataList.isEmpty()) { + return; + } + for (Message msg : messageDataList) { + try { + List groupMessages = JSON.parseArray(new String(msg.getBody()), RedisGroupMessage.class); + for (int i = 0; i < groupMessages.size(); i++) { + RedisGroupMessage groupMessage = groupMessages.get(i); + log.info("[REDIS消息-分组信息更新] {}", groupMessage.toString()); + switch (groupMessage.getMessageType()){ + case "add": + if (!userSetting.isUseAliasForGroupSync()) { + if (groupMessage.getGroupGbId() == null) { + log.info("[REDIS消息-分组信息新增] 分组编号未设置,{}", groupMessage.toString()); + continue; + } + Group group = groupService.queryGroupByDeviceId(groupMessage.getGroupGbId()); + if (group != null) { + log.info("[REDIS消息-分组信息新增] 失败 {},编号已经存在", groupMessage.getGroupGbId()); + continue; + } + if (ObjectUtils.isEmpty(groupMessage.getGroupName()) + || ObjectUtils.isEmpty(groupMessage.getTopGroupGbId()) ){ + log.info("[REDIS消息-业务分组同步回复] 消息关键字段缺失, {}", groupMessage.toString()); + continue; + } + group = new Group(); + group.setDeviceId(groupMessage.getGroupGbId()); + group.setAlias(groupMessage.getGroupAlias()); + group.setParentDeviceId(groupMessage.getParentGroupGbId()); + group.setBusinessGroup(groupMessage.getTopGroupGbId()); + group.setCreateTime(DateUtil.getNow()); + group.setUpdateTime(DateUtil.getNow()); + groupService.add(group); + + }else { + // 此处使用别名作为判断依据,别名此处常常是分组在第三方系统里的唯一ID + if (groupMessage.getGroupAlias() == null || ObjectUtils.isEmpty(groupMessage.getGroupName()) + || ObjectUtils.isEmpty(groupMessage.getTopGroupGAlias())) { + log.info("[REDIS消息-分组信息新增] 消息关键字段缺失, {}", groupMessage.toString()); + continue; + } + Group group = groupService.queryGroupByAlias(groupMessage.getGroupAlias()); + if (group != null) { + log.info("[REDIS消息-分组信息新增] 失败 {},别名已经存在", groupMessage.getGroupGbId()); + continue; + } + group = new Group(); + boolean isTop = groupMessage.getTopGroupGAlias().equals(groupMessage.getGroupAlias()); + String deviceId = buildGroupDeviceId(isTop); + group.setDeviceId(deviceId); + group.setAlias(groupMessage.getGroupAlias()); + + if (!isTop) { + if (ObjectUtils.isEmpty(groupMessage.getTopGroupGAlias()) || ObjectUtils.isEmpty(groupMessage.getParentGAlias())) { + log.info("[REDIS消息-业务分组同步回复] 消息缺失业务分组别名或者父节点别名, {}", groupMessage.toString()); + continue; + } + + Group topGroup = groupService.queryGroupByAlias(groupMessage.getTopGroupGAlias()); + if (topGroup == null) { + log.info("[REDIS消息-业务分组同步回复] 业务分组信息未入库, {}", groupMessage.toString()); + continue; + } + group.setBusinessGroup(groupMessage.getTopGroupGbId()); + Group parentGroup = groupService.queryGroupByAlias(groupMessage.getParentGAlias()); + if (parentGroup == null) { + log.info("[REDIS消息-业务分组同步回复] 虚拟组织父节点信息未入库, {}", groupMessage.toString()); + continue; + } + group.setParentId(parentGroup.getId()); + group.setParentDeviceId(parentGroup.getDeviceId()); + + } + group.setCreateTime(DateUtil.getNow()); + group.setUpdateTime(DateUtil.getNow()); + groupService.add(group); + } + + break; + case "update": + if (!userSetting.isUseAliasForGroupSync()) { + if (groupMessage.getGroupGbId() == null) { + log.info("[REDIS消息-分组信息更新] 分组编号未设置,{}", groupMessage.toString()); + continue; + } + Group group = groupService.queryGroupByDeviceId(groupMessage.getGroupGbId()); + if (group == null) { + log.info("[REDIS消息-分组信息更新] 失败 {},编号不存在", groupMessage.getGroupGbId()); + continue; + } + group.setDeviceId(groupMessage.getGroupGbId()); + group.setAlias(groupMessage.getGroupAlias()); + group.setParentDeviceId(groupMessage.getParentGroupGbId()); + group.setBusinessGroup(groupMessage.getTopGroupGbId()); + group.setUpdateTime(DateUtil.getNow()); + groupService.update(group); + }else { + // 此处使用别名作为判断依据,别名此处常常是分组在第三方系统里的唯一ID + if (groupMessage.getGroupAlias() == null || ObjectUtils.isEmpty(groupMessage.getGroupName()) + || ObjectUtils.isEmpty(groupMessage.getTopGroupGAlias())) { + log.info("[REDIS消息-业务分组同步回复] 消息关键字段缺失, {}", groupMessage.toString()); + continue; + } + Group group = groupService.queryGroupByAlias(groupMessage.getGroupAlias()); + if (group == null ) { + log.info("[REDIS消息-分组信息更新] 失败 {},别名不存在", groupMessage.getGroupAlias()); + continue; + } + boolean isTop = groupMessage.getTopGroupGAlias().equals(groupMessage.getGroupAlias()); + String deviceId = buildGroupDeviceId(isTop); + group.setDeviceId(deviceId); + group.setAlias(groupMessage.getGroupAlias()); + + if (!isTop) { + if (ObjectUtils.isEmpty(groupMessage.getTopGroupGAlias()) || ObjectUtils.isEmpty(groupMessage.getParentGAlias())) { + log.info("[REDIS消息-业务分组同步回复] 消息缺失业务分组别名或者父节点别名, {}", groupMessage.toString()); + continue; + } + + Group topGroup = groupService.queryGroupByDeviceId(group.getBusinessGroup()); + if (topGroup == null) { + log.info("[REDIS消息-业务分组同步回复] 业务分组不存在, {}", groupMessage.toString()); + continue; + } + group.setBusinessGroup(topGroup.getDeviceId()); + Group parentGroup = groupService.queryGroupByAlias(groupMessage.getParentGAlias()); + if (parentGroup == null) { + log.info("[REDIS消息-业务分组同步回复] 虚拟组织父节点信息未入库, {}", groupMessage.toString()); + continue; + } + group.setParentId(parentGroup.getId()); + group.setParentDeviceId(parentGroup.getDeviceId()); + + } + group.setUpdateTime(DateUtil.getNow()); + groupService.update(group); + } + break; + case "delete": + if (!userSetting.isUseAliasForGroupSync()) { + if (groupMessage.getGroupGbId() == null) { + log.info("[REDIS消息-分组信息删除] 分组编号未设置,{}", groupMessage.toString()); + continue; + } + Group group = groupService.queryGroupByDeviceId(groupMessage.getGroupGbId()); + if (group == null) { + log.info("[REDIS消息-分组信息删除] 失败 {},编号不存在", groupMessage.getGroupGbId()); + continue; + } + groupService.delete(group.getId()); + }else { + // 此处使用别名作为判断依据,别名此处常常是分组在第三方系统里的唯一ID + if (groupMessage.getGroupAlias() == null || ObjectUtils.isEmpty(groupMessage.getGroupName()) + || ObjectUtils.isEmpty(groupMessage.getTopGroupGAlias())) { + log.info("[REDIS消息-业务分组同步回复] 消息关键字段缺失, {}", groupMessage.toString()); + continue; + } + Group group = groupService.queryGroupByAlias(groupMessage.getGroupAlias()); + if (group == null) { + log.info("[REDIS消息-分组信息更新] 失败 {},别名不存在", groupMessage.getGroupAlias()); + continue; + } + groupService.delete(group.getId()); + } + break; + default: + log.info("[REDIS消息-分组信息更新] 未识别的消息类型 {},目前支持的消息类型为 add、update、delete", groupMessage.getMessageType()); + } + } + + } catch (Exception e) { + log.warn("[REDIS消息-业务分组同步回复] 发现未处理的异常, \r\n{}", new String(msg.getBody())); + log.error("[REDIS消息-业务分组同步回复] 异常内容: ", e); + } + } + + } + + /** + * 生成分组国标编号 + */ + private String buildGroupDeviceId(boolean isTop) { + try { + String deviceTemplate = userSetting.getGroupSyncDeviceTemplate(); + if (ObjectUtils.isEmpty(deviceTemplate) || !deviceTemplate.contains("%s")) { + String domain = sipConfig.getDomain(); + if (domain.length() != 10) { + domain = sipConfig.getId().substring(0, 10); + } + deviceTemplate = domain + "%s0%s"; + } + String codeType = "216"; + if (isTop) { + codeType = "215"; + } + return String.format(deviceTemplate, codeType, RandomStringUtils.secureStrong().next(6, false, true)); + }catch (Exception e) { + log.error("[REDIS消息-业务分组同步回复] 构建新的分组编号失败", e); + return null; + } + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGroupMsgListener.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGroupMsgListener.java new file mode 100755 index 000000000..c5ff20dda --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGroupMsgListener.java @@ -0,0 +1,213 @@ +package com.genersoft.iot.vmp.service.redisMsg; + +import com.alibaba.fastjson2.JSON; +import com.genersoft.iot.vmp.common.VideoManagerConstants; +import com.genersoft.iot.vmp.conf.SipConfig; +import com.genersoft.iot.vmp.conf.UserSetting; +import com.genersoft.iot.vmp.gb28181.bean.Group; +import com.genersoft.iot.vmp.gb28181.bean.RedisGroupMessage; +import com.genersoft.iot.vmp.gb28181.service.IGroupService; +import com.genersoft.iot.vmp.streamPush.service.IStreamPushService; +import com.genersoft.iot.vmp.utils.DateUtil; +import jakarta.annotation.Resource; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.RandomStringUtils; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.data.redis.connection.Message; +import org.springframework.data.redis.connection.MessageListener; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Component; +import org.springframework.util.ObjectUtils; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ConcurrentLinkedQueue; + +/** + * @Auther: JiangFeng + * @Date: 2022/8/16 11:32 + * @Description: 接收redis发送的推流设备列表更新通知 + * 监听: SUBSCRIBE VM_MSG_PUSH_STREAM_LIST_CHANGE + * 发布 PUBLISH VM_MSG_PUSH_STREAM_LIST_CHANGE '[{"app":1000,"stream":10000000,"gbId":"12345678901234567890","name":"A6","status":false},{"app":1000,"stream":10000021,"gbId":"24212345671381000021","name":"终端9273","status":false},{"app":1000,"stream":10000022,"gbId":"24212345671381000022","name":"终端9434","status":true},{"app":1000,"stream":10000025,"gbId":"24212345671381000025","name":"华为M10","status":false},{"app":1000,"stream":10000051,"gbId":"11111111111381111122","name":"终端9720","status":false}]' + */ +@Slf4j +@Component +public class RedisGroupMsgListener implements MessageListener { + + @Resource + private IGroupService groupService; + + @Resource + private IStreamPushService streamPushService; + + @Autowired + private UserSetting userSetting; + + @Autowired + private SipConfig sipConfig; + + private final ConcurrentLinkedQueue taskQueue = new ConcurrentLinkedQueue<>(); + + @Override + public void onMessage(Message message, byte[] bytes) { + log.info("[REDIS: 业务分组同步回复] key: {}, : {}", VideoManagerConstants.VM_MSG_GROUP_LIST_RESPONSE, new String(message.getBody())); + taskQueue.offer(message); + } + + @Scheduled(fixedDelay = 100) + public void executeTaskQueue() { + if (taskQueue.isEmpty()) { + return; + } + List messageDataList = new ArrayList<>(); + int size = taskQueue.size(); + for (int i = 0; i < size; i++) { + Message msg = taskQueue.poll(); + if (msg != null) { + messageDataList.add(msg); + } + } + if (messageDataList.isEmpty()) { + return; + } + for (Message msg : messageDataList) { + try { + List groupMessages = JSON.parseArray(new String(msg.getBody()), RedisGroupMessage.class); + for (int i = 0; i < groupMessages.size(); i++) { + RedisGroupMessage groupMessage = groupMessages.get(i); + log.info("[REDIS消息-业务分组同步回复] {}", groupMessage.toString()); + if (!userSetting.isUseAliasForGroupSync()) { + if (groupMessage.getGroupGbId() == null) { + log.info("[REDIS消息-业务分组同步回复] 分组编号未设置,{}", groupMessage.toString()); + continue; + } + Group group = groupService.queryGroupByDeviceId(groupMessage.getGroupGbId()); + if (group == null ) { + if (ObjectUtils.isEmpty(groupMessage.getGroupName()) + || ObjectUtils.isEmpty(groupMessage.getTopGroupGbId()) ){ + log.info("[REDIS消息-业务分组同步回复] 消息关键字段缺失, {}", groupMessage.toString()); + continue; + } + group = new Group(); + group.setDeviceId(groupMessage.getGroupGbId()); + group.setAlias(groupMessage.getGroupAlias()); + group.setParentDeviceId(groupMessage.getParentGroupGbId()); + group.setBusinessGroup(groupMessage.getTopGroupGbId()); + group.setCreateTime(DateUtil.getNow()); + group.setUpdateTime(DateUtil.getNow()); + groupService.add(group); + + }else { + group.setDeviceId(groupMessage.getGroupGbId()); + group.setAlias(groupMessage.getGroupAlias()); + group.setParentDeviceId(groupMessage.getParentGroupGbId()); + group.setBusinessGroup(groupMessage.getTopGroupGbId()); + group.setUpdateTime(DateUtil.getNow()); + groupService.update(group); + } + }else { + log.info("[REDIS消息-业务分组同步回复] 使用别名作为唯一ID解析分组消息"); + // 此处使用别名作为判断依据,别名此处常常是分组在第三方系统里的唯一ID + if (groupMessage.getGroupAlias() == null || ObjectUtils.isEmpty(groupMessage.getGroupName()) + || ObjectUtils.isEmpty(groupMessage.getTopGroupGAlias())) { + log.info("[REDIS消息-业务分组同步回复] 消息关键字段缺失, {}", groupMessage.toString()); + continue; + } + Group group = groupService.queryGroupByAlias(groupMessage.getGroupAlias()); + if (group == null ) { + group = new Group(); + boolean isTop = groupMessage.getTopGroupGAlias().equals(groupMessage.getGroupAlias()); + String deviceId = buildGroupDeviceId(isTop); + group.setDeviceId(deviceId); + group.setAlias(groupMessage.getGroupAlias()); + + if (!isTop) { + if (ObjectUtils.isEmpty(groupMessage.getTopGroupGAlias()) || ObjectUtils.isEmpty(groupMessage.getParentGAlias())) { + log.info("[REDIS消息-业务分组同步回复] 消息缺失业务分组别名或者父节点别名, {}", groupMessage.toString()); + continue; + } + + Group topGroup = groupService.queryGroupByAlias(groupMessage.getTopGroupGAlias()); + if (topGroup == null) { + log.info("[REDIS消息-业务分组同步回复] 业务分组信息未入库, {}", groupMessage.toString()); + continue; + } + group.setBusinessGroup(groupMessage.getTopGroupGbId()); + Group parentGroup = groupService.queryGroupByAlias(groupMessage.getParentGAlias()); + if (parentGroup == null) { + log.info("[REDIS消息-业务分组同步回复] 虚拟组织父节点信息未入库, {}", groupMessage.toString()); + continue; + } + group.setParentId(parentGroup.getId()); + group.setParentDeviceId(parentGroup.getDeviceId()); + + } + group.setCreateTime(DateUtil.getNow()); + group.setUpdateTime(DateUtil.getNow()); + groupService.add(group); + + }else { + boolean isTop = groupMessage.getTopGroupGAlias().equals(groupMessage.getGroupAlias()); + String deviceId = buildGroupDeviceId(isTop); + group.setDeviceId(deviceId); + group.setAlias(groupMessage.getGroupAlias()); + + if (!isTop) { + if (ObjectUtils.isEmpty(groupMessage.getTopGroupGAlias()) || ObjectUtils.isEmpty(groupMessage.getParentGAlias())) { + log.info("[REDIS消息-业务分组同步回复] 消息缺失业务分组别名或者父节点别名, {}", groupMessage.toString()); + continue; + } + + Group topGroup = groupService.queryGroupByDeviceId(group.getBusinessGroup()); + if (topGroup == null) { + log.info("[REDIS消息-业务分组同步回复] 业务分组不存在, {}", groupMessage.toString()); + continue; + } + group.setBusinessGroup(topGroup.getDeviceId()); + Group parentGroup = groupService.queryGroupByAlias(groupMessage.getParentGAlias()); + if (parentGroup == null) { + log.info("[REDIS消息-业务分组同步回复] 虚拟组织父节点信息未入库, {}", groupMessage.toString()); + continue; + } + group.setParentId(parentGroup.getId()); + group.setParentDeviceId(parentGroup.getDeviceId()); + + } + group.setUpdateTime(DateUtil.getNow()); + groupService.update(group); + } + } + } + + } catch (Exception e) { + log.warn("[REDIS消息-业务分组同步回复] 发现未处理的异常, \r\n{}", new String(msg.getBody())); + log.error("[REDIS消息-业务分组同步回复] 异常内容: ", e); + } + } + + } + + /** + * 生成分组国标编号 + */ + private String buildGroupDeviceId(boolean isTop) { + try { + String deviceTemplate = userSetting.getGroupSyncDeviceTemplate(); + if (ObjectUtils.isEmpty(deviceTemplate) || !deviceTemplate.contains("%s")) { + String domain = sipConfig.getDomain(); + if (domain.length() != 10) { + domain = sipConfig.getId().substring(0, 10); + } + deviceTemplate = domain + "%s0%s"; + } + String codeType = "216"; + if (isTop) { + codeType = "215"; + } + return String.format(deviceTemplate, codeType, RandomStringUtils.secureStrong().next(6, false, true)); + }catch (Exception e) { + log.error("[REDIS消息-业务分组同步回复] 构建新的分组编号失败", e); + return null; + } + } +} diff --git a/数据库/2.7.4/初始化-mysql-2.7.4.sql b/数据库/2.7.4/初始化-mysql-2.7.4.sql index 5e45b6ee1..c9e42d2ad 100644 --- a/数据库/2.7.4/初始化-mysql-2.7.4.sql +++ b/数据库/2.7.4/初始化-mysql-2.7.4.sql @@ -431,6 +431,7 @@ create table IF NOT EXISTS wvp_common_group create_time varchar(50) NOT NULL, update_time varchar(50) NOT NULL, civil_code varchar(50) default null, + alias varchar(255) default null, constraint uk_common_group_device_platform unique (device_id) ); diff --git a/数据库/2.7.4/初始化-postgresql-kingbase-2.7.4.sql b/数据库/2.7.4/初始化-postgresql-kingbase-2.7.4.sql index 9d4b87ed6..2ad8d746e 100644 --- a/数据库/2.7.4/初始化-postgresql-kingbase-2.7.4.sql +++ b/数据库/2.7.4/初始化-postgresql-kingbase-2.7.4.sql @@ -432,6 +432,7 @@ create table IF NOT EXISTS wvp_common_group create_time varchar(50) NOT NULL, update_time varchar(50) NOT NULL, civil_code varchar(50) default null, + alias varchar(255) default null, constraint uk_common_group_device_platform unique (device_id) ); diff --git a/数据库/2.7.4/更新-mysql-2.7.4.sql b/数据库/2.7.4/更新-mysql-2.7.4.sql index cac775d8d..8d5b51240 100644 --- a/数据库/2.7.4/更新-mysql-2.7.4.sql +++ b/数据库/2.7.4/更新-mysql-2.7.4.sql @@ -87,9 +87,15 @@ BEGIN IF NOT EXISTS (SELECT column_name FROM information_schema.columns WHERE TABLE_SCHEMA = (SELECT DATABASE()) and table_name = 'wvp_device_channel' and column_name = 'map_level') - THEN - ALTER TABLE wvp_device_channel ADD map_level integer default 0; - END IF; + THEN + ALTER TABLE wvp_device_channel ADD map_level integer default 0; + END IF; + + IF NOT EXISTS (SELECT column_name FROM information_schema.columns + WHERE TABLE_SCHEMA = (SELECT DATABASE()) and table_name = 'wvp_common_group' and column_name = 'alias') + THEN + ALTER TABLE wvp_common_group ADD alias varchar(255) default null; + END IF; END; // call wvp_20250924(); DROP PROCEDURE wvp_20250924; diff --git a/数据库/2.7.4/更新-postgresql-kingbase-2.7.4.sql b/数据库/2.7.4/更新-postgresql-kingbase-2.7.4.sql index 4a7684c45..6febd35e4 100644 --- a/数据库/2.7.4/更新-postgresql-kingbase-2.7.4.sql +++ b/数据库/2.7.4/更新-postgresql-kingbase-2.7.4.sql @@ -40,3 +40,4 @@ ALTER table wvp_media_server ADD COLUMN IF NOT EXISTS mp4_ssl_port integer; ALTER table wvp_device_channel ADD COLUMN IF NOT EXISTS enable_broadcast integer default 0; ALTER table wvp_device_channel ADD COLUMN IF NOT EXISTS map_level integer default 0; +ALTER table wvp_common_group ADD COLUMN IF NOT EXISTS alias varchar(255) default null;