重新分组redis同步功能

This commit is contained in:
lin
2025-11-14 19:02:34 +08:00
parent 30aa3192d0
commit 44cdcb8352
6 changed files with 166 additions and 238 deletions

View File

@@ -5,10 +5,6 @@ import lombok.Data;
@Data
public class RedisGroupMessage {
/**
* 分组国标ID
*/
private String groupGbId;
/**
* 分组别名
@@ -20,26 +16,12 @@ public class RedisGroupMessage {
*/
private String groupName;
/**
* 分组所属的行政区划
*/
private String groupCivilCode;
/**
* 分组所属父分组国标ID
*/
private String parentGroupGbId;
/**
* 分组所属父分组别名
*/
private String parentGAlias;
/**
* 分组所属业务分组国标ID
*/
private String topGroupGbId;
/**
* 分组所属业务分组别名
*/
@@ -54,13 +36,9 @@ public class RedisGroupMessage {
@Override
public String toString() {
return "RedisGroupMessage{" +
"groupGbId='" + groupGbId + '\'' +
", groupAlias='" + groupAlias + '\'' +
", groupName='" + groupName + '\'' +
", groupCivilCode='" + groupCivilCode + '\'' +
", parentGroupGbId='" + parentGroupGbId + '\'' +
", parentGAlias='" + parentGAlias + '\'' +
", topGroupGbId='" + topGroupGbId + '\'' +
", topGroupGAlias='" + topGroupGAlias + '\'' +
'}';
}

View File

@@ -320,4 +320,19 @@ public interface GroupMapper {
" GROUP BY coalesce(wdc.gb_parent_id, wdc.parent_id)" +
"</script>")
List<CameraCount> queryCountWithChild(List<CameraGroup> groupList);
@Select("SELECT * from wvp_common_group where alias is not null")
@MapKey("alias")
Map<String, Group> queryGroupByAliasMap();
@Delete("DELETE FROM wvp_common_group where alias is not null")
void deleteHasAlias();
@Update(" UPDATE wvp_common_group g1" +
" JOIN wvp_common_group g2" +
" ON g1.parent_device_id = g2.device_id" +
" SET g1.parent_id = g2.id" +
" WHERE g1.alias IS NOT NULL;")
void fixParentId();
}

View File

@@ -4,7 +4,9 @@ import com.genersoft.iot.vmp.gb28181.bean.Group;
import com.genersoft.iot.vmp.gb28181.bean.GroupTree;
import com.github.pagehelper.PageInfo;
import java.util.Collection;
import java.util.List;
import java.util.Map;
public interface IGroupService {
@@ -31,4 +33,7 @@ public interface IGroupService {
void sync();
Map<String, Group> queryGroupByAliasMap();
void saveByAlias(Collection<Group> groups);
}

View File

@@ -331,4 +331,20 @@ public class GroupServiceImpl implements IGroupService, CommandLineRunner {
throw new ControllerException(ErrorCode.ERROR100.getCode(), "同步失败: " + e.getMessage());
}
}
@Override
public Map<String, Group> queryGroupByAliasMap() {
return groupManager.queryGroupByAliasMap();
}
@Override
@Transactional
public void saveByAlias(Collection<Group> groups) {
// 清空别名数据
groupManager.deleteHasAlias();
// 写入新数据
groupManager.batchAdd(new ArrayList<>(groups));
// 修复数据丢失的parentID
groupManager.fixParentId();
}
}

View File

