修改位置分发逻辑

This commit is contained in:
lin
2025-11-17 16:04:47 +08:00
parent 44cdcb8352
commit 469804b8d3
16 changed files with 149 additions and 92 deletions

View File

@@ -76,11 +76,6 @@ public class VideoManagerConstants {
*/
public static final String VM_MSG_PUSH_STREAM_LIST_CHANGE = "VM_MSG_PUSH_STREAM_LIST_CHANGE";
/**
* 请求同步三方组织结构
*/
public static final String VM_MSG_GROUP_LIST_REQUEST = "VM_MSG_GROUP_LIST_REQUEST";
/**
* 同步三方组织结构回复
*/

View File

@@ -68,4 +68,22 @@ public class MobilePosition {
* 创建时间
*/
private String createTime;
@Override
public String toString() {
return "MobilePosition{" +
"deviceId='" + deviceId + '\'' +
", channelId=" + channelId +
", channelDeviceId='" + channelDeviceId + '\'' +
", deviceName='" + deviceName + '\'' +
", time='" + time + '\'' +
", longitude=" + longitude +
", latitude=" + latitude +
", altitude=" + altitude +
", speed=" + speed +
", direction=" + direction +
", reportSource='" + reportSource + '\'' +
", createTime='" + createTime + '\'' +
'}';
}
}

View File

@@ -89,13 +89,6 @@ public class GroupController {
return groupService.getPath(deviceId, businessGroup);
}
@Operation(summary = "从第三方同步组织结构")
@ResponseBody
@GetMapping("/sync")
public void sync(){
groupService.sync();
}
// @Operation(summary = "根据分组Id查询分组")
// @Parameter(name = "groupDeviceId", description = "分组节点编号", required = true)
// @ResponseBody

View File

@@ -31,8 +31,6 @@ public interface IGroupService {
Group queryGroupByAlias(String groupAlias);
void sync();
Map<String, Group> queryGroupByAliasMap();
void saveByAlias(Collection<Group> groups);

View File

@@ -1,6 +1,6 @@
package com.genersoft.iot.vmp.gb28181.service.impl;
import com.genersoft.iot.vmp.common.VideoManagerConstants;
import com.alibaba.fastjson2.JSONObject;
import com.genersoft.iot.vmp.conf.exception.ControllerException;
import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.gb28181.dao.CommonGBChannelMapper;
@@ -16,7 +16,6 @@ import com.github.pagehelper.PageInfo;
import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
@@ -30,7 +29,7 @@ import java.util.*;
*/
@Service
@Slf4j
public class GroupServiceImpl implements IGroupService, CommandLineRunner {
public class GroupServiceImpl implements IGroupService {
@Autowired
private GroupMapper groupManager;
@@ -47,14 +46,6 @@ public class GroupServiceImpl implements IGroupService, CommandLineRunner {
@Autowired
private RedisTemplate<Object, Object> redisTemplate;
// 启动后请求组织结构同步
@Override
public void run(String... args) throws Exception {
String key = VideoManagerConstants.VM_MSG_GROUP_LIST_REQUEST;
log.info("[redis发送通知] 发送 同步组织结构请求 {}", key);
redisTemplate.convertAndSend(key, "");
}
@Override
public void add(Group group) {
Assert.notNull(group, "参数不可为NULL");
@@ -323,15 +314,6 @@ public class GroupServiceImpl implements IGroupService, CommandLineRunner {
return groupManager.queryGroupByAlias(groupAlias);
}
@Override
public void sync() {
try {
this.run();
}catch (Exception e) {
throw new ControllerException(ErrorCode.ERROR100.getCode(), "同步失败: " + e.getMessage());
}
}
@Override
public Map<String, Group> queryGroupByAliasMap() {
return groupManager.queryGroupByAliasMap();
@@ -340,6 +322,7 @@ public class GroupServiceImpl implements IGroupService, CommandLineRunner {
@Override
@Transactional
public void saveByAlias(Collection<Group> groups) {
log.info("[存储分组数据] {}", JSONObject.toJSONString(groups));
// 清空别名数据
groupManager.deleteHasAlias();
// 写入新数据

View File

@@ -71,6 +71,8 @@ public interface IMediaServerService {
void delete(MediaServer mediaServer);
MediaServer getOneFromCluster(String mediaServerId);
MediaServer getDefaultMediaServer();
MediaServerLoad getLoad(MediaServer mediaServerItem);

View File

@@ -29,6 +29,7 @@ import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.dao.MediaServerMapper;
import com.genersoft.iot.vmp.streamProxy.bean.StreamProxy;
import com.genersoft.iot.vmp.utils.DateUtil;
import com.genersoft.iot.vmp.utils.redis.RedisUtil;
import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
import jakarta.validation.constraints.NotNull;
@@ -443,6 +444,22 @@ public class MediaServerServiceImpl implements IMediaServerService {
return (MediaServer) redisTemplate.opsForHash().get(key, mediaServerId);
}
/**
* 获取集群中的节点信息不区分所属的wvp
*/
@Override
public MediaServer getOneFromCluster(String mediaServerId) {
if (mediaServerId == null) {
return null;
}
String scanKey = String.format("%s_*", VideoManagerConstants.MEDIA_SERVER_PREFIX);
List<Object> values = RedisUtil.scan(redisTemplate, scanKey);
if (values.isEmpty()) {
return null;
}
return (MediaServer) values.get(0);
}
@Override
public MediaServer getDefaultMediaServer() {
@@ -605,7 +622,7 @@ public class MediaServerServiceImpl implements IMediaServerService {
public void delete(MediaServer mediaServer) {
mediaServerMapper.delOne(mediaServer.getId(), userSetting.getServerId());
redisTemplate.opsForZSet().remove(VideoManagerConstants.ONLINE_MEDIA_SERVERS_PREFIX + userSetting.getServerId(), mediaServer.getId());
String key = VideoManagerConstants.MEDIA_SERVER_PREFIX + userSetting.getServerId() + ":" + mediaServer.getId();
String key = VideoManagerConstants.MEDIA_SERVER_PREFIX + userSetting.getServerId();
redisTemplate.delete(key);
// 发送节点移除通知
MediaServerDeleteEvent event = new MediaServerDeleteEvent(this);

View File

@@ -9,7 +9,6 @@ import com.genersoft.iot.vmp.gb28181.bean.Group;
import com.genersoft.iot.vmp.gb28181.bean.RedisGroupMessage;
import com.genersoft.iot.vmp.gb28181.service.IGroupService;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.streamPush.service.IStreamPushService;
import com.genersoft.iot.vmp.utils.DateUtil;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
@@ -21,7 +20,10 @@ import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.springframework.util.ObjectUtils;
import java.util.*;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentLinkedQueue;
/**
@@ -71,6 +73,7 @@ public class RedisGroupMsgListener implements MessageListener {
}
}
if (messageDataList.isEmpty()) {
log.warn("[REDIS消息-业务分组同步回复] 处理队列时发现队列为空");
return;
}
// 按照别名获取所有业务分组
@@ -78,10 +81,11 @@ public class RedisGroupMsgListener implements MessageListener {
Map<String, Group> aliasGroupToSave = new LinkedHashMap<>();
for (Message msg : messageDataList) {
try {
log.info("[REDIS消息-业务分组同步回复] 处理数据: {}", new String(msg.getBody()));
List<RedisGroupMessage> groupMessages = JSON.parseArray(new String(msg.getBody()), RedisGroupMessage.class);
for (int i = 0; i < groupMessages.size(); i++) {
RedisGroupMessage groupMessage = groupMessages.get(i);
log.info("[REDIS消息-业务分组同步回复] 处理数据: {}", groupMessage.toString());
log.info("[REDIS消息-业务分组同步回复] 待处理数量: {}", groupMessages.size());
for (RedisGroupMessage groupMessage : groupMessages) {
// 此处使用别名作为判断依据别名此处常常是分组在第三方系统里的唯一ID
if (groupMessage.getGroupAlias() == null || ObjectUtils.isEmpty(groupMessage.getGroupName())
|| ObjectUtils.isEmpty(groupMessage.getTopGroupGAlias())) {
@@ -90,7 +94,7 @@ public class RedisGroupMsgListener implements MessageListener {
}
boolean isTop = groupMessage.getTopGroupGAlias().equals(groupMessage.getGroupAlias());
Group group = aliasGroupMap.get(groupMessage.getGroupAlias());
if (group == null ) {
if (group == null) {
group = new Group();
String deviceId = buildGroupDeviceId(isTop);
group.setDeviceId(deviceId);
@@ -125,16 +129,15 @@ public class RedisGroupMsgListener implements MessageListener {
}
group.setParentId(null);
group.setParentDeviceId(parentGroup.getDeviceId());
}else {
} else {
group.setParentId(null);
group.setParentDeviceId(topGroup.getDeviceId());
}
}else {
} else {
group.setParentId(null);
group.setBusinessGroup(group.getDeviceId());
group.setParentDeviceId(null);
}
group.setUpdateTime(DateUtil.getNow());
aliasGroupToSave.put(group.getAlias(), group);
}
@@ -143,7 +146,7 @@ public class RedisGroupMsgListener implements MessageListener {
} catch (ControllerException e) {
log.warn("[REDIS消息-业务分组同步回复] 失败, \r\n{}", e.getMsg());
}catch (Exception e) {
} catch (Exception e) {
log.warn("[REDIS消息-业务分组同步回复] 发现未处理的异常, \r\n{}", new String(msg.getBody()));
log.error("[REDIS消息-业务分组同步回复] 异常内容: ", e);
}

View File

@@ -176,7 +176,7 @@ public class ServerController {
@GetMapping(value = "/media_server/media_info")
@ResponseBody
public MediaInfo getMediaInfo(String app, String stream, String mediaServerId) {
MediaServer mediaServer = mediaServerService.getOne(mediaServerId);
MediaServer mediaServer = mediaServerService.getOneFromCluster(mediaServerId);
if (mediaServer == null) {
throw new ControllerException(ErrorCode.ERROR100.getCode(), "流媒体不存在");
}

View File

@@ -0,0 +1,16 @@
package com.genersoft.iot.vmp.web.custom.bean;
import lombok.Getter;
import lombok.Setter;
@Setter
@Getter
public class SYMember {
private String no;
private String unicodeNo;
private String blockId;
private String unitNo;
private String terminalMemberStatus;
}

View File

@@ -40,6 +40,8 @@ public class CameraChannelService implements CommandLineRunner {
private final String REDIS_GPS_MESSAGE = "VM_MSG_MOBILE_GPS";
private final String REDIS_CHANNEL_MESSAGE = "VM_MSG_MOBILE_CHANNEL";
private final String REDIS_MEMBER_STATUS_MESSAGE = "VM_MSG_MEMBER_STATUS_CHANNEL";
private final String MOBILE_CHANNEL_PREFIX = "nationalStandardMobileTerminal_";
@Autowired
private CommonGBChannelMapper channelMapper;
@@ -124,6 +126,11 @@ public class CameraChannelService implements CommandLineRunner {
List<CommonGBChannel> resultListForOnline = new ArrayList<>();
List<CommonGBChannel> resultListForOffline = new ArrayList<>();
List<SYMember> memberList = new ArrayList<>();
List<CommonGBChannel> addMemberList = new ArrayList<>();
switch (event.getMessageType()) {
case UPDATE:
List<CommonGBChannel> oldChannelList = event.getOldChannels();
@@ -152,9 +159,15 @@ public class CameraChannelService implements CommandLineRunner {
resultListForUpdate.add(channel);
}else {
resultListForAdd.add(channel);
if ("ON".equals(channel.getGbStatus())) {
addMemberList.add(channel);
}
}
}else {
resultListForAdd.add(channel);
if ("ON".equals(channel.getGbStatus())) {
addMemberList.add(channel);
}
}
}else {
CommonGBChannel oldChannel = oldChannelMap.get(channel.getGbDeviceId());
@@ -183,14 +196,23 @@ public class CameraChannelService implements CommandLineRunner {
for (CommonGBChannel channel : channels) {
if (channel.getGbPtzType() != null && channel.getGbPtzType() == 99) {
CameraChannel cameraChannel = channelMapper.queryCameraChannelById(channel.getGbId());
SYMember member = getMember(cameraChannel.getGbDeviceId());
if (event.getMessageType() == ChannelEvent.ChannelEventMessageType.ON) {
cameraChannel.setGbStatus("ON");
resultListForOnline.add(cameraChannel);
if (member != null) {
member.setTerminalMemberStatus("ONLINE");
memberList.add(member);
}
}else {
cameraChannel.setGbStatus("OFF");
resultListForOffline.add(cameraChannel);
if (member != null) {
member.setTerminalMemberStatus("OFFLINE");
memberList.add(member);
}
}
}
}
break;
@@ -198,6 +220,9 @@ public class CameraChannelService implements CommandLineRunner {
for (CommonGBChannel channel : channels) {
if (channel.getGbPtzType() != null && channel.getGbPtzType() == 99) {
resultListForAdd.add(channel);
if ("ON".equals(channel.getGbStatus())) {
addMemberList.add(channel);
}
}
}
break;
@@ -221,6 +246,33 @@ public class CameraChannelService implements CommandLineRunner {
if (!resultListForOffline.isEmpty()) {
sendChannelMessage(resultListForOffline, ChannelEvent.ChannelEventMessageType.OFF);
}
if (!memberList.isEmpty()) {
sendMemberStatusMessage(memberList);
}
if (!addMemberList.isEmpty()) {
// 对于在线的终端进行延迟检查和发送
String key = UUID.randomUUID().toString();
dynamicTask.startDelay(key, () -> {
List<SYMember> members = new ArrayList<>();
for (CommonGBChannel commonGBChannel : addMemberList) {
SYMember member = getMember(commonGBChannel.getGbDeviceId());
if (member == null) {
continue;
}
member.setTerminalMemberStatus("ONLINE");
members.add(member);
}
if (!members.isEmpty()) {
sendMemberStatusMessage(members);
}
}, 5000);
}
}
private void sendMemberStatusMessage(List<SYMember> memberList) {
String jsonString = JSONObject.toJSONString(memberList);
log.info("[SY-redis发送通知] 发送 状态变化 {}: {}", REDIS_MEMBER_STATUS_MESSAGE, jsonString);
redisTemplate.convertAndSend(REDIS_MEMBER_STATUS_MESSAGE, jsonString);
}
private void sendChannelMessage(List<CommonGBChannel> channelList, ChannelEvent.ChannelEventMessageType type) {
@@ -239,28 +291,40 @@ public class CameraChannelService implements CommandLineRunner {
@EventListener
public void onApplicationEvent(MobilePositionEvent event) {
MobilePosition mobilePosition = event.getMobilePosition();
Integer channelId = mobilePosition.getChannelId();
CameraChannel cameraChannel = channelMapper.queryCameraChannelById(channelId);
// 非移动设备类型 不发送
if (cameraChannel == null || cameraChannel.getGbPtzType() == null || cameraChannel.getGbPtzType() != 99) {
// 从redis补充信息
SYMember member = getMember(mobilePosition.getChannelDeviceId());
if (member == null) {
log.info("[SY-redis发送通知-移动设备位置信息] 缓存未获取 {}", mobilePosition.toString());
return;
}
// 发送redis消息
JSONObject jsonObject = new JSONObject();
jsonObject.put("time", mobilePosition.getTime());
jsonObject.put("deviceId", mobilePosition.getChannelDeviceId());
jsonObject.put("gpsDate", mobilePosition.getTime());
jsonObject.put("unicodeNo", member.getUnicodeNo());
jsonObject.put("memberNo", member.getNo());
jsonObject.put("unitNo", member.getUnitNo());
jsonObject.put("longitude", mobilePosition.getLongitude());
jsonObject.put("latitude", mobilePosition.getLatitude());
jsonObject.put("altitude", mobilePosition.getAltitude());
jsonObject.put("direction", mobilePosition.getDirection());
jsonObject.put("speed", mobilePosition.getSpeed());
jsonObject.put("topGroupGAlias", cameraChannel.getTopGroupGAlias());
jsonObject.put("groupAlias", cameraChannel.getGroupAlias());
log.info("[SY-redis发送通知] 发送 移动设备位置信息移动位置 {}: {}", REDIS_GPS_MESSAGE, jsonObject.toString());
jsonObject.put("blockId", member.getBlockId());
log.info("[SY-redis发送通知-移动设备位置信息] 发送 {}: {}", REDIS_GPS_MESSAGE, jsonObject.toString());
redisTemplate.convertAndSend(REDIS_GPS_MESSAGE, jsonObject);
}
private SYMember getMember(String deviceId) {
// 从redis补充信息
String key = MOBILE_CHANNEL_PREFIX + deviceId;
String memberJsonString = (String) redisTemplate.opsForValue().get(key);
if (memberJsonString == null) {
return null;
}
return JSONObject.parseObject(memberJsonString, SYMember.class);
}
public PageInfo<CameraChannel> queryList(Integer page, Integer count, String groupAlias, Boolean status, String geoCoordSys) {
// 构建组织结构信息

View File

@@ -61,9 +61,3 @@ export function queryTree(params) {
}
})
}
export function sync() {
return request({
method: 'get',
url: `/api/group/sync`,
})
}

View File

@@ -64,16 +64,6 @@ const actions = {
reject(error)
})
})
},
sync({ commit }) {
return new Promise((resolve, reject) => {
sync().then(response => {
const { data } = response
resolve(data)
}).catch(error => {
reject(error)
})
})
}
}

View File

@@ -16,9 +16,8 @@
type="info"
style="text-align: left"
/>
<div v-if="edit" style="font-size: 14px;position: absolute;left: 270px;z-index: 100;" >
<div v-if="edit" style="margin-top: 10px; font-size: 14px;position: absolute;left: 270px;z-index: 100;" >
显示编号 <el-checkbox v-model="showCode" />
<el-button type="text" style="margin-left: 10px" :loading="groupSyncLoading" @click="groupSync">同步</el-button>
</div>
<vue-easy-tree
@@ -149,8 +148,7 @@ export default {
count: this.defaultCount | 15,
total: 0,
groupList: [],
channelList: [],
groupSyncLoading: false
channelList: []
}
},
created() {
@@ -507,20 +505,6 @@ export default {
id: data.gbId
})
},
groupSync: function() {
this.groupSyncLoading = true
this.$store.dispatch('group/sync').then(data => {
this.$message.success({
showClose: true,
message: '同步消息已经发送, 3秒后自动刷新'
})
setTimeout(() => {
this.refresh('')
}, 3000)
}).finally(() => {
this.groupSyncLoading = false
})
},
contextmenuEventHandlerForLi(event, data) {
console.log(data)
const allMenuItem = []

View File

@@ -623,8 +623,8 @@ export default {
let mapTileConfig = this.mapTileList[this.mapTileIndex]
this.mapTileIndex = index
window.coordinateSystem = this.mapTileList[this.mapTileIndex].coordinateSystem
tileLayer.getSource().clear()
tileLayer.getSource().setUrl(this.mapTileList[index].tilesUrl)
tileLayer.getSource().refresh()
if (mapTileConfig.coordinateSystem !== this.mapTileList[this.mapTileIndex].coordinateSystem) {
// 发送通知
this.$emit('coordinateSystemChange', this.mapTileList[this.mapTileIndex].coordinateSystem)

View File

@@ -160,7 +160,7 @@ export default {
longitudeStr: 'longitude',
latitudeStr: 'latitude',
mapTileList: [],
diffPixels: 30,
diffPixels: 120,
zoomValue: 10,
showDrawThin: false,
quicklyDrawThinLoading: false,