Merge branch '648540858:master' into develop-add-api-key

This commit is contained in:
ancienter
2024-04-09 09:52:23 +08:00
committed by GitHub
57 changed files with 741 additions and 414 deletions

View File

@@ -2,6 +2,7 @@ package com.genersoft.iot.vmp.service;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel;
import com.genersoft.iot.vmp.gb28181.bean.MobilePosition;
import com.genersoft.iot.vmp.vmanager.bean.ResourceBaseInfo;
import com.genersoft.iot.vmp.vmanager.gb28181.platform.bean.ChannelReduce;
@@ -92,4 +93,10 @@ public interface IDeviceChannelService {
* 修改通道的码流类型
*/
void updateChannelStreamIdentification(DeviceChannel channel);
List<DeviceChannel> queryChaneListByDeviceId(String deviceId);
void updateChannelGPS(Device device, DeviceChannel deviceChannel, MobilePosition mobilePosition);
void stopPlay(String deviceId, String channelId);
}

View File

@@ -68,4 +68,5 @@ public interface IPlayService {
void getSnap(String deviceId, String channelId, String fileName, ErrorCallback errorCallback);
void stopPlay(Device device, String channelId);
}

View File

@@ -1,5 +1,8 @@
package com.genersoft.iot.vmp.service.bean;
import com.genersoft.iot.vmp.gb28181.bean.MobilePosition;
import com.genersoft.iot.vmp.utils.DateUtil;
public class GPSMsgInfo {
/**
@@ -39,6 +42,18 @@ public class GPSMsgInfo {
private boolean stored;
public static GPSMsgInfo getInstance(MobilePosition mobilePosition) {
GPSMsgInfo gpsMsgInfo = new GPSMsgInfo();
gpsMsgInfo.setId(mobilePosition.getChannelId());
gpsMsgInfo.setAltitude(mobilePosition.getAltitude() + "");
gpsMsgInfo.setLng(mobilePosition.getLongitude());
gpsMsgInfo.setLat(mobilePosition.getLatitude());
gpsMsgInfo.setSpeed(mobilePosition.getSpeed());
gpsMsgInfo.setDirection(mobilePosition.getDirection() + "");
gpsMsgInfo.setTime(DateUtil.yyyy_MM_dd_HH_mm_ssToISO8601(mobilePosition.getTime()));
return gpsMsgInfo;
}
public String getId() {
return id;

View File

@@ -0,0 +1,49 @@
package com.genersoft.iot.vmp.service.bean;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
public class RequestStopPushStreamMsg {
private SendRtpItem sendRtpItem;
private String platformName;
private int platFormIndex;
public SendRtpItem getSendRtpItem() {
return sendRtpItem;
}
public void setSendRtpItem(SendRtpItem sendRtpItem) {
this.sendRtpItem = sendRtpItem;
}
public String getPlatformName() {
return platformName;
}
public void setPlatformName(String platformName) {
this.platformName = platformName;
}
public int getPlatFormIndex() {
return platFormIndex;
}
public void setPlatFormIndex(int platFormIndex) {
this.platFormIndex = platFormIndex;
}
public static RequestStopPushStreamMsg getInstance(SendRtpItem sendRtpItem, String platformName, int platFormIndex) {
RequestStopPushStreamMsg streamMsg = new RequestStopPushStreamMsg();
streamMsg.setSendRtpItem(sendRtpItem);
streamMsg.setPlatformName(platformName);
streamMsg.setPlatFormIndex(platFormIndex);
return streamMsg;
}
}

View File

@@ -6,7 +6,17 @@ package com.genersoft.iot.vmp.service.bean;
public class WvpRedisMsgCmd {
/**
* 请求获取推流信息
*/
public static final String GET_SEND_ITEM = "GetSendItem";
/**
* 请求推流的请求
*/
public static final String REQUEST_PUSH_STREAM = "RequestPushStream";
/**
* 停止推流的请求
*/
public static final String REQUEST_STOP_PUSH_STREAM = "RequestStopPushStream";
}

View File