@@ -76,154 +76,95 @@ public class RedisGroupChangeListener implements MessageListener {
for (int i = 0; i < groupMessages.size(); i++) {
RedisGroupMessage groupMessage = groupMessages.get(i);
log.info("[REDIS消息-分组信息更新] {}", groupMessage.toString());
Group group = groupService.queryGroupByAlias(groupMessage.getGroupAlias());
switch (groupMessage.getMessageType()){
case "add":
if (!userSetting.isUseAliasForGroupSync()) {
if (groupMessage.getGroupGbId() == null) {
log.info("[REDIS消息-分组信息新增] 分组编号未设置,{}", groupMessage.toString());
continue;
}
Group group = groupService.queryGroupByDeviceId(groupMessage.getGroupGbId());
if (group != null) {
log.info("[REDIS消息-分组信息新增] 失败 {},编号已经存在", groupMessage.getGroupGbId());
continue;
}
if (ObjectUtils.isEmpty(groupMessage.getGroupName())
|| ObjectUtils.isEmpty(groupMessage.getTopGroupGbId()) ){
log.info("[REDIS消息-分组信息新增] 消息关键字段缺失, {}", groupMessage.toString());
continue;
}
group = new Group();
group.setDeviceId(groupMessage.getGroupGbId());
group.setAlias(groupMessage.getGroupAlias());
group.setParentDeviceId(groupMessage.getParentGroupGbId());
group.setBusinessGroup(groupMessage.getTopGroupGbId());
group.setCreateTime(DateUtil.getNow());
group.setUpdateTime(DateUtil.getNow());
groupService.add(group);
}else {
// 此处使用别名作为判断依据别名此处常常是分组在第三方系统里的唯一ID
if (groupMessage.getGroupAlias() == null || ObjectUtils.isEmpty(groupMessage.getGroupName())
|| ObjectUtils.isEmpty(groupMessage.getTopGroupGAlias())) {
log.info("[REDIS消息-分组信息新增] 消息关键字段缺失, {}", groupMessage.toString());
continue;
}
Group group = groupService.queryGroupByAlias(groupMessage.getGroupAlias());
if (group != null) {
log.info("[REDIS消息-分组信息新增] 失败 {},别名已经存在", groupMessage.getGroupGbId());
continue;
}
group = new Group();
boolean isTop = groupMessage.getTopGroupGAlias().equals(groupMessage.getGroupAlias());
String deviceId = buildGroupDeviceId(isTop);
group.setDeviceId(deviceId);
group.setAlias(groupMessage.getGroupAlias());
group.setName(groupMessage.getGroupName());
if (!isTop) {
if (ObjectUtils.isEmpty(groupMessage.getTopGroupGAlias()) ) {
log.info("[REDIS消息-分组信息新增] 消息缺失业务分组别名或者父节点别名, {}", groupMessage.toString());
continue;
}
Group topGroup = groupService.queryGroupByAlias(groupMessage.getTopGroupGAlias());
if (topGroup == null) {
log.info("[REDIS消息-分组信息新增] 业务分组信息未入库, {}", groupMessage.toString());
continue;
}
group.setBusinessGroup(topGroup.getDeviceId());
group.setParentId(topGroup.getId());
}
if (groupMessage.getParentGAlias() != null) {
Group parentGroup = groupService.queryGroupByAlias(groupMessage.getParentGAlias());
if (parentGroup == null) {
log.info("[REDIS消息-分组信息新增] 虚拟组织父节点信息未入库, {}", groupMessage.toString());
continue;
}
group.setParentId(parentGroup.getId());
group.setParentDeviceId(parentGroup.getDeviceId());
}
group.setCreateTime(DateUtil.getNow());
group.setUpdateTime(DateUtil.getNow());
groupService.add(group);
// 此处使用别名作为判断依据别名此处常常是分组在第三方系统里的唯一ID
if (groupMessage.getGroupAlias() == null || ObjectUtils.isEmpty(groupMessage.getGroupName())
|| ObjectUtils.isEmpty(groupMessage.getTopGroupGAlias())) {
log.info("[REDIS消息-分组信息新增] 消息关键字段缺失, {}", groupMessage.toString());
continue;
}
if (group != null) {
log.info("[REDIS消息-分组信息新增] 失败 {},别名已经存在", groupMessage.getGroupAlias());
continue;
}
group = new Group();
boolean isTop = groupMessage.getTopGroupGAlias().equals(groupMessage.getGroupAlias());
String deviceId = buildGroupDeviceId(isTop);
group.setDeviceId(deviceId);
group.setAlias(groupMessage.getGroupAlias());
group.setName(groupMessage.getGroupName());
if (!isTop) {
if (ObjectUtils.isEmpty(groupMessage.getTopGroupGAlias()) ) {
log.info("[REDIS消息-分组信息新增] 消息缺失业务分组别名或者父节点别名, {}", groupMessage.toString());
continue;
}
Group topGroup = groupService.queryGroupByAlias(groupMessage.getTopGroupGAlias());
if (topGroup == null) {
log.info("[REDIS消息-分组信息新增] 业务分组信息未入库, {}", groupMessage.toString());
continue;
}
group.setBusinessGroup(topGroup.getDeviceId());
group.setParentId(topGroup.getId());
}
if (groupMessage.getParentGAlias() != null) {
Group parentGroup = groupService.queryGroupByAlias(groupMessage.getParentGAlias());
if (parentGroup == null) {
log.info("[REDIS消息-分组信息新增] 虚拟组织父节点信息未入库, {}", groupMessage.toString());
continue;
}
group.setParentId(parentGroup.getId());
group.setParentDeviceId(parentGroup.getDeviceId());
}
group.setCreateTime(DateUtil.getNow());
group.setUpdateTime(DateUtil.getNow());
groupService.add(group);
break;
case "update":
if (!userSetting.isUseAliasForGroupSync()) {
if (groupMessage.getGroupGbId() == null) {
log.info("[REDIS消息-分组信息更新] 分组编号未设置,{}", groupMessage.toString());
continue;
}
Group group = groupService.queryGroupByDeviceId(groupMessage.getGroupGbId());
if (group == null) {
log.info("[REDIS消息-分组信息更新] 失败 {},编号不存在", groupMessage.getGroupGbId());
continue;
}
group.setDeviceId(groupMessage.getGroupGbId());
group.setAlias(groupMessage.getGroupAlias());
group.setParentDeviceId(groupMessage.getParentGroupGbId());
group.setBusinessGroup(groupMessage.getTopGroupGbId());
group.setUpdateTime(DateUtil.getNow());
groupService.update(group);
}else {
// 此处使用别名作为判断依据别名此处常常是分组在第三方系统里的唯一ID
if (groupMessage.getGroupAlias() == null) {
log.info("[REDIS消息-分组信息更新] 消息关键字段缺失, {}", groupMessage.toString());
continue;
}
Group group = groupService.queryGroupByAlias(groupMessage.getGroupAlias());
if (group == null ) {
log.info("[REDIS消息-分组信息更新] 失败 {},别名不存在", groupMessage.getGroupAlias());
continue;
}
group.setName(groupMessage.getGroupName());
group.setUpdateTime(DateUtil.getNow());
if (groupMessage.getParentGAlias() != null) {
Group parentGroup = groupService.queryGroupByAlias(groupMessage.getParentGAlias());
if (parentGroup == null) {
log.info("[REDIS消息-分组信息更新] 虚拟组织父节点信息未入库, {}", groupMessage.toString());
continue;
}
group.setParentId(parentGroup.getId());
group.setParentDeviceId(parentGroup.getDeviceId());
}else {
Group businessGroup = groupService.queryGroupByDeviceId(group.getBusinessGroup());
if (businessGroup == null ) {
log.info("[REDIS消息-分组信息更新] 失败 {},业务分组不存在", groupMessage.getGroupAlias());
continue;
}
group.setParentId(businessGroup.getId());
group.setParentDeviceId(null);
}
groupService.update(group);
// 此处使用别名作为判断依据别名此处常常是分组在第三方系统里的唯一ID
if (groupMessage.getGroupAlias() == null) {
log.info("[REDIS消息-分组信息更新] 消息关键字段缺失, {}", groupMessage.toString());
continue;
}
if (group == null ) {
log.info("[REDIS消息-分组信息更新] 失败 {},别名不存在", groupMessage.getGroupAlias());
continue;
}
group.setName(groupMessage.getGroupName());
group.setUpdateTime(DateUtil.getNow());
if (groupMessage.getParentGAlias() != null) {
Group parentGroup = groupService.queryGroupByAlias(groupMessage.getParentGAlias());
if (parentGroup == null) {
log.info("[REDIS消息-分组信息更新] 虚拟组织父节点信息未入库, {}", groupMessage.toString());
continue;
}
group.setParentId(parentGroup.getId());
group.setParentDeviceId(parentGroup.getDeviceId());
}else {
Group businessGroup = groupService.queryGroupByDeviceId(group.getBusinessGroup());
if (businessGroup == null ) {
log.info("[REDIS消息-分组信息更新] 失败 {},业务分组不存在", groupMessage.getGroupAlias());
continue;
}
group.setParentId(businessGroup.getId());
group.setParentDeviceId(null);
}
groupService.update(group);
break;
case "delete":
if (!userSetting.isUseAliasForGroupSync()) {
if (groupMessage.getGroupGbId() == null) {
log.info("[REDIS消息-分组信息删除] 分组编号未设置,{}", groupMessage.toString());
continue;
}
Group group = groupService.queryGroupByDeviceId(groupMessage.getGroupGbId());
if (group == null) {
log.info("[REDIS消息-分组信息删除] 失败 {},编号不存在", groupMessage.getGroupGbId());
continue;
}
groupService.delete(group.getId());
}else {
// 此处使用别名作为判断依据别名此处常常是分组在第三方系统里的唯一ID
if (groupMessage.getGroupAlias() == null) {
log.info("[REDIS消息-分组信息删除] 消息关键字段缺失, {}", groupMessage.toString());
continue;
}
Group group = groupService.queryGroupByAlias(groupMessage.getGroupAlias());
if (group == null) {
log.info("[REDIS消息-分组信息删除] 失败 {},别名不存在", groupMessage.getGroupAlias());
continue;
}
groupService.delete(group.getId());
// 此处使用别名作为判断依据别名此处常常是分组在第三方系统里的唯一ID
if (groupMessage.getGroupAlias() == null) {
log.info("[REDIS消息-分组信息删除] 消息关键字段缺失, {}", groupMessage.toString());
continue;
}
if (group == null) {
log.info("[REDIS消息-分组信息删除] 失败 {},别名不存在", groupMessage.getGroupAlias());
continue;
}
groupService.delete(group.getId());
break;
default:
log.info("[REDIS消息-分组信息改变] 未识别的消息类型 {},目前支持的消息类型为 add、update、delete", groupMessage.getMessageType());

View File

@@ -21,8 +21,7 @@ import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.springframework.util.ObjectUtils;
import java.util.ArrayList;
import java.util.List;
import java.util.*;
import java.util.concurrent.ConcurrentLinkedQueue;
/**
@@ -74,101 +73,73 @@ public class RedisGroupMsgListener implements MessageListener {
if (messageDataList.isEmpty()) {
return;
}
if (userSetting.isUseAliasForGroupSync()) {
log.info("[REDIS消息-业务分组同步回复] 使用别名作为唯一ID解析分组消息");
}
// 按照别名获取所有业务分组
Map<String, Group> aliasGroupMap = groupService.queryGroupByAliasMap();
Map<String, Group> aliasGroupToSave = new LinkedHashMap<>();
for (Message msg : messageDataList) {
try {
List<RedisGroupMessage> groupMessages = JSON.parseArray(new String(msg.getBody()), RedisGroupMessage.class);
for (int i = 0; i < groupMessages.size(); i++) {
RedisGroupMessage groupMessage = groupMessages.get(i);
log.info("[REDIS消息-业务分组同步回复] 处理数据: {}", groupMessage.toString());
if (!userSetting.isUseAliasForGroupSync()) {
if (groupMessage.getGroupGbId() == null) {
log.warn("[REDIS消息-业务分组同步回复] 分组编号未设置,{}", groupMessage.toString());
// 此处使用别名作为判断依据别名此处常常是分组在第三方系统里的唯一ID
if (groupMessage.getGroupAlias() == null || ObjectUtils.isEmpty(groupMessage.getGroupName())
|| ObjectUtils.isEmpty(groupMessage.getTopGroupGAlias())) {
log.info("[REDIS消息-业务分组同步回复] 消息关键字段缺失, {}", groupMessage.toString());
continue;
}
boolean isTop = groupMessage.getTopGroupGAlias().equals(groupMessage.getGroupAlias());
Group group = aliasGroupMap.get(groupMessage.getGroupAlias());
if (group == null ) {
group = new Group();
String deviceId = buildGroupDeviceId(isTop);
group.setDeviceId(deviceId);
group.setAlias(groupMessage.getGroupAlias());
group.setName(groupMessage.getGroupName());
group.setCreateTime(DateUtil.getNow());
}
if (!isTop) {
if (ObjectUtils.isEmpty(groupMessage.getTopGroupGAlias())) {
log.info("[REDIS消息-业务分组同步回复] 消息缺失业务分组别名, {}", groupMessage.toString());
continue;
}
Group group = groupService.queryGroupByDeviceId(groupMessage.getGroupGbId());
if (group == null ) {
if (ObjectUtils.isEmpty(groupMessage.getGroupName())
|| ObjectUtils.isEmpty(groupMessage.getTopGroupGbId()) ){
log.info("[REDIS消息-业务分组同步回复] 消息关键字段缺失, {}", groupMessage.toString());
continue;
}
group = new Group();
group.setDeviceId(groupMessage.getGroupGbId());
group.setAlias(groupMessage.getGroupAlias());
group.setParentDeviceId(groupMessage.getParentGroupGbId());
group.setBusinessGroup(groupMessage.getTopGroupGbId());
group.setCreateTime(DateUtil.getNow());
group.setUpdateTime(DateUtil.getNow());
groupService.add(group);
}else {
group.setDeviceId(groupMessage.getGroupGbId());
group.setAlias(groupMessage.getGroupAlias());
group.setParentDeviceId(groupMessage.getParentGroupGbId());
group.setBusinessGroup(groupMessage.getTopGroupGbId());
group.setUpdateTime(DateUtil.getNow());
groupService.update(group);
Group topGroup = aliasGroupMap.get(groupMessage.getTopGroupGAlias());
if (topGroup == null) {
topGroup = aliasGroupToSave.get(groupMessage.getTopGroupGAlias());
}
}else {
// 此处使用别名作为判断依据别名此处常常是分组在第三方系统里的唯一ID
if (groupMessage.getGroupAlias() == null || ObjectUtils.isEmpty(groupMessage.getGroupName())
|| ObjectUtils.isEmpty(groupMessage.getTopGroupGAlias())) {
log.info("[REDIS消息-业务分组同步回复] 消息关键字段缺失, {}", groupMessage.toString());
if (topGroup == null) {
log.info("[REDIS消息-业务分组同步回复] 业务分组信息未发送或者未首先发送, {}", groupMessage.toString());
continue;
}
boolean isTop = groupMessage.getTopGroupGAlias().equals(groupMessage.getGroupAlias());
Group group = groupService.queryGroupByAlias(groupMessage.getGroupAlias());
if (group == null ) {
group = new Group();
String deviceId = buildGroupDeviceId(isTop);
group.setDeviceId(deviceId);
group.setAlias(groupMessage.getGroupAlias());
group.setName(groupMessage.getGroupName());
group.setCreateTime(DateUtil.getNow());
}
if (!isTop) {
if (ObjectUtils.isEmpty(groupMessage.getTopGroupGAlias())) {
log.info("[REDIS消息-业务分组同步回复] 消息缺失业务分组别名, {}", groupMessage.toString());
group.setBusinessGroup(topGroup.getDeviceId());
if (groupMessage.getParentGAlias() != null) {
Group parentGroup = aliasGroupMap.get(groupMessage.getParentGAlias());
if (parentGroup == null) {
parentGroup = aliasGroupToSave.get(groupMessage.getParentGAlias());
}
if (parentGroup == null) {
log.info("[REDIS消息-业务分组同步回复] 虚拟组织父节点未发送或者未首先发送, {}", groupMessage.toString());
continue;
}
Group topGroup = groupService.queryGroupByAlias(groupMessage.getTopGroupGAlias());
if (topGroup == null) {
log.info("[REDIS消息-业务分组同步回复] 业务分组信息未入库, {}", groupMessage.toString());
continue;
}
group.setBusinessGroup(topGroup.getDeviceId());
if (groupMessage.getParentGAlias() != null) {
Group parentGroup = groupService.queryGroupByAlias(groupMessage.getParentGAlias());
if (parentGroup == null) {
log.info("[REDIS消息-业务分组同步回复] 虚拟组织父节点信息未入库, {}", groupMessage.toString());
continue;
}
group.setParentId(parentGroup.getId());
group.setParentDeviceId(parentGroup.getDeviceId());
}else {
group.setParentId(topGroup.getId());
group.setParentDeviceId(null);
}
group.setParentId(null);
group.setParentDeviceId(parentGroup.getDeviceId());
}else {
group.setParentId(null);
group.setParentDeviceId(null);
}
group.setUpdateTime(DateUtil.getNow());
if (group.getId() > 0) {
log.info("[REDIS消息-业务分组同步回复] 更新入库, {}", JSON.toJSONString(group));
groupService.update(group);
}else {
log.info("[REDIS消息-业务分组同步回复] 新增入库, {}", JSON.toJSONString(group));
groupService.add(group);
group.setParentDeviceId(topGroup.getDeviceId());
}
}else {
group.setParentId(null);
group.setBusinessGroup(group.getDeviceId());
group.setParentDeviceId(null);
}
group.setUpdateTime(DateUtil.getNow());
aliasGroupToSave.put(group.getAlias(), group);
}
// 存储分组数据
groupService.saveByAlias(aliasGroupToSave.values());
} catch (ControllerException e) {
log.warn("[REDIS消息-业务分组同步回复] 失败, \r\n{}", e.getMsg());
@@ -178,6 +149,8 @@ public class RedisGroupMsgListener implements MessageListener {
}
}
}
/**