优化redis 消息处理

This commit is contained in:
lin
2025-11-14 17:09:55 +08:00
parent c597567763
commit fea157436b
5 changed files with 31 additions and 9 deletions

View File

@@ -8,9 +8,6 @@ public class ServerInfo {
private String ip; private String ip;
private int port; private int port;
/**
* 现在使用的线程数
*/
private String createTime; private String createTime;
public static ServerInfo create(String ip, int port) { public static ServerInfo create(String ip, int port) {

View File

@@ -8,6 +8,7 @@ import com.genersoft.iot.vmp.conf.exception.ControllerException;
import com.genersoft.iot.vmp.gb28181.bean.Group; import com.genersoft.iot.vmp.gb28181.bean.Group;
import com.genersoft.iot.vmp.gb28181.bean.RedisGroupMessage; import com.genersoft.iot.vmp.gb28181.bean.RedisGroupMessage;
import com.genersoft.iot.vmp.gb28181.service.IGroupService; import com.genersoft.iot.vmp.gb28181.service.IGroupService;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.streamPush.service.IStreamPushService; import com.genersoft.iot.vmp.streamPush.service.IStreamPushService;
import com.genersoft.iot.vmp.utils.DateUtil; import com.genersoft.iot.vmp.utils.DateUtil;
import jakarta.annotation.Resource; import jakarta.annotation.Resource;
@@ -37,7 +38,7 @@ public class RedisGroupMsgListener implements MessageListener {
private IGroupService groupService; private IGroupService groupService;
@Resource @Resource
private IStreamPushService streamPushService; private IRedisCatchStorage redisCatchStorage;
@Autowired @Autowired
private UserSetting userSetting; private UserSetting userSetting;
@@ -49,6 +50,10 @@ public class RedisGroupMsgListener implements MessageListener {
@Override @Override
public void onMessage(Message message, byte[] bytes) { public void onMessage(Message message, byte[] bytes) {
String serverId = redisCatchStorage.chooseOneServer(null);
if (!userSetting.getServerId().equals(serverId)) {
return;
}
log.info("[REDIS: 业务分组同步回复] key {} {}", VideoManagerConstants.VM_MSG_GROUP_LIST_RESPONSE, new String(message.getBody())); log.info("[REDIS: 业务分组同步回复] key {} {}", VideoManagerConstants.VM_MSG_GROUP_LIST_RESPONSE, new String(message.getBody()));
taskQueue.offer(message); taskQueue.offer(message);
} }
@@ -77,7 +82,7 @@ public class RedisGroupMsgListener implements MessageListener {
List<RedisGroupMessage> groupMessages = JSON.parseArray(new String(msg.getBody()), RedisGroupMessage.class); List<RedisGroupMessage> groupMessages = JSON.parseArray(new String(msg.getBody()), RedisGroupMessage.class);
for (int i = 0; i < groupMessages.size(); i++) { for (int i = 0; i < groupMessages.size(); i++) {
RedisGroupMessage groupMessage = groupMessages.get(i); RedisGroupMessage groupMessage = groupMessages.get(i);
log.info("[REDIS消息-业务分组同步回复] {}", groupMessage.toString()); log.info("[REDIS消息-业务分组同步回复] 处理数据: {}", groupMessage.toString());
if (!userSetting.isUseAliasForGroupSync()) { if (!userSetting.isUseAliasForGroupSync()) {
if (groupMessage.getGroupGbId() == null) { if (groupMessage.getGroupGbId() == null) {
log.warn("[REDIS消息-业务分组同步回复] 分组编号未设置,{}", groupMessage.toString()); log.warn("[REDIS消息-业务分组同步回复] 分组编号未设置,{}", groupMessage.toString());
@@ -156,11 +161,12 @@ public class RedisGroupMsgListener implements MessageListener {
group.setUpdateTime(DateUtil.getNow()); group.setUpdateTime(DateUtil.getNow());
if (group.getId() > 0) { if (group.getId() > 0) {
log.info("[REDIS消息-业务分组同步回复] 更新入库, {}", JSON.toJSONString(group));
groupService.update(group); groupService.update(group);
}else { }else {
log.info("[REDIS消息-业务分组同步回复] 新增入库, {}", JSON.toJSONString(group));
groupService.add(group); groupService.add(group);
} }
} }
} }

View File

@@ -2,7 +2,9 @@ package com.genersoft.iot.vmp.service.redisMsg;
import com.alibaba.fastjson2.JSON; import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONObject; import com.alibaba.fastjson2.JSONObject;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.media.service.IMediaServerService; import com.genersoft.iot.vmp.media.service.IMediaServerService;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.streamPush.bean.RedisPushStreamMessage; import com.genersoft.iot.vmp.streamPush.bean.RedisPushStreamMessage;
import com.genersoft.iot.vmp.streamPush.bean.StreamPush; import com.genersoft.iot.vmp.streamPush.bean.StreamPush;
import com.genersoft.iot.vmp.streamPush.service.IStreamPushService; import com.genersoft.iot.vmp.streamPush.service.IStreamPushService;
@@ -19,6 +21,8 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentLinkedQueue;
import static com.genersoft.iot.vmp.conf.security.JwtUtils.userSetting;
/** /**
* @Auther: JiangFeng * @Auther: JiangFeng
* @Date: 2022/8/16 11:32 * @Date: 2022/8/16 11:32
@@ -36,10 +40,20 @@ public class RedisPushStreamListMsgListener implements MessageListener {
@Resource @Resource
private IStreamPushService streamPushService; private IStreamPushService streamPushService;
@Resource
private IRedisCatchStorage redisCatchStorage;
@Resource
private UserSetting userSetting;
private final ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>(); private final ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>();
@Override @Override
public void onMessage(Message message, byte[] bytes) { public void onMessage(Message message, byte[] bytes) {
String serverId = redisCatchStorage.chooseOneServer(null);
if (!userSetting.getServerId().equals(serverId)) {
return;
}
log.info("[REDIS: 推流设备列表更新] {}", new String(message.getBody())); log.info("[REDIS: 推流设备列表更新] {}", new String(message.getBody()));
taskQueue.offer(message); taskQueue.offer(message);
} }

View File

@@ -521,7 +521,9 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage {
@Override @Override
public String chooseOneServer(String serverId) { public String chooseOneServer(String serverId) {
String key = VideoManagerConstants.WVP_SERVER_LIST; String key = VideoManagerConstants.WVP_SERVER_LIST;
redisTemplate.opsForZSet().remove(key, serverId); if (serverId != null) {
redisTemplate.opsForZSet().remove(key, serverId);
}
Set<Object> range = redisTemplate.opsForZSet().range(key, 0, 0); Set<Object> range = redisTemplate.opsForZSet().range(key, 0, 0);
if (range == null || range.isEmpty()) { if (range == null || range.isEmpty()) {
return null; return null;

View File

@@ -182,10 +182,13 @@ public class CameraChannelService implements CommandLineRunner {
case VLOST: case VLOST:
for (CommonGBChannel channel : channels) { for (CommonGBChannel channel : channels) {
if (channel.getGbPtzType() != null && channel.getGbPtzType() == 99) { if (channel.getGbPtzType() != null && channel.getGbPtzType() == 99) {
CameraChannel cameraChannel = channelMapper.queryCameraChannelById(channel.getGbId());
if (event.getMessageType() == ChannelEvent.ChannelEventMessageType.ON) { if (event.getMessageType() == ChannelEvent.ChannelEventMessageType.ON) {
resultListForOnline.add(channel); cameraChannel.setGbStatus("ON");
resultListForOnline.add(cameraChannel);
}else { }else {
resultListForOffline.add(channel); cameraChannel.setGbStatus("OFF");
resultListForOffline.add(cameraChannel);
} }
} }