@@ -1,16 +1,21 @@
package com.genersoft.iot.vmp.service.impl;
import com.alibaba.fastjson2.JSONObject;
import com.baomidou.dynamic.datasource.annotation.DS;
import com.genersoft.iot.vmp.common.InviteInfo;
import com.genersoft.iot.vmp.common.InviteSessionType;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel;
import com.genersoft.iot.vmp.gb28181.bean.MobilePosition;
import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
import com.genersoft.iot.vmp.gb28181.utils.Coordtransform;
import com.genersoft.iot.vmp.service.IDeviceChannelService;
import com.genersoft.iot.vmp.service.IInviteStreamService;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.dao.DeviceChannelMapper;
import com.genersoft.iot.vmp.storager.dao.DeviceMapper;
import com.genersoft.iot.vmp.storager.dao.DeviceMobilePositionMapper;
import com.genersoft.iot.vmp.utils.DateUtil;
import com.genersoft.iot.vmp.vmanager.bean.ResourceBaseInfo;
import com.genersoft.iot.vmp.vmanager.gb28181.platform.bean.ChannelReduce;
@@ -35,7 +40,7 @@ public class DeviceChannelServiceImpl implements IDeviceChannelService {
private final static Logger logger = LoggerFactory.getLogger(DeviceChannelServiceImpl.class);
@Autowired
private IRedisCatchStorage redisCatchStorage;
private EventPublisher eventPublisher;
@Autowired
private IInviteStreamService inviteStreamService;
@@ -46,6 +51,15 @@ public class DeviceChannelServiceImpl implements IDeviceChannelService {
@Autowired
private DeviceMapper deviceMapper;
@Autowired
private DeviceMobilePositionMapper deviceMobilePositionMapper;
@Autowired
private UserSetting userSetting;
@Autowired
private IRedisCatchStorage redisCatchStorage;
@Override
public DeviceChannel updateGps(DeviceChannel deviceChannel, Device device) {
if (deviceChannel.getLongitude()*deviceChannel.getLatitude() > 0) {
@@ -84,7 +98,6 @@ public class DeviceChannelServiceImpl implements IDeviceChannelService {
public void updateChannel(String deviceId, DeviceChannel channel) {
String channelId = channel.getChannelId();
channel.setDeviceId(deviceId);
// StreamInfo streamInfo = redisCatchStorage.queryPlayByDevice(deviceId, channelId);
InviteInfo inviteInfo = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, deviceId, channelId);
if (inviteInfo != null && inviteInfo.getStreamInfo() != null) {
channel.setStreamId(inviteInfo.getStreamInfo().getStream());
@@ -280,4 +293,69 @@ public class DeviceChannelServiceImpl implements IDeviceChannelService {
}
channelMapper.updateChannelStreamIdentification(channel);
}
@Override
public List<DeviceChannel> queryChaneListByDeviceId(String deviceId) {
return channelMapper.queryAllChannels(deviceId);
}
@Override
public void updateChannelGPS(Device device, DeviceChannel deviceChannel, MobilePosition mobilePosition) {
if (userSetting.getSavePositionHistory()) {
deviceMobilePositionMapper.insertNewPosition(mobilePosition);
}
if (deviceChannel.getChannelId().equals(deviceChannel.getDeviceId())) {
deviceChannel.setChannelId(null);
}
if (deviceChannel.getGpsTime() == null) {
deviceChannel.setGpsTime(DateUtil.getNow());
}
int updated = channelMapper.updatePosition(deviceChannel);
if (updated == 0) {
return;
}
List<DeviceChannel> deviceChannels = new ArrayList<>();
if (deviceChannel.getChannelId() == null) {
// 有的设备这里上报的deviceId与通道Id是一样这种情况更新设备下的全部通道
List<DeviceChannel> deviceChannelsInDb = queryChaneListByDeviceId(device.getDeviceId());
deviceChannels.addAll(deviceChannelsInDb);
}else {
deviceChannels.add(deviceChannel);
}
if (deviceChannels.isEmpty()) {
return;
}
if (deviceChannels.size() > 100) {
logger.warn("[更新通道位置信息后发送通知] 设备可能是平台,上报的位置信息未标明通道编号," +
"导致所有通道被更新位置, deviceId:{}", device.getDeviceId());
}
for (DeviceChannel channel : deviceChannels) {
// 向关联了该通道并且开启移动位置订阅的上级平台发送移动位置订阅消息
mobilePosition.setChannelId(channel.getChannelId());
try {
eventPublisher.mobilePositionEventPublish(mobilePosition);
}catch (Exception e) {
logger.error("[向上级转发移动位置失败] ", e);
}
// 发送redis消息。 通知位置信息的变化
JSONObject jsonObject = new JSONObject();
jsonObject.put("time", DateUtil.yyyy_MM_dd_HH_mm_ssToISO8601(mobilePosition.getTime()));
jsonObject.put("serial", mobilePosition.getDeviceId());
jsonObject.put("code", mobilePosition.getChannelId());
jsonObject.put("longitude", mobilePosition.getLongitude());
jsonObject.put("latitude", mobilePosition.getLatitude());
jsonObject.put("altitude", mobilePosition.getAltitude());
jsonObject.put("direction", mobilePosition.getDirection());
jsonObject.put("speed", mobilePosition.getSpeed());
redisCatchStorage.sendMobilePositionMsg(jsonObject);
}
}
@Override
public void stopPlay(String deviceId, String channelId) {
channelMapper.stopPlay(deviceId, channelId);
}
}

View File

@@ -269,6 +269,8 @@ public class DeviceServiceImpl implements IDeviceService {
int subscribeCycleForCatalog = Math.max(device.getSubscribeCycleForCatalog(),30);
// 设置最小值为30
dynamicTask.startCron(device.getDeviceId() + "catalog", catalogSubscribeTask, (subscribeCycleForCatalog -1) * 1000);
catalogSubscribeTask.run();
return true;
}
@@ -302,6 +304,7 @@ public class DeviceServiceImpl implements IDeviceService {
int subscribeCycleForCatalog = Math.max(device.getSubscribeCycleForMobilePosition(),30);
// 刷新订阅
dynamicTask.startCron(device.getDeviceId() + "mobile_position" , mobilePositionSubscribeTask, subscribeCycleForCatalog * 1000);
mobilePositionSubscribeTask.run();
return true;
}

View File

@@ -34,7 +34,6 @@ import com.genersoft.iot.vmp.service.bean.*;
import com.genersoft.iot.vmp.service.redisMsg.RedisGbPlayMsgListener;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
import com.genersoft.iot.vmp.storager.dao.CloudRecordServiceMapper;
import com.genersoft.iot.vmp.utils.CloudRecordUtils;
import com.genersoft.iot.vmp.utils.DateUtil;
import com.genersoft.iot.vmp.vmanager.bean.AudioBroadcastResult;
@@ -123,9 +122,6 @@ public class PlayServiceImpl implements IPlayService {
@Autowired
private DynamicTask dynamicTask;
@Autowired
private CloudRecordServiceMapper cloudRecordServiceMapper;
@Autowired
private ISIPCommanderForPlatform commanderForPlatform;
@@ -1170,7 +1166,7 @@ public class PlayServiceImpl implements IPlayService {
dynamicTask.startDelay(key, ()->{
logger.info("[语音广播]等待invite消息超时{}/{}", device.getDeviceId(), channelId);
stopAudioBroadcast(device.getDeviceId(), channelId);
}, 2000);
}, 10*1000);
}, eventResultForError -> {
// 发送失败
logger.error("语音广播发送失败: {}:{}", channelId, eventResultForError.msg);
@@ -1409,6 +1405,14 @@ public class PlayServiceImpl implements IPlayService {
logger.info("调用ZLM推流接口, 结果: {}", jsonObject);
logger.info("RTP推流成功[ {}/{} ]{}->{}, ", param.get("app"), param.get("stream"), jsonObject.getString("local_port"),
sendRtpItem.isTcpActive()?"被动发流": param.get("dst_url") + ":" + param.get("dst_port"));
if (sendRtpItem.getPlayType() == InviteStreamType.PUSH && correlationInfo instanceof ParentPlatform) {
ParentPlatform platform = (ParentPlatform)correlationInfo;
MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(0, sendRtpItem.getApp(), sendRtpItem.getStream(),
sendRtpItem.getChannelId(), platform.getServerGBId(), platform.getName(), userSetting.getServerId(),
sendRtpItem.getMediaServerId());
messageForPushChannel.setPlatFormIndex(platform.getId());
redisCatchStorage.sendPlatformStartPlayMsg(messageForPushChannel);
}
} else {
logger.error("RTP推流失败: {}, 参数:{}", jsonObject.getString("msg"), JSONObject.toJSONString(param));
if (sendRtpItem.isOnlyAudio()) {
@@ -1584,4 +1588,26 @@ public class PlayServiceImpl implements IPlayService {
});
}
@Override
public void stopPlay(Device device, String channelId) {
InviteInfo inviteInfo = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, device.getDeviceId(), channelId);
if (inviteInfo == null) {
throw new ControllerException(ErrorCode.ERROR100.getCode(), "点播未找到");
}
if (InviteSessionStatus.ok == inviteInfo.getStatus()) {
try {
logger.info("[停止点播] {}/{}", device.getDeviceId(), channelId);
cmder.streamByeCmd(device, channelId, inviteInfo.getStream(), null, null);
} catch (InvalidArgumentException | SipException | ParseException | SsrcTransactionNotFoundException e) {
logger.error("[命令发送失败] 停止点播, 发送BYE: {}", e.getMessage());
throw new ControllerException(ErrorCode.ERROR100.getCode(), "命令发送失败: " + e.getMessage());
}
}
inviteStreamService.removeInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, device.getDeviceId(), channelId);
storager.stopPlay(device.getDeviceId(), channelId);
channelService.stopPlay(device.getDeviceId(), channelId);
if (inviteInfo.getStreamInfo() != null) {
mediaServerService.closeRTPServer(inviteInfo.getStreamInfo().getMediaServerId(), inviteInfo.getStream());
}
}
}

View File

@@ -8,7 +8,6 @@ import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.conf.DynamicTask;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.conf.exception.ControllerException;
import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent;
import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils;
import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory;
@@ -25,7 +24,6 @@ import com.genersoft.iot.vmp.service.IStreamProxyService;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
import com.genersoft.iot.vmp.storager.dao.GbStreamMapper;
import com.genersoft.iot.vmp.storager.dao.ParentPlatformMapper;
import com.genersoft.iot.vmp.storager.dao.PlatformGbStreamMapper;
import com.genersoft.iot.vmp.storager.dao.StreamProxyMapper;
import com.genersoft.iot.vmp.utils.DateUtil;
@@ -333,8 +331,6 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
result = zlmresTfulUtils.addStreamProxy(mediaServerItem, param.getApp(), param.getStream(), param.getUrl().trim(),
param.isEnableAudio(), param.isEnableMp4(), param.getRtpType());
}
System.out.println("addStreamProxyToZlm====");
System.out.println(result);
if (result != null && result.getInteger("code") == 0) {
JSONObject data = result.getJSONObject("data");
if (data == null) {

View File

@@ -133,7 +133,10 @@ public class RedisGbPlayMsgListener implements MessageListener {
case WvpRedisMsgCmd.REQUEST_PUSH_STREAM:
RequestPushStreamMsg param = JSON.to(RequestPushStreamMsg.class, wvpRedisMsg.getContent());
requestPushStreamMsgHand(param, wvpRedisMsg.getFromId(), wvpRedisMsg.getSerial());
break;
case WvpRedisMsgCmd.REQUEST_STOP_PUSH_STREAM:
RequestStopPushStreamMsg streamMsg = JSON.to(RequestStopPushStreamMsg.class, wvpRedisMsg.getContent());
requestStopPushStreamMsgHand(streamMsg, wvpRedisMsg.getFromId(), wvpRedisMsg.getSerial());
break;
default:
break;
@@ -397,6 +400,19 @@ public class RedisGbPlayMsgListener implements MessageListener {
redisTemplate.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject);
}
/**
* 发送请求推流的消息
*/
public void sendMsgForStopSendRtpStream(String serverId, RequestStopPushStreamMsg streamMsg) {
String key = UUID.randomUUID().toString();
WvpRedisMsg redisMsg = WvpRedisMsg.getRequestInstance(userSetting.getServerId(), serverId,
WvpRedisMsgCmd.REQUEST_STOP_PUSH_STREAM, key, JSON.toJSONString(streamMsg));
JSONObject jsonObject = (JSONObject)JSON.toJSON(redisMsg);
logger.info("[REDIS 请求其他平台停止推流] {}: {}", serverId, jsonObject);
redisTemplate.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject);
}
private SendRtpItem querySendRTPServer(String platformGbId, String channelId, String streamId, String callId) {
if (platformGbId == null) {
platformGbId = "*";
@@ -423,4 +439,36 @@ public class RedisGbPlayMsgListener implements MessageListener {
return null;
}
}
/**
* 处理收到的请求推流的请求
*/
private void requestStopPushStreamMsgHand(RequestStopPushStreamMsg streamMsg, String fromId, String serial) {
SendRtpItem sendRtpItem = streamMsg.getSendRtpItem();
if (sendRtpItem == null) {
logger.info("[REDIS 执行其他平台的请求停止推流] 失败: sendRtpItem为NULL");
return;
}
MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
if (mediaInfo == null) {
// TODO 回复错误
return;
}
Map<String, Object> param = new HashMap<>();
param.put("vhost","__defaultVhost__");
param.put("app",sendRtpItem.getApp());
param.put("stream",sendRtpItem.getStream());
param.put("ssrc", sendRtpItem.getSsrc());
if (zlmServerFactory.stopSendRtpStream(mediaInfo, param)) {
logger.info("[REDIS 执行其他平台的请求停止推流] 成功: {}/{}", sendRtpItem.getApp(), sendRtpItem.getStream());
// 发送redis消息
MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(0,
sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getChannelId(),
sendRtpItem.getPlatformId(), streamMsg.getPlatformName(), userSetting.getServerId(), sendRtpItem.getMediaServerId());
messageForPushChannel.setPlatFormIndex(streamMsg.getPlatFormIndex());
redisCatchStorage.sendPlatformStopPlayMsg(messageForPushChannel);
}
}
}

View File

@@ -2,12 +2,10 @@ package com.genersoft.iot.vmp.service.redisMsg;
import com.alibaba.fastjson2.JSON;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.gb28181.bean.InviteStreamType;
import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem;
import com.genersoft.iot.vmp.service.IMediaServerService;
import com.genersoft.iot.vmp.service.IStreamPushService;
@@ -25,7 +23,6 @@ import org.springframework.stereotype.Component;
import javax.sip.InvalidArgumentException;
import javax.sip.SipException;
import java.text.ParseException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -73,7 +70,7 @@ public class RedisPushStreamCloseResponseListener implements MessageListener {
MessageForPushChannel pushChannel = JSON.parseObject(message.getBody(), MessageForPushChannel.class);
StreamPushItem push = streamPushService.getPush(pushChannel.getApp(), pushChannel.getStream());
if (push != null) {
List<SendRtpItem> sendRtpItems = redisCatchStorage.querySendRTPServerByChnnelId(
List<SendRtpItem> sendRtpItems = redisCatchStorage.querySendRTPServerByChannelId(
push.getGbId());
if (!sendRtpItems.isEmpty()) {
for (SendRtpItem sendRtpItem : sendRtpItems) {
@@ -86,26 +83,6 @@ public class RedisPushStreamCloseResponseListener implements MessageListener {
logger.error("[命令发送失败] 国标级联 发送BYE: {}", e.getMessage());
}
}
if (push.isSelf()) {
// 停止向上级推流
String streamId = sendRtpItem.getStream();
Map<String, Object> param = new HashMap<>();
param.put("vhost","__defaultVhost__");
param.put("app",sendRtpItem.getApp());
param.put("stream",streamId);
param.put("ssrc",sendRtpItem.getSsrc());
logger.info("[REDIS消息-推流结束] 停止向上级推流:{}", streamId);
MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
redisCatchStorage.deleteSendRTPServer(sendRtpItem.getPlatformId(), sendRtpItem.getChannelId(), sendRtpItem.getCallId(), sendRtpItem.getStream());
zlmServerFactory.stopSendRtpStream(mediaInfo, param);
if (InviteStreamType.PUSH == sendRtpItem.getPlayType()) {
MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(0,
sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getChannelId(),
sendRtpItem.getPlatformId(), parentPlatform.getName(), userSetting.getServerId(), sendRtpItem.getMediaServerId());
messageForPushChannel.setPlatFormIndex(parentPlatform.getId());
redisCatchStorage.sendPlatformStopPlayMsg(messageForPushChannel);
}
}
}
}
}

View File

@@ -88,7 +88,8 @@ public class RedisPushStreamStatusListMsgListener implements MessageListener {
streamPushItemForSave.add(streamPushItem);
allGBId.put(streamPushItem.getGbId(), streamPushItem);
} else {
if (allGBId.containsKey(streamPushItem.getGbId())) {
if (allGBId.containsKey(streamPushItem.getGbId())
&& (!allGBId.get(streamPushItem.getGbId()).getApp().equals(streamPushItem.getApp()) || !allGBId.get(streamPushItem.getGbId()).getStream().equals(streamPushItem.getStream()))) {
GbStream gbStream = allGBId.get(streamPushItem.getGbId());
logger.warn("[REDIS消息-推流设备列表更新-UPDATE] 国标编号重复: {}, 已分配给{}/{}",
streamPushItem.getGbId(), gbStream.getApp(), gbStream.getStream());

View File

@@ -1,11 +1,7 @@
package com.genersoft.iot.vmp.service.redisMsg;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONObject;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.media.zlm.ZLMMediaListManager;
import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@@ -41,52 +37,52 @@ public class RedisStreamMsgListener implements MessageListener {
@Override
public void onMessage(Message message, byte[] bytes) {
boolean isEmpty = taskQueue.isEmpty();
taskQueue.offer(message);
if (isEmpty) {
taskExecutor.execute(() -> {
while (!taskQueue.isEmpty()) {
Message msg = taskQueue.poll();
try {
JSONObject steamMsgJson = JSON.parseObject(msg.getBody(), JSONObject.class);
if (steamMsgJson == null) {
logger.warn("[收到redis 流变化]消息解析失败");
continue;
}
String serverId = steamMsgJson.getString("serverId");
if (userSetting.getServerId().equals(serverId)) {
// 自己发送的消息忽略即可
continue;
}
logger.info("[收到redis 流变化] {}", new String(message.getBody()));
String app = steamMsgJson.getString("app");
String stream = steamMsgJson.getString("stream");
boolean register = steamMsgJson.getBoolean("register");
String mediaServerId = steamMsgJson.getString("mediaServerId");
OnStreamChangedHookParam onStreamChangedHookParam = new OnStreamChangedHookParam();
onStreamChangedHookParam.setSeverId(serverId);
onStreamChangedHookParam.setApp(app);
onStreamChangedHookParam.setStream(stream);
onStreamChangedHookParam.setRegist(register);
onStreamChangedHookParam.setMediaServerId(mediaServerId);
onStreamChangedHookParam.setCreateStamp(System.currentTimeMillis()/1000);
onStreamChangedHookParam.setAliveSecond(0L);
onStreamChangedHookParam.setTotalReaderCount("0");
onStreamChangedHookParam.setOriginType(0);
onStreamChangedHookParam.setOriginTypeStr("0");
onStreamChangedHookParam.setOriginTypeStr("unknown");
if (register) {
zlmMediaListManager.addPush(onStreamChangedHookParam);
}else {
zlmMediaListManager.removeMedia(app, stream);
}
}catch (Exception e) {
logger.warn("[REDIS消息-流变化] 发现未处理的异常, \r\n{}", JSON.toJSONString(message));
logger.error("[REDIS消息-流变化] 异常内容: ", e);
}
}
});
}
// boolean isEmpty = taskQueue.isEmpty();
// taskQueue.offer(message);
// if (isEmpty) {
// taskExecutor.execute(() -> {
// while (!taskQueue.isEmpty()) {
// Message msg = taskQueue.poll();
// try {
// JSONObject steamMsgJson = JSON.parseObject(msg.getBody(), JSONObject.class);
// if (steamMsgJson == null) {
// logger.warn("[收到redis 流变化]消息解析失败");
// continue;
// }
// String serverId = steamMsgJson.getString("serverId");
//
// if (userSetting.getServerId().equals(serverId)) {
// // 自己发送的消息忽略即可
// continue;
// }
// logger.info("[收到redis 流变化] {}", new String(message.getBody()));
// String app = steamMsgJson.getString("app");
// String stream = steamMsgJson.getString("stream");
// boolean register = steamMsgJson.getBoolean("register");
// String mediaServerId = steamMsgJson.getString("mediaServerId");
// OnStreamChangedHookParam onStreamChangedHookParam = new OnStreamChangedHookParam();
// onStreamChangedHookParam.setSeverId(serverId);
// onStreamChangedHookParam.setApp(app);
// onStreamChangedHookParam.setStream(stream);
// onStreamChangedHookParam.setRegist(register);
// onStreamChangedHookParam.setMediaServerId(mediaServerId);
// onStreamChangedHookParam.setCreateStamp(System.currentTimeMillis()/1000);
// onStreamChangedHookParam.setAliveSecond(0L);
// onStreamChangedHookParam.setTotalReaderCount("0");
// onStreamChangedHookParam.setOriginType(0);
// onStreamChangedHookParam.setOriginTypeStr("0");
// onStreamChangedHookParam.setOriginTypeStr("unknown");
// if (register) {
// zlmMediaListManager.addPush(onStreamChangedHookParam);
// }else {
// zlmMediaListManager.removeMedia(app, stream);
// }
// }catch (Exception e) {
// logger.warn("[REDIS消息-流变化] 发现未处理的异常, \r\n{}", JSON.toJSONString(message));
// logger.error("[REDIS消息-流变化] 异常内容: ", e);
// }
// }
// });
// }
}
}