From fea157436bf72873f19370cd38cd4f198a13a881 Mon Sep 17 00:00:00 2001 From: lin <648540858@qq.com> Date: Fri, 14 Nov 2025 17:09:55 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96redis=20=E6=B6=88=E6=81=AF?= =?UTF-8?q?=E5=A4=84=E7=90=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../com/genersoft/iot/vmp/common/ServerInfo.java | 3 --- .../service/redisMsg/RedisGroupMsgListener.java | 12 +++++++++--- .../redisMsg/RedisPushStreamListMsgListener.java | 14 ++++++++++++++ .../vmp/storager/impl/RedisCatchStorageImpl.java | 4 +++- .../web/custom/service/CameraChannelService.java | 7 +++++-- 5 files changed, 31 insertions(+), 9 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/common/ServerInfo.java b/src/main/java/com/genersoft/iot/vmp/common/ServerInfo.java index fb1941a13..7cff1bf70 100644 --- a/src/main/java/com/genersoft/iot/vmp/common/ServerInfo.java +++ b/src/main/java/com/genersoft/iot/vmp/common/ServerInfo.java @@ -8,9 +8,6 @@ public class ServerInfo { private String ip; private int port; - /** - * 现在使用的线程数 - */ private String createTime; public static ServerInfo create(String ip, int port) { diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGroupMsgListener.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGroupMsgListener.java index bfa4a5d6e..9be35ac71 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGroupMsgListener.java +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGroupMsgListener.java @@ -8,6 +8,7 @@ import com.genersoft.iot.vmp.conf.exception.ControllerException; import com.genersoft.iot.vmp.gb28181.bean.Group; import com.genersoft.iot.vmp.gb28181.bean.RedisGroupMessage; import com.genersoft.iot.vmp.gb28181.service.IGroupService; +import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.streamPush.service.IStreamPushService; import com.genersoft.iot.vmp.utils.DateUtil; import jakarta.annotation.Resource; @@ -37,7 +38,7 @@ public class RedisGroupMsgListener implements MessageListener { private IGroupService groupService; @Resource - private IStreamPushService streamPushService; + private IRedisCatchStorage redisCatchStorage; @Autowired private UserSetting userSetting; @@ -49,6 +50,10 @@ public class RedisGroupMsgListener implements MessageListener { @Override public void onMessage(Message message, byte[] bytes) { + String serverId = redisCatchStorage.chooseOneServer(null); + if (!userSetting.getServerId().equals(serverId)) { + return; + } log.info("[REDIS: 业务分组同步回复] key: {}, : {}", VideoManagerConstants.VM_MSG_GROUP_LIST_RESPONSE, new String(message.getBody())); taskQueue.offer(message); } @@ -77,7 +82,7 @@ public class RedisGroupMsgListener implements MessageListener { List groupMessages = JSON.parseArray(new String(msg.getBody()), RedisGroupMessage.class); for (int i = 0; i < groupMessages.size(); i++) { RedisGroupMessage groupMessage = groupMessages.get(i); - log.info("[REDIS消息-业务分组同步回复] {}", groupMessage.toString()); + log.info("[REDIS消息-业务分组同步回复] 处理数据: {}", groupMessage.toString()); if (!userSetting.isUseAliasForGroupSync()) { if (groupMessage.getGroupGbId() == null) { log.warn("[REDIS消息-业务分组同步回复] 分组编号未设置,{}", groupMessage.toString()); @@ -156,11 +161,12 @@ public class RedisGroupMsgListener implements MessageListener { group.setUpdateTime(DateUtil.getNow()); if (group.getId() > 0) { + log.info("[REDIS消息-业务分组同步回复] 更新入库, {}", JSON.toJSONString(group)); groupService.update(group); }else { + log.info("[REDIS消息-业务分组同步回复] 新增入库, {}", JSON.toJSONString(group)); groupService.add(group); } - } } diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamListMsgListener.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamListMsgListener.java index 954aa37f0..56273e976 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamListMsgListener.java +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamListMsgListener.java @@ -2,7 +2,9 @@ package com.genersoft.iot.vmp.service.redisMsg; import com.alibaba.fastjson2.JSON; import com.alibaba.fastjson2.JSONObject; +import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.media.service.IMediaServerService; +import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.streamPush.bean.RedisPushStreamMessage; import com.genersoft.iot.vmp.streamPush.bean.StreamPush; import com.genersoft.iot.vmp.streamPush.service.IStreamPushService; @@ -19,6 +21,8 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentLinkedQueue; +import static com.genersoft.iot.vmp.conf.security.JwtUtils.userSetting; + /** * @Auther: JiangFeng * @Date: 2022/8/16 11:32 @@ -36,10 +40,20 @@ public class RedisPushStreamListMsgListener implements MessageListener { @Resource private IStreamPushService streamPushService; + @Resource + private IRedisCatchStorage redisCatchStorage; + + @Resource + private UserSetting userSetting; + private final ConcurrentLinkedQueue taskQueue = new ConcurrentLinkedQueue<>(); @Override public void onMessage(Message message, byte[] bytes) { + String serverId = redisCatchStorage.chooseOneServer(null); + if (!userSetting.getServerId().equals(serverId)) { + return; + } log.info("[REDIS: 推流设备列表更新]: {}", new String(message.getBody())); taskQueue.offer(message); } diff --git a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java index 99dd5bed1..e6acf543b 100755 --- a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java @@ -521,7 +521,9 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage { @Override public String chooseOneServer(String serverId) { String key = VideoManagerConstants.WVP_SERVER_LIST; - redisTemplate.opsForZSet().remove(key, serverId); + if (serverId != null) { + redisTemplate.opsForZSet().remove(key, serverId); + } Set range = redisTemplate.opsForZSet().range(key, 0, 0); if (range == null || range.isEmpty()) { return null; diff --git a/src/main/java/com/genersoft/iot/vmp/web/custom/service/CameraChannelService.java b/src/main/java/com/genersoft/iot/vmp/web/custom/service/CameraChannelService.java index 9bf701843..b8b9c7c17 100644 --- a/src/main/java/com/genersoft/iot/vmp/web/custom/service/CameraChannelService.java +++ b/src/main/java/com/genersoft/iot/vmp/web/custom/service/CameraChannelService.java @@ -182,10 +182,13 @@ public class CameraChannelService implements CommandLineRunner { case VLOST: for (CommonGBChannel channel : channels) { if (channel.getGbPtzType() != null && channel.getGbPtzType() == 99) { + CameraChannel cameraChannel = channelMapper.queryCameraChannelById(channel.getGbId()); if (event.getMessageType() == ChannelEvent.ChannelEventMessageType.ON) { - resultListForOnline.add(channel); + cameraChannel.setGbStatus("ON"); + resultListForOnline.add(cameraChannel); }else { - resultListForOffline.add(channel); + cameraChannel.setGbStatus("OFF"); + resultListForOffline.add(cameraChannel); } }