支持从第三方同步分组信息
This commit is contained in:
@@ -85,6 +85,11 @@ public class VideoManagerConstants {
|
||||
*/
|
||||
public static final String VM_MSG_GROUP_LIST_RESPONSE = "VM_MSG_GROUP_LIST_RESPONSE";
|
||||
|
||||
/**
|
||||
* 同步三方组织结构回复
|
||||
*/
|
||||
public static final String VM_MSG_GROUP_LIST_CHANGE = "VM_MSG_GROUP_LIST_CHANGE";
|
||||
|
||||
/**
|
||||
* redis 消息通知设备推流到平台
|
||||
*/
|
||||
|
||||
@@ -209,4 +209,14 @@ public class UserSetting {
|
||||
*/
|
||||
private boolean disableDateHeader = false;
|
||||
|
||||
/**
|
||||
* 同步业务分组时自动生成分组国标编号的模板,不配置则默认参考当前的sip域信息生成
|
||||
*/
|
||||
private String groupSyncDeviceTemplate;
|
||||
|
||||
/**
|
||||
* 与第三方进行分组同步时使用别名而不是分组ID, 如果没有设置此项为true,那么分组编号就是必须传递的。如果是设置为true则,自动为别名的分组生成新的编号
|
||||
*/
|
||||
private boolean useAliasForGroupSync = false;
|
||||
|
||||
}
|
||||
|
||||
@@ -34,6 +34,11 @@ public class RedisMsgListenConfig {
|
||||
@Autowired
|
||||
private RedisPushStreamListMsgListener pushStreamListMsgListener;
|
||||
|
||||
@Autowired
|
||||
private RedisGroupMsgListener groupMsgListener;
|
||||
|
||||
@Autowired
|
||||
private RedisGroupChangeListener groupChangeListener;
|
||||
|
||||
@Autowired
|
||||
private RedisCloseStreamMsgListener redisCloseStreamMsgListener;
|
||||
@@ -64,6 +69,8 @@ public class RedisMsgListenConfig {
|
||||
container.addMessageListener(redisCloseStreamMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_STREAM_PUSH_CLOSE));
|
||||
container.addMessageListener(redisRpcConfig, new PatternTopic(RedisRpcConfig.REDIS_REQUEST_CHANNEL_KEY));
|
||||
container.addMessageListener(redisPushStreamCloseResponseListener, new PatternTopic(VideoManagerConstants.VM_MSG_STREAM_PUSH_RESPONSE));
|
||||
container.addMessageListener(groupMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_GROUP_LIST_RESPONSE));
|
||||
container.addMessageListener(groupChangeListener, new PatternTopic(VideoManagerConstants.VM_MSG_GROUP_LIST_CHANGE));
|
||||
return container;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -58,12 +58,19 @@ public class Group implements Comparable<Group>{
|
||||
*/
|
||||
@Schema(description = "更新时间")
|
||||
private String updateTime;
|
||||
|
||||
/**
|
||||
* 行政区划
|
||||
*/
|
||||
@Schema(description = "行政区划")
|
||||
private String civilCode;
|
||||
|
||||
/**
|
||||
* 别名
|
||||
*/
|
||||
@Schema(description = "别名, 此别名为唯一值,可以对接第三方是存储对方的ID")
|
||||
private String alias;
|
||||
|
||||
public static Group getInstance(DeviceChannel channel) {
|
||||
GbCode gbCode = GbCode.decode(channel.getDeviceId());
|
||||
if (gbCode == null || (!gbCode.getTypeCode().equals("215") && !gbCode.getTypeCode().equals("216"))) {
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
package com.genersoft.iot.vmp.gb28181.bean;
|
||||
|
||||
import com.alibaba.fastjson2.JSON;
|
||||
import lombok.Data;
|
||||
|
||||
@Data
|
||||
@@ -36,13 +35,33 @@ public class RedisGroupMessage {
|
||||
*/
|
||||
private String parentGAlias;
|
||||
|
||||
/**
|
||||
* 分组所属业务分组国标ID
|
||||
*/
|
||||
private String topGroupGbId;
|
||||
|
||||
public static void main(String[] args) {
|
||||
RedisGroupMessage redisGroupMessage = new RedisGroupMessage();
|
||||
redisGroupMessage.setGroupAlias("100000001");
|
||||
redisGroupMessage.setGroupName("消防大队");
|
||||
System.out.println(JSON.toJSONString(redisGroupMessage));
|
||||
/**
|
||||
* 分组所属业务分组别名
|
||||
*/
|
||||
private String topGroupGAlias;
|
||||
|
||||
/**
|
||||
* 分组变化消息中的消息类型,取值为 add update delete
|
||||
*/
|
||||
private String messageType;
|
||||
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "RedisGroupMessage{" +
|
||||
"groupGbId='" + groupGbId + '\'' +
|
||||
", groupAlias='" + groupAlias + '\'' +
|
||||
", groupName='" + groupName + '\'' +
|
||||
", groupCivilCode='" + groupCivilCode + '\'' +
|
||||
", parentGroupGbId='" + parentGroupGbId + '\'' +
|
||||
", parentGAlias='" + parentGAlias + '\'' +
|
||||
", topGroupGbId='" + topGroupGbId + '\'' +
|
||||
", topGroupGAlias='" + topGroupGAlias + '\'' +
|
||||
'}';
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
||||
@@ -286,4 +286,7 @@ public interface GroupMapper {
|
||||
|
||||
@Delete("DELETE FROM wvp_platform_group WHERE group_id = #{groupId}")
|
||||
void deletePlatformGroup(@Param("groupId") int groupId);
|
||||
|
||||
@Select("SELECT * from wvp_common_group WHERE alias = #{alias} ")
|
||||
Group queryGroupByAlias(@Param("alias") String alias);
|
||||
}
|
||||
|
||||
@@ -17,8 +17,6 @@ public interface IGroupService {
|
||||
|
||||
List<GroupTree> queryForTree(String query, Integer parent, Boolean hasChannel);
|
||||
|
||||
void syncFromChannel();
|
||||
|
||||
boolean delete(int id);
|
||||
|
||||
boolean batchAdd(List<Group> groupList);
|
||||
@@ -26,4 +24,6 @@ public interface IGroupService {
|
||||
List<Group> getPath(String deviceId, String businessGroup);
|
||||
|
||||
PageInfo<Group> queryList(Integer page, Integer count, String query);
|
||||
|
||||
Group queryGroupByAlias(String groupAlias);
|
||||
}
|
||||
|
||||
@@ -186,11 +186,6 @@ public class GroupServiceImpl implements IGroupService {
|
||||
return groupTrees;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void syncFromChannel() {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
@Transactional
|
||||
public boolean delete(int id) {
|
||||
@@ -307,4 +302,9 @@ public class GroupServiceImpl implements IGroupService {
|
||||
List<Group> all = groupManager.query(query, null, null);
|
||||
return new PageInfo<>(all);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Group queryGroupByAlias(String groupAlias) {
|
||||
return groupManager.queryGroupByAlias(groupAlias);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2,9 +2,12 @@ package com.genersoft.iot.vmp.service.impl;
|
||||
|
||||
import com.alibaba.fastjson2.JSONArray;
|
||||
import com.alibaba.fastjson2.JSONObject;
|
||||
import com.genersoft.iot.vmp.common.VideoManagerConstants;
|
||||
import com.genersoft.iot.vmp.service.IMapService;
|
||||
import com.genersoft.iot.vmp.vmanager.bean.MapConfig;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.boot.CommandLineRunner;
|
||||
import org.springframework.data.redis.core.RedisTemplate;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
@@ -12,14 +15,22 @@ import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* 第三方地图适配
|
||||
* 第三方平台适配
|
||||
*/
|
||||
@Slf4j
|
||||
@Service
|
||||
public class MapServiceImplForSy implements IMapService {
|
||||
public class SyServiceImpl implements IMapService, CommandLineRunner {
|
||||
|
||||
@Autowired
|
||||
private RedisTemplate<Object, Object> redisTemplate;
|
||||
|
||||
// 启动后请求组织结构同步
|
||||
@Override
|
||||
public void run(String... args) throws Exception {
|
||||
String key = VideoManagerConstants.VM_MSG_GROUP_LIST_REQUEST;
|
||||
log.info("[redis发送通知] 发送 同步组织结构请求 {}", key);
|
||||
redisTemplate.convertAndSend(key, "");
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<MapConfig> getConfig() {
|
||||
@@ -0,0 +1,272 @@
|
||||
package com.genersoft.iot.vmp.service.redisMsg;
|
||||
|
||||
import com.alibaba.fastjson2.JSON;
|
||||
import com.genersoft.iot.vmp.common.VideoManagerConstants;
|
||||
import com.genersoft.iot.vmp.conf.SipConfig;
|
||||
import com.genersoft.iot.vmp.conf.UserSetting;
|
||||
import com.genersoft.iot.vmp.gb28181.bean.Group;
|
||||
import com.genersoft.iot.vmp.gb28181.bean.RedisGroupMessage;
|
||||
import com.genersoft.iot.vmp.gb28181.service.IGroupService;
|
||||
import com.genersoft.iot.vmp.streamPush.service.IStreamPushService;
|
||||
import com.genersoft.iot.vmp.utils.DateUtil;
|
||||
import jakarta.annotation.Resource;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.commons.lang3.RandomStringUtils;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.data.redis.connection.Message;
|
||||
import org.springframework.data.redis.connection.MessageListener;
|
||||
import org.springframework.scheduling.annotation.Scheduled;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.util.ObjectUtils;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.ConcurrentLinkedQueue;
|
||||
|
||||
/**
|
||||
* @Auther: JiangFeng
|
||||
* @Date: 2022/8/16 11:32
|
||||
* @Description: 接收redis发送的推流设备列表更新通知
|
||||
* 监听: SUBSCRIBE VM_MSG_PUSH_STREAM_LIST_CHANGE
|
||||
* 发布 PUBLISH VM_MSG_PUSH_STREAM_LIST_CHANGE '[{"app":1000,"stream":10000000,"gbId":"12345678901234567890","name":"A6","status":false},{"app":1000,"stream":10000021,"gbId":"24212345671381000021","name":"终端9273","status":false},{"app":1000,"stream":10000022,"gbId":"24212345671381000022","name":"终端9434","status":true},{"app":1000,"stream":10000025,"gbId":"24212345671381000025","name":"华为M10","status":false},{"app":1000,"stream":10000051,"gbId":"11111111111381111122","name":"终端9720","status":false}]'
|
||||
*/
|
||||
@Slf4j
|
||||
@Component
|
||||
public class RedisGroupChangeListener implements MessageListener {
|
||||
|
||||
@Resource
|
||||
private IGroupService groupService;
|
||||
|
||||
@Resource
|
||||
private IStreamPushService streamPushService;
|
||||
|
||||
@Autowired
|
||||
private UserSetting userSetting;
|
||||
|
||||
@Autowired
|
||||
private SipConfig sipConfig;
|
||||
|
||||
private final ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>();
|
||||
|
||||
@Override
|
||||
public void onMessage(Message message, byte[] bytes) {
|
||||
log.info("[REDIS: 分组信息更新] key: {}, : {}", VideoManagerConstants.VM_MSG_GROUP_LIST_CHANGE, new String(message.getBody()));
|
||||
taskQueue.offer(message);
|
||||
}
|
||||
|
||||
@Scheduled(fixedDelay = 100)
|
||||
public void executeTaskQueue() {
|
||||
if (taskQueue.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
List<Message> messageDataList = new ArrayList<>();
|
||||
int size = taskQueue.size();
|
||||
for (int i = 0; i < size; i++) {
|
||||
Message msg = taskQueue.poll();
|
||||
if (msg != null) {
|
||||
messageDataList.add(msg);
|
||||
}
|
||||
}
|
||||
if (messageDataList.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
for (Message msg : messageDataList) {
|
||||
try {
|
||||
List<RedisGroupMessage> groupMessages = JSON.parseArray(new String(msg.getBody()), RedisGroupMessage.class);
|
||||
for (int i = 0; i < groupMessages.size(); i++) {
|
||||
RedisGroupMessage groupMessage = groupMessages.get(i);
|
||||
log.info("[REDIS消息-分组信息更新] {}", groupMessage.toString());
|
||||
switch (groupMessage.getMessageType()){
|
||||
case "add":
|
||||
if (!userSetting.isUseAliasForGroupSync()) {
|
||||
if (groupMessage.getGroupGbId() == null) {
|
||||
log.info("[REDIS消息-分组信息新增] 分组编号未设置,{}", groupMessage.toString());
|
||||
continue;
|
||||
}
|
||||
Group group = groupService.queryGroupByDeviceId(groupMessage.getGroupGbId());
|
||||
if (group != null) {
|
||||
log.info("[REDIS消息-分组信息新增] 失败 {},编号已经存在", groupMessage.getGroupGbId());
|
||||
continue;
|
||||
}
|
||||
if (ObjectUtils.isEmpty(groupMessage.getGroupName())
|
||||
|| ObjectUtils.isEmpty(groupMessage.getTopGroupGbId()) ){
|
||||
log.info("[REDIS消息-业务分组同步回复] 消息关键字段缺失, {}", groupMessage.toString());
|
||||
continue;
|
||||
}
|
||||
group = new Group();
|
||||
group.setDeviceId(groupMessage.getGroupGbId());
|
||||
group.setAlias(groupMessage.getGroupAlias());
|
||||
group.setParentDeviceId(groupMessage.getParentGroupGbId());
|
||||
group.setBusinessGroup(groupMessage.getTopGroupGbId());
|
||||
group.setCreateTime(DateUtil.getNow());
|
||||
group.setUpdateTime(DateUtil.getNow());
|
||||
groupService.add(group);
|
||||
|
||||
}else {
|
||||
// 此处使用别名作为判断依据,别名此处常常是分组在第三方系统里的唯一ID
|
||||
if (groupMessage.getGroupAlias() == null || ObjectUtils.isEmpty(groupMessage.getGroupName())
|
||||
|| ObjectUtils.isEmpty(groupMessage.getTopGroupGAlias())) {
|
||||
log.info("[REDIS消息-分组信息新增] 消息关键字段缺失, {}", groupMessage.toString());
|
||||
continue;
|
||||
}
|
||||
Group group = groupService.queryGroupByAlias(groupMessage.getGroupAlias());
|
||||
if (group != null) {
|
||||
log.info("[REDIS消息-分组信息新增] 失败 {},别名已经存在", groupMessage.getGroupGbId());
|
||||
continue;
|
||||
}
|
||||
group = new Group();
|
||||
boolean isTop = groupMessage.getTopGroupGAlias().equals(groupMessage.getGroupAlias());
|
||||
String deviceId = buildGroupDeviceId(isTop);
|
||||
group.setDeviceId(deviceId);
|
||||
group.setAlias(groupMessage.getGroupAlias());
|
||||
|
||||
if (!isTop) {
|
||||
if (ObjectUtils.isEmpty(groupMessage.getTopGroupGAlias()) || ObjectUtils.isEmpty(groupMessage.getParentGAlias())) {
|
||||
log.info("[REDIS消息-业务分组同步回复] 消息缺失业务分组别名或者父节点别名, {}", groupMessage.toString());
|
||||
continue;
|
||||
}
|
||||
|
||||
Group topGroup = groupService.queryGroupByAlias(groupMessage.getTopGroupGAlias());
|
||||
if (topGroup == null) {
|
||||
log.info("[REDIS消息-业务分组同步回复] 业务分组信息未入库, {}", groupMessage.toString());
|
||||
continue;
|
||||
}
|
||||
group.setBusinessGroup(groupMessage.getTopGroupGbId());
|
||||
Group parentGroup = groupService.queryGroupByAlias(groupMessage.getParentGAlias());
|
||||
if (parentGroup == null) {
|
||||
log.info("[REDIS消息-业务分组同步回复] 虚拟组织父节点信息未入库, {}", groupMessage.toString());
|
||||
continue;
|
||||
}
|
||||
group.setParentId(parentGroup.getId());
|
||||
group.setParentDeviceId(parentGroup.getDeviceId());
|
||||
|
||||
}
|
||||
group.setCreateTime(DateUtil.getNow());
|
||||
group.setUpdateTime(DateUtil.getNow());
|
||||
groupService.add(group);
|
||||
}
|
||||
|
||||
break;
|
||||
case "update":
|
||||
if (!userSetting.isUseAliasForGroupSync()) {
|
||||
if (groupMessage.getGroupGbId() == null) {
|
||||
log.info("[REDIS消息-分组信息更新] 分组编号未设置,{}", groupMessage.toString());
|
||||
continue;
|
||||
}
|
||||
Group group = groupService.queryGroupByDeviceId(groupMessage.getGroupGbId());
|
||||
if (group == null) {
|
||||
log.info("[REDIS消息-分组信息更新] 失败 {},编号不存在", groupMessage.getGroupGbId());
|
||||
continue;
|
||||
}
|
||||
group.setDeviceId(groupMessage.getGroupGbId());
|
||||
group.setAlias(groupMessage.getGroupAlias());
|
||||
group.setParentDeviceId(groupMessage.getParentGroupGbId());
|
||||
group.setBusinessGroup(groupMessage.getTopGroupGbId());
|
||||
group.setUpdateTime(DateUtil.getNow());
|
||||
groupService.update(group);
|
||||
}else {
|
||||
// 此处使用别名作为判断依据,别名此处常常是分组在第三方系统里的唯一ID
|
||||
if (groupMessage.getGroupAlias() == null || ObjectUtils.isEmpty(groupMessage.getGroupName())
|
||||
|| ObjectUtils.isEmpty(groupMessage.getTopGroupGAlias())) {
|
||||
log.info("[REDIS消息-业务分组同步回复] 消息关键字段缺失, {}", groupMessage.toString());
|
||||
continue;
|
||||
}
|
||||
Group group = groupService.queryGroupByAlias(groupMessage.getGroupAlias());
|
||||
if (group == null ) {
|
||||
log.info("[REDIS消息-分组信息更新] 失败 {},别名不存在", groupMessage.getGroupAlias());
|
||||
continue;
|
||||
}
|
||||
boolean isTop = groupMessage.getTopGroupGAlias().equals(groupMessage.getGroupAlias());
|
||||
String deviceId = buildGroupDeviceId(isTop);
|
||||
group.setDeviceId(deviceId);
|
||||
group.setAlias(groupMessage.getGroupAlias());
|
||||
|
||||
if (!isTop) {
|
||||
if (ObjectUtils.isEmpty(groupMessage.getTopGroupGAlias()) || ObjectUtils.isEmpty(groupMessage.getParentGAlias())) {
|
||||
log.info("[REDIS消息-业务分组同步回复] 消息缺失业务分组别名或者父节点别名, {}", groupMessage.toString());
|
||||
continue;
|
||||
}
|
||||
|
||||
Group topGroup = groupService.queryGroupByDeviceId(group.getBusinessGroup());
|
||||
if (topGroup == null) {
|
||||
log.info("[REDIS消息-业务分组同步回复] 业务分组不存在, {}", groupMessage.toString());
|
||||
continue;
|
||||
}
|
||||
group.setBusinessGroup(topGroup.getDeviceId());
|
||||
Group parentGroup = groupService.queryGroupByAlias(groupMessage.getParentGAlias());
|
||||
if (parentGroup == null) {
|
||||
log.info("[REDIS消息-业务分组同步回复] 虚拟组织父节点信息未入库, {}", groupMessage.toString());
|
||||
continue;
|
||||
}
|
||||
group.setParentId(parentGroup.getId());
|
||||
group.setParentDeviceId(parentGroup.getDeviceId());
|
||||
|
||||
}
|
||||
group.setUpdateTime(DateUtil.getNow());
|
||||
groupService.update(group);
|
||||
}
|
||||
break;
|
||||
case "delete":
|
||||
if (!userSetting.isUseAliasForGroupSync()) {
|
||||
if (groupMessage.getGroupGbId() == null) {
|
||||
log.info("[REDIS消息-分组信息删除] 分组编号未设置,{}", groupMessage.toString());
|
||||
continue;
|
||||
}
|
||||
Group group = groupService.queryGroupByDeviceId(groupMessage.getGroupGbId());
|
||||
if (group == null) {
|
||||
log.info("[REDIS消息-分组信息删除] 失败 {},编号不存在", groupMessage.getGroupGbId());
|
||||
continue;
|
||||
}
|
||||
groupService.delete(group.getId());
|
||||
}else {
|
||||
// 此处使用别名作为判断依据,别名此处常常是分组在第三方系统里的唯一ID
|
||||
if (groupMessage.getGroupAlias() == null || ObjectUtils.isEmpty(groupMessage.getGroupName())
|
||||
|| ObjectUtils.isEmpty(groupMessage.getTopGroupGAlias())) {
|
||||
log.info("[REDIS消息-业务分组同步回复] 消息关键字段缺失, {}", groupMessage.toString());
|
||||
continue;
|
||||
}
|
||||
Group group = groupService.queryGroupByAlias(groupMessage.getGroupAlias());
|
||||
if (group == null) {
|
||||
log.info("[REDIS消息-分组信息更新] 失败 {},别名不存在", groupMessage.getGroupAlias());
|
||||
continue;
|
||||
}
|
||||
groupService.delete(group.getId());
|
||||
}
|
||||
break;
|
||||
default:
|
||||
log.info("[REDIS消息-分组信息更新] 未识别的消息类型 {},目前支持的消息类型为 add、update、delete", groupMessage.getMessageType());
|
||||
}
|
||||
}
|
||||
|
||||
} catch (Exception e) {
|
||||
log.warn("[REDIS消息-业务分组同步回复] 发现未处理的异常, \r\n{}", new String(msg.getBody()));
|
||||
log.error("[REDIS消息-业务分组同步回复] 异常内容: ", e);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* 生成分组国标编号
|
||||
*/
|
||||
private String buildGroupDeviceId(boolean isTop) {
|
||||
try {
|
||||
String deviceTemplate = userSetting.getGroupSyncDeviceTemplate();
|
||||
if (ObjectUtils.isEmpty(deviceTemplate) || !deviceTemplate.contains("%s")) {
|
||||
String domain = sipConfig.getDomain();
|
||||
if (domain.length() != 10) {
|
||||
domain = sipConfig.getId().substring(0, 10);
|
||||
}
|
||||
deviceTemplate = domain + "%s0%s";
|
||||
}
|
||||
String codeType = "216";
|
||||
if (isTop) {
|
||||
codeType = "215";
|
||||
}
|
||||
return String.format(deviceTemplate, codeType, RandomStringUtils.secureStrong().next(6, false, true));
|
||||
}catch (Exception e) {
|
||||
log.error("[REDIS消息-业务分组同步回复] 构建新的分组编号失败", e);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
}
|
||||
213
src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGroupMsgListener.java
Executable file
213
src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGroupMsgListener.java
Executable file
@@ -0,0 +1,213 @@
|
||||
package com.genersoft.iot.vmp.service.redisMsg;
|
||||
|
||||
import com.alibaba.fastjson2.JSON;
|
||||
import com.genersoft.iot.vmp.common.VideoManagerConstants;
|
||||
import com.genersoft.iot.vmp.conf.SipConfig;
|
||||
import com.genersoft.iot.vmp.conf.UserSetting;
|
||||
import com.genersoft.iot.vmp.gb28181.bean.Group;
|
||||
import com.genersoft.iot.vmp.gb28181.bean.RedisGroupMessage;
|
||||
import com.genersoft.iot.vmp.gb28181.service.IGroupService;
|
||||
import com.genersoft.iot.vmp.streamPush.service.IStreamPushService;
|
||||
import com.genersoft.iot.vmp.utils.DateUtil;
|
||||
import jakarta.annotation.Resource;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.commons.lang3.RandomStringUtils;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.data.redis.connection.Message;
|
||||
import org.springframework.data.redis.connection.MessageListener;
|
||||
import org.springframework.scheduling.annotation.Scheduled;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.util.ObjectUtils;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.ConcurrentLinkedQueue;
|
||||
|
||||
/**
|
||||
* @Auther: JiangFeng
|
||||
* @Date: 2022/8/16 11:32
|
||||
* @Description: 接收redis发送的推流设备列表更新通知
|
||||
* 监听: SUBSCRIBE VM_MSG_PUSH_STREAM_LIST_CHANGE
|
||||
* 发布 PUBLISH VM_MSG_PUSH_STREAM_LIST_CHANGE '[{"app":1000,"stream":10000000,"gbId":"12345678901234567890","name":"A6","status":false},{"app":1000,"stream":10000021,"gbId":"24212345671381000021","name":"终端9273","status":false},{"app":1000,"stream":10000022,"gbId":"24212345671381000022","name":"终端9434","status":true},{"app":1000,"stream":10000025,"gbId":"24212345671381000025","name":"华为M10","status":false},{"app":1000,"stream":10000051,"gbId":"11111111111381111122","name":"终端9720","status":false}]'
|
||||
*/
|
||||
@Slf4j
|
||||
@Component
|
||||
public class RedisGroupMsgListener implements MessageListener {
|
||||
|
||||
@Resource
|
||||
private IGroupService groupService;
|
||||
|
||||
@Resource
|
||||
private IStreamPushService streamPushService;
|
||||
|
||||
@Autowired
|
||||
private UserSetting userSetting;
|
||||
|
||||
@Autowired
|
||||
private SipConfig sipConfig;
|
||||
|
||||
private final ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>();
|
||||
|
||||
@Override
|
||||
public void onMessage(Message message, byte[] bytes) {
|
||||
log.info("[REDIS: 业务分组同步回复] key: {}, : {}", VideoManagerConstants.VM_MSG_GROUP_LIST_RESPONSE, new String(message.getBody()));
|
||||
taskQueue.offer(message);
|
||||
}
|
||||
|
||||
@Scheduled(fixedDelay = 100)
|
||||
public void executeTaskQueue() {
|
||||
if (taskQueue.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
List<Message> messageDataList = new ArrayList<>();
|
||||
int size = taskQueue.size();
|
||||
for (int i = 0; i < size; i++) {
|
||||
Message msg = taskQueue.poll();
|
||||
if (msg != null) {
|
||||
messageDataList.add(msg);
|
||||
}
|
||||
}
|
||||
if (messageDataList.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
for (Message msg : messageDataList) {
|
||||
try {
|
||||
List<RedisGroupMessage> groupMessages = JSON.parseArray(new String(msg.getBody()), RedisGroupMessage.class);
|
||||
for (int i = 0; i < groupMessages.size(); i++) {
|
||||
RedisGroupMessage groupMessage = groupMessages.get(i);
|
||||
log.info("[REDIS消息-业务分组同步回复] {}", groupMessage.toString());
|
||||
if (!userSetting.isUseAliasForGroupSync()) {
|
||||
if (groupMessage.getGroupGbId() == null) {
|
||||
log.info("[REDIS消息-业务分组同步回复] 分组编号未设置,{}", groupMessage.toString());
|
||||
continue;
|
||||
}
|
||||
Group group = groupService.queryGroupByDeviceId(groupMessage.getGroupGbId());
|
||||
if (group == null ) {
|
||||
if (ObjectUtils.isEmpty(groupMessage.getGroupName())
|
||||
|| ObjectUtils.isEmpty(groupMessage.getTopGroupGbId()) ){
|
||||
log.info("[REDIS消息-业务分组同步回复] 消息关键字段缺失, {}", groupMessage.toString());
|
||||
continue;
|
||||
}
|
||||
group = new Group();
|
||||
group.setDeviceId(groupMessage.getGroupGbId());
|
||||
group.setAlias(groupMessage.getGroupAlias());
|
||||
group.setParentDeviceId(groupMessage.getParentGroupGbId());
|
||||
group.setBusinessGroup(groupMessage.getTopGroupGbId());
|
||||
group.setCreateTime(DateUtil.getNow());
|
||||
group.setUpdateTime(DateUtil.getNow());
|
||||
groupService.add(group);
|
||||
|
||||
}else {
|
||||
group.setDeviceId(groupMessage.getGroupGbId());
|
||||
group.setAlias(groupMessage.getGroupAlias());
|
||||
group.setParentDeviceId(groupMessage.getParentGroupGbId());
|
||||
group.setBusinessGroup(groupMessage.getTopGroupGbId());
|
||||
group.setUpdateTime(DateUtil.getNow());
|
||||
groupService.update(group);
|
||||
}
|
||||
}else {
|
||||
log.info("[REDIS消息-业务分组同步回复] 使用别名作为唯一ID解析分组消息");
|
||||
// 此处使用别名作为判断依据,别名此处常常是分组在第三方系统里的唯一ID
|
||||
if (groupMessage.getGroupAlias() == null || ObjectUtils.isEmpty(groupMessage.getGroupName())
|
||||
|| ObjectUtils.isEmpty(groupMessage.getTopGroupGAlias())) {
|
||||
log.info("[REDIS消息-业务分组同步回复] 消息关键字段缺失, {}", groupMessage.toString());
|
||||
continue;
|
||||
}
|
||||
Group group = groupService.queryGroupByAlias(groupMessage.getGroupAlias());
|
||||
if (group == null ) {
|
||||
group = new Group();
|
||||
boolean isTop = groupMessage.getTopGroupGAlias().equals(groupMessage.getGroupAlias());
|
||||
String deviceId = buildGroupDeviceId(isTop);
|
||||
group.setDeviceId(deviceId);
|
||||
group.setAlias(groupMessage.getGroupAlias());
|
||||
|
||||
if (!isTop) {
|
||||
if (ObjectUtils.isEmpty(groupMessage.getTopGroupGAlias()) || ObjectUtils.isEmpty(groupMessage.getParentGAlias())) {
|
||||
log.info("[REDIS消息-业务分组同步回复] 消息缺失业务分组别名或者父节点别名, {}", groupMessage.toString());
|
||||
continue;
|
||||
}
|
||||
|
||||
Group topGroup = groupService.queryGroupByAlias(groupMessage.getTopGroupGAlias());
|
||||
if (topGroup == null) {
|
||||
log.info("[REDIS消息-业务分组同步回复] 业务分组信息未入库, {}", groupMessage.toString());
|
||||
continue;
|
||||
}
|
||||
group.setBusinessGroup(groupMessage.getTopGroupGbId());
|
||||
Group parentGroup = groupService.queryGroupByAlias(groupMessage.getParentGAlias());
|
||||
if (parentGroup == null) {
|
||||
log.info("[REDIS消息-业务分组同步回复] 虚拟组织父节点信息未入库, {}", groupMessage.toString());
|
||||
continue;
|
||||
}
|
||||
group.setParentId(parentGroup.getId());
|
||||
group.setParentDeviceId(parentGroup.getDeviceId());
|
||||
|
||||
}
|
||||
group.setCreateTime(DateUtil.getNow());
|
||||
group.setUpdateTime(DateUtil.getNow());
|
||||
groupService.add(group);
|
||||
|
||||
}else {
|
||||
boolean isTop = groupMessage.getTopGroupGAlias().equals(groupMessage.getGroupAlias());
|
||||
String deviceId = buildGroupDeviceId(isTop);
|
||||
group.setDeviceId(deviceId);
|
||||
group.setAlias(groupMessage.getGroupAlias());
|
||||
|
||||
if (!isTop) {
|
||||
if (ObjectUtils.isEmpty(groupMessage.getTopGroupGAlias()) || ObjectUtils.isEmpty(groupMessage.getParentGAlias())) {
|
||||
log.info("[REDIS消息-业务分组同步回复] 消息缺失业务分组别名或者父节点别名, {}", groupMessage.toString());
|
||||
continue;
|
||||
}
|
||||
|
||||
Group topGroup = groupService.queryGroupByDeviceId(group.getBusinessGroup());
|
||||
if (topGroup == null) {
|
||||
log.info("[REDIS消息-业务分组同步回复] 业务分组不存在, {}", groupMessage.toString());
|
||||
continue;
|
||||
}
|
||||
group.setBusinessGroup(topGroup.getDeviceId());
|
||||
Group parentGroup = groupService.queryGroupByAlias(groupMessage.getParentGAlias());
|
||||
if (parentGroup == null) {
|
||||
log.info("[REDIS消息-业务分组同步回复] 虚拟组织父节点信息未入库, {}", groupMessage.toString());
|
||||
continue;
|
||||
}
|
||||
group.setParentId(parentGroup.getId());
|
||||
group.setParentDeviceId(parentGroup.getDeviceId());
|
||||
|
||||
}
|
||||
group.setUpdateTime(DateUtil.getNow());
|
||||
groupService.update(group);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
} catch (Exception e) {
|
||||
log.warn("[REDIS消息-业务分组同步回复] 发现未处理的异常, \r\n{}", new String(msg.getBody()));
|
||||
log.error("[REDIS消息-业务分组同步回复] 异常内容: ", e);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* 生成分组国标编号
|
||||
*/
|
||||
private String buildGroupDeviceId(boolean isTop) {
|
||||
try {
|
||||
String deviceTemplate = userSetting.getGroupSyncDeviceTemplate();
|
||||
if (ObjectUtils.isEmpty(deviceTemplate) || !deviceTemplate.contains("%s")) {
|
||||
String domain = sipConfig.getDomain();
|
||||
if (domain.length() != 10) {
|
||||
domain = sipConfig.getId().substring(0, 10);
|
||||
}
|
||||
deviceTemplate = domain + "%s0%s";
|
||||
}
|
||||
String codeType = "216";
|
||||
if (isTop) {
|
||||
codeType = "215";
|
||||
}
|
||||
return String.format(deviceTemplate, codeType, RandomStringUtils.secureStrong().next(6, false, true));
|
||||
}catch (Exception e) {
|
||||
log.error("[REDIS消息-业务分组同步回复] 构建新的分组编号失败", e);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -431,6 +431,7 @@ create table IF NOT EXISTS wvp_common_group
|
||||
create_time varchar(50) NOT NULL,
|
||||
update_time varchar(50) NOT NULL,
|
||||
civil_code varchar(50) default null,
|
||||
alias varchar(255) default null,
|
||||
constraint uk_common_group_device_platform unique (device_id)
|
||||
);
|
||||
|
||||
|
||||
@@ -432,6 +432,7 @@ create table IF NOT EXISTS wvp_common_group
|
||||
create_time varchar(50) NOT NULL,
|
||||
update_time varchar(50) NOT NULL,
|
||||
civil_code varchar(50) default null,
|
||||
alias varchar(255) default null,
|
||||
constraint uk_common_group_device_platform unique (device_id)
|
||||
);
|
||||
|
||||
|
||||
@@ -87,9 +87,15 @@ BEGIN
|
||||
|
||||
IF NOT EXISTS (SELECT column_name FROM information_schema.columns
|
||||
WHERE TABLE_SCHEMA = (SELECT DATABASE()) and table_name = 'wvp_device_channel' and column_name = 'map_level')
|
||||
THEN
|
||||
ALTER TABLE wvp_device_channel ADD map_level integer default 0;
|
||||
END IF;
|
||||
THEN
|
||||
ALTER TABLE wvp_device_channel ADD map_level integer default 0;
|
||||
END IF;
|
||||
|
||||
IF NOT EXISTS (SELECT column_name FROM information_schema.columns
|
||||
WHERE TABLE_SCHEMA = (SELECT DATABASE()) and table_name = 'wvp_common_group' and column_name = 'alias')
|
||||
THEN
|
||||
ALTER TABLE wvp_common_group ADD alias varchar(255) default null;
|
||||
END IF;
|
||||
END; //
|
||||
call wvp_20250924();
|
||||
DROP PROCEDURE wvp_20250924;
|
||||
|
||||
@@ -40,3 +40,4 @@ ALTER table wvp_media_server ADD COLUMN IF NOT EXISTS mp4_ssl_port integer;
|
||||
|
||||
ALTER table wvp_device_channel ADD COLUMN IF NOT EXISTS enable_broadcast integer default 0;
|
||||
ALTER table wvp_device_channel ADD COLUMN IF NOT EXISTS map_level integer default 0;
|
||||
ALTER table wvp_common_group ADD COLUMN IF NOT EXISTS alias varchar(255) default null;
|
||||
|
||||
Reference in New Issue
Block a user